add persistent message compression feature for Gemini
authorYanhua Sun <yanhuas@jyc1.(none)>
Wed, 28 Nov 2012 04:48:48 +0000 (22:48 -0600)
committerYanhua Sun <yanhuas@jyc1.(none)>
Wed, 28 Nov 2012 04:48:48 +0000 (22:48 -0600)
src/arch/gemini_gni/conv-common.h
src/arch/gemini_gni/machine-persistent.c
src/arch/gemini_gni/machine-persistent.h
src/arch/util/compress.C [new file with mode: 0644]
src/arch/util/compress.h [new file with mode: 0644]
src/arch/util/persist-comm.c
src/scripts/Makefile

index 88fd993b2aed69b328679666b3f05518305d2793..38d270c62aad3e27bc7649bec9f0bc27236f2f72 100644 (file)
@@ -7,12 +7,22 @@
 
 #define CMI_MPI_TRACE_USEREVENTS                           0
 
+//#define  DELTA_COMPRESS                                     1
+
 #define CMK_HANDLE_SIGUSR                                  0
 
+#if DELTA_COMPRESS
 #if CMK_ERROR_CHECKING
-#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt2 seq; unsigned char cksum, magic; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root; 
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt2 seq; unsigned char cksum, magic; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root; CmiUInt4 original_size; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; 
 #else
-#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root; 
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root; CmiUInt4 original_size; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; 
+#endif
+#else 
+#if CMK_ERROR_CHECKING
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt2 seq; unsigned char cksum, magic; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root;  
+#else
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root;  
+#endif
 #endif
 
 #define CMK_MSG_HEADER_BASIC  CMK_MSG_HEADER_EXT
index dec23977a48a07d9c0562f3fbc5625e8b2308026..61ccb523500dc004e032bef925bba4d3ddd109f3 100644 (file)
@@ -22,7 +22,7 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
     gni_post_descriptor_t *pd;
     gni_return_t status;
     RDMA_REQUEST        *rdma_request_msg;
-    
+    int         destIndex; 
     PersistentSendsTable *slot = (PersistentSendsTable *)h;
     if (h==NULL) {
         printf("[%d] LrtsSendPersistentMsg: handle from node %d to node %d is NULL. \n", CmiMyPe(), myrank, destNode);
@@ -34,10 +34,16 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
         CmiAbort("Abort: Invalid size\n");
     }
 
