pipelined allreduce for large messages implemented, use -D_PIPELINED_ALLREDUCE_ to...
authorEhsan Totoni <totoni2@illinois.edu>
Fri, 24 Jun 2011 16:21:53 +0000 (11:21 -0500)
committerEhsan Totoni <totoni2@illinois.edu>
Fri, 24 Jun 2011 16:21:53 +0000 (11:21 -0500)
src/ck-core/ckarray.C
src/ck-core/ckarray.ci
src/ck-core/ckarray.h
src/ck-core/ckarrayindex.h
src/ck-core/ckcallback.h
src/ck-core/ckreduction.h

index 9aed8239ecc5908d7b4d8080feaa9a4e9ecab0d3..12124811d2e59a3b04c3be5848dce2a0e041894b 100644 (file)
@@ -268,6 +268,9 @@ void ArrayElement::initBasics(void)
         mlogData->objID.type = TypeArray;
         mlogData->objID.data.array.id = (CkGroupID)thisArrayID;
 #endif
+#ifdef _PIPELINED_ALLREDUCE_
+       allredMgr = NULL;
+#endif
 }
 
 ArrayElement::ArrayElement(void) 
@@ -300,8 +303,98 @@ void ArrayElement::ckJustRestored(void) {
     //empty for out-of-core emulation
 }
 
