Implemented CkReduce to mimic CmiReduce but within the virtualized BigSim.
authorFilippo Gioachin <gioachin@uiuc.edu>
Sat, 3 Apr 2010 18:55:00 +0000 (13:55 -0500)
committerFilippo Gioachin <gioachin@uiuc.edu>
Sat, 3 Apr 2010 18:55:00 +0000 (13:55 -0500)
For now this does not support anything else than global reductions, and there cannot be more than one in flight at a time

src/ck-core/middle-blue.h
src/ck-core/middle-conv.h
src/conv-ccs/conv-ccs.c
src/langs/bluegene/blue.C
src/langs/bluegene/blue_impl.h

index 5ba3a7da23556319e16cf1f96a885238e63d4a09..78bde06d2bae97e44d43d37f76e97bf28e073e52 100644 (file)
@@ -154,6 +154,8 @@ static inline void CksdScheduler(int ret) { BgScheduler(ret); }
 static inline void CksdExitScheduler() { BgExitScheduler(); }
 static inline void CkDeliverMsgs(int nmsg)     { BgDeliverMsgs(nmsg); }
 
+void CkReduce(void *msg, int size, CmiReduceMergeFn mergeFn);
+
 }  /* end of namespace */
 
 #endif
index 8a47bbb09de3009842fe47a1b83fc2965e65ed02..423cd71dc31a81d8ba667252f52e436d684146a1 100644 (file)
@@ -17,6 +17,8 @@
 #define CksvInitialize            CsvInitialize
 #define CksvAccess        CsvAccess
 
+#define CkReduce    CmiReduce
+
 #undef CkMyPe
 #undef CkNumPes
 
index ab33516b3d8dae4eca788c6a3c2a10deb84896b0..342e5fb6c5ff2d6e010a8f20f65a3b96b72a35a1 100644 (file)
@@ -195,7 +195,7 @@ int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
     if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
     if (repPE == -1) {
       /* CCS Broadcast */
-      CmiReduce(msg, len, fn->mergeFn);
+      CkReduce(msg, len, fn->mergeFn);
     } else {
       /* CCS Multicast */
       CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
index a34cf478a1476310896b3d559bfb13a77dc289b4..c81f77c5cc5693c3c77aa7ae81cd72c76f8c2b07 100644 (file)
@@ -2092,3 +2092,30 @@ int BgIsReplay()
     return cva(bgMach).replay != -1;
 }
 
+extern "C" void CkReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
+  ((workThreadInfo*)cta(threadinfo))->reduceMsg = msg;
+  //CmiPrintf("Called CkReduce from %d %hd\n",CmiMyPe(),cta(threadinfo)->globalId);
+  int numLocal = 0, count = 0;
+  for (int j=0; j<cva(numNodes); j++){
+    for(int i=0;i<cva(bgMach).numWth;i++){
+      workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
+      if (t->reduceMsg == NULL) return; /* we are not yet ready to reduce */
+      numLocal ++;
+    }
+  }
+  void **msgLocal = (void**)malloc(sizeof(void*)*(numLocal-1));
+  for (int j=0; j<cva(numNodes); j++){
+    for(int i=0;i<cva(bgMach).numWth;i++){
+      workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
+      if (t == cta(threadinfo)) break;
+      msgLocal[count++] = t->reduceMsg;
+      t->reduceMsg = NULL;
+    }
+  }
+  CmiAssert(count==numLocal-1);
+  msg = mergeFn(&size, msg, msgLocal, numLocal-1);
+  CmiReduce(msg, size, mergeFn);
+  CmiPrintf("Called CmiReduce %d\n",CmiMyPe());
+  for (int i=0; i<numLocal-1; ++i) CmiFree(msgLocal[i]);
+  free(msgLocal);
+}
index c86029e5a7325b5404f45be4287c080c88cc70bc..bffe35994970649ce46b8689cc72cc7d924ec0d7 100644 (file)
@@ -519,8 +519,10 @@ class workThreadInfo : public threadInfo {
 private:
   int CsdStopFlag;
 public:
+  void* reduceMsg;
+  
   workThreadInfo(int _id, nodeInfo *_node): 
-        threadInfo(_id, WORK_THREAD, _node) { 
+        threadInfo(_id, WORK_THREAD, _node), reduceMsg(NULL) { 
     CsdStopFlag=0; 
     watcher = NULL;
     if (_id != -1) {