Merge branch 'charm' into virtualDebug
authorFilippo Gioachin <gioachin@uiuc.edu>
Wed, 21 Apr 2010 22:50:19 +0000 (17:50 -0500)
committerFilippo Gioachin <gioachin@uiuc.edu>
Wed, 21 Apr 2010 22:50:19 +0000 (17:50 -0500)
Conflicts:
src/ck-core/ck.C
src/langs/bluegene/blue.C

15 files changed:
.cproject
src/arch/net/charmrun/charmrun.c
src/ck-core/debug-charm.C
src/ck-core/init.C
src/ck-core/middle-blue.h
src/ck-core/middle-conv.h
src/conv-ccs/ccs-builtins.C
src/conv-ccs/conv-ccs.c
src/conv-ccs/conv-ccs.h
src/conv-ccs/middle-ccs.C [new file with mode: 0644]
src/conv-core/debug-conv.c
src/langs/bluegene/blue.C
src/langs/bluegene/blue_impl.h
src/scripts/Make.depends
src/scripts/Makefile

index 3dbb439aa86e5245be9ce7b193d37c9392026893..6c86e4d8dfc0341a3c238b718812dbb01ee0ef53 100644 (file)
--- a/.cproject
+++ b/.cproject
@@ -65,7 +65,7 @@
 </toolChain>
 </folderInfo>
 <sourceEntries>
-<entry excluding="bin/|include/|lib/|lib_so/|tmp/|net-**" flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name=""/>
+<entry excluding="bin/|include/|lib/|lib_so/|tmp/|net-**|mpi-**" flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name=""/>
 </sourceEntries>
 </configuration>
 </storageModule>
index c906c2322dad2898c2ae1e06c6da3a65322c6b61..de6206f6cf190ef632479f7deb70d68aea20b0f8 100644 (file)
@@ -1402,13 +1402,17 @@ void req_ccs_connect(void)
 
   if (pe<=-nodetab_size || pe>=nodetab_size) {
     /*Treat out of bound values as errors. Helps detecting bugs*/
+    /* But when virtualized with Bigemulator, we can have more pes than nodetabs */
+    /* TODO: We should somehow check boundaries also for bigemulator... */
+#if ! CMK_BLUEGENE_CHARM
     if (pe==-nodetab_size) fprintf(stderr,"Invalid processor index in CCS request: are you trying to do a broadcast instead?");
     else fprintf(stderr,"Invalid processor index in CCS request.");
     CcsServer_sendReply(&h.hdr,0,0);
     free(reqData);
     return;
+#endif
   }
-  else if (pe == -1) {
+  if (pe == -1) {
     /*Treat -1 as broadcast and sent to 0 as root of the spanning tree*/
     pe = 0;
   }
@@ -1425,12 +1429,16 @@ void req_ccs_connect(void)
 #if LOOPBACK /*Immediately reply "there's nothing!" (for performance testing)*/
     CcsServer_sendReply(&h.hdr,0,0);
 #else
+    int destpe = pe;
+#if CMK_BLUEGENE_CHARM
+    destpe = destpe % nodetab_size;
+#endif
     /*Fill out the charmrun header & forward the CCS request*/
     ChMessageHeader_new("req_fw",sizeof(h.hdr)+reqBytes,&h.ch);  
 
     bufs[0]=&h; lens[0]=sizeof(h);
     bufs[1]=reqData; lens[1]=reqBytes;
-    skt_sendV(nodetab_ctrlfd(pe),2,bufs,lens);
+    skt_sendV(nodetab_ctrlfd(destpe),2,bufs,lens);
 
 #endif
   }
index ee31def9499813bb1e295327fc9de546c662f9c7..fe9e4ac07461d307ae624439163f14c702fdd64e 100644 (file)
@@ -362,7 +362,7 @@ void hostInfo(void *itemIter, pup_er pp, CpdListItemsRequest *req) {
 }
 
 /************ Message CPD Lists ****************/
-CpvCExtern(void *,debugQueue);
+CkpvExtern(void *,debugQueue);
 CpvCExtern(int, skipBreakpoint);
 
 // Interpret data in a message in a user-friendly way.
@@ -429,7 +429,7 @@ void CpdPupMessage(PUP::er &p, void *msg)
   p.synchronize(PUP::sync_end_object);
 }
 
