fix a bug caused by pakcing/unpacking in compression
authorYanhua Sun <sun51@illinois.edu>
Thu, 6 Dec 2012 18:25:27 +0000 (12:25 -0600)
committerYanhua Sun <sun51@illinois.edu>
Thu, 6 Dec 2012 18:25:27 +0000 (12:25 -0600)
src/arch/gemini_gni/conv-common.h
src/arch/gemini_gni/machine-persistent.c
src/arch/gemini_gni/machine-persistent.h
src/arch/util/persist-comm.c
src/conv-core/persistent.h

index 38d270c62aad3e27bc7649bec9f0bc27236f2f72..6a64eab5792d81dcb76e69279ba8b541c4de33bd 100644 (file)
@@ -13,9 +13,9 @@
 
 #if DELTA_COMPRESS
 #if CMK_ERROR_CHECKING
-#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt2 seq; unsigned char cksum, magic; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root; CmiUInt4 original_size; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; 
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt2 seq; unsigned char cksum, magic; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; 
 #else
-#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root; CmiUInt4 original_size; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; 
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; 
 #endif
 #else 
 #if CMK_ERROR_CHECKING
index 61ccb523500dc004e032bef925bba4d3ddd109f3..5f11a78e3b8960eda569ad670994be3eed3e740d 100644 (file)
@@ -41,7 +41,7 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
         // uGNI part
     
         slot->addrIndex = (destIndex+1)%PERSIST_BUFFERS_NUM;
-#if  DELTA_COMPRESS 
+#if  DELTA_COMPRESS
         size = CompressPersistentMsg(h, size, m);
 #endif
         MallocPostDesc(pd);
index 14653cb5742ac64d1c91afcd852779116084b662..413e7393b78222b1f864c488e69cc71fff68df18 100644 (file)
 
 #include "gni_pub.h"
 #define PERSIST_MIN_SIZE                SMSG_MAX_MSG
-
+//#define COPY_HISTORY                          1
 // one is for receive one is to store the previous msg
 #if DELTA_COMPRESS
+#if COPY_HISTORY 
+#define PERSIST_BUFFERS_NUM             1
+#else
 #define PERSIST_BUFFERS_NUM             2
+#endif
 #else
 #define PERSIST_BUFFERS_NUM             1
 #endif
