persistent comm works again with the new mem registration scheme.
authorGengbin Zheng <gzheng@illinois.edu>
Sun, 18 Mar 2012 23:13:13 +0000 (16:13 -0700)
committerGengbin Zheng <gzheng@illinois.edu>
Sun, 18 Mar 2012 23:13:13 +0000 (16:13 -0700)
still using ack_msg, can be optimized later
tested on simple tests with both SMP and  non-SMP.

src/arch/gemini_gni/conv-common.h
src/arch/gemini_gni/machine-persistent.c
src/arch/gemini_gni/machine-persistent.h
src/arch/gemini_gni/machine.c
src/arch/util/machine-common-core.c
src/arch/util/persist-comm.c
src/conv-core/converse.h

index 7b84b7f6d432c4437974c170b6cd64bfd5513bc9..34a915a8557ea3df94552fddef56c910a472e241 100644 (file)
@@ -9,7 +9,11 @@
 
 #define CMK_HANDLE_SIGUSR                                  0
 
+#if CMK_ERROR_CHECKING
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt2 seq; unsigned char cksum, magic; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root; 
+#else
 #define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,stratid,redID; CmiInt4 root; 
+#endif
 
 #define CMK_MSG_HEADER_BASIC  CMK_MSG_HEADER_EXT
 #define CMK_MSG_HEADER_EXT    { CMK_MSG_HEADER_EXT_ }
index 5bcb72e577d4eeefca9b6db2a830a0b2e00f037a..2064c38e247e1d88c67776fd3cfcad11ad183f98 100644 (file)
   * persist_machine_init  // machine specific initialization call
 */
 
+#if USE_LRTS_MEMPOOL
 #define LRTS_GNI_RDMA_PUT_THRESHOLD  2048