-CpvStaticDeclare(void *, lastBreakPointMsg);
+CkpvStaticDeclare(void *, lastBreakPointMsg);
 
 //Cpd Lists for local and scheduler queues
 class CpdList_localQ : public CpdListAccessor {
@@ -438,20 +438,20 @@ public:
   CpdList_localQ() {}
   virtual const char * getPath(void) const {return "converse/localqueue";}
   virtual size_t getLength(void) const {
-    int x = CdsFifo_Length((CdsFifo)(CpvAccess(debugQueue)));
+    int x = CdsFifo_Length((CdsFifo)(CkpvAccess(debugQueue)));
     //CmiPrintf("*******Returning fifo length %d*********\n", x);
     //return CdsFifo_Length((CdsFifo)(CpvAccess(CmiLocalQueue)));
-    if (CpvAccess(lastBreakPointMsg) != NULL) x++;
+    if (CkpvAccess(lastBreakPointMsg) != NULL) x++;
     return x;
   }
   virtual void pup(PUP::er &p, CpdListItemsRequest &req) {
-    int length = CdsFifo_Length((CdsFifo)(CpvAccess(debugQueue)));
-    void ** messages = CdsFifo_Enumerate(CpvAccess(debugQueue));
+    int length = CdsFifo_Length((CdsFifo)(CkpvAccess(debugQueue)));
+    void ** messages = CdsFifo_Enumerate(CkpvAccess(debugQueue));
     int curObj=0;
 
-    if (CpvAccess(lastBreakPointMsg) != NULL) {
+    if (CkpvAccess(lastBreakPointMsg) != NULL) {
       beginItem(p, -1);
-      envelope *env=(envelope *)UsrToEnv(CpvAccess(lastBreakPointMsg));
+      envelope *env=(envelope *)UsrToEnv(CkpvAccess(lastBreakPointMsg));
       p.comment("name");
       char *type=(char*)"Breakpoint";
       p(type,strlen(type));
@@ -471,8 +471,12 @@ public:
         const char *type="Converse";
         p.comment("name");
         char name[128];
+#if ! CMK_BLUEGENE_CHARM
         if (CmiGetHandler(msg)==_charmHandlerIdx) {isCharm=1; type="Local Charm";}
         if (CmiGetXHandler(msg)==_charmHandlerIdx) {isCharm=1; type="Network Charm";}
+#else
+        isCharm=1; type="BG";
+#endif
         sprintf(name,"%s %d: %s (%d)","Message",curObj,type,CmiGetHandler(msg));
         p(name, strlen(name));
 
@@ -543,30 +547,32 @@ extern int  CpdIsFrozen(void);
 CpvStaticDeclare(int, _debugMsg);
 CpvStaticDeclare(int, _debugChare);
 
-CpvStaticDeclare(CpdBpFuncTable_t *, breakPointEntryTable);
+CkpvStaticDeclare(CpdBpFuncTable_t *, breakPointEntryTable);
 
 //CpvStaticDeclare(void *, lastBreakPointMsg);
-CpvStaticDeclare(void *, lastBreakPointObject);
-CpvStaticDeclare(int, lastBreakPointIndex);
+CkpvStaticDeclare(void *, lastBreakPointObject);
+CkpvStaticDeclare(int, lastBreakPointIndex);
 
 void CpdBreakPointInit()
 {
-  CpvInitialize(void *, lastBreakPointMsg);
-  CpvInitialize(void *, lastBreakPointObject);
-  CpvInitialize(int, lastBreakPointIndex);
+  CkpvInitialize(void *, lastBreakPointMsg);
+  CkpvInitialize(void *, lastBreakPointObject);
+  CkpvInitialize(int, lastBreakPointIndex);
   CpvInitialize(int, _debugMsg);
   CpvInitialize(int, _debugChare);
-  CpvInitialize(CpdBpFuncTable_t *, breakPointEntryTable);
-  CpvAccess(lastBreakPointMsg) = NULL;
-  CpvAccess(lastBreakPointObject) = NULL;
-  CpvAccess(lastBreakPointIndex) = 0;
+  CkpvInitialize(CpdBpFuncTable_t *, breakPointEntryTable);
+  CkpvAccess(lastBreakPointMsg) = NULL;
+  CkpvAccess(lastBreakPointObject) = NULL;
+  CkpvAccess(lastBreakPointIndex) = 0;
   CpvAccess(_debugMsg) = CkRegisterMsg("debug_msg",0,0,0,0);
   CpvAccess(_debugChare) = CkRegisterChare("debug_Chare",0,TypeChare);
   CkRegisterChareInCharm(CpvAccess(_debugChare));
-  CpvAccess(breakPointEntryTable) = new CpdBpFuncTable_t(10,0.5,CkHashFunction_int,CkHashCompare_int );
+  CkpvAccess(breakPointEntryTable) = new CpdBpFuncTable_t(10,0.5,CkHashFunction_int,CkHashCompare_int );
 }
 
-
+#if CMK_BLUEGENE_CHARM
+#include "blue_impl.h"
+#endif
 
 static void _call_freeze_on_break_point(void * msg, void * object)
 {
@@ -576,17 +582,21 @@ static void _call_freeze_on_break_point(void * msg, void * object)
 
   // If the counter "skipBreakpoint" is not zero we actually do not freeze and deliver the regular message
   if (CpvAccess(skipBreakpoint) > 0) {
-    EntryInfo * breakPointEntryInfo = CpvAccess(breakPointEntryTable)->get(CkMessageToEpIdx(msg));
+    EntryInfo * breakPointEntryInfo = CkpvAccess(breakPointEntryTable)->get(CkMessageToEpIdx(msg));
     CkAssert(breakPointEntryInfo != NULL);
     breakPointEntryInfo->call(msg, object);
     CpvAccess(skipBreakpoint) --;
   } else {
-      CpvAccess(lastBreakPointMsg) = msg;
-      CpvAccess(lastBreakPointObject) = object;
-      CpvAccess(lastBreakPointIndex) = CkMessageToEpIdx(msg);
-      EntryInfo * breakPointEntryInfo = CpvAccess(breakPointEntryTable)->get(CpvAccess(lastBreakPointIndex));
+      CkpvAccess(lastBreakPointMsg) = msg;
+      CkpvAccess(lastBreakPointObject) = object;
+      CkpvAccess(lastBreakPointIndex) = CkMessageToEpIdx(msg);
+      EntryInfo * breakPointEntryInfo = CkpvAccess(breakPointEntryTable)->get(CkpvAccess(lastBreakPointIndex));
       CpdNotify(CPD_BREAKPOINT,breakPointEntryInfo->name);
       CpdFreeze();
+#if CMK_BLUEGENE_CHARM
+      stopVTimer();
+      ((workThreadInfo*)cta(threadinfo))->scheduler(-1);
+#endif
   }
 }
 
@@ -594,20 +604,27 @@ static void _call_freeze_on_break_point(void * msg, void * object)
 extern "C"
 void CpdDeliverSingleMessage () {
   if (!CpdIsFrozen()) return; /* Do something only if we are in freeze mode */
-  if ( (CpvAccess(lastBreakPointMsg) != NULL) && (CpvAccess(lastBreakPointObject) != NULL) ) {
-    EntryInfo * breakPointEntryInfo = CpvAccess(breakPointEntryTable)->get(CpvAccess(lastBreakPointIndex));
+  if ( (CkpvAccess(lastBreakPointMsg) != NULL) && (CkpvAccess(lastBreakPointObject) != NULL) ) {
+    EntryInfo * breakPointEntryInfo = CkpvAccess(breakPointEntryTable)->get(CkpvAccess(lastBreakPointIndex));
     if (breakPointEntryInfo != NULL) {
-      breakPointEntryInfo->call(CpvAccess(lastBreakPointMsg), CpvAccess(lastBreakPointObject));
+      breakPointEntryInfo->call(CkpvAccess(lastBreakPointMsg), CkpvAccess(lastBreakPointObject));
     }
-    CpvAccess(lastBreakPointMsg) = NULL;
-    CpvAccess(lastBreakPointObject) = NULL;
+    CkpvAccess(lastBreakPointMsg) = NULL;
+    CkpvAccess(lastBreakPointObject) = NULL;
+#if CMK_BLUEGENE_CHARM
+    ((workThreadInfo*)cta(threadinfo))->stopScheduler();
+#endif
   }
   else {
     // we were not stopped at a breakpoint, then deliver the first message in the debug queue
-    if (!CdsFifo_Empty(CpvAccess(debugQueue))) {
+    if (!CdsFifo_Empty(CkpvAccess(debugQueue))) {
       CpvAccess(skipBreakpoint) = 1;
-      char *queuedMsg = (char *)CdsFifo_Dequeue(CpvAccess(debugQueue));
+      char *queuedMsg = (char *)CdsFifo_Dequeue(CkpvAccess(debugQueue));
+#if CMK_BLUEGENE_CHARM
+      BgProcessMessageDefault(cta(threadinfo), queuedMsg);
+#else
       CmiHandleMessage(queuedMsg);
+#endif
       CpvAccess(skipBreakpoint) = 0;
     }
   }
@@ -618,18 +635,21 @@ extern "C"
 void CpdContinueFromBreakPoint ()
 {
     CpdUnFreeze();
-    if ( (CpvAccess(lastBreakPointMsg) != NULL) && (CpvAccess(lastBreakPointObject) != NULL) )
+    if ( (CkpvAccess(lastBreakPointMsg) != NULL) && (CkpvAccess(lastBreakPointObject) != NULL) )
     {
-        EntryInfo * breakPointEntryInfo = CpvAccess(breakPointEntryTable)->get(CpvAccess(lastBreakPointIndex));
+        EntryInfo * breakPointEntryInfo = CkpvAccess(breakPointEntryTable)->get(CkpvAccess(lastBreakPointIndex));
         if (breakPointEntryInfo != NULL) {
-           breakPointEntryInfo->call(CpvAccess(lastBreakPointMsg), CpvAccess(lastBreakPointObject));
+           breakPointEntryInfo->call(CkpvAccess(lastBreakPointMsg), CkpvAccess(lastBreakPointObject));
+#if CMK_BLUEGENE_CHARM
+           ((workThreadInfo*)cta(threadinfo))->stopScheduler();
+#endif
         } else {
           // This means that the breakpoint got deleted in the meanwhile
           
         }
     }
-    CpvAccess(lastBreakPointMsg) = NULL;
-    CpvAccess(lastBreakPointObject) = NULL;
+    CkpvAccess(lastBreakPointMsg) = NULL;
+    CkpvAccess(lastBreakPointObject) = NULL;
 }
 
 //ccs handler to set a breakpoint with entry function name msg
@@ -638,7 +658,7 @@ void CpdSetBreakPoint (char *msg)
   char functionName[128];
   int tableSize, tableIdx = 0;
   int reply = 0;
-  sscanf(msg+CmiMsgHeaderSizeBytes, "%s", functionName);
+  sscanf(msg+CmiReservedHeaderSize, "%s", functionName);
   if (strlen(functionName) > 0)
   {
     tableSize = _entryTable.size();
@@ -650,11 +670,11 @@ void CpdSetBreakPoint (char *msg)
        //{
     tableIdx = atoi(functionName);
     if (tableIdx >= 0 && tableIdx < tableSize) {
-           EntryInfo * breakPointEntryInfo = (EntryInfo *)CpvAccess(breakPointEntryTable)->get(tableIdx);
+           EntryInfo * breakPointEntryInfo = (EntryInfo *)CkpvAccess(breakPointEntryTable)->get(tableIdx);
            delete breakPointEntryInfo;
            breakPointEntryInfo = new EntryInfo(_entryTable[tableIdx]->name, _entryTable[tableIdx]->call, _entryTable[tableIdx]->msgIdx, _entryTable[tableIdx]->chareIdx );
            //CmiPrintf("Breakpoint is set for function %s with an epIdx = %ld\n", _entryTable[tableIdx]->name, tableIdx);
-           CpvAccess(breakPointEntryTable)->put(tableIdx) = breakPointEntryInfo;
+           CkpvAccess(breakPointEntryTable)->put(tableIdx) = breakPointEntryInfo;
            _entryTable[tableIdx]->name = "debug_breakpoint_ep";
            _entryTable[tableIdx]->call = (CkCallFnPtr)_call_freeze_on_break_point;
            //_entryTable[tableIdx]->msgIdx = CpvAccess(_debugMsg);
@@ -685,18 +705,18 @@ void CpdRemoveBreakPoint (char *msg)
 {
   char functionName[128];
   int reply = 0;
-  sscanf(msg+CmiMsgHeaderSizeBytes, "%s", functionName);
+  sscanf(msg+CmiReservedHeaderSize, "%s", functionName);
   if (strlen(functionName) > 0) {
     int idx = atoi(functionName);
     if (idx >= 0 && idx < _entryTable.size()) {
       //CmiPrintf("[ERROR]Entrypoint was not found for function %s\n", functionName);
       //void *objPointer;
       //void *keyPointer;
-      //CkHashtableIterator *it = CpvAccess(breakPointEntryTable)->iterator();
+      //CkHashtableIterator *it = CkpvAccess(breakPointEntryTable)->iterator();
       //while(NULL!=(objPointer = it->next(&keyPointer)))
       //{
       //EntryInfo * breakPointEntryInfo = *(EntryInfo **)objPointer;
-      EntryInfo * breakPointEntryInfo = CpvAccess(breakPointEntryTable)->get(idx);
+      EntryInfo * breakPointEntryInfo = CkpvAccess(breakPointEntryTable)->get(idx);
       if (breakPointEntryInfo != NULL) {
         _entryTable[idx]->name =  breakPointEntryInfo->name;
         _entryTable[idx]->call = (CkCallFnPtr)breakPointEntryInfo->call;
@@ -705,7 +725,7 @@ void CpdRemoveBreakPoint (char *msg)
         reply = ~0 ;
         //_debugEntryTable[idx].isBreakpoint = CmiFalse;
         //CmiPrintf("Breakpoint is removed for function %s with epIdx %ld\n", _entryTable[idx]->name, idx);
-        //CpvAccess(breakPointEntryTable)->remove(idx);
+        //CkpvAccess(breakPointEntryTable)->remove(idx);
       }
     }
   }
@@ -718,7 +738,7 @@ void CpdRemoveAllBreakPoints ()
   void *objPointer;
   void *keyPointer;
   int reply = 1;
-  CkHashtableIterator *it = CpvAccess(breakPointEntryTable)->iterator();
+  CkHashtableIterator *it = CkpvAccess(breakPointEntryTable)->iterator();
   while(NULL!=(objPointer = it->next(&keyPointer)))
   {
     EntryInfo * breakPointEntryInfo = *(EntryInfo **)objPointer;
@@ -738,7 +758,17 @@ extern "C" int CpdIsCharmDebugMessage(void *msg) {
          env->getMsgtype() == FillVidMsg || _entryTable[env->getEpIdx()]->inCharm;
 }
 
-
+#if CMK_BLUEGENE_CHARM
+CpvExtern(int, _bgCcsHandlerIdx);
+extern "C" int CpdIsBgCharmDebugMessage(void *msg) {
+  envelope *env = (envelope*)msg;
+  if (CmiBgMsgFlag(msg) == BG_CLONE) {
+    env=*(envelope**)(((char*)msg)+CmiBlueGeneMsgHeaderSizeBytes);
+  }
+  return (((CmiBlueGeneMsgHeader*)msg)->hID) == CpvAccess(_bgCcsHandlerIdx) || env->getMsgtype() == ForVidMsg ||
+         env->getMsgtype() == FillVidMsg || _entryTable[env->getEpIdx()]->inCharm;
+}
+#endif
 
 CpvExtern(char *, displayArgument);
 
@@ -802,7 +832,7 @@ void CpdCharmInit()
   //CpdListRegister(new CpdListAccessor_c("converse/memory/leak",cpd_memory_length,0,cpd_memory_leak,0));
   CpdListRegister(new CpdListAccessor_c("converse/memory/data",cpd_memory_getLength,0,cpd_memory_get,0,false));
 
-  CpdBreakPointInit();
+  //CpdBreakPointInit();
   CcsRegisterHandler("ccs_set_break_point",(CmiHandler)CpdSetBreakPoint);
   CcsSetMergeFn("ccs_set_break_point",CcsMerge_logical_and);
   CcsRegisterHandler("ccs_remove_break_point",(CmiHandler)CpdRemoveBreakPoint);
@@ -822,6 +852,9 @@ void CpdCharmInit()
   CpdListRegister(new CpdList_message());
   CpdListRegister(new CpdList_msgStack());
   CpdIsDebugMessage = CpdIsCharmDebugMessage;
+#if CMK_BLUEGENE_CHARM
+  CpdIsDebugMessage = CpdIsBgCharmDebugMessage;
+#endif
 }
 
 #else
index 382081598602c1f445782d1faef2e689d79a099b..bd1a039254f684d3d51f97218da76f9c0217f17a 100644 (file)
@@ -853,6 +853,17 @@ extern "C" void initQd(char **argv)
         }
 }
 
+#if CMK_BLUEGENE_CHARM && CMK_CCS_AVAILABLE
+CpvExtern(int, _bgCcsHandlerIdx);
+CpvExtern(int, _bgCcsAck);
+extern "C" void req_fw_handler(char*);
+CkpvExtern(void *, debugQueue);
+CkpvExtern(int, freezeModeFlag);
+#include "blue_impl.h"
+extern void BgProcessMessageFreezeMode(threadInfo *, char *);
+#endif
+void CpdBreakPointInit();
+
 /**
   This is the main charm setup routine.  It's called
   on all processors after Converse initialization.
@@ -1047,6 +1058,10 @@ void _initCharm(int unused_argc, char **argv)
                CkRegisterMainModule();
                _registerDone();
        }
+       /* The following will happen on every virtual processor in BigEmulator, not just on once per real processor */
+       if (CkMyRank() == 0) {
+         CpdBreakPointInit();
+       }
        CmiNodeAllBarrier();
 
     // Execute the initcalls registered in modules
@@ -1133,6 +1148,25 @@ void _initCharm(int unused_argc, char **argv)
         CmiInitCPUTopology(argv);
     }
 
+#if CMK_BLUEGENE_CHARM
+        // Register the BG handler for CCS. Notice that this is put into a variable shared by
+        // the whole real processor. This because converse needs to find it. We check that all
+        // virtual processors register the same index for this handler.
+        int bgCcsHandlerIdx = CkRegisterHandler((CmiHandler)req_fw_handler);
+        if (CpvAccess(_bgCcsHandlerIdx) == 0) CpvAccess(_bgCcsHandlerIdx) = bgCcsHandlerIdx;
+        CkAssert(CpvAccess(_bgCcsHandlerIdx)==bgCcsHandlerIdx);
+        CpvAccess(_bgCcsAck) ++;
+        CcsReleaseMessages();
+        
+        CkpvInitialize(int, freezeModeFlag);
+        CkpvAccess(freezeModeFlag) = 0;
+
+        CkpvInitialize(void *, debugQueue);
+        CkpvAccess(debugQueue) = CdsFifo_Create();
+        
+        BgProcessMessage = BgProcessMessageFreezeMode;
+#endif
+
        if (faultFunc) {
                if (CkMyPe()==0) _allStats = new Stats*[CkNumPes()];
                if (!inCommThread) {
@@ -1219,6 +1253,7 @@ void _initCharm(int unused_argc, char **argv)
         }
 
 #if CMK_CCS_AVAILABLE
+        // Should not use CpdFreeze inside a thread (since this processor is really a user-level thread)
        if (CpvAccess(cpdSuspendStartup))
        { 
           //CmiPrintf("In Parallel Debugging mode .....\n");
index b353a35ad48e4a9185aaf4d458084b54e5dc4e7e..120626d2cb192dbc7074cf984faee64a398ce1ba 100644 (file)
@@ -154,6 +154,8 @@ static inline void CksdScheduler(int ret) { BgScheduler(ret); }
 static inline void CksdExitScheduler() { BgExitScheduler(); }
 static inline void CkDeliverMsgs(int nmsg)     { BgDeliverMsgs(nmsg); }
 
+void CkReduce(void *msg, int size, CmiReduceMergeFn mergeFn);
+
 }  /* end of namespace */
 
 #endif
index 8a47bbb09de3009842fe47a1b83fc2965e65ed02..423cd71dc31a81d8ba667252f52e436d684146a1 100644 (file)
@@ -17,6 +17,8 @@
 #define CksvInitialize            CsvInitialize
 #define CksvAccess        CsvAccess
 
+#define CkReduce    CmiReduce
+
 #undef CkMyPe
 #undef CkNumPes
 
index c5cf82981bb9fb7293f4741da29ba2571623c8c1..c66262cc9c0d5952713dea5b0a5e6bcada74d495 100644 (file)
 
 #if CMK_CCS_AVAILABLE
 
-/**********************************************
-  "ccs_getinfo"-- takes no data
-    Return the number of parallel nodes, and
-      the number of processors per node as an array
-      of 4-byte big-endian ints.
-*/
-
-static void ccs_getinfo(char *msg)
-{
-  int nNode=CmiNumNodes();
-  int len=(1+nNode)*sizeof(ChMessageInt_t);
-  ChMessageInt_t *table=(ChMessageInt_t *)malloc(len);
-  int n;
-  table[0]=ChMessageInt_new(nNode);
-  for (n=0;n<nNode;n++)
-    table[1+n]=ChMessageInt_new(CmiNodeSize(n));
-  CcsSendReply(len,(const char *)table);
-  free(table);
-  CmiFree(msg);
-}
+void ccs_getinfo(char *msg);
 
 /**********************************************
   "ccs_killport"-- takes one 4-byte big-endian port number
@@ -61,7 +42,7 @@ static killPortStruct *killList=NULL;
 static void ccs_killport(char *msg)
 {
   killPortStruct *oldList=killList;
-  int port=ChMessageInt(*(ChMessageInt_t *)(msg+CmiMsgHeaderSizeBytes));
+  int port=ChMessageInt(*(ChMessageInt_t *)(msg+CmiReservedHeaderSize));
   skt_ip_t ip;
   unsigned int connPort;
   CcsCallerId(&ip,&connPort);
@@ -175,7 +156,7 @@ static CpdListAccessor *CpdListLookup(const ChMessageInt_t *lenAndPath)
 //Get the length of the given list:
 static void CpdList_ccs_list_len(char *msg)
 {
-  const ChMessageInt_t *req=(const ChMessageInt_t *)(msg+CmiMsgHeaderSizeBytes);
+  const ChMessageInt_t *req=(const ChMessageInt_t *)(msg+CmiReservedHeaderSize);
   CpdListAccessor *acc=CpdListLookup(req);
   if (acc!=NULL) {
     ChMessageInt_t reply=ChMessageInt_new(acc->getLength());
@@ -194,9 +175,9 @@ static void CpdList_ccs_list_len(char *msg)
 static CpdListAccessor *CpdListHeader_ccs_list_items(char *msg,
             CpdListItemsRequest &h)
 {
-  int msgLen=CmiSize((void *)msg)-CmiMsgHeaderSizeBytes;
+  int msgLen=CmiSize((void *)msg)-CmiReservedHeaderSize;
   CpdListAccessor *ret=NULL;
-  const ChMessageInt_t *req=(const ChMessageInt_t *)(msg+CmiMsgHeaderSizeBytes);
+  const ChMessageInt_t *req=(const ChMessageInt_t *)(msg+CmiReservedHeaderSize);
   h.lo=ChMessageInt(req[0]); // first item to send
   h.hi=ChMessageInt(req[1]); // last item to send+1
   h.extraLen=ChMessageInt(req[2]); // extra data length
@@ -269,6 +250,10 @@ void CpdMachineArchitecture(char *msg) {
   char firstByte = *((char*)&value);
   if (firstByte == 1) reply[1] = 1;
   else reply[1] = 2;
+  // add the third bit if we are in bigsim
+#if CMK_BLUEGENE_CHARM
+  reply[1] |= 4;
+#endif
   // get the size of an "int"
   reply[2] = sizeof(int);
   // get the size of an "long"
@@ -454,7 +439,7 @@ CCS Client->CWebHandler->...  (processor 0)
 #define MAXFNS 20 /*Largest number of performance functions to expect*/
 
 typedef struct {
-       char hdr[CmiMsgHeaderSizeBytes];
+       char hdr[CmiReservedHeaderSize];
        int fromPE;/*Source processor*/
        int perfData[MAXFNS];/*Performance numbers*/
 } CWeb_CollectedData;
@@ -582,9 +567,9 @@ static void CWebHandler(void){
       
       /*Start collecting data on each processor*/
       for(i = 0; i < CmiNumPes(); i++){
-        char *msg = (char *)CmiAlloc(CmiMsgHeaderSizeBytes);
+        char *msg = (char *)CmiAlloc(CmiReservedHeaderSize);
         CmiSetHandler(msg, CWeb_CollectIndex);
-        CmiSyncSendAndFree(i, CmiMsgHeaderSizeBytes,msg);
+        CmiSyncSendAndFree(i, CmiReservedHeaderSize,msg);
       }
     }
   }
index 80a22c5f26fd8564ac1e03e3f45af1e607227812..342e5fb6c5ff2d6e010a8f20f65a3b96b72a35a1 100644 (file)
@@ -55,8 +55,8 @@ static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
            Pack user data into a converse message (cripes! why bother?);
            user will delete the message. 
          */
-               char *cmsg = (char *) CmiAlloc(CmiMsgHeaderSizeBytes+reqLen);
-               memcpy(cmsg+CmiMsgHeaderSizeBytes, reqData, reqLen);
+               char *cmsg = (char *) CmiAlloc(CmiReservedHeaderSize+reqLen);
+               memcpy(cmsg+CmiReservedHeaderSize, reqData, reqLen);
                (c->fnOld)(cmsg);
        }
        else { /* Pass read-only copy of data straight to user */
@@ -99,17 +99,17 @@ void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
   char *ptr;
   int i;
   for (i=0; i<n; ++i) {
-    hdr = (CcsImplHeader*)(((char*)remote[i])+CmiMsgHeaderSizeBytes);
+    hdr = (CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize);
     total += ChMessageInt(hdr->len);
   }
   reply = CmiAlloc(total);
   memcpy(reply, local, *size);
-  ((CcsImplHeader*)(((char*)reply)+CmiMsgHeaderSizeBytes))->len = ChMessageInt_new(total-CmiMsgHeaderSizeBytes-sizeof(CcsImplHeader));
+  ((CcsImplHeader*)(((char*)reply)+CmiReservedHeaderSize))->len = ChMessageInt_new(total-CmiReservedHeaderSize-sizeof(CcsImplHeader));
   CmiFree(local);
   ptr = ((char*)reply)+*size;
   for (i=0; i<n; ++i) {
-    int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiMsgHeaderSizeBytes))->len);
-    memcpy(ptr, ((char*)remote[i])+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader), len);
+    int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize))->len);
+    memcpy(ptr, ((char*)remote[i])+CmiReservedHeaderSize+sizeof(CcsImplHeader), len);
     ptr += len;
   }
   *size = total;