+#ifdef _PIPELINED_ALLREDUCE_
+void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
+                                       CMK_REFNUM_TYPE userFlag)
+{
+       CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
+       msg->setUserFlag(userFlag);
+       msg->setMigratableContributor(true);
+       thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+void ArrayElement::contribute2(int dataSize,const void *data,CkReduction::reducerType type,
+                                       const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
+{
+       CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
+       msg->setUserFlag(userFlag);
+       msg->setCallback(cb);
+       msg->setMigratableContributor(true);
+       thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+void ArrayElement::contribute2(CkReductionMsg *msg) 
+{
+       msg->setMigratableContributor(true);
+       thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+void ArrayElement::contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
+{
+       CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
+    msg->setUserFlag(userFlag);
+    msg->setCallback(cb);
+    msg->setMigratableContributor(true);
+    thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+void ArrayElement::contribute2(CMK_REFNUM_TYPE userFlag)
+{
+    CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::random);
+    msg->setUserFlag(userFlag);
+    msg->setMigratableContributor(true);
+    thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+
+void ArrayElement::contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
+                                                         const CkCallback &cb,CMK_REFNUM_TYPE userFlag)
+{
+       // if it is a broadcast to myself and size is large
+       if(cb.type==CkCallback::bcastArray && cb.d.array.id==thisArrayID && dataSize>FRAG_THRESHOLD) 
+       {
+               if (!allredMgr) {
+                       allredMgr = new AllreduceMgr();
+               }
+               // number of fragments
+               int fragNo = dataSize/FRAG_SIZE;
+               int size = FRAG_SIZE;
+               // for each fragment
+               for (int i=0; i<fragNo; i++) {
+                       // callback to defragmentor
+                       CkCallback defrag_cb(CkIndex_ArrayElement::defrag(NULL), thisArrayID);
+                       if ((0 != i) && ((fragNo-1) == i) && (0 != dataSize%FRAG_SIZE)) {
+                               size = dataSize%FRAG_SIZE;
+                       }
+                       CkReductionMsg *msg = CkReductionMsg::buildNew(size, (char*)data+i*FRAG_SIZE);
+                       // initialize the new msg
+                       msg->reducer            = type;
+                       msg->nFrags             = fragNo;
+                       msg->fragNo             = i;
+                       msg->callback           = defrag_cb;
+                       msg->userFlag           = userFlag;
+                       allredMgr->cb           = cb;
+                       allredMgr->cb.type      = CkCallback::sendArray;
+                       allredMgr->cb.d.array.idx = myIndex;
+                       contribute2(msg);
+               }
+               return;
+       }
+       CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);
+       msg->setUserFlag(userFlag);
+       msg->setCallback(cb);
+       msg->setMigratableContributor(true);
+       thisArray->contribute(&*(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],msg);
+}
+
+
+#else
 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(ArrayElement,thisArray,
    *(contributorInfo *)&listenerData[thisArray->reducer->ckGetOffset()],true)
+#endif
+// _PIPELINED_ALLREDUCE_
+void ArrayElement::defrag(CkReductionMsg *msg)
+{
+//     CkPrintf("in defrag\n");
+#ifdef _PIPELINED_ALLREDUCE_
+       allredMgr->allreduce_recieve(msg);
+#endif
+}
 
 /// Remote method: calls destructor
 void ArrayElement::ckDestroy(void)
index c7faa5d52fa185b3dcd4c30dfdd233f31eef897c..2b6306451a2262c0484588cd8a925b2c9e7d1d85 100644 (file)
@@ -33,6 +33,9 @@ module CkArray {
     entry void recvBroadcast(CkMessage *);
     // CMK_MEM_CHECKPOINT
     entry void inmem_checkpoint(CkArrayCheckPTReqMessage *);
+    // _PIPELINED_ALLREDUCE_
+    entry void defrag(CkReductionMsg*);
+
   };
 
 
index d430e0eebee97c87d24d0bc8e8baa9036832c7d8..3153e4b92944ede3ddaf4461b1b860f277451355 100644 (file)
@@ -473,6 +473,9 @@ class ArrayElement : public CkMigratable
   friend class CkArrayListener;
   int numInitialElements; // Number of elements created by ckNew(numElements)
   void initBasics(void);
+#ifdef _PIPELINED_ALLREDUCE_
+AllreduceMgr * allredMgr; // for allreduce
+#endif
 public:
   ArrayElement(void);
   ArrayElement(CkMigrateMessage *m);
@@ -495,8 +498,21 @@ public:
   /// Synonym for ckMigrate
   inline void migrateMe(int toPe) {ckMigrate(toPe);}
 
+#ifdef _PIPELINED_ALLREDUCE_
+       void contribute2(CkArrayIndex myIndex, int dataSize,const void *data,CkReduction::reducerType type,
+                          const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);
+       void contribute2(int dataSize,const void *data,CkReduction::reducerType type, 
+                                       CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); 
+       void contribute2(int dataSize,const void *data,CkReduction::reducerType type, 
+                                       const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); 
+       void contribute2(CkReductionMsg *msg); 
+       void contribute2(const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);
+       void contribute2(CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);
+#else
   CK_REDUCTION_CONTRIBUTE_METHODS_DECL
-
+#endif
+       // for _PIPELINED_ALLREDUCE_, assembler entry method
+       inline void defrag(CkReductionMsg* msg);
   inline const CkArrayID &ckGetArrayID(void) const {return thisArrayID;}
 
   inline int ckGetArraySize(void) const { return numInitialElements; }
@@ -549,6 +565,31 @@ public:
         mlogData->objID.data.array.idx=thisIndexMax;
 #endif
 }
+#ifdef _PIPELINED_ALLREDUCE_
+       void contribute(int dataSize,const void *data,CkReduction::reducerType type,
+                                               CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1)
+       {
+               contribute2( dataSize,data, type, userFlag);
+
+       }
+       void contribute(int dataSize,const void *data,CkReduction::reducerType type,
+                                               const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1)
+       {
+               contribute2((CkArrayIndex)(thisIndex) ,dataSize, data, type,   cb, userFlag);
+       }
+       void contribute(CkReductionMsg *msg) 
+       {
+               contribute2(msg);
+       }
+       void contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1)
+       {
+               contribute2(cb ,userFlag);
+       }
+       void contribute(CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1)
+       {
+               contribute2(userFlag);
+       }
+#endif
   ArrayElementT(CkMigrateMessage *msg)
        :ArrayElement(msg),
        thisIndex(*(const T *)thisIndexMax.data()) {}
index 20e48afcef3b94d8f8027a9e733c0db957448ac3..cb2b1927c347dbb2305f1063415c884029c61372 100644 (file)
@@ -67,6 +67,9 @@ class CkArrayIndex: public CkArrayIndexBase
     public:
         /// Default
         CkArrayIndex() { nInts=0; dimension=0; for (int i=0; i<CK_ARRAYINDEX_MAXLEN; i++) index[i] = 0; }
+#ifdef _PIPELINED_ALLREDUCE_
+       CkArrayIndex(int idx) {init(1,1,idx);};
+#endif
         /// Return a pointer to the actual index data
         int *data(void)             {return index; }
         /// Return a const pointer to the actual index data
index d2d8f39f3650b9d0a0600dffaae7448c2ccb6ffb..2d8764e0a7bce5880f5be6874f06746a1586cc80 100644 (file)
@@ -29,6 +29,9 @@ class ArrayElement;
 
 class CkCallback {
 public:
+#ifdef _PIPELINED_ALLREDUCE_
+       friend class ArrayElement;
+#endif
        typedef enum {
        invalid=0, //Invalid callback
        ignore, //Do nothing
index bd2a46de406e7b36e9e86aaff38247ed48f13455..97ee5cd825551ba46edd92319bf6b949a64887bd 100644 (file)
@@ -26,6 +26,11 @@ The calls needed to use the reduction manager are:
 #define GROUP_LEVEL_REDUCTION           1
 #endif
 
+#ifdef _PIPELINED_ALLREDUCE_
+#define FRAG_SIZE 32768
+#define FRAG_THRESHOLD 65536
+#endif
+
 class CkReductionMsg; //See definition below
 
 
@@ -262,6 +267,10 @@ class CkReductionMsg : public CMessage_CkReductionMsg
        friend class CkNodeReductionMgr;
        friend class CkArrayReductionMgr;
        friend class CkMulticastMgr;
+#ifdef _PIPELINED_ALLREDUCE_
+       friend class ArrayElement;
+       friend class AllreduceMgr;
+#endif
     friend class ck::impl::XArraySectionReducer;
 public:
 
@@ -412,5 +421,35 @@ class Group : public CkReductionMgr
        CK_REDUCTION_CONTRIBUTE_METHODS_DECL
 };
 
+#ifdef _PIPELINED_ALLREDUCE_
+class AllreduceMgr
+{
+public:
+       AllreduceMgr() { fragsRecieved=0; size=0; }
+       friend class ArrayElement;
+       // recieve an allreduce message
+       void allreduce_recieve(CkReductionMsg* msg)
+       {
+               allred_msgs.enq(msg);
+               fragsRecieved++;
+               if(fragsRecieved==1)
+               {
+                       data = new char[FRAG_SIZE*msg->nFrags];
+               }
+               memcpy(data+msg->fragNo*FRAG_SIZE, msg->data, msg->dataSize);
+               size += msg->dataSize;
+               
+               if(fragsRecieved==msg->nFrags)
+                       cb.send(size, (void*)data);
+               
+       }
+       // TODO: check for same reduction
+       CkCallback cb;  
+       int size;
+       char* data;
+       int fragsRecieved;
+       CkMsgQ<CkReductionMsg> allred_msgs;
+};
+#endif // _PIPELINED_ALLREDUCE_
 
 #endif //_CKREDUCTION_H