fixed msg next bug
authorYanhua Sun <sun51@hopper07.(none)>
Sun, 7 Aug 2011 21:38:24 +0000 (14:38 -0700)
committerYanhua Sun <sun51@hopper07.(none)>
Sun, 7 Aug 2011 21:38:24 +0000 (14:38 -0700)
src/arch/gemini_gni/machine.c

index 4f39d6603261e9a4df908c71f3643ed6b02c0842..e30cc1e8c49148b2bd60b7a92a7f4962bb252531 100644 (file)
@@ -74,7 +74,7 @@ static void sleep(int secs) {
 #undef GNI_RC_CHECK
 #endif
 #ifdef DEBUG
-#define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("%s; err=%s\n",msg,gni_err_str[rc]); exit(911); } } while(0)
+#define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("%s; err=%s\n",msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
 #else
 #define GNI_RC_CHECK(msg,rc)
 #endif
@@ -131,8 +131,8 @@ typedef struct control_msg
 }CONTROL_MSG;
 
 /* reuse PendingMsg memory */
-static CONTROL_MSG  *control_freelist=NULL;
-static MSG_LIST     *msglist_freelist=NULL;
+static CONTROL_MSG  *control_freelist=0;
+static MSG_LIST     *msglist_freelist=0;
 
 #define FreeControlMsg(d)       \
   do {  \
@@ -412,6 +412,7 @@ static int send_with_smsg(int destNode, int size, char *msg)
             msg_tmp->msg                    = control_msg_tmp;
             msg_tmp->tag                    = tag_control;
         }
+        msg_tmp->next               = 0;
         buffered_smsg_tail->next    = msg_tmp;
         buffered_smsg_tail          = msg_tmp;
         return 0;
@@ -435,6 +436,7 @@ static int send_with_smsg(int destNode, int size, char *msg)
                 msg_tmp->size       = size;
                 msg_tmp->msg        = msg;
                 msg_tmp->tag        = tag_data;
+                msg_tmp->next       = 0;
                 /* store into buffer smsg_list and send later */
                 buffered_smsg_head  = msg_tmp;
                 buffered_smsg_tail  = msg_tmp;
@@ -470,6 +472,7 @@ static int send_with_smsg(int destNode, int size, char *msg)
                 msg_tmp ->size      = sizeof(CONTROL_MSG);
                 msg_tmp->msg        = control_msg_tmp;
                 msg_tmp->tag        = tag_control;
+                msg_tmp->next       = 0;
                 /* store into buffer smsg_list and send later */
                 buffered_smsg_head = msg_tmp;
                 buffered_smsg_tail = msg_tmp;
@@ -503,15 +506,15 @@ void LrtsPostNonLocal(){}
 /* pooling CQ to receive network message */
 static void PumpNetworkMsgs()
 {
-    void *header;
+    void                *header;
     uint8_t             msg_tag, data_tag, control_tag, ack_tag;
-    gni_return_t status;
-    uint64_t inst_id;
-    gni_cq_entry_t event_data;
-    int msg_nbytes;
-    void *msg_data;
-    gni_mem_handle_t msg_mem_hndl;
-    CONTROL_MSG *request_msg;
+    gni_return_t        status;
+    uint64_t            inst_id;
+    gni_cq_entry_t      event_data;
+    int                 msg_nbytes;
+    void                *msg_data;
+    gni_mem_handle_t    msg_mem_hndl;
+    CONTROL_MSG         *request_msg;
     gni_post_descriptor_t *pd;
 
     data_tag        = DATA_TAG;
@@ -523,8 +526,13 @@ static void PumpNetworkMsgs()
     if(status == GNI_RC_SUCCESS)
     {
         inst_id = GNI_CQ_GET_INST_ID(event_data);
-    }else
+    }else if (status == GNI_RC_NOT_DONE)
+    {
         return;
+    }else
+    {
+        GNI_RC_CHECK("CQ Get event", status);
+    }
   
     msg_tag = GNI_SMSG_ANY_TAG;
     status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
@@ -535,7 +543,7 @@ static void PumpNetworkMsgs()
         /* copy msg out and then put into queue */
         if(msg_tag == data_tag)
         {
-            msg_nbytes  = *(int*)header;
+            memcpy(&msg_nbytes, header, sizeof(int));
             msg_data    = CmiAlloc(msg_nbytes);
             memcpy(msg_data, (char*)header+sizeof(int), msg_nbytes);
             handleOneRecvedMsg(msg_nbytes, msg_data);
@@ -585,13 +593,7 @@ static void PumpNetworkMsgs()
             CmiAbort("Unknown tag\n");
         }
         GNI_SmsgRelease(ep_hndl_array[inst_id]);
-    }else
-    {
-        //CmiPrintf("Message not ready\n");
-        //
-        return;
-    }
-
+    }  //else not match smsg, maybe larger message
     }   // end of while loop
 }
 