-void LrtsSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
+#else
+#define LRTS_GNI_RDMA_PUT_THRESHOLD  16384
+#endif
+
+void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
 {
     gni_post_descriptor_t *pd;
     gni_return_t status;
@@ -27,56 +32,56 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
     CmiAssert(h!=NULL);
     PersistentSendsTable *slot = (PersistentSendsTable *)h;
     CmiAssert(slot->used == 1);
-    CmiAssert(slot->destPE == destPE);
+    CmiAssert(CmiNodeOf(slot->destPE) == destNode);
     if (size > slot->sizeMax) {
         CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
         CmiAbort("Abort: Invalid size\n");
     }
 
-    /* CmiPrintf("[%d] LrtsSendPersistentMsg h=%p hdl=%d destPE=%d destAddress=%p size=%d\n", CmiMyPe(), h, CmiGetHandler(m), destPE, slot->destBuf[0].destAddress, size); */
+    // CmiPrintf("[%d] LrtsSendPersistentMsg h=%p hdl=%d destNode=%d destAddress=%p size=%d\n", CmiMyPe(), h, CmiGetHandler(m), destNode, slot->destBuf[0].destAddress, size);
 
     if (slot->destBuf[0].destAddress) {
         // uGNI part
         MallocPostDesc(pd);
-#if USE_LRTS_MEMPOOL
-        if(size <= 2048){
-#else
-        if(size <= 16384){
-#endif
+        if(size <= LRTS_GNI_RDMA_PUT_THRESHOLD) {
             pd->type            = GNI_POST_FMA_PUT;
         }
         else
         {
             pd->type            = GNI_POST_RDMA_PUT;
-#if USE_LRTS_MEMPOOL
-            pd->local_mem_hndl  = GetMemHndl(m);
-#else
-            status = MEMORY_REGISTER(onesided_hnd, nic_hndl,  m, size, &(pd->local_mem_hndl), &omdh);
-#endif
-            GNI_RC_CHECK("Mem Register before post", status);
         }
         pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
         pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
-        pd->length          = size;
+        pd->length          = ALIGN64(size);
         pd->local_addr      = (uint64_t) m;
        
         pd->remote_addr     = (uint64_t)slot->destBuf[0].destAddress;
         pd->remote_mem_hndl = slot->destBuf[0].mem_hndl;
         pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
         pd->rdma_mode       = 0;
+        pd->cqwrite_value   = PERSIST_SEQ;
+        pd->amo_cmd         = 0;
 
+        SetMemHndlZero(pd->local_mem_hndl);
+#if CMK_SMP
+        bufferRdmaMsg(destNode, pd, -1);
+#else
+        status = registerMessage((void*)(pd->local_addr), pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
+        if (status == GNI_RC_SUCCESS) 
+        {
         if(pd->type == GNI_POST_RDMA_PUT) 
-            status = GNI_PostRdma(ep_hndl_array[destPE], pd);
+            status = GNI_PostRdma(ep_hndl_array[destNode], pd);
+        else
+            status = GNI_PostFma(ep_hndl_array[destNode],  pd);
+        }
         else
-            status = GNI_PostFma(ep_hndl_array[destPE],  pd);
+            status = GNI_RC_ERROR_RESOURCE;
         if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
         {
-            MallocRdmaRequest(rdma_request_msg);
-            rdma_request_msg->destNode = destPE;
-            rdma_request_msg->pd = pd;
-            PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
+            bufferRdmaMsg(destNode, pd, -1);
         }else
             GNI_RC_CHECK("AFter posting", status);
+#endif
     }
   else {
 #if 1
@@ -181,13 +186,24 @@ int PumpPersistent()
 
 void *PerAlloc(int size)
 {
-  return CmiAlloc(size);
+//  return CmiAlloc(size);
+  gni_return_t status;
+  void *res = NULL;
+  size = ALIGN64(size) + sizeof(CmiChunkHeader);
+  if (0 != posix_memalign(&res, 64, size))
+      CmiAbort("PerAlloc: failed to allocate memory.");
+  //printf("[%d] PerAlloc %p. \n", myrank, res);
+  MEMORY_REGISTER(onesided_hnd, nic_hndl,  res, size , &MEMHFIELD((char*)res+sizeof(CmiChunkHeader)), &omdh, NULL, status);
+  GNI_RC_CHECK("Mem Register before post", status);
+  return (char*)res+sizeof(CmiChunkHeader);
 }
                                                                                 
 void PerFree(char *msg)
 {
-  //elan_CmiStaticFree(msg);
-  CmiFree(msg);
+//  CmiFree(msg);
+  char *ptr = msg-sizeof(CmiChunkHeader);
+  MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &MEMHFIELD(msg) , &omdh, SIZEFIELD(msg));
+  free(ptr);
 }
 
 /* machine dependent init call */
@@ -198,20 +214,19 @@ void persist_machine_init(void)
 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
 {
   int i;
-  gni_return_t status;
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
     char *buf = PerAlloc(maxBytes+sizeof(int)*2);
     _MEMCHECK(buf);
     memset(buf, 0, maxBytes+sizeof(int)*2);
+    slot->destBuf[i].mem_hndl = MEMHFIELD(buf);
     slot->destBuf[i].destAddress = buf;
     /* note: assume first integer in elan converse header is the msg size */
     slot->destBuf[i].destSizeAddress = (unsigned int*)buf;
 #if USE_LRTS_MEMPOOL
-    slot->destBuf[i].mem_hndl = GetMemHndl(buf);
-#else
-    status = MEMORY_REGISTER(onesided_hnd, nic_hndl,  buf, maxBytes+sizeof(int)*2 , &(slot->destBuf[i].mem_hndl), &omdh);
-    GNI_RC_CHECK("Mem Register before post", status);
+    // assume already registered from mempool
+    // slot->destBuf[i].mem_hndl = GetMemHndl(buf);
 #endif
+    // FIXME:  assume always succeed
   }
   slot->sizeMax = maxBytes;
 }
index 9ea6516ed5e20533164bbc3e66c34523198319bf..cc6f2918dda1dd52aa1971e0547f70d0a4a04deb 100644 (file)
@@ -12,6 +12,8 @@
 
 #define PERSIST_BUFFERS_NUM             1
 
+#define PERSIST_SEQ                     0xFFFFFFFFFFFF
+
 typedef struct  _PersistentBuf {
   void *destAddress;
   void *destSizeAddress;
@@ -41,9 +43,9 @@ typedef struct _PersistentReceivesTable {
 extern PersistentReceivesTable *persistentReceivesTableHead;
 extern PersistentReceivesTable *persistentReceivesTableTail;
 
-extern PersistentHandle  *phs;
-extern int phsSize;
-extern int curphs;
+CpvExtern(PersistentHandle *, phs);
+CpvExtern(int, phsSize);
+CpvExtern(int, curphs);
 
 void *PerAlloc(int size);
 void PerFree(char *msg);
index 2ed356fcbd82faa3b6037697388423193cf1d783..ac6fa488dc579b137a7a1b78bd9a8003b5cd9860 100644 (file)
@@ -713,6 +713,28 @@ inline  void AckPool_freeslot(int s)
 
 #endif
 
+/* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
+#define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
+#define CHARM_MAGIC_NUMBER               126
+
+#if CMK_ERROR_CHECKING
+extern unsigned char computeCheckSum(unsigned char *data, int len);
+static int checksum_flag = 0;
+#define CMI_SET_CHECKSUM(msg, len)      \
+        if (checksum_flag)  {   \
+          ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
+          ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
+        }
+#define CMI_CHECK_CHECKSUM(msg, len)    \
+        if (checksum_flag)      \
+          if (computeCheckSum((unsigned char*)msg, len) != 0)   \
+            CmiAbort("Fatal error: checksum doesn't agree!\n");
+#else
+#define CMI_SET_CHECKSUM(msg, len)
+#define CMI_CHECK_CHECKSUM(msg, len)
+#endif
+/* =====End of Definitions of Message-Corruption Related Macros=====*/
+
 #if CMK_WITH_STATS
 typedef struct comm_thread_stats
 {
@@ -1527,6 +1549,7 @@ inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode,
 inline void LrtsPrepareEnvelope(char *msg, int size)
 {
     CmiSetMsgSize(msg, size);
+    CMI_SET_CHECKSUM(msg, size);
 }
 
 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
@@ -1669,7 +1692,7 @@ static void set_limit()
         MAX_REG_MEM  = _totalmem / numprocesses;
         MAX_BUFF_SEND = MAX_REG_MEM / 2;
         if (CmiMyPe() == 0)
-           printf("mem_max = %lld, send_max =%lld\n", MAX_REG_MEM, MAX_BUFF_SEND);
+           printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
         {
              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
@@ -1911,7 +1934,7 @@ static void PumpNetworkSmsg()
                 break;
             }
 #if PRINT_SYH
-            printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
+            printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
 #endif
             /* copy msg out and then put into queue (small message) */
             switch (msg_tag) {
@@ -1924,6 +1947,7 @@ static void PumpNetworkSmsg()
                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
+                CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
                 handleOneRecvedMsg(msg_nbytes, msg_data);
                 break;
             }
@@ -2009,14 +2033,16 @@ static void PumpNetworkSmsg()
                 break;
             }
 #if CMK_PERSISTENT_COMM
-            case PUT_DONE_TAG: //persistent message
-                void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
+            case PUT_DONE_TAG:  {   //persistent message
+                void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
                 int size = ((CONTROL_MSG *) header)->length;
                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
                 CmiReference(msg);
+                CMI_CHECK_CHECKSUM(msg, size);
                 handleOneRecvedMsg(size, msg); 
                 break;
+            }
 #endif
 #if CMK_DIRECT
             case DIRECT_PUT_DONE_TAG:  //cmi direct 
@@ -2074,6 +2100,44 @@ static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
 }
 #endif
 
+// register memory for a message
+// return mem handle
+static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
+{
+    gni_return_t status = GNI_RC_SUCCESS;
+
+    if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
+
+#if CMK_PERSISTENT_COMM
+        // persistent message is always registered
+    if (seqno == PERSIST_SEQ) {
+        if (!IsMemHndlZero(MEMHFIELD(msg))) {
+            *memh = MEMHFIELD(msg);
+            return GNI_RC_SUCCESS;
+        }
+        CmiAbort("registerMessage> persistent memory should already be registered.");
+    }
+#endif
+    if(seqno == 0)
+    {
+        if(IsMemHndlZero((GetMemHndl(msg))))
+        {
+            msg = (void*)(msg);
+            status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
+            if(status == GNI_RC_SUCCESS)
+                *memh = GetMemHndl(msg);
+        }
+        else {
+            *memh = GetMemHndl(msg);
+        }
+    }
+    else {
+        //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
+        status = registerMemory(msg, size, memh, NULL); 
+    }
+    return status;
+}
+
 // for BIG_MSG called on receiver side for receiving control message
 // LMSG_INIT_TAG
 static void getLargeMsgRequest(void* header, uint64_t inst_id )
@@ -2106,6 +2170,18 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     }
    
     pd->cqwrite_value = request_msg->seq_id;
+
+    SetMemHndlZero(pd->local_mem_hndl);
+    transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
+    status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
+    if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
+        if(NoMsgInRecv( (void*)(msg_data)))
+            register_size = GetMempoolsize((void*)(msg_data));
+        else
+            register_size = 0;
+    }
+
+#if 0
     if( request_msg->seq_id == 0)
     {
         pd->local_mem_hndl= GetMemHndl(msg_data);
@@ -2135,6 +2211,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
         }
     }
+#endif
     pd->first_operand = ALIGN64(size);                   //  total length
 
     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
@@ -2390,10 +2467,12 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
 #endif
             case GNI_POST_FMA_PUT:
                 if(tmp_pd->amo_cmd == 1) {
+#if CMK_DIRECT
                     //sender ACK to receiver to trigger it is done
                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
                     msg_tag = DIRECT_PUT_DONE_TAG;
+#endif
                 }
                 else {
                     CmiFree((void *)tmp_pd->local_addr);
@@ -2487,6 +2566,7 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
 #endif
                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
+                    CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
                 }else if(msg_tag == BIG_MSG_TAG){
                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
@@ -2501,6 +2581,7 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->second_operand/1000000.0), (double)((tmp_pd->second_operand+1)/1000000.0), (double)((tmp_pd->second_operand+2)/1000000.0), (void*)tmp_pd->local_addr); 
 #endif
                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
