Refreshed the implementation for tracing comm thread based on the latest tracing...
authorChao Mei <chaomei2@illinois.edu>
Fri, 9 Mar 2012 03:05:54 +0000 (21:05 -0600)
committerChao Mei <chaomei2@illinois.edu>
Fri, 9 Mar 2012 03:05:54 +0000 (21:05 -0600)
src/arch/mpi/machine.c

index f7793ee7b0973d964956450e793b9838b2685466..eaa3022bc2e08ab5cb480d2a447525b4c776aa60 100644 (file)
@@ -85,7 +85,7 @@ static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
 #define CMK_TRACE_COMMOVERHEAD 0
 #endif
 
-#if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
+#if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
 CpvStaticDeclare(double, projTraceStart);
 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
@@ -93,6 +93,34 @@ CpvStaticDeclare(double, projTraceStart);
 #define  START_EVENT()
 #define  END_EVENT(x)
 #endif
+
+#if CMK_SMP_TRACE_COMMTHREAD
+#define START_TRACE_SENDCOMM(msg)  \
+                        int isTraceEligible = traceBeginCommOp(msg); \
+                        if(isTraceEligible) traceSendMsgComm(msg);
+#define END_TRACE_SENDCOMM(msg) if(isTraceEligible) traceEndCommOp(msg);
+#define START_TRACE_RECVCOMM(msg) CpvAccess(projTraceStart) = CmiWallTimer();
+#define END_TRACE_RECVCOMM(msg) \
+                        if(traceBeginCommOp(msg)){ \
+                            traceChangeLastTimestamp(CpvAccess(projTraceStart)); \
+                            traceSendMsgComm(msg); \
+                            traceEndCommOp(msg); \
+                        }
+#define CONDITIONAL_TRACE_USER_EVENT(x) \
+                        do{ \
+                            double etime = CmiWallTimer(); \
+                            if(etime - CpvAccess(projTraceStart) > 5*1e-6){ \
+                                traceUserBracketEvent(x, CpvAccess(projTraceStart), etime); \
+                            }\
+                        }while(0);
+#else
+#define START_TRACE_SENDCOMM(msg)
+#define END_TRACE_SENDCOMM(msg)
+#define START_TRACE_RECVCOMM(msg)
+#define END_TRACE_RECVCOMM(msg)
+#define CONDITIONAL_TRACE_USER_EVENT(x)
+#endif
+
 /* ###End of machine-layer-tracing related macros ### */
 
 /* ###Beginning of POST_RECV related macros ### */
@@ -274,9 +302,6 @@ typedef struct msg_list {
     char *msg;
     struct msg_list *next;
     int size, destpe, mode;
-#if CMK_SMP_TRACE_COMMTHREAD
-    int srcpe;
-#endif
     MPI_Request req;
 } SMSG_LIST;
 
@@ -388,10 +413,6 @@ static void EnqueueMsg(void *m, int size, int node, int mode) {
     msg_tmp->next = 0;
     msg_tmp->mode = mode;
 
-#if CMK_SMP_TRACE_COMMTHREAD
-    msg_tmp->srcpe = CmiMyPe();
-#endif
-
 #if MULTI_SENDQUEUE
     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
 #else
@@ -422,20 +443,21 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
     if (size>=MPI_POST_RECV_LOWERSIZE && size < MPI_POST_RECV_UPPERSIZE) {
 #if MPI_DYNAMIC_POST_RECV
         int sendTagOffset = (size-MPI_POST_RECV_LOWERSIZE)/MPI_POST_RECV_INC+1;
-        START_EVENT();
+        START_TRACE_SENDCOMM(msg);
         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG+sendTagOffset,MPI_COMM_WORLD,&(smsg->req)))
             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
+        END_TRACE_SENDCOMM(msg);
 #else
-        START_EVENT();
+        START_TRACE_SENDCOMM(msg);
         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
+        END_TRACE_SENDCOMM(msg);
 #endif
-        /*END_EVENT(40);*/
     } else {
-        START_EVENT();
-       if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
-        CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
-        /*END_EVENT(40);*/
+        START_TRACE_SENDCOMM(msg);
+           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
+            CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
+        END_TRACE_SENDCOMM(msg);
     }
 #else
 /* branch not using MPI_POST_RECV */
@@ -445,22 +467,12 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
 #else
        dstrank=node;
 #endif
-    START_EVENT();
+    START_TRACE_SENDCOMM(msg)
     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,MPI_COMM_WORLD,&(smsg->req)))
         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
-    /*END_EVENT(40);*/
+    END_TRACE_SENDCOMM(msg)
 #endif /* end of #if MPI_POST_RECV */
 