@@ -611,91 +613,90 @@ static void PumpLocalTransactions()
     while (1) 
     {
 
-    status = GNI_CqGetEvent(tx_cqh, &ev);
-    if(status == GNI_RC_SUCCESS)
-    {
-        type        = GNI_CQ_GET_TYPE(ev);
-        inst_id     = GNI_CQ_GET_INST_ID(ev);
-        data_addr   = GNI_CQ_GET_DATA(ev);
-    }else
-        return;
-
-    if(type == GNI_CQ_EVENT_TYPE_POST)
-    {
-        status = GNI_GetCompleted(tx_cqh, ev, &tmp_pd);
-        GNI_RC_CHECK("Local CQ completed ", status);
-        //Message is sent, free message , put is not used now
-        if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
+        status = GNI_CqGetEvent(tx_cqh, &ev);
+        if(status == GNI_RC_SUCCESS)
+        {
+            type        = GNI_CQ_GET_TYPE(ev);
+            inst_id     = GNI_CQ_GET_INST_ID(ev);
+            data_addr   = GNI_CQ_GET_DATA(ev);
+        }else if (status == GNI_RC_NOT_DONE)
         {
-            CmiFree((void *)tmp_pd->local_addr);
-        }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
+            return;
+        }else
+        {
+            GNI_RC_CHECK("CQ Get event", status);
+        }
+
+        if(type == GNI_CQ_EVENT_TYPE_POST)
         {
-           /* Send an ACK to remote side */
-           /*ack_pd.type = GNI_POST_CQWRITE;
-           ack_pd.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
-           ack_pd.dlvr_mode = GNI_DLVMODE_NO_ADAPT;
-           ack_pd.cqwrite_value = tmp_pd->remote_addr&0x0000ffffffffffff;
-           ack_pd.remote_mem_hndl = tmp_pd->remote_mem_hndl;
-           status = GNI_PostCqWrite(ep_hndl_array[inst_id], &ack_pd);
-           GNI_RC_CHECK("Ack Post by CQWrite ", status);
-           */
-            //CmiPrintf("\nPE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, SIZEFIELD((void*)tmp_pd->local_addr), tmp_pd->length, tmp_pd->remote_addr); 
-           //CmiPrintf("\n+PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, remote_length , tmp_pd->length, (void*)remote_addr); 
-           MallocControlMsg(ack_msg_tmp);
-           ack_msg_tmp->source = myrank;
-           //CmiPrintf("\n++PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, SIZEFIELD((void*)tmp_pd->local_addr), tmp_pd->length, tmp_pd->remote_addr); 
-           //CmiPrintf("\n+++PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, remote_length , tmp_pd->length, (void*)remote_addr); 
-           ack_msg_tmp->source_addr = tmp_pd->remote_addr;
-           ack_msg_tmp->length=tmp_pd->length; 
-           ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
-           //CmiPrintf("PE:%d sending ACK back addr=%p \n", myrank, ack_msg_tmp->source_addr); 
+            status = GNI_GetCompleted(tx_cqh, ev, &tmp_pd);
+            GNI_RC_CHECK("Local CQ completed ", status);
+            //Message is sent, free message , put is not used now
+            if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
+            {
+                CmiFree((void *)tmp_pd->local_addr);
+            }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
+            {
+                /* Send an ACK to remote side */
+                //CmiPrintf("\nPE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, SIZEFIELD((void*)tmp_pd->local_addr), tmp_pd->length, tmp_pd->remote_addr); 
+                ////CmiPrintf("\n+PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, remote_length , tmp_pd->length, (void*)remote_addr); 
+                MallocControlMsg(ack_msg_tmp);
+                ack_msg_tmp->source = myrank;
+                //CmiPrintf("\n++PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, SIZEFIELD((void*)tmp_pd->local_addr), tmp_pd->length, tmp_pd->remote_addr); 
+                ////CmiPrintf("\n+++PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, remote_length , tmp_pd->length, (void*)remote_addr); 
+                ack_msg_tmp->source_addr = tmp_pd->remote_addr;
+                ack_msg_tmp->length=tmp_pd->length; 
+                ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
+                //CmiPrintf("PE:%d sending ACK back addr=%p \n", myrank, ack_msg_tmp->source_addr); 
            
-           if(buffered_smsg_head!=0)
-           {
-               CmiPrintf("[%d] PumpLocalTransactions: smsg buffered.\n", myrank);
-               MallocMsgList(msg_tmp);
-               msg_tmp->msg = ack_msg_tmp;
-               msg_tmp ->size = sizeof(CONTROL_MSG);
-               msg_tmp->tag = ack_tag;
-               msg_tmp->destNode = inst_id;
-               buffered_smsg_tail->next = msg_tmp;
-               buffered_smsg_tail = msg_tmp;
-           }else
-           {
-               //CmiPrintf("PE:%d sending ACK back addr=%p \n", myrank, ack_msg_tmp->source_addr); 
-               status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, ack_tag);
-               if(status == GNI_RC_SUCCESS)
-               {
-                   FreeControlMsg(ack_msg_tmp);
-               }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
-               {
-                   CmiPrintf("[%d] PumpLocalTransactions: ack smsg buffered.\n", myrank);
-                   MallocMsgList(msg_tmp);
-                   msg_tmp->msg = ack_msg_tmp;
-                   msg_tmp ->size = sizeof(CONTROL_MSG);
-                   msg_tmp->tag = ack_tag;
-                   msg_tmp->destNode = inst_id;
-                   buffered_smsg_head = msg_tmp;
-                   buffered_smsg_tail = msg_tmp;
-               }
-               else
-                   GNI_RC_CHECK("GNI_SmsgSendWTag", status);
-           }
-           handleOneRecvedMsg(SIZEFIELD((void*)tmp_pd->local_addr), (void*)tmp_pd->local_addr); 
+                if(buffered_smsg_head!=0)
+                {
+                    //CmiPrintf("[%d] PumpLocalTransactions: smsg buffered.\n", myrank);
+                    MallocMsgList(msg_tmp);
+                    msg_tmp->msg = ack_msg_tmp;
+                    msg_tmp ->size = sizeof(CONTROL_MSG);
+                    msg_tmp->tag = ack_tag;
+                    msg_tmp->destNode = inst_id;
+                    msg_tmp->next = 0;
+                    buffered_smsg_tail->next = msg_tmp;
+                    buffered_smsg_tail = msg_tmp;
+                }else
+                {
+                    //CmiPrintf("PE:%d sending ACK back addr=%p \n", myrank, ack_msg_tmp->source_addr); 
+                    status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, ack_tag);
+                    if(status == GNI_RC_SUCCESS)
+                    {
+                        FreeControlMsg(ack_msg_tmp);
+                    }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
+                    {
+                        // CmiPrintf("[%d] PumpLocalTransactions: ack smsg buffered.\n", myrank);
+                        MallocMsgList(msg_tmp);
+                        msg_tmp->msg = ack_msg_tmp;
+                        msg_tmp ->size = sizeof(CONTROL_MSG);
+                        msg_tmp->tag = ack_tag;
+                        msg_tmp->next = 0;
+                        msg_tmp->destNode = inst_id;
+                        buffered_smsg_head = msg_tmp;
+                        buffered_smsg_tail = msg_tmp;
+                    }
+                    else
+                        GNI_RC_CHECK("GNI_SmsgSendWTag", status);
+                }
+                handleOneRecvedMsg(SIZEFIELD((void*)tmp_pd->local_addr), (void*)tmp_pd->local_addr); 
+            }
         }
