many changes including adding size field to header
authorGengbin Zheng <gzheng@illinois.edu>
Tue, 9 Aug 2011 15:10:49 +0000 (08:10 -0700)
committerGengbin Zheng <gzheng@illinois.edu>
Tue, 9 Aug 2011 15:10:49 +0000 (08:10 -0700)
src/arch/gemini_gni/conv-common.h
src/arch/gemini_gni/machine.c

index 7404b83dc32b7bd49f33ed0913d094f114a5af00..aa2e669d49acabe32c8d73738058d9a2838a6c76 100644 (file)
@@ -7,7 +7,7 @@
 
 #define CMK_HANDLE_SIGUSR                                  1
 
-#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, stratid, redID; CmiInt4 root; 
+#define CMK_MSG_HEADER_EXT_    CmiUInt8 size; CmiUInt2 rank, hdl,xhdl,info, stratid, redID; CmiInt4 root; 
 
 #define CMK_MSG_HEADER_BASIC  CMK_MSG_HEADER_EXT
 #define CMK_MSG_HEADER_EXT    { CMK_MSG_HEADER_EXT_ }
index 0d270f017799a4cc116fa10657c2671f5561f537..6baca6ef4659eaa9cd8e12856578eb2af3f0b843 100644 (file)
 #include <sys/types.h>
 #include <sys/timeb.h>
 
-//#define  USE_ONESIDED 1
-
-#ifdef USE_ONESIDED
-#include "onesided.h"
-#endif
 static void sleep(int secs) {
     Sleep(1000*secs);
 }
 #else
 #include <unistd.h> /*For getpid()*/
 #endif
-#include <stdlib.h> /*For sleep()*/
+
+//#define  USE_ONESIDED 1
+#ifdef USE_ONESIDED
+#include "onesided.h"
+#endif
 
 #include "machine.h"
 
 #include "pcqueue.h"
 
+#define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
+#define CmiSetMsgSize(m,s)  do {((((CmiMsgHeaderExt*)m)->size)=(s));} while(0)
+
 #define DEBUY_PRINT
 
 #ifdef DEBUY_PRINT
