updated syntax of the AllToAll message, in order to prepare for the two new
authorFilippo Gioachin <gioachin@illinois.edu>
Mon, 18 Apr 2005 09:12:00 +0000 (09:12 +0000)
committerFilippo Gioachin <gioachin@illinois.edu>
Mon, 18 Apr 2005 09:12:00 +0000 (09:12 +0000)
zero-copy routines

src/conv-com/petable.C

index 007d187d9e2e7883b3729c1e90ca1c634d3ce2a8..632b6fc93ca58a3810f10f6022cb43518fcb85ed 100644 (file)
@@ -245,8 +245,9 @@ char * PeTable ::ExtractAndPack(comID id, int ufield, int npe,
        return(NULL);
     }
     
        return(NULL);
     }
     
-    int msg_offset = CmiReservedHeaderSize + sizeof(comID) 
-        + (npe + 4 + nummsgs) * sizeof(int);  
+    int msg_offset = sizeof(struct AllToAllHdr) + (npe + nummsgs + 1) * sizeof(int);
+    //int msg_offset = CmiReservedHeaderSize + sizeof(comID) 
+    //    + (npe + 4 + nummsgs) * sizeof(int);  
 
     msg_offset = ALIGN8(msg_offset);
     
 
     msg_offset = ALIGN8(msg_offset);
     
@@ -265,6 +266,7 @@ char * PeTable ::ExtractAndPack(comID id, int ufield, int npe,
     PACK(int, refno);
     PACK(comID, id);
     PACK(int, ufield);
     PACK(int, refno);
     PACK(comID, id);
     PACK(int, ufield);
+    PACK(int, nummsgs);
     PACK(int, npe);
     
     int lesspe=0;
     PACK(int, npe);
     
     int lesspe=0;
@@ -290,9 +292,10 @@ char * PeTable ::ExtractAndPack(comID id, int ufield, int npe,
             else {
                npacked ++;
                 
             else {
                npacked ++;
                 
-               offset=msg_offset;
+               //offset=msg_offset;
+               offset=npacked;
                PeList[index][j]->magic=magic;
                PeList[index][j]->magic=magic;
-               PeList[index][j]->offset=msg_offset;
+               PeList[index][j]->offset=offset;
                PTinfo *tempmsg=PeList[index][j];
                
                 CmiChunkHeader hdr;
                PTinfo *tempmsg=PeList[index][j];
                
                 CmiChunkHeader hdr;
@@ -316,14 +319,16 @@ char * PeTable ::ExtractAndPack(comID id, int ufield, int npe,
         }
         msgnum[index]=0;
     }
         }
         msgnum[index]=0;
     }
-    offset=-1;
-    PACK(int, offset);
-    
+    //offset=-1;
+    //PACK(int, offset);
+
+    /*    
     if (lesspe) {
         t=p+CmiReservedHeaderSize+2*sizeof(int) + sizeof(comID);
        npe=npe-lesspe;
        PACK(int, npe);
     }
     if (lesspe) {
         t=p+CmiReservedHeaderSize+2*sizeof(int) + sizeof(comID);
        npe=npe-lesspe;
        PACK(int, npe);
     }
+    */
 
     return(p);
 } 
 
     return(p);
 } 