-#if CMK_SMP_TRACE_COMMTHREAD
-    TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
-#endif
-
-#if CMI_MPI_TRACE_MOREDETAILED 
-    char tmp[64];
-    sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
-    traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
-#endif
-
     MACHSTATE(3,"}MPI_Isend end");
     CpvAccess(MsgQueueLen)++;
     if (CpvAccess(sent_msgs)==0)
@@ -623,9 +635,7 @@ static int PumpMsgs(void) {
         if (recvCnt >= dynamicRecvCap) break;
 #endif
 
-#if CMI_SMP_TRACE_COMMTHREAD
-        START_EVENT();
-#endif
+        START_TRACE_RECVCOMM(NULL);
 
         /* First check posted recvs then do  probe unmatched outstanding messages */
 #if MPI_POST_RECV
@@ -654,6 +664,7 @@ static int PumpMsgs(void) {
         if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
 #endif
+        CONDITIONAL_TRACE_USER_EVENT(60); /* MPI_Test related user event */
         if (flg) {
             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
@@ -667,69 +678,65 @@ static int PumpMsgs(void) {
 
             CpvAccess(Cmi_posted_recv_total)++;
         } else {
+            START_EVENT();
             res = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flg, &sts);
             if (res != MPI_SUCCESS)
                 CmiAbort("MPI_Iprobe failed\n");
             if (!flg) break;
             
+            CONDITIONAL_TRACE_USER_EVENT(70); /* MPI_Iprobe related user event */
             recd = 1;
             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
             msg = (char *) CmiAlloc(nbytes);
 
 #if USE_ASYNC_RECV_FUNC
             if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
-#endif
-            START_EVENT();
-            if(doSyncRecv){            
+#endif            
+            if(doSyncRecv){
+                START_EVENT();
                 if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
-                    CmiAbort("PumpMsgs: MPI_Recv failed!\n");            
+                    CmiAbort("PumpMsgs: MPI_Recv failed!\n");                
             }
 #if USE_ASYNC_RECV_FUNC        
             else {
+                START_EVENT();
                 IRecvList one = irecvListEntryAllocate();
                 if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, MPI_COMM_WORLD, &(one->req));
                     CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
                 one->msg = msg;
                 one->size = nbytes;
                 one->next = NULL;
-                waitIrecvListTail->next = one;            
+                waitIrecvListTail->next = one;
+                CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
             }
 #endif
-            /*END_EVENT(30);*/
-
             CpvAccess(Cmi_unposted_recv_total)++;
         }
 #else
         /* Original version */
-#if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
-        double startT = CmiWallTimer();
-#endif
+        START_EVENT();
         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
         if (res != MPI_SUCCESS)
             CmiAbort("MPI_Iprobe failed\n");
 
         if (!flg) break;
-#if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
-        {
-            double endT = CmiWallTimer();
-            /* only trace the probe that last longer than 1ms */
-            if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
-        }
-#endif
+        CONDITIONAL_TRACE_USER_EVENT(70); /* MPI_Iprobe related user event */
+        
         recd = 1;
         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
         msg = (char *) CmiAlloc(nbytes);
 
 #if USE_ASYNC_RECV_FUNC
         if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
-#endif
-        START_EVENT();
-        if(doSyncRecv){            
+#endif        
+        if(doSyncRecv){
+            START_EVENT();
             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");            
         }
 #if USE_ASYNC_RECV_FUNC        
         else {
+            START_EVENT();
             IRecvList one = irecvListEntryAllocate();
             if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, MPI_COMM_WORLD, &(one->req)))
                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
@@ -739,9 +746,9 @@ static int PumpMsgs(void) {
             waitIrecvListTail->next = one;
             waitIrecvListTail = one;
             /*printf("PE[%d]: MPI_Irecv msg=%p, size=%d, entry=%p\n", CmiMyPe(), msg, nbytes, one);*/
+            CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
         }
-#endif        
-        /*END_EVENT(30);*/
+#endif
 
 #endif /*end of not MPI_POST_RECV */
 
@@ -756,20 +763,8 @@ static int PumpMsgs(void) {
         }
 #endif
 
-#if CMK_SMP_TRACE_COMMTHREAD
-        TRACE_COMM_RECV(CpvAccess(projTraceStart), msg);
-#if CMI_MPI_TRACE_MOREDETAILED
-        char tmp[32];
-        sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
-        traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
-#endif
-#elif CMK_TRACE_COMMOVERHEAD
-        char tmp[32];
-        sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
-        traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
-#endif
-
         if(doSyncRecv){
+            END_TRACE_RECVCOMM(msg);
             handleOneRecvedMsg(nbytes, msg);
         }
         
@@ -798,7 +793,7 @@ static int PumpMsgs(void) {
                                          MPI_COMM_WORLD,
                                          &((postedOne->postedRecvReqs)[completed_index])  ))
                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
-            END_EVENT(50);
+            CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
         }
 #else
         if (postedOne) {
@@ -807,7 +802,6 @@ static int PumpMsgs(void) {
 
             /* and repost the recv */
             START_EVENT();
-
             if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
                                          MPI_POST_RECV_SIZE,
                                          MPI_BYTE,
@@ -816,7 +810,7 @@ static int PumpMsgs(void) {
                                          MPI_COMM_WORLD,
                                          &((postedOne->postedRecvReqs)[completed_index])  ))
                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
-            END_EVENT(50);
+            CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
         }
 #endif /* not MPI_DYNAMIC_POST_RECV */
 #endif