@@ -39,6 +43,7 @@ typedef struct _PersistentSendsTable {
 #if DELTA_COMPRESS
   PersistentHandle destDataHandle;
   void  *previousMsg;
+  int   previousSize;
   int   compressStart;
   int   compressSize;
 #endif
@@ -53,7 +58,7 @@ typedef struct _PersistentReceivesTable {
   int           addrIndex;
 #if DELTA_COMPRESS
   int   compressStart;
-  int   compressSize;
+  void  *history;
 #endif
 } PersistentReceivesTable;
 
index c4f62406942da570feaf10d0870fa9371acfecef..d1e1233046315fa6e6ffad4e6aeea0782e43fd6e 100644 (file)
@@ -13,7 +13,7 @@
 #include "compress.c"
 #include "machine-persistent.h"
 #define ENVELOP_SIZE 104
-//#define PRINT_DEBUG 1 
+//#define VERIFY 1 
 CpvDeclare(PersistentSendsTable *, persistentSendsTableHead);
 CpvDeclare(PersistentSendsTable *, persistentSendsTableTail);
 CpvDeclare(int, persistentSendsTableCount);
@@ -27,8 +27,9 @@ typedef struct _PersistentRequestMsg {
   int requestorPE;
   int maxBytes;
   PersistentHandle sourceHandler;
-  int compressStart;
-  int compressSize;
+#if DELTA_COMPRESS
+  int   compressStart;
+#endif
 } PersistentRequestMsg;
 
 typedef struct _PersistentReqGrantedMsg {
@@ -170,8 +171,8 @@ PersistentHandle CmiCreateCompressPersistent(int destPE, int maxBytes, int compr
   msg->requestorPE = CmiMyPe();
 #if DELTA_COMPRESS
   slot->previousMsg  = NULL; 
-  slot->compressStart = msg->compressStart = compressStart;
-  slot->compressSize = msg->compressSize = compressSize;
+  slot->compressStart =  msg->compressStart = compressStart;
+  slot->compressSize = compressSize;
 #endif
   CmiSetHandler(msg, persistentRequestHandlerIdx);
   CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
@@ -179,7 +180,7 @@ PersistentHandle CmiCreateCompressPersistent(int destPE, int maxBytes, int compr
   return h;
 }
 
-PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
+PersistentHandle CmiCreatePersistent(int destPE, int maxBytes, int start)
 {
   PersistentHandle h;
   PersistentSendsTable *slot;
@@ -198,9 +199,12 @@ PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
   msg->requestorPE = CmiMyPe();
 
 #if DELTA_COMPRESS
-  slot->previousMsg  = NULL; 
-  slot->compressStart = msg->compressStart = ENVELOP_SIZE;
-  slot->compressSize = msg->compressSize = 0;
+  slot->previousMsg  = NULL;
+  if(start<=0)
+      slot->compressStart =  msg->compressStart = ENVELOP_SIZE;
+  else
+      slot->compressStart =  msg->compressStart = start;
+  slot->compressSize =  0;
 #endif
   CmiSetHandler(msg, persistentRequestHandlerIdx);
   CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
@@ -217,6 +221,9 @@ static void persistentNoDecompressHandler(void *msg)
     slot->addrIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
     // uncompress data from historyIndex data
     CldRestoreHandler(msg);
+#if COPY_HISTORY 
+    memcpy(slot->history, msg, size);
+#endif
     (((CmiMsgHeaderExt*)msg)->xhdl) =  (((CmiMsgHeaderExt*)msg)->xxhdl);
     CmiHandleMessage(msg);
 }
@@ -230,23 +237,23 @@ static void persistentDecompressHandler(void *msg)
     char    *cmsg = (char*)msg;
     int     size = ((CmiMsgHeaderExt*)msg)->size;
     int     compressSize = *(int*)(msg+slot->compressStart);
-
-    if(slot->compressSize == 0)
-    {
-        slot->compressSize = size-ENVELOP_SIZE;
-    }
-    char    *decompressData =(char*) malloc(slot->compressSize);
+    int     originalSize = *(int*)(msg+slot->compressStart+sizeof(int));
+    
+    char    real1 = cmsg[size - originalSize +sizeof(int)+compressSize];
+    char    real2 = cmsg[size - originalSize +sizeof(int)+compressSize+1];
+    char    *decompressData =(char*) malloc(originalSize);
     historyIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
     slot->addrIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
     // uncompress data from historyIndex data
+#if COPY_HISTORY
+    char *history = slot->history;
+#else
     char *history = (char*)(slot->destBuf[historyIndex].destAddress);
-
-    //STH WRONG here
-    CmiPrintf("[%d] begin uncompress message is decompressed [%d:%d:%d start:%d]\n ", CmiMyPe(), size, compressSize, slot->compressSize, slot->compressStart);
+#endif
+    //CmiPrintf("[%d] begin uncompress message is decompressed [%d:%d:%d start:%d]\n ", CmiMyPe(), size, compressSize, originalSize, slot->compressStart);
+    int left_size = size - slot->compressStart - originalSize;
     char *base_dst = cmsg+size-1;
-    char *base_src = cmsg+size-1-slot->compressSize+compressSize+sizeof(int);
-    int left_size = size - slot->compressStart - slot->compressSize;
-    CmiPrintf("[%d] left size=%d\n", CmiMyPe(), left_size);
+    char *base_src = cmsg+ size - originalSize +compressSize+sizeof(int) -1;
     for(i=0; i<left_size; i++)
     {
        *base_dst = *base_src;
@@ -254,25 +261,31 @@ static void persistentDecompressHandler(void *msg)
        base_src--;
     }
     
-    decompressChar(msg+slot->compressStart+sizeof(int), decompressData, slot->compressSize, compressSize, history+slot->compressStart);
-    memcpy(msg+slot->compressStart, decompressData, slot->compressSize);
+    decompressChar(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
+    memcpy(msg+slot->compressStart, decompressData, originalSize);
     free(decompressData);
     CldRestoreHandler(msg);
     (((CmiMsgHeaderExt*)msg)->xhdl) =  (((CmiMsgHeaderExt*)msg)->xxhdl);
-#if PRINT_DEBUG
-    static int step=0;
-    char fname[50];
-    sprintf(fname, "output.%d", step);
-    FILE *fp = fopen(fname, "w");
-    fprintf(fp, "%d\n", size-slot->compressStart);
-    for(i=slot->compressStart; i<size; i++)
-        fprintf(fp, "%c ", ((char*)msg)[i]);
-    fclose(fp);
-    step++;
-#endif
 
+#if VERIFY
+   
+    char checksum1 = cmsg[0];
+    for(i=1; i< slot->compressStart; i++)
+        checksum1 ^= cmsg[i];
+    if(memcmp(&checksum1, &real1, 1))
+        CmiPrintf("receiver chumsum wrong header \n");
+    char  checksum2 = cmsg[slot->compressStart];
+    for(i=slot->compressStart+1; i< size; i++)
+        checksum2 ^= cmsg[i];
+    if(memcmp(&checksum2, &real2, 1))
+        CmiPrintf("receiver chumsum wrong data \n");
+
+#endif
+#if COPY_HISTORY
+    memcpy(slot->history, msg, size);
+#endif
     CmiHandleMessage(msg);
-    CmiPrintf("[%d] done uncompress message is decompressed [%d:%d:%d start:%d]\n ", CmiMyPe(), size, compressSize, slot->compressSize, slot->compressStart);
+    //CmiPrintf("[%d] done uncompress message is decompressed [%d:%d:%d start:%d]\n ", CmiMyPe(), size, compressSize, originalSize, slot->compressStart);
 }
 
 #if 0
@@ -295,7 +308,7 @@ int CompressPersistentMsg(PersistentHandle h, int size, void **m)
     {
         if(slot->compressSize == 0)
         {
-            slot->compressSize = size-ENVELOP_SIZE;
+            slot->compressSize = size - slot->compressStart;
         }
         if(slot->compressSize>100)
         {
@@ -345,42 +358,59 @@ int CompressPersistentMsg(PersistentHandle h, int size, void *msg)
     void *dest=NULL;
     int compressSize=size;
     int i;
+    char *cmsg = (char*)msg;
 
-#if PRINT_DEBUG
-    static int step=0;
-    char fname[50];
-    sprintf(fname, "input.%d", step);
-    FILE *fp = fopen(fname, "w");
-    fprintf(fp, "%d\n", size-slot->compressStart);
-    for(i=slot->compressStart; i<size; i++)
-        fprintf(fp, "%c ", ((char*)msg)[i]);
-    fclose(fp);
-    printf("#####\n\n");
-    printf("%d\n", size-slot->compressStart);
-    for(i=slot->compressStart; i<size; i++)
-        printf("%c ", ((char*)msg)[i]);
-
-    printf("#####\n\n");
-    step++;
-#endif
-
+    char checksum1;
+    char checksum2;
+   
+    ((CmiMsgHeaderExt*)msg)-> persistRecvHandler = slot->destDataHandle;
+    ((CmiMsgHeaderExt*)msg)->size = size;
     
     if(history == NULL)
     {
         newSize = size;
         slot->previousMsg = msg;
+        slot->previousSize = size;
         CmiReference(msg);
         (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
         CldSwitchHandler(msg, persistentNoDecompressHandlerIdx);
-    }else
+    }else if(size != slot->previousSize)    //persistent msg size changes
     {
+        newSize = size;
+        CmiFree(slot->previousMsg);
+        slot->previousMsg = msg;
+        if(slot->compressSize == slot->previousSize - slot->compressStart)
+            slot->compressSize = size - slot->compressStart;
+        slot->previousSize = size;
+        CmiReference(msg);
+        (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
+        CldSwitchHandler(msg, persistentNoDecompressHandlerIdx);
+    }
+    else {
+        
         if(slot->compressSize == 0)
         {
-            slot->compressSize = size-ENVELOP_SIZE;
+            slot->compressSize = size-slot->compressStart;
         }
-
-        dest = malloc(size);
+#if VERIFY
+        void *history_save = CmiAlloc(size);
+        memcpy(history_save, history, size);
+        checksum1 = cmsg[0];
+        for(i=1; i< slot->compressStart; i++)
+            checksum1 ^= cmsg[i];
+        checksum2 = cmsg[slot->compressStart];
+        for(i=slot->compressStart+1; i< size; i++)
+            checksum2 ^= cmsg[i];
+#endif
+        dest = malloc(slot->compressSize);
         compressChar(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
+#if VERIFY
+        void *recover = malloc(slot->compressSize);
+        decompressChar(dest, recover, slot->compressSize, compressSize, history_save+slot->compressStart);
+        if(memcmp(msg+slot->compressStart, recover, slot->compressSize))
+            CmiPrintf("sth wrong with compression\n");
+#endif
         if(slot->compressSize - compressSize <= 100) //not compress
         {
             newSize = size;
@@ -393,17 +423,46 @@ int CompressPersistentMsg(PersistentHandle h, int size, void *msg)
         {
             memcpy(history+slot->compressStart, msg+slot->compressStart, slot->compressSize);
             *(int*)(msg+slot->compressStart) = compressSize;
-            memcpy(msg+slot->compressStart+sizeof(int), dest, compressSize);
-            memcpy(msg+slot->compressStart+compressSize+sizeof(int), msg+slot->compressStart+slot->compressSize, size-slot->compressStart-slot->compressSize);
-            newSize = size-slot->compressSize+compressSize+sizeof(int);
+            *(int*)(msg+slot->compressStart+sizeof(int)) = slot->compressSize;
+            memcpy(msg+slot->compressStart+2*sizeof(int), dest, compressSize);
+            int leftSize = size-slot->compressStart-slot->compressSize;
+            if(leftSize > 0)
+                memcpy(msg+slot->compressStart+compressSize+2*sizeof(int), msg+slot->compressStart+slot->compressSize, leftSize);
+            newSize = slot->compressStart + compressSize + 2*sizeof(int) +leftSize;
             (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
             CldSwitchHandler(msg, persistentDecompressHandlerIdx);
+#if VERIFY
+            memcpy(msg+newSize, &checksum1, 1); 
+            memcpy(msg+newSize+1, &checksum2, 1); 
+            char *orig = CmiAlloc(size);
+            memcpy(orig, msg, newSize);
+             
+            char    *decompressData =(char*) malloc(slot->compressSize);
+            int left_size = size - slot->compressStart - slot->compressSize;
+            char *base_dst = orig+size-1;
+            char *base_src = orig + size - slot->compressSize +compressSize+2*sizeof(int) -1;
+            for(i=0; i<left_size; i++)
+            {
+                *base_dst = *base_src;
+                base_dst--;
+                base_src--;
+            }
+    
+            decompressChar(orig+slot->compressStart+2*sizeof(int), decompressData, slot->compressSize, compressSize, history_save+slot->compressStart);
+            memcpy(orig+slot->compressStart, decompressData, slot->compressSize);
+            free(decompressData);
+            CldRestoreHandler(orig);
+            (((CmiMsgHeaderExt*)orig)->xhdl) =  (((CmiMsgHeaderExt*)orig)->xxhdl);
+            if(memcmp(orig, history, slot->compressStart))
+                CmiPrintf("sth wrong header all \n");
+            if(memcmp(orig+slot->compressStart, history+slot->compressStart, slot->compressSize))
+                CmiPrintf("sth wrong data \n");
+            newSize += 2;
+#endif
             //CmiPrintf("\n[%d ] finish compressing \n", CmiMyPe() );
         }
         free(dest);
     }
-    ((CmiMsgHeaderExt*)msg)-> persistRecvHandler = slot->destDataHandle;
-    ((CmiMsgHeaderExt*)msg)->size = size;
  
     return newSize;
 }
@@ -418,7 +477,7 @@ PersistentHandle CmiCreateNodePersistent(int destNode, int maxBytes)
     /* randomly pick one rank on the destination node is fine for setup.
        actual message will be handled by comm thread anyway */
   int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
-  return CmiCreatePersistent(pe, maxBytes);
+  return CmiCreatePersistent(pe, maxBytes, 0);
 }
 
 static void persistentRequestHandler(void *env)
@@ -437,7 +496,9 @@ static void persistentRequestHandler(void *env)
 
 #if DELTA_COMPRESS
   slot->compressStart = msg->compressStart;
-  slot->compressSize = msg->compressSize;
+#if COPY_HISTORY
+  slot->history = malloc(msg->maxBytes);
+#endif
 #endif
   setupRecvSlot(slot, msg->maxBytes);
 
index ce384ee84bb786b42b177b0b781a0a9e6a9dc38c..78852fa777c57231d1d0656cdc8788a65c48f4f9 100644 (file)
@@ -57,7 +57,7 @@ typedef struct {
 void CmiPersistentInit();
 
 PersistentHandle CmiCreateCompressPersistent(int destPE, int maxBytes, int start, int size);
-PersistentHandle CmiCreatePersistent(int destPE, int maxBytes);
+PersistentHandle CmiCreatePersistent(int destPE, int maxBytes, int start);
 PersistentHandle CmiCreateNodePersistent(int destNode, int maxBytes);
 PersistentReq CmiCreateReceiverPersistent(int maxBytes);
 PersistentHandle CmiRegisterReceivePersistent(PersistentReq req);