@@ -62,12 +64,12 @@ static void sleep(int secs) {
 #define LRTS_GNI_RDMA_THRESHOLD  16384
 
 #define REMOTE_QUEUE_ENTRIES  1048576
-#define LOCAL_QUEUE_ENTRIES 1024
+#define LOCAL_QUEUE_ENTRIES   1024
 /* SMSG is data message */
-#define DATA_TAG        0x38
+#define DATA_TAG          0x38
 /* SMSG is a control message to initialize a BTE */
-#define LMSG_INIT_TAG        0x39 
-#define ACK_TAG         0x37
+#define LMSG_INIT_TAG     0x39 
+#define ACK_TAG           0x37
 
 #define DEBUG
 #ifdef GNI_RC_CHECK
@@ -182,6 +184,7 @@ static RDMA_REQUEST         *rdma_freelist = 0;
 /* reuse gni_post_descriptor_t */
 static gni_post_descriptor_t *post_freelist=NULL;
 
+#if 1
 #define FreePostDesc(d)       \
   do {  \
     (d)->next_descr = post_freelist;\
@@ -194,11 +197,19 @@ static gni_post_descriptor_t *post_freelist=NULL;
      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
      _MEMCHECK(d);\
   } else post_freelist = d->next_descr;
+#else
+
+#define FreePostDesc(d)     free(d);
+#define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
 
+#endif
+
+static int send_pending = 0;
 
 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
 static MSG_LIST *buffered_smsg_head= 0;
 static MSG_LIST *buffered_smsg_tail= 0;
+
 /* SmsgSend return success but message sent is not confirmed by remote side */
 
 static RDMA_REQUEST  *pending_rdma_head = 0;
@@ -347,26 +358,43 @@ void CmiMachineProgressImpl() {
 }
 #endif
 
+inline
+static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
+{
+    MSG_LIST        *msg_tmp;
+    MallocMsgList(msg_tmp);
+    msg_tmp->destNode = destNode;
+    msg_tmp->size   = size;
+    msg_tmp->msg    = msg;
+    msg_tmp->tag    = tag;
+    msg_tmp->next   = NULL;
+    if (buffered_smsg_tail == NULL) {
+      buffered_smsg_head  = buffered_smsg_tail  = msg_tmp;
+    }
+    else {
+      buffered_smsg_tail->next    = msg_tmp;
+      buffered_smsg_tail          = msg_tmp;
+    }
+}
+
 static int send_with_smsg(int destNode, int size, char *msg)
 {
     gni_return_t        status  =   GNI_RC_SUCCESS;
-    MSG_LIST            *msg_tmp;
     CONTROL_MSG         *control_msg_tmp;
-    uint8_t             tag_data    = DATA_TAG;
-    uint8_t             tag_control = LMSG_INIT_TAG ;
+    const uint8_t       tag_data    = DATA_TAG;
+    const uint8_t       tag_control = LMSG_INIT_TAG ;
     uint32_t            vmdh_index  = -1;
 
+    CmiSetMsgSize(msg, size);
+
     /* No mailbox available, buffer this msg and its info */
     if(buffered_smsg_head != 0)
     {
-        MallocMsgList(msg_tmp);
-        msg_tmp->destNode = destNode;
         if(size <=SMSG_MAX_MSG)
         {
-            msg_tmp->size   = size;
-            msg_tmp->msg    = msg;
-            msg_tmp->tag    = tag_data;
-        }else
+            delay_send_small_msg(msg, size, destNode, tag_data);
+        }
+        else
         {
             MallocControlMsg(control_msg_tmp);
             /*
@@ -385,13 +413,8 @@ static int send_with_smsg(int destNode, int size, char *msg)
             control_msg_tmp->length         =size; 
             control_msg_tmp->source_mem_hndl.qword1 = 0;
             control_msg_tmp->source_mem_hndl.qword2 = 0;
-            msg_tmp->size                   = sizeof(CONTROL_MSG);
-            msg_tmp->msg                    = control_msg_tmp;
-            msg_tmp->tag                    = tag_control;
+            delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, tag_control);
         }
-        msg_tmp->next               = 0;
-        buffered_smsg_tail->next    = msg_tmp;
-        buffered_smsg_tail          = msg_tmp;
         return 0;
     }
     else {
@@ -399,24 +422,18 @@ static int send_with_smsg(int destNode, int size, char *msg)
         if(size <= SMSG_MAX_MSG)
         {
             /* send the msg itself */
-            status = GNI_SmsgSendWTag(ep_hndl_array[destNode], &size, sizeof(int), msg, size, 0, tag_data);
+            status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag_data);
+            //CmiPrintf("[%d] send_with_smsg sends a data msg to PE %d status: %s\n", myrank, destNode, gni_err_str[status]);
             if (status == GNI_RC_SUCCESS)
             {
+                send_pending++;
                 CmiFree(msg);
                 return 1;
             }
             else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
             {
                 //CmiPrintf("[%d] data msg add to send queue\n", myrank);
-                MallocMsgList(msg_tmp);
-                msg_tmp->destNode   = destNode;
-                msg_tmp->size       = size;
-                msg_tmp->msg        = msg;
-                msg_tmp->tag        = tag_data;
-                msg_tmp->next       = 0;
-                /* store into buffer smsg_list and send later */
-                buffered_smsg_head  = msg_tmp;
-                buffered_smsg_tail  = msg_tmp;
+                delay_send_small_msg(msg, size, destNode, tag_data);
                 return 0;
             }
             else
@@ -444,8 +461,10 @@ static int send_with_smsg(int destNode, int size, char *msg)
             }else if(status == GNI_RC_SUCCESS)
             {
                 status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, tag_control);
+                CmiPrintf("[%d] send_with_smsg sends a control msg to PE %d status: %d\n", myrank, destNode, status);
                 if(status == GNI_RC_SUCCESS)
                 {
+                    send_pending ++;
                     FreeControlMsg(control_msg_tmp);
                     return 1;
                 }
@@ -460,19 +479,13 @@ static int send_with_smsg(int destNode, int size, char *msg)
             
             // Memory register fails or send fails 
             //CmiPrintf("[%d] control msg add to send queue\n", myrank);
-            MallocMsgList(msg_tmp);
-            msg_tmp->destNode   = destNode;
-            msg_tmp ->size      = sizeof(CONTROL_MSG);
-            msg_tmp->msg        = control_msg_tmp;
-            msg_tmp->tag        = tag_control;
-            msg_tmp->next       = 0;
             /* store into buffer smsg_list and send later */
-            buffered_smsg_head = msg_tmp;
-            buffered_smsg_tail = msg_tmp;
+            delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, tag_control);
             return 0;
         }
     }
 }