+                        CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
                     }
                 }
@@ -2541,6 +2622,17 @@ static void  SendRdmaMsg()
         gni_post_descriptor_t *pd = ptr->pd;
         status = GNI_RC_SUCCESS;
         
+        msg = (void*)(pd->local_addr);
+        status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
+        if(pd->cqwrite_value == 0) {
+            if(NoMsgInRecv(msg))
+                register_size = GetMempoolsize((void*)(pd->local_addr));
+            else
+                register_size = 0;
+        }
+        else
+            register_size = 0;
+#if 0
         if(pd->cqwrite_value == 0)
         {
             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
@@ -2563,9 +2655,10 @@ static void  SendRdmaMsg()
         {
             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl), NULL); 
         }
+#endif
         if(status == GNI_RC_SUCCESS)        //mem register good
         {
-            CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET? rdma_tx_cq_lock:default_tx_cq_lock;
+            CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
             CMI_GNI_LOCK(lock);
 #if REMOTE_EVENT
             if( pd->cqwrite_value == 0)
@@ -2579,7 +2672,7 @@ static void  SendRdmaMsg()
                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
             }
 #endif
-            if(pd->type == GNI_POST_RDMA_GET) 
+            if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT
             {
                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
             }
@@ -3509,12 +3602,18 @@ void* LrtsAlloc(int n_bytes, int header)
         ptr = res + ALIGNBUF - header;
 #endif
     }