@@ -119,7 +119,7 @@ void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
 #define SIMPLE_REDUCTION(name, dataType, loop) \
 void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
   int i, m; \
-  CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiMsgHeaderSizeBytes); \
+  CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiReservedHeaderSize); \
   int lenLocal = ChMessageInt(hdrLocal->len); \
   int nElem = lenLocal / sizeof(dataType); \
   dataType *ret = (dataType *) (hdrLocal+1); \
@@ -127,7 +127,7 @@ void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
   for (m=0; m<n; ++m) { \
     int len; \
     dataType *value; \
-    hdr = (CcsImplHeader*)(((char*)remote[m])+CmiMsgHeaderSizeBytes); \
+    hdr = (CcsImplHeader*)(((char*)remote[m])+CmiReservedHeaderSize); \
     len = ChMessageInt(hdr->len); \
     value = (dataType *)(hdr+1); \
     CmiAssert(lenLocal == len); \
@@ -182,9 +182,9 @@ int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
   if (repPE <= -1) {
     /* Reduce the message to get the final reply */
     CcsHandlerRec *fn;
-    int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+repLen;
+    int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+repLen;
     char *msg=CmiAlloc(len);
-    char *r=msg+CmiMsgHeaderSizeBytes;
+    char *r=msg+CmiReservedHeaderSize;
     char *handlerStr;
     rep->len = ChMessageInt_new(repLen);
     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
@@ -195,7 +195,7 @@ int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
     if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
     if (repPE == -1) {
       /* CCS Broadcast */
-      CmiReduce(msg, len, fn->mergeFn);
+      CkReduce(msg, len, fn->mergeFn);
     } else {
       /* CCS Multicast */
       CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
@@ -261,7 +261,7 @@ delivery.
   Deliver the given message data to the given
 CCS handler.
 */
-static void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
+void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
 {
   char *cmsg;
   int reqLen=ChMessageInt(hdr->len);
@@ -286,42 +286,6 @@ static void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
     CcsSendReply(0,NULL);/*Send an empty reply if not*/
 }
 
-/*Unpacks request message to call above routine*/
-int _ccsHandlerIdx = 0;/*Converse handler index of below routine*/
-static void req_fw_handler(char *msg)
-{
-  int offset = CmiMsgHeaderSizeBytes + sizeof(CcsImplHeader);
-  CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiMsgHeaderSizeBytes);
-  int destPE = (int)ChMessageInt(hdr->pe);
-  if (CmiMyPe() == 0 && destPE == -1) {
-    /* Broadcast message to all other processors */
-    int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+ChMessageInt(hdr->len);
-    CmiSyncBroadcast(len, msg);
-  }
-  else if (destPE < -1) {
-    /* Multicast the message to your children */
-    int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+ChMessageInt(hdr->len)-destPE*sizeof(ChMessageInt_t);
-    int index, child, i;
-    int *pes = (int*)(msg+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader));
-    ChMessageInt_t *pes_nbo = (ChMessageInt_t *)pes;
-    offset -= destPE * sizeof(ChMessageInt_t);
-    if (ChMessageInt(pes_nbo[0]) == CmiMyPe()) {
-      for (index=0; index<-destPE; ++index) pes[index] = ChMessageInt(pes_nbo[index]);
-    }
-    for (index=0; index<-destPE; ++index) {
-      if (pes[index] == CmiMyPe()) break;
-    }
-    child = (index << 2) + 1;
-    for (i=0; i<4; ++i) {
-      if (child+i < -destPE) {
-        CmiSyncSend(pes[child+i], len, msg);
-      }
-    }
-  }
-  CcsHandleRequest(hdr, msg+offset);
-  CmiFree(msg);
-}
-
 #if ! NODE_0_IS_CONVHOST
 /* The followings are necessary to prevent CCS requests to be processed before
  * CCS has been initialized. Really it matters only when NODE_0_IS_CONVHOST=0, but
@@ -331,6 +295,71 @@ static int CcsNumBufferedMsgs = 0;
 #define CCS_MAX_NUM_BUFFERED_MSGS  100
 #endif
 
+void CcsBufferMessage(char *msg) {
+  //CmiPrintf("Buffering CCS message\n");
+  CmiAssert(CcsNumBufferedMsgs < CCS_MAX_NUM_BUFFERED_MSGS);
+  if (CcsNumBufferedMsgs < 0) CmiAbort("Why is a CCS message being buffered now???");
+  if (bufferedMessages == NULL) bufferedMessages = malloc(sizeof(char*)*CCS_MAX_NUM_BUFFERED_MSGS);
+  bufferedMessages[CcsNumBufferedMsgs] = msg;
+  CcsNumBufferedMsgs ++;
+}
+  
+/*Unpacks request message to call above routine*/
+int _ccsHandlerIdx = 0;/*Converse handler index of routine req_fw_handler*/
+
+#if CMK_BLUEGENE_CHARM
+CpvDeclare(int, _bgCcsHandlerIdx);
+CpvDeclare(int, _bgCcsAck);
+/* This routine is needed when the application is built on top of the bigemulator
+ * layer of Charm. In this case, the real CCS handler must be called within a
+ * worker thread. The function of this function is to receive the CCS message in
+ * the bottom converse layer and forward it to the emulated layer. */
+static void bg_req_fw_handler(char *msg) {
+  if (CpvAccess(_bgCcsAck) < BgNodeSize()) {
+    CcsBufferMessage(msg);
+    return;
+  }
+  /* Get out of the message who is the destination pe */
+  int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
+  CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
+  int destPE = (int)ChMessageInt(hdr->pe);
+  if (destPE == -1) destPE = 0;
+  if (destPE < -1) {
+    ChMessageInt_t *pes_nbo = (ChMessageInt_t *)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
+    destPE = ChMessageInt(pes_nbo[0]);
+  }
+  //CmiAssert(destPE >= 0); // FixME: should cover also broadcast and multicast -> create generic function to extract destpe
+  (((CmiBlueGeneMsgHeader*)msg)->tID) = 0;
+  (((CmiBlueGeneMsgHeader*)msg)->n) = 0;
+  (((CmiBlueGeneMsgHeader*)msg)->flag) = 0;
+  (((CmiBlueGeneMsgHeader*)msg)->t) = 0;
+  (((CmiBlueGeneMsgHeader*)msg)->hID) = CpvAccess(_bgCcsHandlerIdx);
+  /* Get the right thread to deliver to (for now assume it is using CyclicMapInfo) */
+  addBgNodeInbuffer_c(msg, destPE/CmiNumPes());
+  //CmiPrintf("message CCS added %d to %d\n",((CmiBlueGeneMsgHeader*)msg)->hID, ((CmiBlueGeneMsgHeader*)msg)->tID);
+}
+#define req_fw_handler bg_req_fw_handler
+#endif
+extern void req_fw_handler(char *msg);
+
+void CcsReleaseMessages() {
+#if ! NODE_0_IS_CONVHOST
+#if CMK_BLUEGENE_CHARM
+  if (CpvAccess(_bgCcsAck) == 0 || CpvAccess(_bgCcsAck) < BgNodeSize()) return;
+#endif
+  if (CcsNumBufferedMsgs > 0) {
+    int i;
+    for (i=0; i<CcsNumBufferedMsgs; ++i) {
+      CmiSetHandler(bufferedMessages[i], _ccsHandlerIdx);
+      CmiPushPE(0, bufferedMessages[i]);
+    }
+    free(bufferedMessages);
+    bufferedMessages = NULL;
+    CcsNumBufferedMsgs = -1;
+  }
+#endif
+}
+
 /*Convert CCS header & message data into a converse message 
  addressed to handler*/
 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
@@ -340,10 +369,10 @@ char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_le
   int len;
   char *msg;
   if (destPE < -1) reqLen -= destPE*sizeof(int);
-  len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+reqLen;
+  len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+reqLen;
   msg=(char *)CmiAlloc(len);
-  memcpy(msg+CmiMsgHeaderSizeBytes,hdr,sizeof(CcsImplHeader));
-  memcpy(msg+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader),data,reqLen);
+  memcpy(msg+CmiReservedHeaderSize,hdr,sizeof(CcsImplHeader));
+  memcpy(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader),data,reqLen);
   if (ret_len!=NULL) *ret_len=len;
   if (_ccsHandlerIdx != 0) {
     CmiSetHandler(msg, _ccsHandlerIdx);
@@ -352,12 +381,7 @@ char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_le
 #if NODE_0_IS_CONVHOST
     CmiAbort("Why do we need to buffer messages when node 0 is Convhost?");
 #else
-    //CmiPrintf("Buffering CCS message\n");
-    CmiAssert(CcsNumBufferedMsgs < CCS_MAX_NUM_BUFFERED_MSGS);
-    if (CcsNumBufferedMsgs < 0) CmiAbort("Why is a CCS message being buffered now???");
-    if (bufferedMessages == NULL) bufferedMessages = malloc(sizeof(char*)*CCS_MAX_NUM_BUFFERED_MSGS);
-    bufferedMessages[CcsNumBufferedMsgs] = msg;
-    CcsNumBufferedMsgs ++;
+    CcsBufferMessage(msg);
     return NULL;
 #endif
   }