@@ -337,76 +342,123 @@ int PeTable :: UnpackAndInsert(void *in)
 {
   char *junk;
   char *t =(char *)in + CmiReservedHeaderSize;
 {
   char *junk;
   char *t =(char *)in + CmiReservedHeaderSize;
-  int i, ufield, npe, pe, tot_msgsize, ptrlistindex=0;
+  int i, ufield, npe, pe, nummsgs, ptrlistindex=0;
+  char *msgend, *msgcur;
   comID id;
   int refno = 0;
 
   comID id;
   int refno = 0;
 
+  register int offset;
+
   UNPACK(int, refno);
   UNPACK(int, refno);
-  
   //ComlibPrintf("%d UnPacking id\n", CkMyPe());
   UNPACK(comID, id);
   UNPACK(int, ufield);
   //ComlibPrintf("%d UnPacking id\n", CkMyPe());
   UNPACK(comID, id);
   UNPACK(int, ufield);
+  UNPACK(int, nummsgs);
   UNPACK(int, npe);
   UNPACK(int, npe);
-  
-  register int offset;
-  for (i=0;i<npe;i++) {
-       UNPACK(int, pe);
-       pe *= -1;
+
+  // unpack all messages into an array
+  msgend = (char*)in + CmiSize(in);
+  msgcur = (char*)in + ALIGN8(sizeof(struct AllToAllHdr) + (npe+nummsgs+1)*sizeof(int));
+  while (msgcur < msgend) {
+    CmiChunkHeader *ch = (CmiChunkHeader *)msgcur;
+
+    PTinfo *temp;
+    PTALLOC(temp);
+    temp->msgsize=ch->size;
+    temp->msg=(void*)&ch[1];
+    temp->refCount=0;
+    temp->magic=0;
+    temp->offset=0;
+
+    ptrvec.insert(++ptrlistindex, temp);
+
+    // fix the ref field of the message
+    ch->ref = (int)((char*)in - (char*)temp->msg);
+    msgcur += ALIGN8(ch->size) + sizeof(CmiChunkHeader);
+  }
+
+  pe = -1;
+  //for (i=0;i<npe;i++) {
+  for (i=0; i<nummsgs; ++i) {
+    //UNPACK(int, pe);
+    //pe *= -1;
+
+    UNPACK(int, offset);
+    if (offset <= 0) {
+      pe = -1 * offset;
+      --i;
+      continue;
+    }
+
+    if (msgnum[pe] >= MaxSize[pe]) {
+      REALLOC(PeList[pe], MaxSize[pe]);
+      MaxSize[pe] *= 2;
+    }
+    PeList[pe][msgnum[pe]] = ptrvec[offset];
+    (ptrvec[offset])->refCount++;
+    msgnum[pe]++;
+
+    /*
+    while (offset > 0) {
+      int tempmsgsize;
+      UNPACKMSG(&(tempmsgsize), (char *)in+offset, sizeof(int));
+      int ptr;
+      UNPACKMSG(&ptr, (char *)in+offset, sizeof(int));
+
+      if (ptr >=0 )  {
+       if (msgnum[pe] >= MaxSize[pe]) {
+         REALLOC(PeList[pe], MaxSize[pe]);
+         MaxSize[pe] *= 2;
+       }
+       PeList[pe][msgnum[pe]]=ptrvec[ptr];
+       (ptrvec[ptr])->refCount++;
+       msgnum[pe]++;
 
        UNPACK(int, offset);
 
        UNPACK(int, offset);
-       while (offset > 0) {
-            int tempmsgsize;
-            UNPACKMSG(&(tempmsgsize), (char *)in+offset, sizeof(int));
-            int ptr;
-            UNPACKMSG(&ptr, (char *)in+offset, sizeof(int));
-
-            if (ptr >=0 )  {
-                if (msgnum[pe] >= MaxSize[pe]) {
-                    REALLOC(PeList[pe], MaxSize[pe]);
-                    MaxSize[pe] *= 2;
-                }
-                PeList[pe][msgnum[pe]]=ptrvec[ptr];
-                (ptrvec[ptr])->refCount++;
-                msgnum[pe]++;
-
-                UNPACK(int, offset);
-                continue;
-            }
+       continue;
+      }
             
             
-            PTinfo *temp;
-            PTALLOC(temp);
-            temp->msgsize=tempmsgsize;
-            temp->refCount=1;
-            temp->magic=0;
-            temp->offset=0;
-
-            ptrvec.insert(ptrlistindex, temp);
-            memcpy((char *)in+offset-sizeof(int), &ptrlistindex, sizeof(int));
-
-            ptrlistindex++;
-            temp->msg=(void *)((char *)in+offset);
-            if (msgnum[pe] >= MaxSize[pe]) {
-
-                REALLOC(PeList[pe], MaxSize[pe]);
-                MaxSize[pe] *= 2;
-            }
-            PeList[pe][msgnum[pe]]=temp;
-            msgnum[pe]++;
-            UNPACK(int, offset);
-       }
-       t -=sizeof(int);
+      PTinfo *temp;
+      PTALLOC(temp);
+      temp->msgsize=tempmsgsize;
+      temp->refCount=1;
+      temp->magic=0;
+      temp->offset=0;
+
+      ptrvec.insert(ptrlistindex, temp);
+      memcpy((char *)in+offset-sizeof(int), &ptrlistindex, sizeof(int));
+
+      ptrlistindex++;
+      temp->msg=(void *)((char *)in+offset);
+      if (msgnum[pe] >= MaxSize[pe]) {
+
+       REALLOC(PeList[pe], MaxSize[pe]);
+       MaxSize[pe] *= 2;
+      }
+      PeList[pe][msgnum[pe]]=temp;
+      msgnum[pe]++;
+      UNPACK(int, offset);
+      //}
+    //t -=sizeof(int);
   }
   *(int *)((char *)in -sizeof(int))=ptrlistindex; 
   }
   *(int *)((char *)in -sizeof(int))=ptrlistindex; 
-  
-  if (ptrlistindex==0)
-      CmiFree(in);
-  
+  */
+  }
+
+  REFFIELD(in) = ptrlistindex;
+  if (ptrlistindex==0) {
+    REFFIELD(in) = 1;
+    CmiFree(in);
+  }
+
+  /*  
   for (i=0;i<ptrlistindex;i++) {
   for (i=0;i<ptrlistindex;i++) {
-      char * actualmsg=(char *)(ptrvec[i]->msg);
-      int *rc=(int *)(actualmsg-sizeof(int));
-      *rc=(int)((char *)in-actualmsg);
-      //ComlibPrintf("I am inserting %d\n", *rc);
+    char * actualmsg=(char *)(ptrvec[i]->msg);
+    int *rc=(int *)(actualmsg-sizeof(int));
+    *rc=(int)((char *)in-actualmsg);
+    //ComlibPrintf("I am inserting %d\n", *rc);
   }
   }
+  */
 
   ptrvec.removeAll();
   
 
   ptrvec.removeAll();
   
@@ -467,7 +519,7 @@ int PeTable :: UnpackAndInsertAll(void *in, int npes, int *pelist){
 
     // Get the size of the message, and set the ref field correctly for CmiFree
     size = SIZEFIELD(msg);
 
     // Get the size of the message, and set the ref field correctly for CmiFree
     size = SIZEFIELD(msg);
-    REFFIELD(msg) = (int)((char *)&REFFIELD(in) - (char *)REFFIELD(msg));
+    REFFIELD(msg) = (int)((char *)in - (char *)msg);
 
     t += ALIGN8(size);
 
 
     t += ALIGN8(size);