add persistent message files and changes for PAMI BGQ
authorYanhua Sun <sun51@illinois.edu>
Fri, 25 Jan 2013 16:45:10 +0000 (16:45 +0000)
committerNikhil Jain <nikhil@illinois.edu>
Fri, 1 Feb 2013 01:51:52 +0000 (19:51 -0600)
src/arch/pamilrts/conv-common.h
src/arch/pamilrts/machine-persistent.c [new file with mode: 0644]
src/arch/pamilrts/machine-persistent.h [new file with mode: 0644]
src/arch/pamilrts/machine.c
src/arch/util/persist-comm.c

index 454d278d70c6efb10a85e21513b3b3b3840947e3..b0ae16cae44ff4b745d024457b611deeefc2087e 100644 (file)
@@ -6,12 +6,22 @@
 #define CMK_HANDLE_SIGUSR                                  1
 
 //#define CMK_ENABLE_ASYNC_PROGRESS                          1
+//#define  DELTA_COMPRESS                                     1
 
+#if DELTA_COMPRESS
+#if CMK_ENABLE_ASYNC_PROGRESS
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, stratid; unsigned char cksum, magic; int root, size, dstnode; CmiUInt2 redID, padding; char work[6*sizeof(uintptr_t)]; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler;
+#else
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, stratid; unsigned char cksum, magic; int root, size; CmiUInt2 redID, padding; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler;
+#endif
+#else
 #if CMK_ENABLE_ASYNC_PROGRESS
 #define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, stratid; unsigned char cksum, magic; int root, size, dstnode; CmiUInt2 redID, padding; char work[6*sizeof(uintptr_t)]; 
 #else
 #define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, stratid; unsigned char cksum, magic; int root, size; CmiUInt2 redID, padding; 
 #endif
+#endif
+
 #define CMK_MSG_HEADER_BASIC  CMK_MSG_HEADER_EXT
 #define CMK_MSG_HEADER_EXT    { CMK_MSG_HEADER_EXT_ }
 #define CMK_MSG_HEADER_BIGSIM_    { CMK_MSG_HEADER_EXT_ CMK_BIGSIM_FIELDS }