-    }
     }   /* end of while loop */
 }
 
 static void SendBufferMsg()
 {
-    MSG_LIST *ptr;
-    uint8_t tag_data, tag_control, tag_ack;
-    gni_return_t status;
+    MSG_LIST        *ptr;
+    uint8_t         tag_data, tag_control, tag_ack;
+    gni_return_t     status;
 
-    tag_data = DATA_TAG;
+    tag_data    = DATA_TAG;
     tag_control = LMSG_INIT_TAG;
-    tag_ack = ACK_TAG;
+    tag_ack     = ACK_TAG;
     /* can add flow control here to control the number of messages sent before handle message */
     while(buffered_smsg_head != 0)
     {
@@ -948,8 +949,8 @@ static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rx_cqh);
     GNI_RC_CHECK("Create CQ (rx)", status);
     
-    status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &remote_bte_cq_hndl);
-    GNI_RC_CHECK("Create BTE CQ", status);
+    //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &remote_bte_cq_hndl);
+    //GNI_RC_CHECK("Create BTE CQ", status);
 
     /* create the endpoints. they need to be bound to allow later CQWrites to them */
     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
@@ -976,7 +977,7 @@ static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
         _init_smallMsgwithFma(mysize);
     }
     free(MPID_UGNI_AllAddr);
-    PRINT_INFO("\nDone with LrtsInit")
+    //PRINT_INFO("\nDone with LrtsInit")
 }
 
 #define ALIGNBUF                64
@@ -1007,9 +1008,6 @@ void  LrtsFree(void *msg)
       free(msg);
     else
       free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
-/*
-    CmiFree(msg);
-*/
 }
 
 static void* LrtsAllocRegister(int n_bytes, gni_mem_handle_t* mem_hndl)
@@ -1036,6 +1034,7 @@ static void LrtsExit()
 
 void CmiAbort(const char *message) {
 
+    CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
     PMI_Abort(-1, message);
 }