added a quiescence detection module to converse, message counting to convcore.c and
authorSameer Paranjpye <paranjpy@uiuc.edu>
Fri, 25 Feb 2000 02:22:09 +0000 (02:22 +0000)
committerSameer Paranjpye <paranjpy@uiuc.edu>
Fri, 25 Feb 2000 02:22:09 +0000 (02:22 +0000)
quiescence function declarations to converse.h

also prepared a callback interface for additional message processing at sends and
receives, though not integrated yet.

src/conv-core/convcore.c
src/conv-core/converse.h
src/conv-core/msgcallbacks.c [new file with mode: 0644]
src/conv-core/quiescence.c [new file with mode: 0644]
src/conv-core/quiescence.h [new file with mode: 0644]

index 4ce1ecfd7dbd0963181f8a2b98699de7306016f4..d7e8908896d5b1c001f6136a8ac4c5b20f21cd63 100644 (file)
@@ -19,6 +19,7 @@ extern void CmiMemoryInit(char **);
 extern void CldModuleInit(void);
 extern int  CqsPrioGT(prio, prio);
 extern prio CqsGetPriority(Queue);
+#define DEBUGF(x)  printf x
 #endif
 
 /*
@@ -114,8 +115,6 @@ CsvDeclare(CmiNodeLock, CsdNodeQueueLock);
 #endif
 CpvDeclare(int,   CsdStopFlag);
 
-
-
 /*****************************************************************************
  *
  * Some of the modules use this in their argument parsing.
@@ -797,7 +796,7 @@ int CsdScheduler(int maxmsgs)
   CpvExtern(int, freezeModeFlag);
 #endif
 
-  int *msg;
+  int *msg, csdMsgFlag = 0; /* To signal a message coming from the CsdNodeQueue */
   void *localqueue = CpvAccess(CmiLocalQueue);
   int cycle = CpvAccess(CsdStopFlag);
   
