Merge branch 'charm' into virtualDebug
[charm.git] / src / langs / bluegene / blue.C
index a91e56ddf102462476959982a4cd2796b5518be0..3679dcba4a2e74efef40ea3db399d5a0eda72290 100644 (file)
@@ -517,6 +517,9 @@ void addBgNodeInbuffer(char *msgPtr, int lnodeID)
        
   nInfo.addBgNodeInbuffer(msgPtr);
 }
+extern "C" void addBgNodeInbuffer_c(char *msgPtr, int lnodeID) {
+  addBgNodeInbuffer(msgPtr, lnodeID);
+}
 
 /** BG API Func 
  *  called by a comm thread
@@ -1193,11 +1196,12 @@ void BgSetWorkerThreadStart(BgStartHandler f)
 extern "C" void CthResumeNormalThread(CthThreadToken* token);
 
 // kernel function for processing a bluegene message
-void BgProcessMessage(threadInfo *tinfo, char *msg)
+void BgProcessMessageDefault(threadInfo *tinfo, char *msg)
 {
   DEBUGM(5, ("=====Begin of BgProcessing a msg on node[%d]=====\n", BgMyNode()));
   int handler = CmiBgMsgHandle(msg);
-  DEBUGF(("[%d] call handler %d\n", BgMyNode(), handler));
+  //CmiPrintf("[%d] call handler %d\n", BgMyNode(), handler);
+  CmiAssert(handler < 1000);
 
   BgHandlerInfo *handInfo;
 #if  CMK_BLUEGENE_NODE
@@ -1237,6 +1241,7 @@ void BgProcessMessage(threadInfo *tinfo, char *msg)
   DEBUGM(5, ("=====End of BgProcessing a msg on node[%d]=====\n\n", BgMyNode()));
 }
 
+void  (*BgProcessMessage)(threadInfo *t, char *msg) = BgProcessMessageDefault;
 
 void scheduleWorkerThread(char *msg)
 {
@@ -2121,6 +2126,34 @@ int BgIsReplay()
     return cva(bgMach).replay != -1;
 }
 
+extern "C" void CkReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
+  ((workThreadInfo*)cta(threadinfo))->reduceMsg = msg;
+  //CmiPrintf("Called CkReduce from %d %hd\n",CmiMyPe(),cta(threadinfo)->globalId);
+  int numLocal = 0, count = 0;
+  for (int j=0; j<cva(numNodes); j++){
+    for(int i=0;i<cva(bgMach).numWth;i++){
+      workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
+      if (t->reduceMsg == NULL) return; /* we are not yet ready to reduce */
+      numLocal ++;
+    }
+  }
+  void **msgLocal = (void**)malloc(sizeof(void*)*(numLocal-1));
+  for (int j=0; j<cva(numNodes); j++){
+    for(int i=0;i<cva(bgMach).numWth;i++){
+      workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
+      if (t == cta(threadinfo)) break;
+      msgLocal[count++] = t->reduceMsg;
+      t->reduceMsg = NULL;
+    }
+  }
+  CmiAssert(count==numLocal-1);
+  msg = mergeFn(&size, msg, msgLocal, numLocal-1);
+  CmiReduce(msg, size, mergeFn);
+  CmiPrintf("Called CmiReduce %d\n",CmiMyPe());
+  for (int i=0; i<numLocal-1; ++i) CmiFree(msgLocal[i]);
+  free(msgLocal);
+}
+
 // for record/replay, to fseek back
 void BgRewindRecord()
 {