@@ -368,7 +392,7 @@ converse to node 0.*/
 static void rep_fw_handler(char *msg)
 {
   int len;
-  char *r=msg+CmiMsgHeaderSizeBytes;
+  char *r=msg+CmiReservedHeaderSize;
   CcsImplHeader *hdr=(CcsImplHeader *)r; 
   r+=sizeof(CcsImplHeader);
   len=ChMessageInt(hdr->len);
@@ -425,10 +449,10 @@ void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
     CcsServer_sendReply(rep,repLen,repData);
   } else {
     /*Forward data & socket # to the replyPE*/
-    int len=CmiMsgHeaderSizeBytes+
+    int len=CmiReservedHeaderSize+
            sizeof(CcsImplHeader)+repLen;
     char *msg=CmiAlloc(len);
-    char *r=msg+CmiMsgHeaderSizeBytes;
+    char *r=msg+CmiReservedHeaderSize;
     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
     memcpy(r,repData,repLen);
     CmiSetHandler(msg,rep_fw_handler_idx);
@@ -524,6 +548,12 @@ void CcsInit(char **argv)
   CpvInitialize(CcsImplHeader *, ccsReq);
   CpvAccess(ccsReq) = NULL;
   _ccsHandlerIdx = CmiRegisterHandler((CmiHandler)req_fw_handler);
+#if CMK_BLUEGENE_CHARM
+  CpvInitialize(int, _bgCcsHandlerIdx);
+  CpvAccess(_bgCcsHandlerIdx) = 0;
+  CpvInitialize(int, _bgCcsAck);
+  CpvAccess(_bgCcsAck) = 0;
+#endif
   CpvInitialize(int, cmiArgDebugFlag);
   CpvInitialize(char *, displayArgument);
   CpvInitialize(int, cpdSuspendStartup);
@@ -572,18 +602,7 @@ void CcsInit(char **argv)
      }
   }
 