@@ -822,6 +821,7 @@ int CsdScheduler(int maxmsgs)
 
         if(msg != 0){
           if(strncmp((char *)((char *)msg+CmiMsgHeaderSizeBytes),"req",3)!=0) {
+            /*CQdCreate(CpvAccess(cQdState), 1);*/
             CsdEndIdle();
             FIFO_EnQueue(CpvAccess(debugQueue), msg);
             continue;
@@ -839,6 +839,7 @@ int CsdScheduler(int maxmsgs)
 #endif
       if (msg==0) FIFO_DeQueue(localqueue, (void **)&msg);
 #if CMK_NODE_QUEUE_AVAILABLE
+      csdMsgFlag = 0;
       if (msg==0) msg = CmiGetNonLocalNodeQ();
       if (msg==0 && !CqsEmpty(CsvAccess(CsdNodeQueue))
                  && !CqsPrioGT(CqsGetPriority(CsvAccess(CsdNodeQueue)), 
@@ -846,9 +847,11 @@ int CsdScheduler(int maxmsgs)
         CmiLock(CsvAccess(CsdNodeQueueLock));
         CqsDequeue(CsvAccess(CsdNodeQueue),&msg);
         CmiUnlock(CsvAccess(CsdNodeQueueLock));
+               csdMsgFlag = 1;
       }
 #endif
-      if (msg==0) CqsDequeue(CpvAccess(CsdSchedQueue),&msg);
+         if (msg && (!csdMsgFlag)) CQdProcess(CpvAccess(cQdState), 1);
+         if (msg==0) CqsDequeue(CpvAccess(CsdSchedQueue),&msg);
       if (msg) {
         CmiHandleMessage(msg);
         maxmsgs--;
@@ -873,6 +876,7 @@ int CsdScheduler(int maxmsgs)
 
       if(msg != 0){
        if(strncmp((char *)((char *)msg+CmiMsgHeaderSizeBytes),"req",3)!=0){
+          /*CQdCreate(CpvAccess(cQdState), 1);*/
          CsdEndIdle();
          FIFO_EnQueue(CpvAccess(debugQueue), msg);
          continue;
@@ -890,6 +894,7 @@ int CsdScheduler(int maxmsgs)
 #endif
     if (msg==0) FIFO_DeQueue(localqueue, (void**)&msg);
 #if CMK_NODE_QUEUE_AVAILABLE
+       csdMsgFlag = 0;
     if (msg==0) msg = CmiGetNonLocalNodeQ();
     if (msg==0 && !CqsEmpty(CsvAccess(CsdNodeQueue))
                && !CqsPrioGT(CqsGetPriority(CsvAccess(CsdNodeQueue)), 
@@ -897,9 +902,11 @@ int CsdScheduler(int maxmsgs)
       CmiLock(CsvAccess(CsdNodeQueueLock));
       CqsDequeue(CsvAccess(CsdNodeQueue),&msg);
       CmiUnlock(CsvAccess(CsdNodeQueueLock));
+         csdMsgFlag = 1;
     }
 #endif
-    if (msg==0) CqsDequeue(CpvAccess(CsdSchedQueue),&msg);
+    if (msg && (!csdMsgFlag)) CQdProcess(CpvAccess(cQdState), 1);
+       if (msg==0) CqsDequeue(CpvAccess(CsdSchedQueue),&msg);
     if (msg) {
       CsdEndIdle();
       CmiHandleMessage(msg);
@@ -931,6 +938,7 @@ int handler;
     else      FIFO_DeQueue(localqueue, (void**)&msg);
     if (msg) {
       if (CmiGetHandler(msg)==handler) {
+       CQdProcess(CpvAccess(cQdState), 1);
        CsdEndIdle();
        CmiHandleMessage(msg);
        return;
@@ -1669,6 +1677,7 @@ void ConverseCommonInit(char **argv)
   CmiGroupInit();
   CmiMulticastInit();
   CmiInitMultipleSend();
+  CQdInit();
 #if CMK_CCS_AVAILABLE
   CcsInit();
 #endif
index 994a3f4b446610a4e804be1b8cd883b12bfa8fa6..5b965e507c786980534b90a022c5702325c43d72 100644 (file)
@@ -1066,6 +1066,7 @@ typedef void (*CcdVoidFn)();
 #define CcdPROCESSORIDLE 1
 #define CcdSIGUSR1 2
 #define CcdSIGUSR2 3
+#define CcdQUIESCENCE 4
 
 void CcdCallFnAfter(CcdVoidFn fnp, void *arg, unsigned int msecs);
 void CcdPeriodicallyCall(CcdVoidFn fnp, void *arg);
@@ -1142,6 +1143,33 @@ void CWebPlateDataDeposit (int timestep, int cellx, int celly,
 CpvExtern(void*, CmiLocalQueue);
 #endif
 
+/*****************************************************************************
+ *
+ *    Converse Quiescence Detection
+ *
+ *****************************************************************************/
+
+struct ConvQdMsg;
+struct ConvQdState;
+typedef struct ConvQdMsg    *CQdMsg;
+typedef struct ConvQdState  *CQdState;
+typedef CcdVoidFn CQdVoidFn; 
+
+CpvExtern(CQdState, cQdState);
+
+void CQdInit(void);
+void CQdCreate(CQdState, int);
+void CQdProcess(CQdState, int);
+int  CQdGetCreated(CQdState);
+int  CQdGetProcessed(CQdState);
+void CQdRegisterCallback(CQdVoidFn, void *);
+void CmiStartQD(CQdVoidFn, void *);
+
+/*****************************************************************************
+ *
+ *    Converse Random Numbers
+ *
+ *****************************************************************************/
 
 typedef struct rngen_
 {
diff --git a/src/conv-core/msgcallbacks.c b/src/conv-core/msgcallbacks.c
new file mode 100644 (file)
index 0000000..841a17f
--- /dev/null
@@ -0,0 +1,103 @@
+/***********************************************************************************************************
+Callbacks for converse messages. Sameer Paranjpye 2/24/2000.
+This module defines an interface for registering callbacks for converse messages. 
+This is meant to be a machine level interface through which additional message processing capabilities can 
+be added to converse. The idea is that if a message needs additional processing when it is sent or received
+such as encryption/decryption, maintaining quiescence counters etc., then a callback is registered that provides 
+this processing capability. Its only current application is keeping quiescence counts.
+
+For now I'm assuming that callbacks are only registered in pairs, for every send callback there is a receice 
+callback. But this can be easily changed by making CMsgRegisterCallback non-static.
+
+CMsgCallbacksInit - Initializes the callback mechanism.
+
+CMsgRegisterCallbackPair - Registers a message callback pair, one at the send side one at the recv side.
+
+CMsgInvokeCallbacks - Invokes registered callbacks
+
+************************************************************************************************************/
+
+
+#include "converse.h"
+#define CALLBACKSETSIZE     5
+#define MAXCALLBACKSETS     5 
+#define SENDCALLBACK        0
+#define RECVCALLBACK        1
+
+typedef struct CMsgCallback {
+  CMsgProcFn fn;
+} CMSGCALLBACK;
+
+struct CMsgCallbackQ {
+       CMSGCALLBACK **cbQ;
+       int          size;
+};
+
+struct CMsgCallbackQ  cMsgSendCbQ;
+struct CMsgCallbackQ  cMsgRecvCbQ;   
+
+void CMsgCallbacksInit(void)
+{
+       cMsgSendCbQ.cbQ  = (CMSGCALLBACK **) malloc(MAXCALLBACKSETS* 
+                                                                                               sizeof(CMSGCALLBACK*));
+       cMsgSendCbQ.size = 0;
+       cMsgRecvCbQ.cbQ  = (CMSGCALLBACK **) malloc(MAXCALLBACKSETS* 
+                                                                                               sizeof(CMSGCALLBACK*));
+       cMsgRecvCbQ.size = 0;
+}
+
+static int CMsgRegisterCallback(CMsgProcFn fnp, int type)
+{
+       int setNum, setIdx;
+       struct CMsgCallbackQ* Q;
+       
+       if (type < 2)
+               Q = (type)? (&cMsgRecvCbQ) : (&cMsgSendCbQ);
+       else 
+       {
+               CmiPrintf("Unknown callback type, cannot register");
+               return -1;
+       }
+
+       setNum = Q->size/CALLBACKSETSIZE;
+       setIdx = Q->size%CALLBACKSETSIZE;
+       
+       if (setNum >= MAXCALLBACKSETS) {
+               CmiPrintf("Too many message callbacks, cannot register\n");
+               return -1;
+       }
+       
+       if (!setIdx)
+               Q->cbQ[setNum] = (CMSGCALLBACK *) malloc(CALLBACKSETSIZE*sizeof(CMSGCALLBACK));
+       
+       (Q->cbQ[setNum])[setIdx].fn = fnp;
+       Q->size++;
+
+       return 0;
+}
+
+int CMsgRegisterCallbackPair(CMsgProcFn sendFn, CMsgProcFn recvFn)
+{
+       if(CMsgRegisterCallback(sendFn, SENDCALLBACK) < 0) 
+               return -1;
+       if(CMsgRegisterCallback(recvFn, RECVCALLBACK) < 0) 
+               return -1;
+       return 0;
+}
+
+void CMsgInvokeCallbacks(int type, char *msg)
+{
+       int  i;
+       struct CMsgCallbackQ* Q;
+
+       if (type < 2)
+               Q = (type)? (&cMsgRecvCbQ) : (&cMsgSendCbQ);
+       else 
+       {
+               CmiPrintf("Unknown callback type, cannot invoke");
+               return;
+       }
+
+       for(i=0; i < Q->size; i++)
+               ((Q->cbQ[i/CALLBACKSETSIZE])[i%CALLBACKSETSIZE].fn)(msg);
+}
diff --git a/src/conv-core/quiescence.c b/src/conv-core/quiescence.c
new file mode 100644 (file)
index 0000000..b38bd91
--- /dev/null
@@ -0,0 +1,288 @@
+#include "converse.h"
+#include "quiescence.h"
+#include <assert.h>
+#include <stdio.h>
+#ifndef  DEBUGF
+#define  DEBUGF(x) printf x 
+#endif
+
+CpvDeclare(CQdState, cQdState);
+unsigned int CQdHandlerIdx;
+unsigned int CQdAnnounceHandlerIdx;
+
+
+int  CQdMsgGetPhase(CQdMsg msg) 
+{ return msg->phase; }
+
+void CQdMsgSetPhase(CQdMsg msg, int p) 
+{ msg->phase = p; }
+
+int  CQdMsgGetCreated(CQdMsg msg) 
+{ assert(msg->phase==1); return msg->u.p1.created; }
+
+void CQdMsgSetCreated(CQdMsg msg, int c) 
+{ assert(msg->phase==1); msg->u.p1.created = c; }
+
+int  CQdMsgGetProcessed(CQdMsg msg) 
+{ assert(msg->phase==1); return msg->u.p1.processed; }
+
+void CQdMsgSetProcessed(CQdMsg msg, int p) 
+{ assert(msg->phase==1); msg->u.p1.processed = p; }
+
+int  CQdMsgGetDirty(CQdMsg msg) 
+{ assert(msg->phase==2); return msg->u.p2.dirty; }
+
+void CQdMsgSetDirty(CQdMsg msg, int d) 
+{ assert(msg->phase==2); msg->u.p2.dirty = d; }
+
+
+int CQdGetCreated(CQdState state)
+{ return state->mCreated; }
+
+void CQdCreate(CQdState state, int n)
+{ state->mCreated += n; }
+
+int CQdGetProcessed(CQdState state)
+{ return state->mProcessed; }
+
+void CQdProcess(CQdState state, int n)
+{ state->mProcessed += n; }
+
+
+void CQdPropagate(CQdState state, CQdMsg msg) 
+{   
+       int i;
+       CmiSetHandler(msg, CQdHandlerIdx);
+    for(i=0; i<state->nChildren; i++) {
+               CQdCreate(state, -1);
+               CmiSyncSend(state->children[i], sizeof(struct ConvQdMsg), (char *)msg);
+    }
+}
+
+int  CQdGetParent(CQdState state) 
+{ return state->parent; }
+    
+int  CQdGetCCreated(CQdState state) 
+{ return state->cCreated; }
+
+int  CQdGetCProcessed(CQdState state) 
+{ return state->cProcessed; }
+
+void CQdSubtreeCreate(CQdState state, int c) 
+{ state->cCreated += c; }
+
+void CQdSubtreeProcess(CQdState state, int p) 
+{ state->cProcessed += p; }
+
+int  CQdGetStage(CQdState state) 
+{ return state->stage; }
+
+void CQdSetStage(CQdState state, int p) 
+{ state->stage = p; }
+
+void CQdReported(CQdState state) 
+{ state->nReported++; }
+
+int  CQdAllReported(CQdState state) 
+{ return state->nReported==(state->nChildren+1);}
+
+void CQdReset(CQdState state) 
+{ state->nReported=0; state->cCreated=0; state->cProcessed=0; state->cDirty=0; }
+
+void CQdMarkProcessed(CQdState state) 
+{ state->oProcessed = state->mProcessed; }
+
+int  CQdIsDirty(CQdState state) 
+{ return ((state->mProcessed > state->oProcessed) || state->cDirty); }
+
+void CQdSubtreeSetDirty(CQdState state, int d) 
+{ state->cDirty = state->cDirty || d; }
+
+CQdState CQdStateCreate(void)
+{
+       CQdState state = (CQdState) malloc(sizeof(struct ConvQdState));
+       _MEMCHECK(state);
+       state->mCreated = 0;
+       state->mProcessed = 0;
+       state->stage = 0;
+       state->nReported = 0;
+       state->oProcessed = 0;
+       state->cCreated = 0;
+       state->cProcessed = 0;
+       state->cDirty = 0;
+       state->nChildren = CmiNumSpanTreeChildren(CmiMyPe());
+       state->parent = CmiSpanTreeParent(CmiMyPe());
+       state->children = (int *) malloc(state->nChildren*sizeof(int));
+       _MEMCHECK(state->children);
+       CmiSpanTreeChildren(CmiMyPe(), state->children);
+
+       return state;
+}
+
+
+static void CQdBcastQD1(CQdState state, CQdMsg msg)
+{  
+       CQdMsgSetPhase(msg, 0); 
+       CQdPropagate(state, msg); 
+       CQdMsgSetPhase(msg, 1); 
+       CQdMsgSetCreated(msg, CQdGetCreated(state)); 
+       CQdMsgSetProcessed(msg, CQdGetProcessed(state)); 
+       CQdCreate(state, -1);
+       CmiSyncSendAndFree(CmiMyPe(), sizeof(struct ConvQdMsg), (char *) msg);
+       CQdMarkProcessed(state); 
+       CQdReset(state); 
+       CQdSetStage(state, 1); 
+}
+
+
+static void CQdBcastQD2(CQdState state, CQdMsg msg)
+{
+       CQdMsgSetPhase(msg, 1); 
+       CQdPropagate(state, msg); 
+       CQdMsgSetPhase(msg, 2); 
+       CQdMsgSetDirty(msg, CQdIsDirty(state)); 
+       CQdCreate(state, -1);
+       CmiSyncSendAndFree(CmiMyPe(), sizeof(struct ConvQdMsg), (char *) msg);
+       CQdReset(state); 
+       CQdSetStage(state, 2); 
+}
+
+
+static void CQdHandlePhase0(CQdState state, CQdMsg msg)
+{
+       assert(CmiMyPe()==0 || CQdGetStage(state)==0);
+       if(CQdGetStage(state)==0)
+               CQdBcastQD1(state, msg);
+       else
+               CmiFree(msg);
+}
+
+
+static void CQdHandlePhase1(CQdState state, CQdMsg msg)
+{
+       switch(CQdGetStage(state)) 
+       {               
+       case 0 :
+               assert(CmiMyPe()!=0);
+               CQdBcastQD2(state, msg);
+               break;
+       case 1 :
+               CQdSubtreeCreate(state, CQdMsgGetCreated(msg)); 
+               CQdSubtreeProcess(state, CQdMsgGetProcessed(msg)); 
+               CQdReported(state); 
+               
+               if(CQdAllReported(state)) 
+               {
+                       if(CmiMyPe()==0) 
+                       {
+                               if(CQdGetCCreated(state) == CQdGetCProcessed(state)) 
+                                       CQdBcastQD2(state, msg); 
+                               else 
+                                       CQdBcastQD1(state, msg);
+                       } 
+                       else 
+                       {
+                               CQdMsgSetCreated(msg, CQdGetCCreated(state)); 
+                               CQdMsgSetProcessed(msg, CQdGetCProcessed(state)); 
+                               CQdCreate(state, -1);
+                               CmiSyncSendAndFree(CQdGetParent(state), sizeof(struct ConvQdMsg), (char *) msg);
+                               DEBUGF(("PE = %d, My parent = %d\n", CmiMyPe(), CQdGetParent(state)));
+                               CQdReset(state); 
+                               CQdSetStage(state, 0); 
+                       }
+               } 
+               else
+                       CmiFree(msg);
+               break;
+       default: 
+               CmiAbort("Internal QD Error. Contact Developers.!\n");
+       }
+}
+
+
+static void CQdHandlePhase2(CQdState state, CQdMsg msg)
+{
+       assert(CQdGetStage(state)==2);
+       CQdSubtreeSetDirty(state, CQdMsgGetDirty(msg));    
+       CQdReported(state);
+       if(CQdAllReported(state)) 
+       { 
+               if(CmiMyPe()==0) 
+               {
+                       if(CQdIsDirty(state)) 
+                               CQdBcastQD1(state, msg);
+                       else 
+                       {
+                               CmiSetHandler(msg, CQdAnnounceHandlerIdx);
+                               CQdCreate(state, 0-CmiNumPes());
+                               CmiSyncBroadcastAllAndFree(sizeof(struct ConvQdMsg), (char *) msg);
+                               CQdReset(state); 
+                               CQdSetStage(state, 0); 
+               }
+               } 
+               else 
+               {
+                       CQdMsgSetDirty(msg, CQdIsDirty(state)); 
+                       CQdCreate(state, -1);
+                       CmiSyncSendAndFree(CQdGetParent(state), sizeof(struct ConvQdMsg), (char *) msg);
+                       CQdReset(state); 
+                       CQdSetStage(state, 0); 
+               }
+       } 
+       else
+               CmiFree(msg);
+}
+
+
+static void CQdCallWhenIdle(CQdMsg msg)
+{
+       CQdState state = CpvAccess(cQdState);
+  
+       switch(CQdMsgGetPhase(msg)) 
+       {
+    case 0 : CQdHandlePhase0(state, msg); break;
+    case 1 : CQdHandlePhase1(state, msg); break;
+    case 2 : CQdHandlePhase2(state, msg); break;
+    default: CmiAbort("Internal QD Error. Contact Developers.!\n");
+       }
+}
+
+
+void CQdHandler(CQdMsg msg)
+{
+       CmiGrabBuffer((void **)&msg);
+       CQdProcess(CpvAccess(cQdState), -1);
+       CcdCallOnCondition(CcdPROCESSORIDLE, (CcdVoidFn)CQdCallWhenIdle, (void*) msg);  
+}
+
+
+void CQdRegisterCallback(CQdVoidFn fn, void *arg)
+{
+       CcdCallOnCondition(CcdQUIESCENCE, fn, arg);
+}
+
+void CQdAnnounceHandler(CQdMsg msg)
+{
+       CQdProcess(CpvAccess(cQdState), -1);
+       CcdRaiseCondition(CcdQUIESCENCE);
+}
+
+
+void CQdInit(void)
+{
+       CpvInitialize(CQdState, cQdState);
+       CpvAccess(cQdState) = CQdStateCreate();
+       CQdHandlerIdx = CmiRegisterHandler((CmiHandler)CQdHandler);
+       CQdAnnounceHandlerIdx = CmiRegisterHandler((CmiHandler)CQdAnnounceHandler);
+}
+
+void CmiStartQD(CQdVoidFn fn, void *arg)
+{
+       register CQdMsg msg = (CQdMsg) CmiAlloc(sizeof(struct ConvQdMsg)); 
+       CQdRegisterCallback(fn, arg);
+       CQdMsgSetPhase(msg, 0);  
+       CmiSetHandler(msg, CQdHandlerIdx);
+       CQdCreate(CpvAccess(cQdState), -1);
+       CmiSyncSendAndFree(0, sizeof(struct ConvQdMsg), (char *)msg);
+}
diff --git a/src/conv-core/quiescence.h b/src/conv-core/quiescence.h
new file mode 100644 (file)
index 0000000..f7e11b7
--- /dev/null
@@ -0,0 +1,64 @@
+#ifndef _QUIESCENCE_H_
+#define _QUIESCENCE_H_
+
+struct ConvQdMsg 
+{  
+       char core[CmiMsgHeaderSizeBytes];
+       int phase; /* 0..2*/
+       union 
+       {
+               struct { int created; int processed; } p1;
+               struct { int dirty; } p2;
+       } u;
+};
+
+
+struct ConvQdState 
+{
+    int stage; /* 0..2*/
+    int oProcessed;
+    int mCreated, mProcessed;
+    int cCreated, cProcessed;
+    int cDirty;
+    int nReported;
+    int nChildren;
+    int parent;
+    int *children;
+};
+
+
+/* Declarations for CQdMsg related operations */
+int  CQdMsgGetPhase(CQdMsg); 
+void CQdMsgSetPhase(CQdMsg, int); 
+int  CQdMsgGetCreated(CQdMsg); 
+void CQdMsgSetCreated(CQdMsg, int); 
+int  CQdMsgGetProcessed(CQdMsg); 
+void CQdMsgSetProcessed(CQdMsg, int); 
+int  CQdMsgGetDirty(CQdMsg); 
+void CQdMsgSetDirty(CQdMsg, int); 
+
+/* Declarations for CQdState related operations */
+void CQdInit(void);
+int  CQdGetCreated(CQdState);
+void CQdCreate(CQdState, int);
+int  CQdGetProcessed(CQdState);
+void CQdProcess(CQdState, int);
+void CQdPropagate(CQdState, CQdMsg); 
+int  CQdGetParent(CQdState); 
+int  CQdGetCCreated(CQdState); 
+int  CQdGetCProcessed(CQdState); 
+void CQdSubtreeCreate(CQdState, int); 
+void CQdSubtreeProcess(CQdState, int); 
+int  CQdGetStage(CQdState); 
+void CQdSetStage(CQdState, int); 
+void CQdReported(CQdState); 
+int  CQdAllReported(CQdState); 
+void CQdReset(CQdState); 
+void CQdMarkProcessed(CQdState); 
+int  CQdIsDirty(CQdState); 
+void CQdSubtreeSetDirty(CQdState, int); 
+
+CQdState CQdStateCreate(void);
+void CQdHandler(CQdMsg);
+
+#endif