@@ -856,22 +850,18 @@ static int PumpMsgs(void) {
     MPI_Status sts;
     while(waitIrecvListHead->next) {
         IRecvList irecvEnt = waitIrecvListHead->next;
-#if CMK_SMP_TRACE_COMMTHREAD
+
         START_EVENT();
-#endif        
-        
+                
         /*printf("PE[%d]: check irecv entry=%p\n", CmiMyPe(), irecvEnt);*/
         if(MPI_SUCCESS != MPI_Test(&(irecvEnt->req), &irecvDone, &sts))
             CmiAbort("PumpMsgs: MPI_Test failed!\n");
         if(!irecvDone) break; /* in-order recv */
 
-#if CMK_SMP_TRACE_COMMTHREAD
-        TRACE_COMM_RECV(CpvAccess(projTraceStart), irecvEnt->msg);
-#endif
-    
+        END_TRACE_RECVCOMM((irecvEnt->msg));
         /*printf("PE[%d]: irecv entry=%p finished with size=%d, msg=%p\n", CmiMyPe(), irecvEnt, irecvEnt->size, irecvEnt->msg);*/
         
-        handleOneRecvedMsg(irecvEnt->size, irecvEnt->msg);        
+        handleOneRecvedMsg(irecvEnt->size, irecvEnt->msg);
         waitIrecvListHead->next = irecvEnt->next;
         irecvListEntryFree(irecvEnt);
         recd = 1;        
@@ -915,24 +905,19 @@ static void PumpMsgsBlocking(void) {
     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
 #endif
 
-    START_EVENT();
-
+    START_TRACE_RECVCOMM(NULL);
     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
-        CmiAbort("PumpMsgs: PMP_Recv failed!\n");
-
-    /*END_EVENT(30);*/
+        CmiAbort("PumpMsgs: PMP_Recv failed!\n");    
 
     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
     msg = (char *) CmiAlloc(nbytes);
     memcpy(msg, buf, nbytes);
+    END_TRACE_RECVCOMM(msg);
 
-#if CMK_SMP_TRACE_COMMTHREAD
-    TRACE_COMM_RECV(CpvAccess(projTraceStart), msg);
-#if CMI_MPI_TRACE_MOREDETAILED
+#if CMK_SMP_TRACE_COMMTHREAD && CMI_MPI_TRACE_MOREDETAILED
     char tmp[32];
     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
-#endif
 #endif
 
     handleOneRecvedMsg(nbytes, msg);
@@ -1244,7 +1229,7 @@ static void registerMPITraceEvents() {
     traceRegisterUserEvent("MPI_Recv", 30);
     traceRegisterUserEvent("MPI_Isend", 40);
     traceRegisterUserEvent("MPI_Irecv", 50);
-    traceRegisterUserEvent("MPI_Test", 60);
+    traceRegisterUserEvent("MPI_Test[any]", 60);
     traceRegisterUserEvent("MPI_Iprobe", 70);
 #endif
 }
@@ -1944,17 +1929,20 @@ static void consumeAllMsgs()
                 /* Indicating this entry has been tested before */
                 if (ptr->postedRecvBufs[i] == NULL) continue;
 
+                START_TRACE_RECVCOMM(NULL);
                 if (MPI_SUCCESS != MPI_Test(ptr->postedRecvReqs+i, &done, &sts))
                     CmiAbort("consumeAllMsgs failed in MPI_Test!\n");
                 if (done) {
                     int nbytes;
-                    char *msg;
+                    char *msg;                    
+                    
                     if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
                         CmiAbort("consumeAllMsgs failed in MPI_Get_count!\n");
                     /* ready to handle this msg */
                     msg = (ptr->postedRecvBufs)[i];
                     (ptr->postedRecvBufs)[i] = NULL;
-
+                    
+                    END_TRACE_RECVCOMM(msg);
                     handleOneRecvedMsg(nbytes, msg);
                 } else {
                     if (MPI_SUCCESS != MPI_Cancel(ptr->postedRecvReqs+i))