-    if (slot->destBuf[0].destAddress) {
-        // CmiPrintf("[%d] LrtsSendPersistentMsg h=%p hdl=%d destNode=%d destAddress=%p size=%d\n", CmiMyPe(), h, CmiGetHandler(m), destNode, slot->destBuf[0].destAddress, size);
+    destIndex = slot->addrIndex;
+    if (slot->destBuf[destIndex].destAddress) {
+         //CmiPrintf("[%d===%d] LrtsSendPersistentMsg h=%p hdl=%d destNode=%d destAddress=%p size=%d\n", CmiMyPe(), destNode, h, CmiGetHandler(m), destNode, slot->destBuf[0].destAddress, size);
 
         // uGNI part
+    
+        slot->addrIndex = (destIndex+1)%PERSIST_BUFFERS_NUM;
+#if  DELTA_COMPRESS 
+        size = CompressPersistentMsg(h, size, m);
+#endif
         MallocPostDesc(pd);
         if(size <= LRTS_GNI_RDMA_THRESHOLD) {
             pd->type            = GNI_POST_FMA_PUT;
@@ -51,8 +57,8 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
         pd->length          = ALIGN64(size);
         pd->local_addr      = (uint64_t) m;
        
-        pd->remote_addr     = (uint64_t)slot->destBuf[0].destAddress;
-        pd->remote_mem_hndl = slot->destBuf[0].mem_hndl;
+        pd->remote_addr     = (uint64_t)slot->destBuf[destIndex].destAddress;
+        pd->remote_mem_hndl = slot->destBuf[destIndex].mem_hndl;
 #if MULTI_THREAD_SEND
         pd->src_cq_hndl     = rdma_tx_cqh;
 #else
@@ -67,6 +73,7 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
 #endif
         SetMemHndlZero(pd->local_mem_hndl);
 
+        //CmiPrintf("[%d] sending   %p  with handler=%p\n", CmiMyPe(), m, ((CmiMsgHeaderExt*)m)-> persistRecvHandler);
         //TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);
          /* always buffer */
 #if CMK_SMP || 1
@@ -292,8 +299,10 @@ void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
     slot->destBuf[i].destAddress = buf;
       /* note: assume first integer in elan converse header is the msg size */
     slot->destBuf[i].destSizeAddress = (unsigned int*)buf;
+    memset(buf, 0, maxBytes+sizeof(int)*2);
   }
   slot->sizeMax = maxBytes;
+  slot->addrIndex = 0;
 #if REMOTE_EVENT
 #if !MULTI_THREAD_SEND
   CmiLock(persistPool.lock);    /* lock in function */
index 31a9bfa47f9637ab7e84a1e705531bc9acc4b953..14653cb5742ac64d1c91afcd852779116084b662 100644 (file)
@@ -9,10 +9,14 @@
 /*@{*/
 
 #include "gni_pub.h"
-
 #define PERSIST_MIN_SIZE                SMSG_MAX_MSG
 
+// one is for receive one is to store the previous msg
+#if DELTA_COMPRESS
+#define PERSIST_BUFFERS_NUM             2
+#else
 #define PERSIST_BUFFERS_NUM             1
+#endif
 
 #define PERSIST_SEQ                     0xFFFFFFF
 
@@ -27,22 +31,30 @@ typedef struct  _PersistentBuf {
 typedef struct _PersistentSendsTable {
   int destPE;
   int sizeMax;
-  PersistentHandle   destHandle;  
+  PersistentHandle   destHandle; 
   PersistentBuf     destBuf[PERSIST_BUFFERS_NUM];
   void *messageBuf;
   int messageSize;
   struct _PersistentSendsTable *prev, *next;
+#if DELTA_COMPRESS
+  PersistentHandle destDataHandle;
+  void  *previousMsg;
+  int   compressStart;
+  int   compressSize;
+#endif
+  int addrIndex;
 } PersistentSendsTable;
 
 typedef struct _PersistentReceivesTable {
-#if 0
-  void *messagePtr[PERSIST_BUFFERS_NUM];      /* preallocated message buffer of size "sizeMax" */
-  unsigned int *recvSizePtr[PERSIST_BUFFERS_NUM];   /* pointer to the size */
-#endif
   PersistentBuf     destBuf[PERSIST_BUFFERS_NUM];
   int sizeMax;
   size_t               index;
   struct _PersistentReceivesTable *prev, *next;
+  int           addrIndex;
+#if DELTA_COMPRESS
+  int   compressStart;
+  int   compressSize;
+#endif
 } PersistentReceivesTable;
 
 CpvExtern(PersistentReceivesTable *, persistentReceivesTableHead);
diff --git a/src/arch/util/compress.C b/src/arch/util/compress.C
new file mode 100644 (file)
index 0000000..f5eeb2c
--- /dev/null
@@ -0,0 +1,257 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  Compress.C
+ *
+ *    Description: Floating point compression/Decompression algorithm 
+ *
+ *        Version:  1.0
+ *        Created:  09/02/2012 02:53:08 PM
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  YOUR NAME (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <math.h>
+#include "compress.h"
+#include "charm++.h"
+
+#define     SETBIT(dest, i)  (dest[i>>3]) |= (1 << i%8 )
+#define     TESTBIT(dest, i) ((dest[i>>3]) >>  (i%8)) & 1 
+//#define     COMPRESS 1 
+#define     DEBUG  1
+#define     CHAR_BIT 8
+#define     FLOAT_BIT CHAR_BIT*sizeof(float)
+#define     FLOAT_BYTE sizeof(float)
+
+//#define  COMPRESS_EXP 1
+
+
+#if  COMPRESS_EXP
+
+void compressFloatingPoint(void *src, void *dst, int s, int *compressSize, void *bData) 
+{
+    int size = s/FLOAT_BYTE;
+    float *source = (float*)src;
+    float *dest = (float*)dst;
+    float *baseData = (float*)bData;
+    unsigned int *bptr = (unsigned int*) baseData;
+    unsigned int  *uptr = (unsigned int *) source;
+    char *uchar;
+    int i;
+#if DEBUG
+    double startTimer = CmiWallTimer();
+    printf("[%d]starting compressing.....  orig size:%d ", CmiMyPe(), size);
+#endif
+
+#if !COMPRESS 
+    memcpy(dest, source, size*sizeof(float)); 
+    *compressSize = s;
+#else
+    // Is this the first time we're sending stuff to this node?
+    if (baseData == NULL) {
+        baseData = (float*)malloc(size*sizeof(float));
+        memcpy(baseData, source, size*sizeof(float));
+        memcpy(dest, source, size*sizeof(float)); 
+        *compressSize = s;
+    } else {
+        // Create message to receive the compressed buffer.
+        unsigned char *cdst = (unsigned char*)dest; 
+        int _dataIndex = (size+7)/8;
+        memset(cdst, 0, (size+7)/8 );
+        for (i = 0; i < size; ++i) {
+            // Bitmask everything but the exponents, then check if they match.
+            unsigned int prevExp = bptr[i] & 0x7f800000;
+            unsigned int currExp = uptr[i] & 0x7f800000;
+            if (currExp != prevExp) {
+                // If not, mark this exponent as "different" and store it to send with the message.
+                SETBIT(cdst, i);
+                memcpy(cdst+_dataIndex, &(uptr[i]), 4);
+                _dataIndex += 4;
+            }else
+            {
+                unsigned int ui = uptr[i];
+                ui = (ui << 1) | (ui >> 31);
+                memcpy(cdst+_dataIndex, &ui, 3);
+                _dataIndex += 3;
+            }
+
+        }
+        *compressSize = _dataIndex;
+    }
+#endif
+#if DEBUG
+    printf("[%d] ===>done compressingcompressed size:(%d===>%d) (reduction:%d) ration=%f Timer:%f ms\n\n", CmiMyPe(), size*sizeof(float), *compressSize, (size*sizeof(float)-*compressSize), (1-(float)*compressSize/(size*sizeof(float)))*100, (CmiWallTimer()-startTimer)*1000);
+    //printf(" ===>done compressingcompressed size:(%d===>%d) (reduction:%d) ration=%f \n", size*sizeof(float), *compressSize, (size*sizeof(float)-*compressSize), (1-(float)*compressSize/(size*sizeof(float)))*100);
+#endif
+}
+
+void decompressFloatingPoint(void *cData, void *dData, int s, int compressSize, void *bData) {
+    int size = s/FLOAT_BYTE;
+#if DEBUG
+    double startTimer  = CmiWallTimer();
+    printf("starting decompressing..... ");
+#endif
+#if !COMPRESS
+    memcpy(dData, cData, size*sizeof(float));
+#else
+    float *compressData = (float*)cData;
+    float *decompressData =(float*)dData;
+    float *baseData = (float*)bData;
+    int _sdataIndex = (size+7)/8;
+    char *src = (char*)compressData;
+    int exponent;
+    unsigned int mantissa;
+    unsigned int *bptr = (unsigned int*)baseData;
+    int i;
+    for(i=0; i<size; ++i)
+    {
+       if(TESTBIT(src, i)) // different
+       {
+
+           decompressData[i] = *((float*)(src+_sdataIndex));
+           _sdataIndex += 4;
+       }else        //same exponet
+       {
+           exponent = bptr[i]  & 0x7f800000;
+           mantissa = *((unsigned int*)(src+_sdataIndex)) & 0x00FFFFFF;
+           mantissa = (mantissa >> 1) | (mantissa << 31) ;
+           mantissa |= exponent;
+           decompressData[i] = *((float*)&mantissa);   
+           _sdataIndex += 3;
+       }
+    }
+#endif
+#if DEBUG
+    printf("done decompressing.....  orig size:%d\n time:%f ms", size, (CmiWallTimer()-startTimer)*1000);
+#endif
+
+}
+
+
+#else
+
+#include <bitset>
+#include <vector>
+using namespace std;
+
+void compressFloatingPoint(void *src, void *dst, int s, int *compressSize, void *bData)
+{
+    float *source = (float*)src;
+    float *dest = (float*)dst;
+    float *baseData = (float*)bData;
+    int size = s/sizeof(float);
+#if DEBUG
+    double startTimer = CmiWallTimer();
+    printf("[%d]starting compressing.....  orig size:%d ", CmiMyPe(), size);
+#endif
+    
+#if !COMPRESS 
+    memcpy(dest, source, size*sizeof(float)); 
+    *compressSize = s;
+#else
+    std::bitset<FLOAT_BIT> comp_data;    
+    int index = 0;
+    int f_index = 0;
+    unsigned long tmp;
+    for (int i = 0; i < size; ++i) {
+        std::bitset<FLOAT_BIT> data(*(unsigned long*)(&(source[i])));
+        std::bitset<FLOAT_BIT> prev_data(*(unsigned long*)(&(baseData[i])));
+        std::bitset<FLOAT_BIT> xor_data = data^prev_data;
+        int zers = 0;
+        bool flag = false;
+        for(int b=xor_data.size()-1; b>=0; b--){
+            if(!flag){
+                if(!xor_data[b] && zers<15){
+                    zers++;
+                }
+                else{
+                    flag = true;
+                    std:bitset<4> bs(zers);
+                    for(int j=3;j>=0; j--)
+                    {
+                        comp_data[index++]=bs[j];
+                        if(index == FLOAT_BIT){ tmp = comp_data.to_ulong(); index = 0;  dest[f_index++] = *(float*)(&tmp);}
+                     }
+                        comp_data[index++]=xor_data[b];
+                        if(index == FLOAT_BIT){ tmp = comp_data.to_ulong(); index = 0;  dest[f_index++] = *(float*)(&tmp);}
+                }
+            }
+            else{
+                comp_data[index++]=xor_data[b];
+                if(index == FLOAT_BIT){ tmp = comp_data.to_ulong(); index = 0;  dest[f_index++] = *(float*)(&tmp);}
+            }
+        }
+    }
+    if(index > 0)
+    { tmp = comp_data.to_ulong(); dest[f_index++] = *(float*)(&tmp);}
+
+    *compressSize = f_index*sizeof(float);
+    float compressRatio = (1-(float)(*compressSize)/s)*100;
+    
+#if DEBUG
+    printf("[%d] ===>done compressingcompressed size:(%d===>%d) (reduction:%d) ration=%f Timer:%f ms\n\n", CmiMyPe(), size*sizeof(float), *compressSize, (size*sizeof(float)-*compressSize), (1-(float)*compressSize/(size*sizeof(float)))*100, (CmiWallTimer()-startTimer)*1000);
+#endif
+
+#endif
+}
+
+void decompressFloatingPoint(void *cData, void *dData, int s, int compressSize, void *bData) {
+    int size = s/sizeof(float);
+#if DEBUG
+    double startTimer  = CmiWallTimer();
+    printf("starting decompressing..... ");
+#endif
+#if !COMPRESS
+    memcpy(dData, cData, size*sizeof(float));
+#else
+    float*compData = (float*)cData;
+    float *decompressData = (float*)dData;
+    float *baseData = (float*)bData;
+    int compp = 0;
+    vector<bool>  compressData;
+    std::bitset<FLOAT_BIT> xor_data;
+    std::bitset<FLOAT_BIT> data(0ul);
+    int index = 0;
+    for(int i=0; i<compressSize/sizeof(float); i++)
+    {
+        std::bitset<FLOAT_BIT> cbits(*(unsigned long*) (&(compData[i])));
+        for(int j=0; j<FLOAT_BIT; j++)
+            compressData.push_back(cbits[j]);
+    }
+
+    for (int i=0; i<size; i++) {
+        int index = FLOAT_BIT-1;
+        std::bitset<FLOAT_BIT> prev_data(*(unsigned long*)(&(baseData[i])));
+        //read 4 bits and puts index acccordingly
+        for (int f=3; f>=0; f--,compp++) {
+            if(compressData[compp] == 1){
+                for (int ff=0; ff<pow(2,f); ff++) {
+                    data[index] = 0; index--;
+                }
+            }
+        }
+        while(index>=0){
+            data[index] = compressData[compp];
+            index--; compp++;
+        }
+     
+        xor_data = data^prev_data;
+        unsigned long tmp = xor_data.to_ulong();
+        decompressData[i] = *(reinterpret_cast<float*>(&tmp));
+    }
+
+#if DEBUG
+    printf("done decompressing.....  orig size:%d\n time:%f ms", size, (CmiWallTimer()-startTimer)*1000);
+#endif
+
+#endif
+}
+
+#endif
diff --git a/src/arch/util/compress.h b/src/arch/util/compress.h
new file mode 100644 (file)
index 0000000..d58a233
--- /dev/null
@@ -0,0 +1,12 @@
+#ifndef  __COMPRESS_H_
+#define __COMPRESS_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif 
+extern void compressFloatingPoint(void *src, void *dst, int s, int *compressSize, void *bData);
+extern void decompressFloatingPoint(void *cData, void *dData, int s, int compressSize, void *bData);
+#ifdef __cplusplus
+}
+#endif
+#endif
index 42c7a743fb9d884ddfb36a417c0dc05a278a0959..aaa95775f1cb5e570cef3886914d49d4e2c1bd3e 100644 (file)
@@ -9,8 +9,7 @@
 /*@{*/
 
 #include "converse.h"
-
-#if CMK_PERSISTENT_COMM
+#include "compress.h"
 
 #include "machine-persistent.h"
 
@@ -27,6 +26,8 @@ typedef struct _PersistentRequestMsg {
   int requestorPE;
   int maxBytes;
   PersistentHandle sourceHandler;
+  int compressStart;
+  int compressSize;
 } PersistentRequestMsg;
 
 typedef struct _PersistentReqGrantedMsg {
@@ -38,6 +39,7 @@ typedef struct _PersistentReqGrantedMsg {
   PersistentBuf    buf[PERSIST_BUFFERS_NUM];
   PersistentHandle sourceHandler;
   PersistentHandle destHandler;
+  PersistentHandle destDataHandler;
 } PersistentReqGrantedMsg;
 
 typedef struct _PersistentDestoryMsg {
@@ -49,6 +51,8 @@ typedef struct _PersistentDestoryMsg {
 int persistentRequestHandlerIdx;
 int persistentReqGrantedHandlerIdx;
 int persistentDestoryHandlerIdx;
+int persistentDecompressHandlerIdx;
+int persistentNoDecompressHandlerIdx;
 
 CpvDeclare(PersistentHandle *, phs);
 CpvDeclare(int, phsSize);
@@ -146,26 +150,47 @@ PersistentHandle getFreeRecvSlot()
         message.
 ******************************************************************************/
 
-PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
+PersistentHandle CmiCreateCompressPersistent(int destPE, int maxBytes, int compressStart, int compressSize)
 {
   PersistentHandle h;
   PersistentSendsTable *slot;
 
   if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
 
-/*
-  if (CmiMyPe() == destPE) {
-    CmiPrintf("[%d] CmiCreatePersistent Error>  setting up persistent communication to the same processor is not allowed.\n", CmiMyPe());
-    CmiAbort("CmiCreatePersistent");
-  }
-*/
-
   h = getFreeSendSlot();
   slot = (PersistentSendsTable *)h;
 
   slot->destPE = destPE;
   slot->sizeMax = maxBytes;
+  slot->addrIndex = 0;
+  PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
+  msg->maxBytes = maxBytes;
+  msg->sourceHandler = h;
+  msg->requestorPE = CmiMyPe();
+#if DELTA_COMPRESS
+  slot->previousMsg  = NULL; 
+  slot->compressStart = msg->compressStart = compressStart;
+  slot->compressSize = msg->compressSize = compressSize;
+#endif
+  CmiSetHandler(msg, persistentRequestHandlerIdx);
+  CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
 
+  return h;
+}
+
+PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
+{
+  PersistentHandle h;
+  PersistentSendsTable *slot;
+
+  if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
+
+  h = getFreeSendSlot();
+  slot = (PersistentSendsTable *)h;
+
+  slot->destPE = destPE;
+  slot->sizeMax = maxBytes;
+  slot->addrIndex = 0;
   PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
   msg->maxBytes = maxBytes;
   msg->sourceHandler = h;
@@ -177,6 +202,161 @@ PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
   return h;
 }
 
+#if DELTA_COMPRESS
+static void persistentNoDecompressHandler(void *msg)
+{
+    //no msg is compressed, just update history
+    PersistentReceivesTable *slot = (PersistentReceivesTable *) (((CmiMsgHeaderExt*)msg)-> persistRecvHandler);
+    int size = ((CmiMsgHeaderExt*)msg)->size;
+    slot->addrIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
+    // uncompress data from historyIndex data
+    //int historyIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
+    //char *history = (char*)(slot->destBuf[historyIndex].destAddress);
+    //CmiPrintf("[%d] uncompress[NONO]  history = %p h=%p index=%d\n", CmiMyPe(), history, slot, historyIndex);
+    CldRestoreHandler(msg);
+    (((CmiMsgHeaderExt*)msg)->xhdl) =  (((CmiMsgHeaderExt*)msg)->xxhdl);
+    CmiHandleMessage(msg);
+}
+
+static void persistentDecompressHandler(void *msg)
+{
+#if 1
+    //  decompress  delta
+    //  recovery message based on previousRecvMsg
+    PersistentReceivesTable *slot = (PersistentReceivesTable *) (((CmiMsgHeaderExt*)msg)-> persistRecvHandler);
+    int     historyIndex;
+    int     i;
+    char    *msgdata = (char*)msg;
+    int     size = ((CmiMsgHeaderExt*)msg)->size;
+    int     compressSize = *(int*)(msg+slot->compressStart);
+    char    *decompressData =(char*) malloc(slot->compressSize);
+    historyIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
+    slot->addrIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
+    // uncompress data from historyIndex data
+    char *history = (char*)(slot->destBuf[historyIndex].destAddress);
+
+    //CmiPrintf("[%d] begin uncompress message is decompressed [%d:%d]history = %p h=%p index=%d", CmiMyPe(), size, compressSize, history, slot, historyIndex);
+    char *base_dst = msg+size-1;
+    char *base_src = msg+size-slot->compressSize+compressSize+sizeof(int)-1;
+    for(i=0; i<size - slot->compressStart - slot->compressSize-sizeof(int); i++)
+    {
+       *base_dst = *base_src;
+       base_dst--;
+       base_src--;
+    }
+
+    decompressFloatingPoint(msg+slot->compressStart+sizeof(int), decompressData, slot->compressSize, compressSize, history+slot->compressStart);
+    memcpy(msg+slot->compressStart, decompressData, slot->compressSize);
+    free(decompressData);
+    CldRestoreHandler(msg);
+    (((CmiMsgHeaderExt*)msg)->xhdl) =  (((CmiMsgHeaderExt*)msg)->xxhdl);
+    //CmiPrintf("[%d] end uncompress message is decompressed history = %p h=%p index=%d", CmiMyPe(), history, slot, historyIndex);
+    CmiHandleMessage(msg);
+#else
+    CmiPrintf("[%d] msg is decompressed\n", CmiMyPe());
+    CmiPrintf("\n[%d ] decompress switching   %d:%d\n", CmiMyPe(), CmiGetXHandler(msg), CmiGetHandler(msg));
+    CldRestoreHandler(msg);
+    (((CmiMsgHeaderExt*)msg)->xhdl) =  (((CmiMsgHeaderExt*)msg)->xxhdl);
+    CmiPrintf("\n[%d ] decompress after switching   %d:%d\n", CmiMyPe(), CmiGetXHandler(msg), CmiGetHandler(msg));
+    CmiHandleMessage(msg);
+#endif
+}
+
+int CompressPersistentMsg(PersistentHandle h, int size, void *msg)
+{
+    PersistentSendsTable *slot = (PersistentSendsTable *)h;
+#if 0  
+    char *user_buffer = (char*) msg + sizeof(CmiMsgHeaderExt);
+    char *old_buffer = (char*)( h->sentPreviousMsg);
+    
+    char *delta_msg = (char*) malloc(size-sizeof(CmiMsgHeaderExt));
+    for(int i=0; i<size-sizeof(CmiMsgHeaderExt); i++)
+    {
+        delta_msg[i] = user_buffer - old_buffer;
+    }
+
+    memcpy(h->sentPreviousMsg, user_buffer, size-sizeof(CmiMsgHeaderExt));
+    void  (*compressFn) (void*, void*, int, int*); 
+    void *compress_msg = msg;
+    switch(compress_mode)
+    {
+    case CMODE_ZLIB:  compressFn = zlib_compress; break;
+    case CMODE_QUICKLZ: compressFn = quicklz_compress; break;
+    case CMODE_LZ4:  compressFn = lz4_wrapper_compress; break;
+    }
+    
+    ((CmiMsgHeaderExt*)compress_msg)->compress_flag = 1;
+    ((CmiMsgHeaderExt*)compress_msg)->original_size  = size;
+    compressFn(delta_msg, compress_msg+sizeof(CmiMsgHeaderExt), size-sizeof(CmiMsgHeaderExt), &compress_size);
+
+    CldSwitchHandler(compress_msg, persistentDecompressHandlerIdx);
+    /* ** change handler  */
+    return compress_size+ sizeof(CmiMsgHeaderExt);
+#else
+#if 0
+    ((CmiMsgHeaderExt*)msg)->size = size;
+    char *history = slot->previousMsg;
+    char tmp;
+    int i;
+    char *msgdata = (char*)msg;
+    for(i=sizeof(CmiMsgHeaderExt); i < size; i++)
+    {
+        //tmp = msgdata[i]; 
+        //msgdata[i] = msgdata[i]-history[i]; 
+        msgdata[i] = msgdata[i] & 255; 
+        //history[i] = tmp;
+    }
+    ((CmiMsgHeaderExt*)msg)-> persistRecvHandler = slot->destDataHandle;
+    CmiPrintf("\n[%d ] compressing... before   old:new %d ===>%d \n", CmiMyPe(), CmiGetXHandler(msg), CmiGetHandler(msg));
+    (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
+    CldSwitchHandler(msg, persistentDecompressHandlerIdx);
+    CmiPrintf("\n[%d ] after switching   %d:%d\n", CmiMyPe(), CmiGetXHandler(msg), CmiGetHandler(msg));
+    return size;
+#else
+    int  newSize;
+    void *history = slot->previousMsg;
+    void *dest;
+    int compressSize;
+    if(history == NULL)
+    {
+        newSize = size;
+        slot->previousMsg = msg;
+        CmiReference(msg);
+        (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
+        CldSwitchHandler(msg, persistentNoDecompressHandlerIdx);
+    }else
+    {
+        dest = malloc(size+(size+7)/8);
+        compressFloatingPoint(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
+        if(compressSize>= slot->compressSize-10*sizeof(int)) //not compress
+        {
+            newSize = size;
+            (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
+            CldSwitchHandler(msg, persistentNoDecompressHandlerIdx);
+            CmiFree(slot->previousMsg);
+            slot->previousMsg = msg;
+            CmiReference(msg);
+        }else
+        {
+            *(int*)(msg+slot->compressStart) = compressSize;
+            memcpy(history+slot->compressStart, msg+slot->compressStart, slot->compressSize); 
+            memcpy(msg+slot->compressStart+sizeof(int), dest, compressSize);
+            memcpy(msg+slot->compressStart+compressSize+sizeof(int), msg+slot->compressStart+slot->compressSize, size-slot->compressStart-slot->compressSize);
+            newSize = size-slot->compressSize+compressSize;
+            (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
+            CldSwitchHandler(msg, persistentDecompressHandlerIdx);
+        }
+        free(dest);
+    }
+    ((CmiMsgHeaderExt*)msg)-> persistRecvHandler = slot->destDataHandle;
+    ((CmiMsgHeaderExt*)msg)->size = size;
+    return newSize;
+#endif
+#endif
+}
+#else
+#endif
+
 /* for SMP */
 PersistentHandle CmiCreateNodePersistent(int destNode, int maxBytes)
 {
@@ -194,11 +374,16 @@ static void persistentRequestHandler(void *env)
 
   PersistentHandle h = getFreeRecvSlot();
   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
+
   /*slot->messagePtr = elan_CmiStaticAlloc(msg->maxBytes);*/
 
   /* build reply message */
   PersistentReqGrantedMsg *gmsg = CmiAlloc(sizeof(PersistentReqGrantedMsg));
 
+#if DELTA_COMPRESS
+  slot->compressStart = msg->compressStart;
+  slot->compressSize = msg->compressSize;
+#endif
   setupRecvSlot(slot, msg->maxBytes);
 
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
@@ -212,7 +397,10 @@ static void persistentRequestHandler(void *env)
 
   gmsg->sourceHandler = msg->sourceHandler;
   gmsg->destHandler = getPersistentHandle(h, 1);
-
+#if  DELTA_COMPRESS
+  gmsg->destDataHandler = h;
+  //CmiPrintf("[%d] receiver slot=%p, current=%d, h=%p  =%p \n", CmiMyPe(), slot, slot->addrIndex, h, gmsg->destDataHandler);
+#endif
   CmiSetHandler(gmsg, persistentReqGrantedHandlerIdx);
   CmiSyncSendAndFree(msg->requestorPE,sizeof(PersistentReqGrantedMsg),gmsg);
 
@@ -238,7 +426,10 @@ static void persistentReqGrantedHandler(void *env)
 #endif
   }
   slot->destHandle = msg->destHandler;
-
+#if DELTA_COMPRESS
+  slot->destDataHandle = msg->destDataHandler;
+  //CmiPrintf("+++[%d] req grant %p\n", CmiMyPe(), slot->destDataHandle);
+#endif
   if (slot->messageBuf) {
     LrtsSendPersistentMsg(h, CmiNodeOf(slot->destPE), slot->messageSize, slot->messageBuf);
     slot->messageBuf = NULL;
@@ -392,6 +583,7 @@ void CmiDestoryAllPersistent()
   CpvAccess(persistentReceivesTableCount) = 0;
 }
 
+
 void CmiPersistentInit()
 {
   int i;
@@ -403,7 +595,13 @@ void CmiPersistentInit()
   persistentDestoryHandlerIdx = 
        CmiRegisterHandler((CmiHandler)persistentDestoryHandler);
 
+#if DELTA_COMPRESS
+  persistentDecompressHandlerIdx = 
+      CmiRegisterHandler((CmiHandler)persistentDecompressHandler);
+  persistentNoDecompressHandlerIdx = 
+      CmiRegisterHandler((CmiHandler)persistentNoDecompressHandler);
+#endif
+
   CpvInitialize(PersistentHandle*, phs);
   CpvAccess(phs) = NULL;
   CpvInitialize(int, phsSize);
@@ -425,7 +623,6 @@ void CmiPersistentInit()
   CpvAccess(persistentReceivesTableCount) = 0;
 }
 
-
 void CmiUsePersistentHandle(PersistentHandle *p, int n)
 {
   if (n==1 && *p == NULL) { p = NULL; n = 0; }
@@ -446,6 +643,5 @@ void CmiPersistentOneSend()
   if (CpvAccess(phs)) CpvAccess(curphs)++;
 }
 
-#endif
 
 /*@}*/
index 8fcfc478e1ab899af364c064273af6a21e9850f9..1b8639ed0d9676d27f55893c331af143436f4a3c 100644 (file)
@@ -422,7 +422,7 @@ LIBCONV_UTIL=pup_util.o pup_toNetwork.o pup_toNetwork4.o            \
        pup_xlater.o pup_c.o pup_paged.o pup_cmialloc.o                 \
        ckimage.o ckdll.o ckhashtable.o sockRoutines.o                  \
        conv-lists.o RTH.o persist-comm.o mempool.o graph.o \
-       TopoManager.o CrayNid.o crc32.o
+       TopoManager.o CrayNid.o crc32.o compress.o
 
 LIBCONV_UTILF=pup_f.o
 
@@ -464,6 +464,7 @@ include Makefile.machine
 $(L)/libconv-cplus-n.a: machine.c $(CVHEADERS)
        @-test -f $(INC)/mpi.h && mv -f $(INC)/mpi.h $(INC)/mpi.BAK || true
        $(CHARMC) -o machine.o  -DFOR_CPLUS=1 machine.c
+       $(CHARMC) -o compress.o  -DFOR_CPLUS=1 compress.C
        @-test -f ./bglmachine.C && $(CHARMC) -o machine.o  -DFOR_CPLUS=1 bglmachine.C || true
        $(CHARMC) -o $(L)/libconv-cplus-y.a machine.o
        $(CHARMC) -o $(L)/libconv-cplus-n.a -DFOR_CPLUS=0 machine.c