+
 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
 {
     //PRINT_INFO("Calling LrtsSend")
@@ -480,18 +493,40 @@ static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
     {
         send_with_smsg(destNode, size, msg); 
     }
+    else {
+        CmiAssert(0);
+    }
     return 0;
 }
 
 static void LrtsPreCommonInit(int everReturn){}
-static void LrtsPostCommonInit(int everReturn){}
+
+/* Idle-state related functions: called in non-smp mode */
+void CmiNotifyIdleForGemini(void) {
+    LrtsAdvanceCommunication();
+}
+
+static void LrtsPostCommonInit(int everReturn)
+{
+#if CMK_SMP
+    CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
+    CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
+#else
+    CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForGemini,NULL);
+#endif
+
+}
+
 
 void LrtsPostNonLocal(){}
 /* pooling CQ to receive network message */
 static void PumpNetworkMsgs()
 {
     void                *header;
-    uint8_t             msg_tag, data_tag, control_tag, ack_tag;
+    uint8_t             msg_tag;
+    const uint8_t       data_tag = DATA_TAG;
+    const uint8_t       control_tag = LMSG_INIT_TAG;
+    const uint8_t       ack_tag = ACK_TAG;
     gni_return_t        status;
     uint64_t            inst_id;
     gni_cq_entry_t      event_data;
@@ -502,10 +537,6 @@ static void PumpNetworkMsgs()
     RDMA_REQUEST        *rdma_request_msg;
     gni_post_descriptor_t *pd;
 
-    data_tag        = DATA_TAG;
-    control_tag     = LMSG_INIT_TAG;
-    ack_tag         = ACK_TAG;
-    
     while (1) {
     status = GNI_CqGetEvent(rx_cqh, &event_data);
     if(status == GNI_RC_SUCCESS)
@@ -519,25 +550,29 @@ static void PumpNetworkMsgs()
         GNI_RC_CHECK("CQ Get event", status);
     }
   
+    //printf("[%d]  PumpNetworkMsgs GNI_CQ_GET_TYPE %d from %d. \n", myrank, GNI_CQ_GET_TYPE(event_data), inst_id);
+
     msg_tag = GNI_SMSG_ANY_TAG;
     status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
 
-    //CmiPrintf("++++## PumpNetwork Small msg is received on PE:%d message tag=%c\n", myrank, msg_tag);
     if(status  == GNI_RC_SUCCESS)
     {
         /* copy msg out and then put into queue */
         if(msg_tag == data_tag)
         {
+            //CmiPrintf("[%d] PumpNetwork data msg is received\n", myrank);
             //memcpy(&msg_nbytes, header, sizeof(int));
-            msg_nbytes = *(int*)header;
+            //msg_nbytes = *(int*)header;
+            msg_nbytes = CmiGetMsgSize(header);
             msg_data    = CmiAlloc(msg_nbytes);
             //CmiPrintf("[%d] PumpNetworkMsgs: get datamsg, size: %d msg id:%d\n", myrank, msg_nbytes, GNI_CQ_GET_MSG_ID(event_data));
-            memcpy(msg_data, (char*)header+sizeof(int), msg_nbytes);
+            memcpy(msg_data, (char*)header, msg_nbytes);
             handleOneRecvedMsg(msg_nbytes, msg_data);
             GNI_SmsgRelease(ep_hndl_array[inst_id]);
         }
         else if(msg_tag == control_tag) 
         {
+            //CmiPrintf("[%d] PumpNetwork control msg is received\n", myrank);
             /* initial a get to transfer data from the sender side */
             request_msg = (CONTROL_MSG *) header;
             msg_data = CmiAlloc(request_msg->length);
@@ -605,6 +640,7 @@ static void PumpNetworkMsgs()
                 GNI_RC_CHECK("AFter posting", status);
         }
         else if(msg_tag == ack_tag) {
+            CmiPrintf("[%d] PumpNetwork tag msg is received\n", myrank);
             /* Get is done, release message . Now put is not used yet*/
             request_msg = (CONTROL_MSG *) header;
             //CmiPrintf("++++## ACK msg is received on PE:%d message size=%d, addr=%p\n", myrank, request_msg->length, (void*)request_msg->source_addr);
@@ -622,6 +658,9 @@ static void PumpNetworkMsgs()
             CmiAbort("Unknown tag\n");
         }
     }  //else not match smsg, maybe larger message
+    else {
+       // CmiPrintf("[%d] GNI_SmsgGetNextWTag returns %s\n", myrank, gni_err_str[status]);
+    }
     }   // end of while loop
 }
 