-#if ! NODE_0_IS_CONVHOST
-  if (CcsNumBufferedMsgs > 0) {
-    int i;
-    for (i=0; i<CcsNumBufferedMsgs; ++i) {
-      CmiSetHandler(bufferedMessages[i], _ccsHandlerIdx);
-      CmiPushPE(0, bufferedMessages[i]);
-    }
-    free(bufferedMessages);
-    bufferedMessages = NULL;
-    CcsNumBufferedMsgs = -1;
-  }
-#endif
+  CcsReleaseMessages();
 }
 
 #endif /*CMK_CCS_AVAILABLE*/
index dd7815a1478426922920347c01d502f75862109a..7f87b368519bade3cf90e00c8c049955cfd80eb7 100644 (file)
@@ -69,6 +69,7 @@ SIMPLE_POLYMORPH_REDUCTION(min);
 #undef SIMPLE_REDUCTION
 #undef SIMPLE_POLYMORPH_REDUCTION
 
+void CcsReleaseMessages();
 void CcsInit(char **argv);
 int CcsEnabled(void);
 int CcsIsRemoteRequest(void);
@@ -81,6 +82,7 @@ void CcsNoDelayedReply(CcsDelayedReply d);
 
 #else
 typedef void *CcsDelayedReply;
+#define CcsReleaseMessages() /*empty*/
 #define CcsInit(argv) /*empty*/
 #define CcsRegisterHandler(x,y) 0
 #define CcsRegisterHandlerFn(x,y,p) 0