diff --git a/src/arch/pamilrts/machine-persistent.c b/src/arch/pamilrts/machine-persistent.c
new file mode 100644 (file)
index 0000000..664666f
--- /dev/null
@@ -0,0 +1,227 @@
+/*
+  included in machine.c
+    Yanhua 01/27/2013
+*/
+
+/*
+  machine specific persistent comm functions:
+  * LrtsSendPersistentMsg
+  * CmiSyncSendPersistent
+  * PumpPersistent
+  * PerAlloc PerFree      // persistent message memory allocation/free functions
+  * persist_machine_init  // machine specific initialization call
+*/
+typedef struct _cmi_pami_rzv_persist {
+  void             * srcPtr;
+  void             * destPtr; 
+  size_t           offset;
+  int              bytes;
+  int              dst_context;
+}CmiPAMIRzvPersist_t;  
+
+
+
+void rzv_persist_recv_done   (pami_context_t     ctxt, 
+    void             * clientdata, 
+    pami_result_t      result) 
+{
+  CmiPAMIRzvRecv_t recv = *(CmiPAMIRzvRecv_t *)clientdata;
+  CmiReference(recv.msg);
+  recv_done(ctxt, recv.msg, PAMI_SUCCESS);
+  sendAck(ctxt, &recv);
+}
+
+void rzv_persist_pkt_dispatch (pami_context_t       context,   
+    void               * clientdata,
+    const void         * header_addr,
+    size_t               header_size,
+    const void         * pipe_addr,  
+    size_t               pipe_size,  
+    pami_endpoint_t      origin,
+    pami_recv_t         * recv) 
+{
+  INCR_ORECVS();    
+
+  CmiPAMIRzvPersist_t  *rzv_hdr = (CmiPAMIRzvPersist_t *) header_addr;
+  CmiAssert (header_size == sizeof(CmiPAMIRzvPersist_t));  
+  int alloc_size = rzv_hdr->bytes;
+  char * buffer  = rzv_hdr->destPtr; 
+  //char * buffer  = (char *)CmiAlloc(alloc_size + sizeof(CmiPAMIRzvRecv_t));
+  //char *buffer=(char*)CmiAlloc(alloc_size+sizeof(CmiPAMIRzvRecv_t)+sizeof(int))
+  //*(int *)(buffer+alloc_size) = *(int *)header_addr;  
+  CmiAssert (recv == NULL);
+
+  CmiPAMIRzvRecv_t *rzv_recv = (CmiPAMIRzvRecv_t *)(buffer+alloc_size);
+  rzv_recv->msg        = buffer;
+  rzv_recv->src_ep     = origin;
+  rzv_recv->src_buffer = rzv_hdr->srcPtr;
+
+  CmiAssert (pipe_addr != NULL);
+  pami_memregion_t *mregion = (pami_memregion_t *) pipe_addr;
+  CmiAssert (pipe_size == sizeof(pami_memregion_t));
+
+  //Rzv inj fifos are on the 17th core shared by all contexts
+  pami_rget_simple_t  rget;
+  rget.rma.dest    = origin;
+  rget.rma.bytes   = rzv_hdr->bytes;
+  rget.rma.cookie  = rzv_recv;
+  rget.rma.done_fn = rzv_persist_recv_done;
+  rget.rma.hints.buffer_registered = PAMI_HINT_ENABLE;
+  rget.rma.hints.use_rdma = PAMI_HINT_ENABLE;
+  rget.rdma.local.mr      = &cmi_pami_memregion[rzv_hdr->dst_context].mregion;  
+  rget.rdma.local.offset  = (size_t)buffer - 
+    (size_t)cmi_pami_memregion[rzv_hdr->dst_context].baseVA;
+  rget.rdma.remote.mr     = mregion; //from message payload
+  rget.rdma.remote.offset = rzv_hdr->offset;
+
+  PAMI_Rget (context, &rget);  
+}
+
+
+void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *msg)
+{
+    int         to_lock; 
+    int         destIndex; 
+    PersistentSendsTable *slot = (PersistentSendsTable *)h;
+    if (h==NULL) {
+        CmiAbort("LrtsSendPersistentMsg: not a valid PersistentHandle");
+    }
+    CmiAssert(CmiNodeOf(slot->destPE) == destNode);
+    if (size > slot->sizeMax) {
+        CmiPrintf("size: %d sizeMax: %d mype=%d destPe=%d\n", size, slot->sizeMax, CmiMyPe(), destNode);
+        CmiAbort("Abort: Invalid size\n");
+    }
+
+    destIndex = slot->addrIndex;
+    if (slot->destBuf[destIndex].destAddress) {
+         //CmiPrintf("[%d===%d] LrtsSendPersistentMsg h=%p hdl=%d destNode=%d destAddress=%p size=%d\n", CmiMyPe(), destNode, h, CmiGetHandler(m), destNode, slot->destBuf[0].destAddress, size);
+
+        slot->addrIndex = (destIndex+1)%PERSIST_BUFFERS_NUM;
+#if  DELTA_COMPRESS
+        if(slot->compressFlag)
+        {
+            size = CompressPersistentMsg(h, size, msg);
+        }
+#endif
+  
+        CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
+        CMI_MSG_SIZE(msg) = size;
+        CMI_SET_CHECKSUM(msg, size);
+        to_lock = CpvAccess(uselock);
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+        int c = node % cmi_pami_numcontexts;
+        pami_context_t context = cmi_pami_contexts[c];    
+#else
+        pami_context_t context = MY_CONTEXT();    
+#endif
+
+        pami_endpoint_t target;
+#if CMK_PAMI_MULTI_CONTEXT
+        size_t dst_context = myrand(&r_seed) % cmi_pami_numcontexts;
+#else
+        size_t dst_context = 0;
+#endif
+        PAMI_Endpoint_create (cmi_pami_client, (pami_task_t)destNode, dst_context, &target);
+
+        CmiPAMIRzvPersist_t   rzv;
+        rzv.bytes       = size;
+        rzv.srcPtr      = msg;
+        rzv.destPtr     = slot->destBuf[destIndex].destAddress;
+        rzv.offset      = (size_t)msg - (size_t)cmi_pami_memregion[0].baseVA;
+        rzv.dst_context = dst_context;
+
+        pami_send_immediate_t parameters;
+        parameters.dispatch        = CMI_PAMI_RZV_PERSIST_DISPATCH;
+        parameters.header.iov_base = &rzv;
+        parameters.header.iov_len  = sizeof(rzv);
+        parameters.data.iov_base   = &cmi_pami_memregion[0].mregion;      
+        parameters.data.iov_len    = sizeof(pami_memregion_t);
+        parameters.dest = target;
+
+        if(to_lock)
+            PAMIX_CONTEXT_LOCK(context);
+
+        PAMI_Send_immediate (context, &parameters);
+
+        if(to_lock)
+            PAMIX_CONTEXT_UNLOCK(context);
+    } else {
+
+#if 1
+        if (slot->messageBuf != NULL) {
+            CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
+            CmiAbort("");
+        }
+        slot->messageBuf = msg;
+        slot->messageSize = size;
+#else
+    /* normal send */
+        PersistentHandle  *phs_tmp = phs;
+        int phsSize_tmp = phsSize;
+        phs = NULL; phsSize = 0;
+        CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
+        CmiSyncSendAndFree(slot->destPE, size, msg);
+        phs = phs_tmp; phsSize = phsSize_tmp;
+#endif
+    }
+}
+
+
+extern void CmiReference(void *blk);
+
+                                                                                
+void PerFree(char *msg)
+{
+    CmiFree(msg);
+}
+
+/* machine dependent init call */
+void persist_machine_init(void)
+{
+}
+
+void initSendSlot(PersistentSendsTable *slot)
+{
+  int i;
+  slot->destPE = -1;
+  slot->sizeMax = 0;
+  slot->destHandle = 0; 
+  memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
+  slot->messageBuf = 0;
+  slot->messageSize = 0;
+  slot->prev = slot->next = NULL;
+}
+
+void initRecvSlot(PersistentReceivesTable *slot)
+{
+  int i;
+  memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
+  slot->sizeMax = 0;
+  //slot->index = -1;
+  slot->prev = slot->next = NULL;
+}
+
+void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
+{
+  int i;
+  for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
+    char *buf = CmiAlloc(maxBytes+sizeof(CmiPAMIRzvRecv_t));
+    _MEMCHECK(buf);
+    memset(buf, 0, maxBytes+sizeof(CmiPAMIRzvRecv_t));
+    slot->destBuf[i].destAddress = buf;
+    /* note: assume first integer in elan converse header is the msg size */
+    //slot->destBuf[i].destSizeAddress = (unsigned int*)buf;
+    memset(buf, 0, maxBytes+sizeof(CmiPAMIRzvRecv_t));
+  }
+  slot->sizeMax = maxBytes;
+  slot->addrIndex = 0;
+}
+
+void clearRecvSlot(PersistentReceivesTable *slot)
+{
+}
+
+PersistentHandle getPersistentHandle(PersistentHandle h, int toindex)
+{
+    return h;
+}
diff --git a/src/arch/pamilrts/machine-persistent.h b/src/arch/pamilrts/machine-persistent.h
new file mode 100644 (file)
index 0000000..d93c7ff
--- /dev/null
@@ -0,0 +1,81 @@
+/** @file
+ * General implementation of persistent communication support
+ * @ingroup Machine
+ */
+
+/**
+ * \addtogroup Machine
+*/
+/*@{*/
+
+#define PERSIST_MIN_SIZE               EAGER_CUTOFF 
+//#define COPY_HISTORY                          1
+// one is for receive one is to store the previous msg
+#if DELTA_COMPRESS
+#if COPY_HISTORY 
+#define PERSIST_BUFFERS_NUM             1
+#else
+#define PERSIST_BUFFERS_NUM             2
+#endif
+#else
+#define PERSIST_BUFFERS_NUM             1
+#endif
+
+#define PERSIST_SEQ                     0xFFFFFFF
+
+#define IS_PERSISTENT_MEMORY(ptr)          (REFFIELD(msg) > PERSIST_SEQ/2)
+
+typedef struct  _PersistentBuf {
+  void *destAddress;
+//  void *destSizeAddress;
+} PersistentBuf;
+
+typedef struct _PersistentSendsTable {
+  int destPE;
+  int sizeMax;
+  PersistentHandle   destHandle; 
+  PersistentBuf     destBuf[PERSIST_BUFFERS_NUM];
+  void *messageBuf;
+  int messageSize;
+  struct _PersistentSendsTable *prev, *next;
+#if DELTA_COMPRESS
+  PersistentHandle destDataHandle;
+  void  *previousMsg;
+  int   previousSize;
+  int   compressStart;
+  int   compressSize;
+  int  dataType;
+  int   compressFlag;
+#endif
+  int addrIndex;
+} PersistentSendsTable;
+
+typedef struct _PersistentReceivesTable {
+  PersistentBuf     destBuf[PERSIST_BUFFERS_NUM];
+  int sizeMax;
+  size_t               index;
+  struct _PersistentReceivesTable *prev, *next;
+  int           addrIndex;
+#if DELTA_COMPRESS
+  int   compressStart;
+  int  dataType;
+  void  *history;
+#endif
+} PersistentReceivesTable;
+
+CpvExtern(PersistentReceivesTable *, persistentReceivesTableHead);
+CpvExtern(PersistentReceivesTable *, persistentReceivesTableTail);
+
+CpvExtern(PersistentHandle *, phs);
+CpvExtern(int, phsSize);
+CpvExtern(int, curphs);
+
+PersistentHandle getPersistentHandle(PersistentHandle h, int toindex);
+void *PerAlloc(int size);
+void PerFree(char *msg);
+int PumpPersistent();
+void swapSendSlotBuffers(PersistentSendsTable *slot);
+void swapRecvSlotBuffers(PersistentReceivesTable *slot);
+void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes);
+void clearRecvSlot(PersistentReceivesTable *slot);
+
index e0464757ce4ba1590faa3074052364ce684b0560..2a73b61d0ba4948d6e26a4f52537a6250898f7c7 100644 (file)
@@ -44,6 +44,12 @@ char *ALIGN_32(char *p) {
 #define SHORT_CUTOFF   128
 #define EAGER_CUTOFF   4096
 
+#if CMK_PERSISTENT_COMM
+void rzv_persist_pkt_dispatch (pami_context_t context, void *clientdata, const void *header_addr, size_t header_size, const void * pipe_addr,  size_t pipe_size,  pami_endpoint_t origin, pami_recv_t  * recv); 
+#define CMI_PAMI_RZV_PERSIST_DISPATCH            11 
+#include "machine-persistent.h"
+#endif
+
 #if !CMK_OPTIMIZE
 static int checksum_flag = 0;
 extern unsigned char computeCheckSum(unsigned char *data, int len);
@@ -304,7 +310,6 @@ static void short_pkt_dispatch (pami_context_t       context,
   //CmiPushPE(CMI_DEST_RANK(smsg), (void *)msg);
 }
 
-
 void rzv_pkt_dispatch (pami_context_t       context,   
     void               * clientdata,
     const void         * header_addr,
@@ -538,6 +543,15 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
         NULL,
         options);      
 
+#if CMK_PERSISTENT_COMM
+      pfn.p2p = rzv_persist_pkt_dispatch;
+      PAMI_Dispatch_set (cmi_pami_contexts[i],
+          CMI_PAMI_RZV_PERSIST_DISPATCH,
+          pfn,
+          NULL,
+          options);     
+#endif      
+
     pfn.p2p = short_pkt_dispatch;
     PAMI_Dispatch_set (cmi_pami_contexts[i],
         CMI_PAMI_SHORT_DISPATCH,
@@ -995,3 +1009,7 @@ void ack_pkt_dispatch (pami_context_t       context,
 }
 
 #include "cmimemcpy_qpx.h"
+
+#if CMK_PERSISTENT_COMM
+#include "machine-persistent.c"
+#endif
index 08fe0933c7e678a7ad12f24e15f618224a863e45..5201f4bdf1b1a011d2e324a73542637568b0e4e0 100644 (file)
@@ -753,8 +753,8 @@ void CmiDestoryAllPersistent()
     PersistentReceivesTable *next = slot->next;
     int i;
     for (i=0; i<PERSIST_BUFFERS_NUM; i++)  {
-      if (slot->destBuf[i].destSizeAddress)
-        CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered undelivered message.\n");
+      //if (slot->destBuf[i].destSizeAddress)
+      //  CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered undelivered message.\n");
       if (slot->destBuf[i].destAddress) PerFree((char*)slot->destBuf[i].destAddress);
     }
     free(slot);