pipelined allreduce for large messages implemented, use -D_PIPELINED_ALLREDUCE_ to...
[charm.git] / src / ck-core / ckreduction.h
index bd2a46de406e7b36e9e86aaff38247ed48f13455..97ee5cd825551ba46edd92319bf6b949a64887bd 100644 (file)
@@ -26,6 +26,11 @@ The calls needed to use the reduction manager are:
 #define GROUP_LEVEL_REDUCTION           1
 #endif
 
+#ifdef _PIPELINED_ALLREDUCE_
+#define FRAG_SIZE 32768
+#define FRAG_THRESHOLD 65536
+#endif
+
 class CkReductionMsg; //See definition below
 
 
@@ -262,6 +267,10 @@ class CkReductionMsg : public CMessage_CkReductionMsg
        friend class CkNodeReductionMgr;
        friend class CkArrayReductionMgr;
        friend class CkMulticastMgr;
+#ifdef _PIPELINED_ALLREDUCE_
+       friend class ArrayElement;
+       friend class AllreduceMgr;
+#endif
     friend class ck::impl::XArraySectionReducer;
 public:
 
@@ -412,5 +421,35 @@ class Group : public CkReductionMgr
        CK_REDUCTION_CONTRIBUTE_METHODS_DECL
 };
 
+#ifdef _PIPELINED_ALLREDUCE_
+class AllreduceMgr
+{
+public:
+       AllreduceMgr() { fragsRecieved=0; size=0; }
+       friend class ArrayElement;
+       // recieve an allreduce message
+       void allreduce_recieve(CkReductionMsg* msg)
+       {
+               allred_msgs.enq(msg);
+               fragsRecieved++;
+               if(fragsRecieved==1)
+               {
+                       data = new char[FRAG_SIZE*msg->nFrags];
+               }
+               memcpy(data+msg->fragNo*FRAG_SIZE, msg->data, msg->dataSize);
+               size += msg->dataSize;
+               
+               if(fragsRecieved==msg->nFrags)
+                       cb.send(size, (void*)data);
+               
+       }
+       // TODO: check for same reduction
+       CkCallback cb;  
+       int size;
+       char* data;
+       int fragsRecieved;
+       CkMsgQ<CkReductionMsg> allred_msgs;
+};
+#endif // _PIPELINED_ALLREDUCE_
 
 #endif //_CKREDUCTION_H