added persistent code in Gemini
authorYanhua Sun <sun51@hopper09.(none)>
Tue, 13 Sep 2011 06:08:36 +0000 (23:08 -0700)
committerYanhua Sun <sun51@hopper09.(none)>
Tue, 13 Sep 2011 06:08:36 +0000 (23:08 -0700)
src/arch/gemini_gni/machine-persistent.c
src/arch/gemini_gni/machine.c

index 4fb7b43bb36f99fd5dbf5ab1603b3fb90dd9ac4d..bfb2c907cbdc4d1dc83ef7b6b4355c09cc6f1bc7 100644 (file)
 
 void LrtsSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
 {
-  CmiAssert(h!=NULL);
-  PersistentSendsTable *slot = (PersistentSendsTable *)h;
-  CmiAssert(slot->used == 1);
-  CmiAssert(slot->destPE == destPE);
-  if (size > slot->sizeMax) {
-    CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
-    CmiAbort("Abort: Invalid size\n");
-  }
+    gni_post_descriptor_t *pd;
+    gni_return_t status;
+    RDMA_REQUEST        *rdma_request_msg;
+    
+    CmiAssert(h!=NULL);
+    PersistentSendsTable *slot = (PersistentSendsTable *)h;
+    CmiAssert(slot->used == 1);
+    CmiAssert(slot->destPE == destPE);
+    if (size > slot->sizeMax) {
+        CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
+        CmiAbort("Abort: Invalid size\n");
+    }
 
 /*CmiPrintf("[%d] LrtsSendPersistentMsg h=%p hdl=%d destPE=%d destAddress=%p size=%d\n", CmiMyPe(), *phs, CmiGetHandler(m), destPE, slot->destAddress[0], size);*/
 
-  if (slot->destBuf[0].destAddress) {
-#if 0
-    ELAN_EVENT *e1, *e2;
-    int strategy = STRATEGY_ONE_PUT;
-    /* if (size > 280) strategy = STRATEGY_TWO_ELANPUT; */
-    int *footer = (int*)((char*)m + size);
-    footer[0] = size;
-    footer[1] = 1;
-    if (strategy == STRATEGY_ONE_PUT) CMI_MESSAGE_SIZE(m) = size;
-    else CMI_MESSAGE_SIZE(m) = 0;
-    e1 = elan_put(elan_base->state, m, slot->destAddress[0], size+sizeof(int)*2, destPE);
-    switch (strategy ) {
-    case STRATEGY_ONE_PUT:
-    case STRATEGY_TWO_PUT:  {
-      PMSG_LIST *msg_tmp;
-      NEW_PMSG_LIST(e1, m, size, destPE, slot->destSizeAddress[0], h, strategy);
-      APPEND_PMSG_LIST(msg_tmp);
-      swapSendSlotBuffers(slot);
-      break;
-      }
-    case 2:
-      elan_wait(e1, ELAN_POLL_EVENT);
-      e2 = elan_put(elan_base->state, &size, slot->destSizeAddress[0], sizeof(int), destPE);
-      elan_wait(e2, ELAN_POLL_EVENT);
-      CMI_MESSAGE_SIZE(m) = 0;
-      /*CmiPrintf("[%d] elan finished. \n", CmiMyPe());*/
-      CmiFree(m);
+    if (slot->destBuf[0].destAddress) {
+        // uGNI part
+     
+        MallocPostDesc(pd);
+        if(size < LRTS_GNI_RDMA_THRESHOLD) 
+            pd->type            = GNI_POST_FMA_PUT;
+        else
+            pd->type            = GNI_POST_RDMA_PUT;
+
+        pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
+        pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
+        pd->length          = size;
+        pd->local_addr      = (uint64_t) m;
+        pd->local_mem_hndl  = GetMemHndl(m) ;
+        pd->remote_addr     = (uint64_t)slot->destBuf[0].destAddress;
+        pd->remote_mem_hndl = slot->destBuf[0].mem_hndl;
+        pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
+        pd->rdma_mode       = 0;
+
+        if(pd->type == GNI_POST_RDMA_PUT) 
+            status = GNI_PostRdma(ep_hndl_array[destPE], pd);
+        else
+            status = GNI_PostFma(ep_hndl_array[destPE],  pd);
+        if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
+        {
+            MallocRdmaRequest(rdma_request_msg);
+            rdma_request_msg->next = 0;
+            rdma_request_msg->destNode = destPE;
+            rdma_request_msg->pd = pd;
+            if(pending_rdma_head == 0)
+            {
+                pending_rdma_head = rdma_request_msg;
+            }else
+            {
+                pending_rdma_tail->next = rdma_request_msg;
+            }
+            pending_rdma_tail = rdma_request_msg;
+        }else
+            GNI_RC_CHECK("AFter posting", status);
     }
-#else
-     // uGNI part
-#endif
-  }
   else {
 #if 1
     if (slot->messageBuf != NULL) {
@@ -190,7 +202,7 @@ void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
     slot->destBuf[i].destAddress = buf;
     /* note: assume first integer in elan converse header is the msg size */
     slot->destBuf[i].destSizeAddress = (unsigned int*)buf;
-    // slot->destBuf[i].mem_hdl = 0;
+    slot->destBuf[i].mem_hndl = GetMemHndl(buf);
   }
   slot->sizeMax = maxBytes;
 }
index 5cf794ec1bf7b2e9b97ce9cd4f3b4325aea0eef7..ebd7366dc54dc7ee6aa9d0213251fff1a9252c42 100644 (file)
@@ -97,7 +97,7 @@ static int  SMSG_MAX_MSG;
 
 #define REMOTE_QUEUE_ENTRIES  1048576
 #define LOCAL_QUEUE_ENTRIES   32
-
+#define PUT_DONE_TAG      0x29
 #define ACK_TAG           0x30
 /* SMSG is data message */
 #define SMALL_DATA_TAG          0x31
@@ -841,6 +841,10 @@ static void PumpNetworkSmsg()
                 SendRdmaMsg();
                 break;
             }
+            case PUT_DONE_TAG: //persistent message
+            {
+                handleOneRecvedMsg(((CONTROL_MSG *) header)->length,((void*) (CONTROL_MSG *) header)); 
+            }
             default: {
                 CmiPrintf("weird tag problem\n");
                 CmiAbort("Unknown tag\n");
@@ -1010,6 +1014,7 @@ static void PumpLocalRdmaTransactions()
     gni_post_descriptor_t   *tmp_pd;
     MSG_LIST                *ptr;
     CONTROL_MSG             *ack_msg_tmp;
+    uint8_t             msg_tag;
 
     while ( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
     {
@@ -1031,51 +1036,51 @@ static void PumpLocalRdmaTransactions()
             //CmiPrintf("**[%d] SMSGPumpLocalTransactions local done(type=%d) length=%d, size=%d\n", myrank, type, tmp_pd->length, SIZEFIELD((void*)(tmp_pd->local_addr)) );
             ////Message is sent, free message , put is not used now
             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
-            {
+            {   //persistent message 
                 CmiFree((void *)tmp_pd->local_addr);
+                msg_tag = PUT_DONE_TAG;  
             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
             {
-                /* Send an ACK to remote side */
-                MallocControlMsg(ack_msg_tmp);
-                ack_msg_tmp->source             = myrank;
-                ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
-                ack_msg_tmp->length             = tmp_pd->length; 
-                ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
+                msg_tag = ACK_TAG;  
+            }
+            MallocControlMsg(ack_msg_tmp);
+            ack_msg_tmp->source             = myrank;
+            ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
+            ack_msg_tmp->length             = tmp_pd->length; 
+            ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
 #if PRINT_SYH
-                lrts_send_msg_id++;
-                CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
+            lrts_send_msg_id++;
+            CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
 #endif
-                if(smsg_msglist_head[inst_id]!=0)
-                {
-                    delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, ACK_TAG);
-                }else
+            if(smsg_msglist_head[inst_id]!=0)
+            {
+                delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, msg_tag);
+            }else
+            {
+                status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, msg_tag);
+                if(status == GNI_RC_SUCCESS)
                 {
-                    status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, ACK_TAG);
-                    if(status == GNI_RC_SUCCESS)
-                    {
 #if PRINT_SYH
-                        lrts_smsg_success++;
-                        CmiPrintf("[%d==>%d] sent ACK done%d\n", myrank, inst_id, lrts_smsg_success);
+                    lrts_smsg_success++;
+                    CmiPrintf("[%d==>%d] sent ACK done%d\n", myrank, inst_id, lrts_smsg_success);
 #endif
-                        FreeControlMsg(ack_msg_tmp);
-                    }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
-                    {
-                        delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, ACK_TAG);
-                    }
-                    else
-                        GNI_RC_CHECK("GNI_SmsgSendWTag", status);
+                    FreeControlMsg(ack_msg_tmp);
+                }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
+                {
+                    delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, msg_tag);
                 }
+                else
+                    GNI_RC_CHECK("GNI_SmsgSendWTag", status);
+            }
 #if     !USE_LRTS_MEMPOOL
-                MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
+            MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
 #endif
-                CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
-                //handleOneRecvedMsg(SIZEFIELD((void*)(tmp_pd->local_addr)), (void*)tmp_pd->local_addr); 
-                handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
-                SendRdmaMsg(); 
-            }
-            FreePostDesc(tmp_pd);
+            CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
+            handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
+            SendRdmaMsg(); 
         }
-    }
+        FreePostDesc(tmp_pd);
+    } //end while
 }
 
 static int SendRdmaMsg()
@@ -1096,7 +1101,7 @@ static int SendRdmaMsg()
         }
         if(status == GNI_RC_SUCCESS)
         {
-            if(pd->type == GNI_POST_RDMA_GET) 
+            if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT
                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
             else
                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
@@ -1480,7 +1485,9 @@ static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
 #else
     ptag = get_ptag();
     cookie = get_cookie();
-    
+#if 0
+    modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
+#endif
     //Create and attach to the communication  domain */
     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
     GNI_RC_CHECK("GNI_CdmCreate", status);