+#if CMK_PERSISTENT_COMM
+    if (ptr) SetMemHndlZero(MEMHFIELD((char*)ptr+header));
+#endif
     return ptr;
 }
 
 void  LrtsFree(void *msg)
 {
     CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
+#if CMK_PERSISTENT_COMM
+    if (!IsMemHndlZero(MEMHFIELD((char*)msg+sizeof(CmiChunkHeader)))) return;
+#endif
     if (size <= SMSG_MAX_MSG)
         free(msg);
     else {
index d7356279c2d78fa1e195a7059d6ebb9dd8ed2f04..24f49374aa5fa754f3fa6807ac08eb2781976675 100644 (file)
@@ -521,18 +521,20 @@ void CmiFreeSendFn(int destPE, int size, char *msg) {
     if (CmiMyPe()==destPE) {
         CmiSendSelf(msg);
 #if CMK_PERSISTENT_COMM
-        if (phs) curphs++;
+        if (CpvAccess(phs)) CpvAccess(curphs)++;
 #endif
     } else {
 #if CMK_PERSISTENT_COMM
-        if (phs) {
+        if (CpvAccess(phs)) {
           if (size > 8192) {
-            CmiAssert(curphs < phsSize);
-            LrtsSendPersistentMsg(phs[curphs++], destPE, size, msg);
+            CmiAssert(CpvAccess(curphs) < CpvAccess(phsSize));
+            int destNode = CmiNodeOf(destPE);
+            CMI_DEST_RANK(msg) = CmiRankOf(destPE);
+            LrtsSendPersistentMsg(CpvAccess(phs)[CpvAccess(curphs)++], destNode, size, msg);
             return;
           }
           else
-            curphs++;
+            CpvAccess(curphs)++;
         }
 #endif
         int destNode = CmiNodeOf(destPE);
index 2464bba7186b672a4b133d2a6ab6a3a8a30bcd3a..d320c0f8b198b1d94544c44ffc957a266d938853 100644 (file)
@@ -50,9 +50,9 @@ int persistentRequestHandlerIdx;
 int persistentReqGrantedHandlerIdx;
 int persistentDestoryHandlerIdx;
 
-PersistentHandle  *phs = NULL;
-int phsSize;
-int curphs = 0;
+CpvDeclare(PersistentHandle *, phs);
+CpvDeclare(int, phsSize);
+CpvDeclare(int, curphs);
 
 /******************************************************************************
      Utilities
@@ -231,14 +231,15 @@ static void persistentReqGrantedHandler(void *env)
 {
   int i;
 
-
   PersistentReqGrantedMsg *msg = (PersistentReqGrantedMsg *)env;
   PersistentHandle h = msg->sourceHandlerIndex;
   PersistentSendsTable *slot = (PersistentSendsTable *)h;
-  CmiAssert(slot->used == 1);
 
   /* CmiPrintf("[%d] Persistent handler granted  h:%p\n", CmiMyPe(), h); */
 
+  CmiAssert(slot->used == 1);
+
+
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
 #if 0
     slot->destAddress[i] = msg->msgAddr[i];
@@ -250,7 +251,7 @@ static void persistentReqGrantedHandler(void *env)
   slot->destHandle = msg->destHandlerIndex;
 
   if (slot->messageBuf) {
-    LrtsSendPersistentMsg(h, slot->destPE, slot->messageSize, slot->messageBuf);
+    LrtsSendPersistentMsg(h, CmiNodeOf(slot->destPE), slot->messageSize, slot->messageBuf);
     slot->messageBuf = NULL;
   }
   CmiFree(msg);
@@ -400,6 +401,12 @@ void CmiPersistentInit()
   persistentDestoryHandlerIdx = 
        CmiRegisterHandler((CmiHandler)persistentDestoryHandler);
 
+  CpvInitialize(PersistentHandle*, phs);
+  CpvAccess(phs) = NULL;
+  CpvInitialize(int, phsSize);
+  CpvInitialize(int, curphs);
+  CpvAccess(curphs) = 0;
+
   persist_machine_init();
 
   for (i=0; i<TABLESIZE; i++) {
@@ -421,9 +428,9 @@ void CmiUsePersistentHandle(PersistentHandle *p, int n)
     if (p[i] == NULL) CmiAbort("CmiUsePersistentHandle: invalid PersistentHandle.\n");
   }
 #endif
-  phs = p;
-  phsSize = n;
-  curphs = 0;
+  CpvAccess(phs) = p;
+  CpvAccess(phsSize) = n;
+  CpvAccess(curphs) = 0;
 }
 
 #endif
index 3816233985ee3a76b3af1f4965ade27be5c52ce6..7ec8e3df10531282d251ae04e463d4815d5a273b 100644 (file)
@@ -610,12 +610,19 @@ CpvExtern(int, _curRestartPhase);      /* number of restarts */
 #define MESSAGE_PHASE_CHECK(msg)
 #endif
 
+#if CMK_CONVERSE_GEMINI_UGNI && !CMK_SEQUENTIAL
+#include "gni_pub.h"
+#endif
+
 /** This header goes before each chunk of memory allocated with CmiAlloc. 
     See the comment in convcore.c for details on the fields.
 */
 typedef struct {
   int size;
   int ref;
+#if  CMK_CONVERSE_GEMINI_UGNI && !CMK_SEQUENTIAL
+  gni_mem_handle_t memh;             /* register memory handle */
+#endif
 } CmiChunkHeader;
 
 #if CMK_USE_IBVERBS | CMK_USE_IBUD
@@ -632,6 +639,7 @@ struct infiCmiChunkMetaDataStruct *registerMultiSendMesg(char *msg,int msgSize);
 /* Given a user chunk m, extract the enclosing chunk header fields: */
 #define SIZEFIELD(m) (((CmiChunkHeader *)(m))[-1].size)
 #define REFFIELD(m) (((CmiChunkHeader *)(m))[-1].ref)
+#define MEMHFIELD(m) (((CmiChunkHeader *)(m))[-1].memh)
 #define BLKSTART(m) (((CmiChunkHeader *)(m))-1)
 
 extern void* malloc_nomigrate(size_t size);