Checking in a new broadcast strategy which broadcasts to arrays along a load-balanced...
[charm.git] / src / ck-com / StreamingStrategy.C
index f2c8c1f9108b87399da01820934ad61c37aff369..c306ca63f59396d75937e6a74cdb4d3565a362c5 100644 (file)
@@ -1,6 +1,24 @@
 #include "StreamingStrategy.h"
 #include "MsgPacker.h"
 
+void StreamingHandlerFn(void *msg) {
+    CombinedMessage hdr;
+    
+    CkPrintf("In streaming handler fn\n");
+
+    PUP_fromCmiAllocMem fp(msg);
+    fp | hdr;
+    
+    for(int count = 0; count < hdr.nmsgs; count ++) {
+        char *msg;
+        fp.pupCmiAllocBuf((void **)&msg);
+        int size = SIZEFIELD(msg);
+        CmiSyncSendAndFree(CkMyPe(), size, msg);
+    }
+    CmiFree(msg);
+    return;
+}
+
 StreamingStrategy::StreamingStrategy(int periodMs,int bufferMax_)
     : PERIOD(periodMs), bufferMax(bufferMax_), CharmStrategy()
 {
@@ -8,6 +26,8 @@ StreamingStrategy::StreamingStrategy(int periodMs,int bufferMax_)
     streamingMsgCount=NULL;
     shortMsgPackingFlag = CmiFalse;
     idleFlush = CmiTrue;
+    streaming_handler_id = 0;
+    setType(ARRAY_STRATEGY);
 }
 
 void StreamingStrategy::insertMessage(CharmMessageHolder *cmsg) {
@@ -18,10 +38,10 @@ void StreamingStrategy::insertMessage(CharmMessageHolder *cmsg) {
     int size = env->getTotalsize();
 
     if(size > MAX_STREAMING_MESSAGE_SIZE) {//AVOID COPYING
-      ComlibPrintf("StreamingStrategy::insertMessage: direct send\n");
-      CmiSyncSendAndFree(pe, size, (char *)env);
-      delete cmsg;
-      return;
+        ComlibPrintf("StreamingStrategy::insertMessage: direct send\n");
+        CmiSyncSendAndFree(pe, size, (char *)env);
+        delete cmsg;
+        return;
     }
 
     ComlibPrintf("StreamingStrategy::insertMessage: buffering t=%d, n=%d, s=%d\n",  
@@ -39,57 +59,61 @@ void StreamingStrategy::doneInserting() {
 
 /// Send off all accumulated messages for this PE:
 void StreamingStrategy::flushPE(int pe) {
+
+  //CkPrintf("Checking %d\n", pe);
+
   if(streamingMsgCount[pe] == 0)
-    return; //Nothing to do.
+      return; //Nothing to do.
   
   CharmMessageHolder *cmsg, *toBeDeleted = NULL;
+  int size = 0;
   if(shortMsgPackingFlag){
-    MsgPacker mpack(streamingMsgBuf[pe], streamingMsgCount[pe]);
-    CombinedMessage *msg; 
-    int size;
-    mpack.getMessage(msg, size);
-    ComlibPrintf("[%d] StreamingStrategy::flushPE: packed %d short messages to %d\n", 
-                CkMyPe(), streamingMsgCount[pe], pe); 
-    CmiSyncSendAndFree(pe, size, (char *)msg);
-    streamingMsgCount[pe] = 0;
+      MsgPacker mpack(streamingMsgBuf[pe], streamingMsgCount[pe]);
+      CombinedMessage *msg; 
+      mpack.getMessage(msg, size);
+      ComlibPrintf("[%d] StreamingStrategy::flushPE: packed %d short messages to %d\n", 
+                   CkMyPe(), streamingMsgCount[pe], pe); 
+      CmiSyncSendAndFree(pe, size, (char *)msg);
+      streamingMsgCount[pe] = 0;
   }
   else {
     // Build a CmiMultipleSend list of messages to be sent off:
     int msg_count=streamingMsgCount[pe], msg_pe=0;
     if(msg_count == 1) {
-      cmsg = streamingMsgBuf[pe].deq();
-      char *msg = cmsg->getCharmMessage();
-      envelope *env = UsrToEnv(msg);
-      int size = env->getTotalsize();
-      CmiSyncSendAndFree(pe, size, (char *)env);
-      ComlibPrintf("[%d] StreamingStrategy::flushPE: one message to %d\n", 
-                  CkMyPe(), pe);            
-      delete cmsg;
-      streamingMsgCount[pe] = 0;
-      return;
+        cmsg = streamingMsgBuf[pe].deq();
+        char *msg = cmsg->getCharmMessage();
+        envelope *env = UsrToEnv(msg);
+        int size = env->getTotalsize();
+        CmiSyncSendAndFree(pe, size, (char *)env);
+        ComlibPrintf("[%d] StreamingStrategy::flushPE: one message to %d\n", 
+                     CkMyPe(), pe);            
+        delete cmsg;
+        streamingMsgCount[pe] = 0;
+        return;
     }
+    
     char **msgComps = new char*[msg_count];
     int *sizes = new int[msg_count];
     ComlibPrintf("[%d] StreamingStrategy::flushPE: %d messages to %d\n", 
-                CkMyPe(), msg_count, pe);            
+                 CkMyPe(), msg_count, pe);            
     while (!streamingMsgBuf[pe].isEmpty()) {
-      cmsg = streamingMsgBuf[pe].deq();
-      char *msg = cmsg->getCharmMessage();
-      envelope *env = UsrToEnv(msg);
-      sizes[msg_pe] = env->getTotalsize();
-      msgComps[msg_pe] = (char *)env;
-      msg_pe++;
-      
-      // Link cmsg into the toBeDeleted list:
-      cmsg->next = toBeDeleted;
-      toBeDeleted = cmsg;            
+        cmsg = streamingMsgBuf[pe].deq();
+        char *msg = cmsg->getCharmMessage();
+        envelope *env = UsrToEnv(msg);
+        sizes[msg_pe] = env->getTotalsize();
+        msgComps[msg_pe] = (char *)env;
+        msg_pe++;
+        
+        // Link cmsg into the toBeDeleted list:
+        cmsg->next = toBeDeleted;
+        toBeDeleted = cmsg;            
     }
     
     if (msg_count!=msg_pe) 
-      CkAbort("streamingMsgCount doesn't match streamingMsgBuf!\n");
-
+        CkAbort("streamingMsgCount doesn't match streamingMsgBuf!\n");
+    
     ComlibPrintf("--> Sending %d Messages to PE %d\n", msg_count, pe);
-
+    
     CmiMultipleSend(pe, msg_count, sizes, msgComps);
     delete [] msgComps;
     delete [] sizes;
@@ -103,38 +127,111 @@ void StreamingStrategy::flushPE(int pe) {
         delete cmsg;
         cmsg = toBeDeleted;            
     }     
+    
+    /*
+    PUP_cmiAllocSizer sp;
+    CombinedMessage hdr;
+    
+    sp | hdr;
+
+    int nmsgs = streamingMsgCount[pe];
+    for(int count = 0; count < nmsgs; count++) {
+        cmsg = streamingMsgBuf[pe][count];
+        char *msg = cmsg->getCharmMessage();
+        envelope *env = UsrToEnv(msg);
+        size = env->getTotalsize();
+        
+        sp.pupCmiAllocBuf((void **)&env, size);
+    }
+    
+    char *newmsg = (char *)CmiAlloc(sp.size());
+    PUP_toCmiAllocMem mp(newmsg);
+    
+    hdr.aid.setZero();
+    hdr.srcPE = CkMyPe();
+    hdr.nmsgs = nmsgs;
+    mp | hdr;
+    
+    for(int count = 0; count < nmsgs; count++) {
+        cmsg = streamingMsgBuf[pe][count];
+        char *msg = cmsg->getCharmMessage();
+        envelope *env = UsrToEnv(msg);
+        size = env->getTotalsize();
+        
+        mp.pupCmiAllocBuf((void **)&env, size);
+    }
+
+    for(int count = 0; count < nmsgs; count++) {
+        cmsg = streamingMsgBuf[pe][count];
+        delete cmsg;
+    }    
+    
+    streamingMsgCount[pe] = 0;
+    CmiSetHandler(newmsg, streaming_handler_id);
+    CmiSyncSendAndFree(pe, sp.size(), newmsg); 
+    */
   }
 }
 
 void StreamingStrategy::periodicFlush() {
-  for (int pe=0; pe<CkNumPes(); pe++) flushPE(pe);
+    for (int proc = 0; proc < CkNumPes(); proc++) 
+        flushPE(proc);
+}
+
+struct MsgStruct {
+    char header[CmiReservedHeaderSize];
+    void *addr;
+};
+
+
+void testHandler(void *msg) {
+    StreamingStrategy *s;
+
+    MsgStruct *mstruct = (MsgStruct *)msg;
+
+    s = (StreamingStrategy *) (mstruct->addr);
+    s->periodicFlush();
+
+    CmiSyncSendAndFree(CkMyPe(), sizeof(MsgStruct), (char *)msg);
 }
 
 /// This routine is called via CcdCallFnAfter to flush all messages:
 static void call_delayFlush(void *arg,double curWallTime) {
-  StreamingStrategy *s=(StreamingStrategy *)arg;
-  s->periodicFlush();
-  s->registerFlush(); //Set ourselves up to be called again
+    StreamingStrategy *s=(StreamingStrategy *)arg;
+    s->periodicFlush();
+    s->registerFlush(); //Set ourselves up to be called again
 }
 
 void StreamingStrategy::registerFlush(void) {
-  // CkPrintf("[%d] Will call function again every %d ms\n",CkMyPe(),PERIOD);
-  CcdCallFnAfterOnPE((CcdVoidFn)call_delayFlush, (void *)this, PERIOD, CkMyPe());
+    //CkPrintf("[%d] Will call function again every %d ms\n",CkMyPe(),PERIOD);
+    CcdCallFnAfterOnPE(call_delayFlush, (void *)this, PERIOD, CkMyPe());
 }
 
 /// This routine is called via CcdCallOnCondition to flush all messages:
 static void call_idleFlush(void *arg,double curWallTime) {
-  StreamingStrategy *s=(StreamingStrategy *)arg;
-  s->periodicFlush();
+    StreamingStrategy *s=(StreamingStrategy *)arg;
+    s->periodicFlush();
 }
 
 // When we're finally ready to go, register for timeout and idle flush.
 void StreamingStrategy::beginProcessing(int ignored) {
-  registerFlush();
-  if(idleFlush)
-    CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE,
-                              (CcdVoidFn)call_idleFlush, 
-                              (void *)this, CkMyPe());
+    registerFlush();
+    //if(idleFlush)
+    //  CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE,
+    //                             (CcdVoidFn)call_idleFlush, 
+    //                             (void *)this, CkMyPe());
+    
+    streaming_handler_id = CkRegisterHandler(StreamingHandlerFn);
+    
+    /*
+      int handler = CkRegisterHandler(testHandler);
+      
+      MsgStruct *msg = (MsgStruct *)CmiAlloc(sizeof(MsgStruct));
+      msg->addr = this;
+      CmiSetHandler(msg, handler);
+      
+      CmiSyncSendAndFree(CkMyPe(), sizeof(MsgStruct), (char *)msg);
+    */
 }
 
 void StreamingStrategy::pup(PUP::er &p){
@@ -144,12 +241,13 @@ void StreamingStrategy::pup(PUP::er &p){
   p | bufferMax;
   p | shortMsgPackingFlag;
   p | idleFlush;
+  p | streaming_handler_id;
 
   if(p.isUnpacking()) {
-    streamingMsgBuf = new CkQ<CharmMessageHolder *>[CkNumPes()];
-    streamingMsgCount = new int[CkNumPes()];
-    for(int count = 0; count < CkNumPes(); count ++)
-      streamingMsgCount[count] = 0;
+      streamingMsgBuf = new CkQ<CharmMessageHolder *>[CkNumPes()];
+      streamingMsgCount = new int[CkNumPes()];
+      for(int count = 0; count < CkNumPes(); count ++)
+          streamingMsgCount[count] = 0;
   }
 }