Making msgpacker faster by reducing short envelope size and making fewer calls to...
[charm.git] / src / ck-com / MsgPacker.h
index 47f42349c96144f663c7559f1a0ef008176d1614..b358bedcc20d79f53d533484333eeb9c5f02724c 100644 (file)
@@ -3,24 +3,51 @@
 
 #include "charm++.h"
 #include "envelope.h"
+#include "ComlibManager.h"
+#include "register.h"
+#include "pup_cmialloc.h"
+
+#define MAX_MESSAGE_SIZE 32768
+
 
-#define MAX_MESSAGE_SIZE 65535
 
 class short_envelope {
  public:
     UShort epIdx;
     UShort size;  //Can only send messages up to 64KB :)    
+    
     CkArrayIndexMax idx;
     char *data;
 
     short_envelope();
     ~short_envelope();
-    short_envelope(CkMigrateMessage *){}
+    inline short_envelope(CkMigrateMessage *){}
     
     void pup(PUP::er &p);
 };
 
-PUPmarshall(short_envelope);
+inline short_envelope::short_envelope(){
+    epIdx = 0;
+    data = NULL;
+}
+
+inline short_envelope::~short_envelope(){
+    /*
+      if(data) 
+      CmiFree(data);        
+      data = NULL;
+    */
+}
+
+inline void short_envelope::pup(PUP::er &p){
+    p.pupCmiAllocBuf((void **)&data, size);
+    p | idx;
+    
+    int *data = (int *)this;
+    //p | epIdx;
+    //p | size;    
+    p(data, 2);
+}
 
 struct CombinedMessage{
 
@@ -41,9 +68,79 @@ class MsgPacker {
     MsgPacker();
     ~MsgPacker();    
     MsgPacker(CkQ<CharmMessageHolder*> &cmsg_list, int n_msgs);
+    
     void getMessage(CombinedMessage *&msg, int &size);
-
     static void deliver(CombinedMessage *cmb_msg);
 };
 
+inline void MsgPacker::deliver(CombinedMessage *cmb_msg){
+
+    CombinedMessage cmb_hdr;
+
+    PUP_fromCmiAllocMem fp(cmb_msg);
+    fp | cmb_hdr;
+
+    int nmsgs = cmb_hdr.nmsgs;
+
+    ComlibPrintf("In MsgPacker::deliver\n");
+    CkArrayID aid = cmb_hdr.aid;
+    int src_pe = cmb_hdr.srcPE;
+
+    for(int count = 0; count < nmsgs; count ++){
+        short_envelope senv;
+        fp | senv;
+        
+        int ep = senv.epIdx;
+        CkArrayIndexMax idx = senv.idx;
+        int size = senv.size;
+
+        CProxyElement_ArrayBase ap(aid, idx);
+        ArrayElement *a_elem = ap.ckLocal();
+        CkArray *a=(CkArray *)_localBranch(aid);
+
+        int msgIdx = _entryTable[ep]->msgIdx;
+        if(_entryTable[ep]->noKeep && a_elem != NULL) {
+            //Unpack the message
+            senv.data = (char *)_msgTable[msgIdx]->unpack(senv.data); 
+            CkDeliverMessageReadonly(ep, senv.data, a_elem);            
+            CmiFree(senv.data);
+        }
+        else {
+            //envelope *env = (envelope *)CmiAlloc(sizeof(envelope) + size);
+            envelope *env = _allocEnv(ForArrayEltMsg, 
+                                      sizeof(envelope) + size);
+
+            void *data = EnvToUsr(env);
+            memcpy(data, senv.data, size);
+            
+            //Unpack the message
+            data = (char *)_msgTable[msgIdx]->unpack(data); 
+            
+            env->getsetArrayMgr() = aid;
+            env->getsetArrayIndex() = idx;
+            env->getsetArrayEp() = ep;
+            env->setPacked(0); 
+            env->getsetArraySrcPe()=src_pe;  
+            env->getsetArrayHops()=1;  
+            env->setQueueing(CK_QUEUEING_FIFO);            
+            env->setUsed(0);
+            env->setMsgIdx(msgIdx);
+
+            env->setTotalsize(sizeof(envelope) + size);
+
+            //if(a_elem)
+            //  CkDeliverMessageFree(ep, data, a_elem);                     
+            //else
+            //ap.ckSend((CkArrayMessage *)data, ep);
+            
+            a->deliver((CkArrayMessage *)data, CkDeliver_queue);
+
+            CmiFree(senv.data);
+        }        
+    }      
+        
+    CmiFree(cmb_msg);
+}
+
+
 #endif