@@ -633,7 +672,6 @@ static void PumpLocalTransactions()
     uint64_t type, inst_id, data_addr;
     uint8_t         ack_tag = ACK_TAG;
     gni_post_descriptor_t *tmp_pd;
-    MSG_LIST *msg_tmp;
     //gni_post_descriptor_t   ack_pd;
     MSG_LIST  *ptr;
     CONTROL_MSG *ack_msg_tmp;
@@ -655,7 +693,13 @@ static void PumpLocalTransactions()
             GNI_RC_CHECK("CQ Get event", status);
         }
 
-        if(type == GNI_CQ_EVENT_TYPE_POST)
+        // printf("[%d]  PumpLocalTransactions GNI_CQ_GET_TYPE %d. \n", myrank, GNI_CQ_GET_TYPE(ev));
+
+        if (type == GNI_CQ_EVENT_TYPE_SMSG) {
+            send_pending --;
+            //CmiPrintf("[%d] PumpLocalTransactions smsg pending: %d\n", myrank, send_pending);
+        }
+        else if(type == GNI_CQ_EVENT_TYPE_POST)
         {
             status = GNI_GetCompleted(tx_cqh, ev, &tmp_pd);
             GNI_RC_CHECK("Local CQ completed ", status);
@@ -680,32 +724,19 @@ static void PumpLocalTransactions()
                 if(buffered_smsg_head!=0)
                 {
                     //CmiPrintf("[%d] PumpLocalTransactions: smsg buffered.\n", myrank);
-                    MallocMsgList(msg_tmp);
-                    msg_tmp->msg = ack_msg_tmp;
-                    msg_tmp ->size = sizeof(CONTROL_MSG);
-                    msg_tmp->tag = ack_tag;
-                    msg_tmp->destNode = inst_id;
-                    msg_tmp->next = 0;
-                    buffered_smsg_tail->next = msg_tmp;
-                    buffered_smsg_tail = msg_tmp;
+                    delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, ack_tag);
                 }else
                 {
                     //CmiPrintf("PE:%d sending ACK back addr=%p \n", myrank, ack_msg_tmp->source_addr); 
                     status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, ack_tag);
                     if(status == GNI_RC_SUCCESS)
                     {
+                        send_pending++;
                         FreeControlMsg(ack_msg_tmp);
                     }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
                     {
                         // CmiPrintf("[%d] PumpLocalTransactions: ack smsg buffered.\n", myrank);
-                        MallocMsgList(msg_tmp);
-                        msg_tmp->msg = ack_msg_tmp;
-                        msg_tmp ->size = sizeof(CONTROL_MSG);
-                        msg_tmp->tag = ack_tag;
-                        msg_tmp->next = 0;
-                        msg_tmp->destNode = inst_id;
-                        buffered_smsg_head = msg_tmp;
-                        buffered_smsg_tail = msg_tmp;
+                        delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, ack_tag);
                     }
                     else
                         GNI_RC_CHECK("GNI_SmsgSendWTag", status);