diff --git a/src/conv-ccs/middle-ccs.C b/src/conv-ccs/middle-ccs.C
new file mode 100644 (file)
index 0000000..262aef9
--- /dev/null
@@ -0,0 +1,118 @@
+#include "converse.h"
+
+#if CMK_BLUEGENE_CHARM
+#include "bgconverse.h"
+#endif
+#include "ccs-server.h"
+
+extern "C" void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData);
+
+extern "C" void req_fw_handler(char *msg)
+{
+  int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
+  CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
+  int destPE = (int)ChMessageInt(hdr->pe);
+  if (CmiMyPe() == 0 && destPE == -1) {
+    /* Broadcast message to all other processors */
+    int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len);
+    CmiSyncBroadcast(len, msg);
+  }
+  else if (destPE < -1) {
+    /* Multicast the message to your children */
+    int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len)-destPE*sizeof(ChMessageInt_t);
+    int index, child, i;
+    int *pes = (int*)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
+    ChMessageInt_t *pes_nbo = (ChMessageInt_t *)pes;
+    offset -= destPE * sizeof(ChMessageInt_t);
+    if (ChMessageInt(pes_nbo[0]) == CmiMyPe()) {
+      for (index=0; index<-destPE; ++index) pes[index] = ChMessageInt(pes_nbo[index]);
+    }
+    for (index=0; index<-destPE; ++index) {
+      if (pes[index] == CmiMyPe()) break;
+    }
+    child = (index << 2) + 1;
+    for (i=0; i<4; ++i) {
+      if (child+i < -destPE) {
+        CmiSyncSend(pes[child+i], len, msg);
+      }
+    }
+  }
+  CcsHandleRequest(hdr, msg+offset);
+  CmiFree(msg);
+}
+
+extern "C" void CcsSendReply(int replyLen, const void *replyData);
+/**********************************************
+  "ccs_getinfo"-- takes no data
+    Return the number of parallel nodes, and
+      the number of processors per node as an array
+      of 4-byte big-endian ints.
+*/
+
+void ccs_getinfo(char *msg)
+{
+  int nNode=CmiNumNodes();
+  int len=(1+nNode)*sizeof(ChMessageInt_t);
+  ChMessageInt_t *table=(ChMessageInt_t *)malloc(len);
+  int n;
+  table[0]=ChMessageInt_new(nNode);
+  for (n=0;n<nNode;n++)
+    table[1+n]=ChMessageInt_new(CmiNodeSize(n));
+  CcsSendReply(len,(const char *)table);
+  free(table);
+  CmiFree(msg);
+}
+
+//////////////////////////////////////////////////////////////////// middle-debug.C
+
+extern "C" {
+
+CpvDeclare(void *, debugQueue);
+CpvDeclare(int, freezeModeFlag);
+
+/*
+ Start the freeze-- call will not return until unfrozen
+ via a CCS request.
+ */
+void CpdFreeze(void)
+{
+  CpdNotify(CPD_FREEZE,getpid());
+  if (CpvAccess(freezeModeFlag)) return; /*Already frozen*/
+  CpvAccess(freezeModeFlag) = 1;
+#if ! CMK_BLUEGENE_CHARM
+  CpdFreezeModeScheduler();
+#endif
+}
+
+void CpdUnFreeze(void)
+{
+  CpvAccess(freezeModeFlag) = 0;
+}
+
+int CpdIsFrozen(void) {
+  return CpvAccess(freezeModeFlag);
+}
+
+}
+
+#if CMK_BLUEGENE_CHARM
+#include "blue_impl.h"
+void BgProcessMessageFreezeMode(threadInfo *t, char *msg) {
+//  CmiPrintf("BgProcessMessageFreezeMode\n");
+#if CMK_CCS_AVAILABLE
+  void *debugQ=CpvAccess(debugQueue);
+  CmiAssert(msg!=NULL);
+  int processImmediately = CpdIsDebugMessage(msg);
+  if (processImmediately) BgProcessMessageDefault(t, msg);
+  while (!CpvAccess(freezeModeFlag) && !CdsFifo_Empty(debugQ)) {
+    BgProcessMessageDefault(t, (char*)CdsFifo_Dequeue(debugQ));
+  }
+  if (!processImmediately) {
+    if (!CpvAccess(freezeModeFlag)) BgProcessMessageDefault(t, msg); 
+    else CdsFifo_Enqueue(debugQ, msg);
+  }
+#else
+  BgProcessMessageDefault(t, msg);
+#endif
+}
+#endif
index feed238069b8c97d00b9c5f677827f84e41a5eb7..dbe5a5efbb59f6612a27f9e15daa3ab04965cd1f 100644 (file)
@@ -12,10 +12,10 @@ Orion Sky Lawlor, olawlor@acm.org, 4/10/2001
 #include "conv-ccs.h"
 #include <errno.h>
 
