pipelined allreduce for large messages implemented, use -D_PIPELINED_ALLREDUCE_ to...
[charm.git] / src / ck-core / ckarray.C
index 9aed8239ecc5908d7b4d8080feaa9a4e9ecab0d3..12124811d2e59a3b04c3be5848dce2a0e041894b 100644 (file)
@@ -268,6 +268,9 @@ void ArrayElement::initBasics(void)
         mlogData->objID.type = TypeArray;
         mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
 #endif
+#ifdef _PIPELINED_ALLREDUCE_
+       allredMgr = NULL;
+#endif
 }
 
 ArrayElement::ArrayElement(void) 
@@ -300,8 +303,98 @@ void ArrayElement::ckJustRestored(void) {
     //empty for out-of-core emulation
 }
 
+#ifdef _PIPELINED_ALLREDUCE_
+void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
+                                       CMK_REFNUM_TYPE userFlag)
+{
+       CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
+       msg->setUserFlag(userFlag);
+       msg->setMigratableContributor(true);
+       thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
+                                       const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
+{
+       CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
+       msg->setUserFlag(userFlag);
+       msg->setCallback(cb);
+       msg->setMigratableContributor(true);
+       thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+void ArrayElement::contribute2(CkReductionMsg *msg) 
+{
+       msg->setMigratableContributor(true);
+       thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+void ArrayElement::contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
+{
+       CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
+    msg->setUserFlag(userFlag);
+    msg->setCallback(cb);
+    msg->setMigratableContributor(true);
+    thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+void ArrayElement::contribute2(CMK_REFNUM_TYPE userFlag)
+{
+    CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
+    msg->setUserFlag(userFlag);
+    msg->setMigratableContributor(true);
+    thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+
+void ArrayElement::contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
+                                                         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
+{
+       // if it is a broadcast to myself and size is large
+       if(cb.type==CkCallback::bcastArray && cb.d.array.id==thisArrayID && dataSize>FRAG_THRESHOLD) 
+       {
+               if (!allredMgr) {
+                       allredMgr = new AllreduceMgr();
+               }
+               // number of fragments
+               int fragNo = dataSize/FRAG_SIZE;
+               int size = FRAG_SIZE;
+               // for each fragment
+               for (int i=0; i<fragNo; i++) {
+                       // callback to defragmentor
+                       CkCallback defrag_cb(CkIndex_ArrayElement::defrag(NULL), thisArrayID);
+                       if ((0 != i) && ((fragNo-1) == i) && (0 != dataSize%FRAG_SIZE)) {
+                               size = dataSize%FRAG_SIZE;
+                       }
+                       CkReductionMsg *msg = CkReductionMsg::buildNew(size, (char*)data+i*FRAG_SIZE);
+                       // initialize the new msg
+                       msg->reducer            = type;
+                       msg->nFrags             = fragNo;
+                       msg->fragNo             = i;
+                       msg->callback           = defrag_cb;
+                       msg->userFlag           = userFlag;
+                       allredMgr->cb           = cb;
+                       allredMgr->cb.type      = CkCallback::sendArray;
+                       allredMgr->cb.d.array.idx = myIndex;
+                       contribute2(msg);
+               }
+               return;
+       }
+       CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
+       msg->setUserFlag(userFlag);
+       msg->setCallback(cb);
+       msg->setMigratableContributor(true);
+       thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+
+
+#else
 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
+#endif
+// _PIPELINED_ALLREDUCE_
+void ArrayElement::defrag(CkReductionMsg *msg)
+{
+//     CkPrintf("in defrag\n");
+#ifdef _PIPELINED_ALLREDUCE_
+       allredMgr->allreduce_recieve(msg);
+#endif
+}
 
 /// Remote method: calls destructor
 void ArrayElement::ckDestroy(void)