@@ -718,6 +749,7 @@ static void PumpLocalTransactions()
                 handleOneRecvedMsg(SIZEFIELD((void*)tmp_pd->local_addr), (void*)tmp_pd->local_addr); 
                 SendRdmaMsg(); 
             }
+            FreePostDesc(tmp_pd);
         }
     }   /* end of while loop */
 }
@@ -782,10 +814,14 @@ static int SendBufferMsg()
             ptr = buffered_smsg_head;
             if(ptr->tag == tag_data)
             {
-                status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], &(ptr->size), (uint32_t)sizeof(int), ptr->msg, ptr->size, 0, tag_data);
-                if(status == GNI_RC_SUCCESS)
+                status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], NULL, 0, ptr->msg, ptr->size, 0, tag_data);
+                //CmiPrintf("[%d] SendBufferMsg sends a data msg to PE %d status: %s\n", myrank, ptr->destNode, gni_err_str[status]);
+                if(status == GNI_RC_SUCCESS) {
+                    send_pending++;
                     CmiFree(ptr->msg);
-            }else if(ptr->tag ==tag_control)
+                }
+            }
+            else if(ptr->tag ==tag_control)
             {
                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
@@ -795,34 +831,40 @@ static int SendBufferMsg()
 #else
                     status = GNI_MemRegister(nic_hndl, (uint64_t)control_msg_tmp->source_addr, 
                         control_msg_tmp->length, rx_cqh,
-                        GNI_MEM_READ_ONLY | GNI_MEM_USE_GART,
+                        GNI_MEM_READ_ONLY | GNI_MEM_USE_GART | GNI_MEM_PI_FLUSH,
                         -1, &(control_msg_tmp->source_mem_hndl));
 #endif 
                     if(status != GNI_RC_SUCCESS)
                         break;
                 }
                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, tag_control);
-                if(status == GNI_RC_SUCCESS)
+                //CmiPrintf("[%d] SendBufferMsg sends a control msg to PE %d status: %d\n", myrank, ptr->destNode, status);
+                if(status == GNI_RC_SUCCESS) {
                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
+                    send_pending ++;
+                }
             }else if (ptr->tag == tag_ack)
             {
                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, tag_ack);
-                if(status == GNI_RC_SUCCESS)
+                //CmiPrintf("[%d] SendBufferMsg sends a tag msg to PE %d status: %d\n", myrank, ptr->destNode, status);
+                if(status == GNI_RC_SUCCESS) {
                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
+                    send_pending++;
+                }
             }
         } else if(useStaticMSGQ)
         {
-            CmiPrintf("MSGQ Send not done\n");
+            CmiAbort("MSGQ Send not done\n");
         }else
         {
-            CmiPrintf("FMA Send not done\n");
+            CmiAbort("FMA Send not done\n");
         }
         if(status == GNI_RC_SUCCESS)
         {
-            buffered_smsg_head= buffered_smsg_head->next;
+            buffered_smsg_head = buffered_smsg_head->next;
             FreeMsgList(ptr);
         }else
-            break;
+            return 0;
     }
     return 1;
 }
@@ -848,6 +890,8 @@ void remoteEventHandle(gni_cq_entry_t *event_data, void *context)
     uint8_t             tag_data;
     uint8_t             tag_control;
 
+    CmiAssert(0);
+
     tag_data = DATA_TAG;
     tag_control = LMSG_INIT_TAG;
     /* pool the CQ to check which smsg endpoint to get the data */