-CpvStaticDeclare(int, freezeModeFlag);
+CpvExtern(int, freezeModeFlag);
 CpvStaticDeclare(int, continueFlag);
 CpvStaticDeclare(int, stepFlag);
-CpvDeclare(void *, debugQueue);
+CpvExtern(void *, debugQueue);
 int _debugHandlerIdx;
 CpvDeclare(int, skipBreakpoint); /* This is a counter of how many breakpoints we should skip */
 
@@ -192,7 +192,7 @@ static void CpdDebugCallMemStat(char *msg) {
 static void CpdDebugHandler(char *msg)
 {
     char name[128];
-    sscanf(msg+CmiMsgHeaderSizeBytes, "%s", name);
+    sscanf(msg+CmiReservedHeaderSize, "%s", name);
 
     if (strcmp(name, "freeze") == 0) {
       CpdFreeze();
@@ -225,27 +225,6 @@ static void CpdDebugHandler(char *msg)
 }
 
 
-/*
- Start the freeze-- call will not return until unfrozen
- via a CCS request.
- */
-void CpdFreeze(void)
-{
-  CpdNotify(CPD_FREEZE,getpid());
-  if (CpvAccess(freezeModeFlag)) return; /*Already frozen*/
-  CpvAccess(freezeModeFlag) = 1;
-  CpdFreezeModeScheduler();
-}
-
-void CpdUnFreeze(void)
-{
-  CpvAccess(freezeModeFlag) = 0;
-}
-
-int CpdIsFrozen(void) {
-  return CpvAccess(freezeModeFlag);
-}
-
 /* Deliver a single message in the queue while not unfreezing the program */
 void CpdNext(void) {
 
@@ -268,6 +247,9 @@ int (*CpdIsDebugMessage)(void *);
 
 void CpdFreezeModeScheduler(void)
 {
+#if CMK_BLUEGENE_CHARM
+    CmiAbort("Cannot run CpdFreezeModeScheduler inside BigSim emulated environment");
+#else
 #if CMK_CCS_AVAILABLE
     void *msg;
     void *debugQ=CpvAccess(debugQueue);
@@ -310,18 +292,21 @@ void CpdFreezeModeScheduler(void)
         CmiHandleMessage(queuedMsg);
     }
 #endif
+#endif
 }
 
 void CpdMemoryMarkClean(char *msg);
 
 void CpdInit(void)
 {
+#if ! CMK_BLUEGENE_CHARM
   CpvInitialize(int, freezeModeFlag);
   CpvAccess(freezeModeFlag) = 0;
 
   CpvInitialize(void *, debugQueue);
   CpvAccess(debugQueue) = CdsFifo_Create();
-
+#endif
+  
   CcsRegisterHandler("ccs_debug", (CmiHandler)CpdDebugHandler);
   CcsSetMergeFn("ccs_debug", CcsMerge_concat);
 
index a91e56ddf102462476959982a4cd2796b5518be0..3679dcba4a2e74efef40ea3db399d5a0eda72290 100644 (file)
@@ -517,6 +517,9 @@ void addBgNodeInbuffer(char *msgPtr, int lnodeID)
        
   nInfo.addBgNodeInbuffer(msgPtr);
 }
+extern "C" void addBgNodeInbuffer_c(char *msgPtr, int lnodeID) {
+  addBgNodeInbuffer(msgPtr, lnodeID);
+}
 
 /** BG API Func 
  *  called by a comm thread
@@ -1193,11 +1196,12 @@ void BgSetWorkerThreadStart(BgStartHandler f)
 extern "C" void CthResumeNormalThread(CthThreadToken* token);
 
 // kernel function for processing a bluegene message
-void BgProcessMessage(threadInfo *tinfo, char *msg)
+void BgProcessMessageDefault(threadInfo *tinfo, char *msg)
 {
   DEBUGM(5, ("=====Begin of BgProcessing a msg on node[%d]=====\n", BgMyNode()));
   int handler = CmiBgMsgHandle(msg);
-  DEBUGF(("[%d] call handler %d\n", BgMyNode(), handler));
+  //CmiPrintf("[%d] call handler %d\n", BgMyNode(), handler);
+  CmiAssert(handler < 1000);
 
   BgHandlerInfo *handInfo;
 #if  CMK_BLUEGENE_NODE
@@ -1237,6 +1241,7 @@ void BgProcessMessage(threadInfo *tinfo, char *msg)
   DEBUGM(5, ("=====End of BgProcessing a msg on node[%d]=====\n\n", BgMyNode()));
 }
 
+void  (*BgProcessMessage)(threadInfo *t, char *msg) = BgProcessMessageDefault;
 
 void scheduleWorkerThread(char *msg)
 {
@@ -2121,6 +2126,34 @@ int BgIsReplay()
     return cva(bgMach).replay != -1;
 }
 
+extern "C" void CkReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
+  ((workThreadInfo*)cta(threadinfo))->reduceMsg = msg;
+  //CmiPrintf("Called CkReduce from %d %hd\n",CmiMyPe(),cta(threadinfo)->globalId);
+  int numLocal = 0, count = 0;
+  for (int j=0; j<cva(numNodes); j++){
+    for(int i=0;i<cva(bgMach).numWth;i++){
+      workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
+      if (t->reduceMsg == NULL) return; /* we are not yet ready to reduce */
+      numLocal ++;
+    }
+  }
+  void **msgLocal = (void**)malloc(sizeof(void*)*(numLocal-1));
+  for (int j=0; j<cva(numNodes); j++){
+    for(int i=0;i<cva(bgMach).numWth;i++){
+      workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
+      if (t == cta(threadinfo)) break;
+      msgLocal[count++] = t->reduceMsg;
+      t->reduceMsg = NULL;
+    }
+  }
+  CmiAssert(count==numLocal-1);
+  msg = mergeFn(&size, msg, msgLocal, numLocal-1);
+  CmiReduce(msg, size, mergeFn);
+  CmiPrintf("Called CmiReduce %d\n",CmiMyPe());
+  for (int i=0; i<numLocal-1; ++i) CmiFree(msgLocal[i]);
+  free(msgLocal);
+}
+
 // for record/replay, to fseek back
 void BgRewindRecord()
 {
index c2cbe0ac829bcc2557b8526f9ec70c07bb34bc25..d8e6f8d79b65276a01af8c8da8abd308f7bc9410 100644 (file)
@@ -521,8 +521,10 @@ class workThreadInfo : public threadInfo {
 private:
   int CsdStopFlag;
 public:
+  void* reduceMsg;
+  
   workThreadInfo(int _id, nodeInfo *_node): 
-        threadInfo(_id, WORK_THREAD, _node) { 
+        threadInfo(_id, WORK_THREAD, _node), reduceMsg(NULL) { 
     CsdStopFlag=0; 
     watcher = NULL;
     if (_id != -1) {
@@ -557,7 +559,8 @@ void    resetVTime();
 char * getFullBuffer();
 void   addBgNodeMessage(char *msgPtr);
 void   addBgThreadMessage(char *msgPtr, int threadID);
-void   BgProcessMessage(threadInfo *t, char *msg);
+void   BgProcessMessageDefault(threadInfo *t, char *msg);
+extern void (*BgProcessMessage)(threadInfo *t, char *msg);
 
 
 /* blue gene debug */
index 24ed07daf7c434c6aecff1443a2ff6ca67b180b1..048e6547bedf227e13882f0e45c05d62f95dd886 100644 (file)
@@ -87,6 +87,9 @@ conv-ccs.o: conv-ccs.c converse.h conv-config.h conv-autoconfig.h \
   ckhashtable.h pup.h
        $(CHARMC) -c -I. conv-ccs.c
 
+middle-ccs.o: middle-ccs.C converse.h bgconverse.h
+       $(CHARMC) -c -I. middle-ccs.C
+
 ccs-builtins.o: ccs-builtins.C converse.h conv-config.h conv-autoconfig.h \
   conv-common.h conv-mach.h conv-mach-opt.h pup_c.h queueing.h conv-cpm.h \
   conv-cpath.h conv-qd.h conv-random.h conv-lists.h conv-trace.h \
index f358904acb96b9532293c5baeb61705326423331..e0cdf5bc7a50ddc5726a8d3a83a0ca1505080b56 100644 (file)
@@ -361,7 +361,7 @@ CVLIBS=$(L)/libconv-core.a \
 
 LIBCONV_CORE= convcore.o conv-conds.o queueing.o msgmgr.o \
        cpm.o cpthreads.o futures.o cldb.o topology.o random.o \
-       debug-conv.o generate.o edgelist.o conv-ccs.o ccs-builtins.o \
+       debug-conv.o generate.o edgelist.o conv-ccs.o ccs-builtins.o middle-ccs.o \
        traceCore.o traceCoreCommon.o tracec.o \
        converseProjections.o machineProjections.o \
        quiescence.o isomalloc.o mem-arena.o conv-counter.o \