Fixed a few bugs on CmiReduce functions. Added new basic reduction routines based...
authorFilippo Gioachin <gioachin@illinois.edu>
Tue, 1 Sep 2009 00:17:29 +0000 (00:17 +0000)
committerFilippo Gioachin <gioachin@illinois.edu>
Tue, 1 Sep 2009 00:17:29 +0000 (00:17 +0000)
src/arch/net/charmrun/charmrun.c
src/ck-core/debug-charm.C
src/conv-ccs/ccs-server.c
src/conv-ccs/conv-ccs.c
src/conv-ccs/conv-ccs.h
src/conv-core/convcore.c
src/conv-core/converse.h
src/conv-core/debug-conv.c

index 0cdfc1ae58501974d1efed049534145fe9a3131b..e4dc2127f1c7aa11e0853e97fedb73bc5324d125 100644 (file)
@@ -1382,9 +1382,12 @@ void req_ccs_connect(void)
   reqBytes=ChMessageInt(h.hdr.len);
 
   if (pe<=-nodetab_size || pe>=nodetab_size) {
-    /*Treat out of bound values as value 0*/
-       pe=0;
-       h.hdr.pe=ChMessageInt_new(pe);
+    /*Treat out of bound values as errors. Helps detecting bugs*/
+    if (pe==-nodetab_size) fprintf(stderr,"Invalid processor index in CCS request: are you trying to do a broadcast instead?");
+    else fprintf(stderr,"Invalid processor index in CCS request.");
+    CcsServer_sendReply(&h.hdr,0,0);
+    free(reqData);
+    return;
   }
   else if (pe == -1) {
     /*Treat -1 as broadcast and sent to 0 as root of the spanning tree*/
@@ -1393,21 +1396,22 @@ void req_ccs_connect(void)
   else if (pe < -1) {
     /*Treat negative values as multicast to a number of processors specified by -pe.
       The pes to multicast to follows sits at the beginning of reqData*/
-    pe = *(int*)reqData;
+    reqBytes -= pe * sizeof(ChMessageInt_t);
+    pe = ChMessageInt(*(ChMessageInt_t*)reqData);
   }
   
   if (! check_stdio_header(&h.hdr)) {
 
 #define LOOPBACK 0
 #if LOOPBACK /*Immediately reply "there's nothing!" (for performance testing)*/
-  CcsServer_sendReply(&h.hdr,0,0);
+    CcsServer_sendReply(&h.hdr,0,0);
 #else
-  /*Fill out the charmrun header & forward the CCS request*/
-  ChMessageHeader_new("req_fw",sizeof(h.hdr)+reqBytes,&h.ch);  
-  
-  bufs[0]=&h; lens[0]=sizeof(h);
-  bufs[1]=reqData; lens[1]=reqBytes;
-  skt_sendV(nodetab_ctrlfd(pe),2,bufs,lens);
+    /*Fill out the charmrun header & forward the CCS request*/
+    ChMessageHeader_new("req_fw",sizeof(h.hdr)+reqBytes,&h.ch);  
+
+    bufs[0]=&h; lens[0]=sizeof(h);
+    bufs[1]=reqData; lens[1]=reqBytes;
+    skt_sendV(nodetab_ctrlfd(pe),2,bufs,lens);
 
 #endif
   }
index 96b37b8dcf4063d4e6c6caf0ddf45a7aab704ea0..1579ffde6d440e4167d6694d8da6e3756e87baa9 100644 (file)
@@ -625,6 +625,7 @@ void CpdSetBreakPoint (char *msg)
 {
   char functionName[128];
   int tableSize, tableIdx = 0;
+  int reply = 0;
   sscanf(msg+CmiMsgHeaderSizeBytes, "%s", functionName);
   if (strlen(functionName) > 0)
   {
@@ -636,14 +637,11 @@ void CpdSetBreakPoint (char *msg)
        //if (strstr(_entryTable[tableIdx]->name, functionName) != NULL)
        //{
     tableIdx = atoi(functionName);
-    if (tableIdx < 0 || tableIdx >= tableSize) {
-      CmiPrintf("[ERROR]Entrypoint was not found for function %s\n", functionName);
-      return;
-    }
+    if (tableIdx >= 0 && tableIdx < tableSize) {
            EntryInfo * breakPointEntryInfo = (EntryInfo *)CpvAccess(breakPointEntryTable)->get(tableIdx);
            delete breakPointEntryInfo;
            breakPointEntryInfo = new EntryInfo(_entryTable[tableIdx]->name, _entryTable[tableIdx]->call, _entryTable[tableIdx]->msgIdx, _entryTable[tableIdx]->chareIdx );
-           CmiPrintf("Breakpoint is set for function %s with an epIdx = %ld\n", _entryTable[tableIdx]->name, tableIdx);
+           //CmiPrintf("Breakpoint is set for function %s with an epIdx = %ld\n", _entryTable[tableIdx]->name, tableIdx);
            CpvAccess(breakPointEntryTable)->put(tableIdx) = breakPointEntryInfo;
            _entryTable[tableIdx]->name = "debug_breakpoint_ep";
            _entryTable[tableIdx]->call = (CkCallFnPtr)_call_freeze_on_break_point;
@@ -651,8 +649,9 @@ void CpdSetBreakPoint (char *msg)
            //_entryTable[tableIdx]->chareIdx = CpvAccess(_debugChare);
            //_debugEntryTable[tableIdx].isBreakpoint = CmiTrue;
            //break;
-       //}
-    //}
+           reply = ~0;
+           //}
+    }
     //if (tableIdx == tableSize)
     //{
     //  CmiPrintf("[ERROR]Entrypoint was not found for function %s\n", functionName);
@@ -660,6 +659,7 @@ void CpdSetBreakPoint (char *msg)
     //}
 
   }
+  CcsSendReply(sizeof(int), (void*)&reply);
 
 }
 
@@ -672,30 +672,32 @@ void CpdQuitDebug()
 void CpdRemoveBreakPoint (char *msg)
 {
   char functionName[128];
+  int reply = 0;
   sscanf(msg+CmiMsgHeaderSizeBytes, "%s", functionName);
   if (strlen(functionName) > 0) {
     int idx = atoi(functionName);
-    if (idx < 0 || idx >= _entryTable.size()) {
-      CmiPrintf("[ERROR]Entrypoint was not found for function %s\n", functionName);
-      return;
-    }
-    //void *objPointer;
-    //void *keyPointer;
-    //CkHashtableIterator *it = CpvAccess(breakPointEntryTable)->iterator();
-    //while(NULL!=(objPointer = it->next(&keyPointer)))
-    //{
-    //EntryInfo * breakPointEntryInfo = *(EntryInfo **)objPointer;
-    EntryInfo * breakPointEntryInfo = CpvAccess(breakPointEntryTable)->get(idx);
-    if (breakPointEntryInfo != NULL) {
-      _entryTable[idx]->name =  breakPointEntryInfo->name;
-      _entryTable[idx]->call = (CkCallFnPtr)breakPointEntryInfo->call;
-      _entryTable[idx]->msgIdx = breakPointEntryInfo->msgIdx;
-      _entryTable[idx]->chareIdx = breakPointEntryInfo->chareIdx;
-      //_debugEntryTable[idx].isBreakpoint = CmiFalse;
-      CmiPrintf("Breakpoint is removed for function %s with epIdx %ld\n", _entryTable[idx]->name, idx);
-      //CpvAccess(breakPointEntryTable)->remove(idx);
+    if (idx >= 0 && idx < _entryTable.size()) {
+      //CmiPrintf("[ERROR]Entrypoint was not found for function %s\n", functionName);
+      //void *objPointer;
+      //void *keyPointer;
+      //CkHashtableIterator *it = CpvAccess(breakPointEntryTable)->iterator();
+      //while(NULL!=(objPointer = it->next(&keyPointer)))
+      //{
+      //EntryInfo * breakPointEntryInfo = *(EntryInfo **)objPointer;
+      EntryInfo * breakPointEntryInfo = CpvAccess(breakPointEntryTable)->get(idx);
+      if (breakPointEntryInfo != NULL) {
+        _entryTable[idx]->name =  breakPointEntryInfo->name;
+        _entryTable[idx]->call = (CkCallFnPtr)breakPointEntryInfo->call;
+        _entryTable[idx]->msgIdx = breakPointEntryInfo->msgIdx;
+        _entryTable[idx]->chareIdx = breakPointEntryInfo->chareIdx;
+        reply = ~0 ;
+        //_debugEntryTable[idx].isBreakpoint = CmiFalse;
+        //CmiPrintf("Breakpoint is removed for function %s with epIdx %ld\n", _entryTable[idx]->name, idx);
+        //CpvAccess(breakPointEntryTable)->remove(idx);
+      }
     }
   }
+  CcsSendReply(sizeof(int), (void*)&reply);
 }
 
 void CpdRemoveAllBreakPoints ()
@@ -703,6 +705,7 @@ void CpdRemoveAllBreakPoints ()
   //all breakpoints removed
   void *objPointer;
   void *keyPointer;
+  int reply = 1;
   CkHashtableIterator *it = CpvAccess(breakPointEntryTable)->iterator();
   while(NULL!=(objPointer = it->next(&keyPointer)))
   {
@@ -713,6 +716,7 @@ void CpdRemoveAllBreakPoints ()
     _entryTable[idx]->msgIdx = breakPointEntryInfo->msgIdx;
     _entryTable[idx]->chareIdx = breakPointEntryInfo->chareIdx;
   }
+  CcsSendReply(sizeof(int), (void*)&reply);
 }
 
 extern "C" int CpdIsCharmDebugMessage(void *msg) {
@@ -788,8 +792,11 @@ void CpdCharmInit()
 
   CpdBreakPointInit();
   CcsRegisterHandler("ccs_set_break_point",(CmiHandler)CpdSetBreakPoint);
+  CcsSetMergeFn("ccs_set_break_point",CcsMerge_logical_and);
   CcsRegisterHandler("ccs_remove_break_point",(CmiHandler)CpdRemoveBreakPoint);
+  CcsSetMergeFn("ccs_remove_break_point",CcsMerge_logical_and);
   CcsRegisterHandler("ccs_remove_all_break_points",(CmiHandler)CpdRemoveAllBreakPoints);
+  CcsSetMergeFn("ccs_remove_all_break_points",CmiReduceMergeFn_random);
   CcsRegisterHandler("ccs_continue_break_point",(CmiHandler)CpdContinueFromBreakPoint);
   CcsRegisterHandler("ccs_single_step",(CmiHandler)CpdDeliverSingleMessage);
   CcsRegisterHandler("ccs_debug_quit",(CmiHandler)CpdQuitDebug);
index 469f4f6250301b4b9f98f8375d1f458277d943a4..ef543a9e5ceaf67dfd1585e841cff96fa273b61e 100644 (file)
@@ -379,7 +379,7 @@ static int CcsServer_recvRequestData(SOCKET fd,
                                     CcsImplHeader *hdr,void **reqData)
 {
   CcsMessageHeader req;/*CCS header, from requestor*/
-  int reqBytes;
+  int reqBytes, numPes, destPE;
   const char *err;
   if (NULL!=(err=CcsServer_readHeader(fd,&ccs_clientlist,security,
                                      &hdr->attr,&req))) 
@@ -394,16 +394,14 @@ static int CcsServer_recvRequestData(SOCKET fd,
   hdr->len=req.len;
   hdr->replyFd=ChMessageInt_new(fd);
 
-  {
   /*Is it a multicast?*/
-  int numPes = 0;
-  int destPE = ChMessageInt(hdr->pe);
+  numPes = 0;
+  destPE = ChMessageInt(hdr->pe);
   if (destPE < -1) numPes = -destPE;
   
   /*Grab the user data portion of the message*/
-  reqBytes=ChMessageInt(req.len) + numPes*sizeof(int);
+  reqBytes=ChMessageInt(req.len) + numPes*sizeof(ChMessageInt_t);
   *reqData=(char *)malloc(reqBytes);
-  }
   if (-1==skt_recvN(fd,*reqData,reqBytes)) {
     fprintf(stdout,"CCS ERROR> Retrieving %d message bytes\n",reqBytes);
     free(*reqData);
index 533677de784a60fabf99d12d4b1d563cb0cb8800..4d24c5a909e553e0674df6ae40d0d65251540df5 100644 (file)
@@ -34,6 +34,7 @@ typedef struct CcsHandlerRec {
        void *userPtr;
        CmiReduceMergeFn mergeFn; /*Merge function used for bcast requests*/
        int nCalls; /* Number of times handler has been executed*/
+       CmiUInt2 redID; /*Reduction ID to be used with CmiListReduce*/
 } CcsHandlerRec;
 
 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
@@ -88,8 +89,68 @@ void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
     CmiAbort("CCS: Unknown CCS handler name.\n");
   }
   rec->mergeFn=newMerge;
+  rec->redID=CmiGetGlobalReduction();
 }
 
+void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
+  CcsImplHeader *hdr;
+  int total = *size;
+  int i;
+  for (i=0; i<n; ++i) {
+    hdr = (CcsImplHeader*)(((char*)remote[i])+CmiMsgHeaderSizeBytes);
+    total += ChMessageInt(hdr->len);
+  }
+  void *reply = CmiAlloc(total);
+  memcpy(reply, local, *size);
+  ((CcsImplHeader*)(((char*)reply)+CmiMsgHeaderSizeBytes))->len = ChMessageInt_new(total-CmiMsgHeaderSizeBytes-sizeof(CcsImplHeader));
+  CmiFree(local);
+  char *ptr = ((char*)reply)+*size;
+  for (i=0; i<n; ++i) {
+    int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiMsgHeaderSizeBytes))->len);
+    memcpy(ptr, ((char*)remote[i])+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader), len);
+    ptr += len;
+  }
+  *size = total;
+  return reply;
+}
+
+#define SIMPLE_REDUCTION(name, dataType, loop) \
+void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
+  int i, m; \
+  CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiMsgHeaderSizeBytes); \
+  int lenLocal = ChMessageInt(hdrLocal->len); \
+  int nElem = lenLocal / sizeof(dataType); \
+  dataType *ret = (dataType *) (hdrLocal+1); \
+  CcsImplHeader *hdr; \
+  for (m=0; m<n; ++m) { \
+    hdr = (CcsImplHeader*)(((char*)remote[m])+CmiMsgHeaderSizeBytes); \
+    int len = ChMessageInt(hdr->len); \
+    CmiAssert(lenLocal == len); \
+    dataType *value = (dataType *)(hdr+1); \
+    for (i=0; i<nElem; ++i) loop; \
+  } \
+  return local; \
+}
+
+SIMPLE_REDUCTION(logical_and, int, ret[i]=(ret[i]&&value[i])?1:0)
+SIMPLE_REDUCTION(logical_or, int, ret[i]=(ret[i]||value[i])?1:0)
+SIMPLE_REDUCTION(bitvec_and, int, ret[i]&=value[i])
+SIMPLE_REDUCTION(bitvec_or, int, ret[i]|=value[i])
+
+//Use this macro for reductions that have the same type for all inputs
+#define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
+  SIMPLE_REDUCTION(nameBase##_int, int, loop) \
+  SIMPLE_REDUCTION(nameBase##_float, float, loop) \
+  SIMPLE_REDUCTION(nameBase##_double, double, loop)
+
+SIMPLE_POLYMORPH_REDUCTION(sum, ret[i]+=value[i])
+SIMPLE_POLYMORPH_REDUCTION(product, ret[i]*=value[i])
+SIMPLE_POLYMORPH_REDUCTION(max, if (ret[i]<value[i]) ret[i]=value[i])
+SIMPLE_POLYMORPH_REDUCTION(min, if (ret[i]>value[i]) ret[i]=value[i])
+
+#undef SIMPLE_REDUCTION
+#undef SIMPLE_POLYMORPH_REDUCTION
+
 int CcsEnabled(void)
 {
   return 1;
@@ -119,21 +180,20 @@ int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
     int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+repLen;
     char *msg=CmiAlloc(len);
     char *r=msg+CmiMsgHeaderSizeBytes;
+    char *handlerStr;
     rep->len = ChMessageInt_new(repLen);
     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
     memcpy(r,repData,repLen);
     CmiSetHandler(msg,rep_fw_handler_idx);
-    {
-    char *handlerStr=rep->handler;
+    handlerStr=rep->handler;
     CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
     if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
-    if (repPE <= -1) {
+    if (repPE == -1) {
       /* CCS Broadcast */
       CmiReduce(msg, len, fn->mergeFn);
     } else {
       /* CCS Multicast */
-      CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn);
-    }
+      CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
     }
   } else {
     CcsImpl_reply(rep, repLen, repData);
@@ -225,6 +285,7 @@ static void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
 int _ccsHandlerIdx;/*Converse handler index of below routine*/
 static void req_fw_handler(char *msg)
 {
+  int offset = CmiMsgHeaderSizeBytes + sizeof(CcsImplHeader);
   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiMsgHeaderSizeBytes);
   int destPE = (int)ChMessageInt(hdr->pe);
   if (CmiMyPe() == 0 && destPE == -1) {
@@ -234,23 +295,25 @@ static void req_fw_handler(char *msg)
   }
   else if (destPE < -1) {
     /* Multicast the message to your children */
-    int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+ChMessageInt(hdr->len)-destPE*sizeof(int);
-    int index;
+    int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+ChMessageInt(hdr->len)-destPE*sizeof(ChMessageInt_t);
+    offset -= destPE * sizeof(ChMessageInt_t);
+    int index, child, i;
     int *pes = (int*)(msg+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader));
+    ChMessageInt_t *pes_nbo = (ChMessageInt_t *)pes;
+    if (ChMessageInt(pes_nbo[0]) == CmiMyPe()) {
+      for (index=0; index<-destPE; ++index) pes[index] = ChMessageInt(pes_nbo[index]);
+    }
     for (index=0; index<-destPE; ++index) {
       if (pes[index] == CmiMyPe()) break;
     }
-    {
-    int child = (index << 2) + 1;
-    int i;
+    child = (index << 2) + 1;
     for (i=0; i<4; ++i) {
       if (child+i < -destPE) {
         CmiSyncSend(pes[child+i], len, msg);
       }
     }
-    }
   }
-  CcsHandleRequest(hdr, msg+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader));
+  CcsHandleRequest(hdr, msg+offset);
   CmiFree(msg);
 }
 
@@ -260,16 +323,16 @@ char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_le
 {
   int reqLen=ChMessageInt(hdr->len);
   int destPE = ChMessageInt(hdr->pe);
-  if (destPE < -1) reqLen += destPE*sizeof(int);
-  {
-  int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+reqLen;
-  char *msg=(char *)CmiAlloc(len);
+  int len;
+  char *msg;
+  if (destPE < -1) reqLen -= destPE*sizeof(int);
+  len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+reqLen;
+  msg=(char *)CmiAlloc(len);
   memcpy(msg+CmiMsgHeaderSizeBytes,hdr,sizeof(CcsImplHeader));
   memcpy(msg+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader),data,reqLen);
   CmiSetHandler(msg, _ccsHandlerIdx);
   if (ret_len!=NULL) *ret_len=len;
   return msg;
-  }
 }
 
 /*Receives reply messages passed up from
@@ -360,24 +423,27 @@ Closes it.
  */
 void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
 {
+  char *msg;
   int len,repPE=ChMessageInt(hdr->pe);
   if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
-    repPE=0;
-    hdr->pe=ChMessageInt_new(repPE);
+    /*Treat out of bound values as errors. Helps detecting bugs*/
+    if (repPE==-CmiNumPes()) CmiPrintf("Invalid processor index in CCS request: are you trying to do a broadcast instead?");
+    else CmiPrintf("Invalid processor index in CCS request.");
+    CpvAccess(ccsReq)=hdr;
+    CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
+    return;
   }
 
-  {
-    char *msg=CcsImpl_ccs2converse(hdr,reqData,&len);
-    if (repPE >= 0) {
-      CmiSyncSendAndFree(repPE,len,msg);
-    } else if (repPE == -1) {
-      /* Broadcast to all processors */
-      CmiPushPE(0, msg);
-    } else {
-      /* Multicast to -repPE processors, specified right at the beginning of reqData (as a list of pes) */
-      int firstPE = *(int*)reqData;
-      CmiSyncSendAndFree(firstPE,len,msg);
-    }
+  msg=CcsImpl_ccs2converse(hdr,reqData,&len);
+  if (repPE >= 0) {
+    CmiSyncSendAndFree(repPE,len,msg);
+  } else if (repPE == -1) {
+    /* Broadcast to all processors */
+    CmiPushPE(0, msg);
+  } else {
+    /* Multicast to -repPE processors, specified right at the beginning of reqData (as a list of pes) */
+    int firstPE = ChMessageInt(*(ChMessageInt_t*)reqData);
+    CmiSyncSendAndFree(firstPE,len,msg);
   }
 }
 
index b0a3bde2c5ecccf8e52c00cd8173717bc8d1b799..dd7815a1478426922920347c01d502f75862109a 100644 (file)
@@ -51,6 +51,23 @@ void CcsRegisterHandlerFn(const char *ccs_handlername, CcsHandlerFn fn, void *us
  * Set the merging function for this CCS handler to newMerge.
  */
 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge);
+/* A few standard functions for merging CCS messages */
+#define SIMPLE_REDUCTION(name) void * CcsMerge_##name(int *size,void *local,void **remote,int n)
+#define SIMPLE_POLYMORPH_REDUCTION(nameBase) \
+  SIMPLE_REDUCTION(nameBase##_int); \
+  SIMPLE_REDUCTION(nameBase##_float); \
+  SIMPLE_REDUCTION(nameBase##_double)
+SIMPLE_REDUCTION(concat);
+SIMPLE_REDUCTION(logical_and);
+SIMPLE_REDUCTION(logical_or);
+SIMPLE_REDUCTION(bitvec_and);
+SIMPLE_REDUCTION(bitvec_and);
+SIMPLE_POLYMORPH_REDUCTION(sum);
+SIMPLE_POLYMORPH_REDUCTION(product);
+SIMPLE_POLYMORPH_REDUCTION(max);
+SIMPLE_POLYMORPH_REDUCTION(min);
+#undef SIMPLE_REDUCTION
+#undef SIMPLE_POLYMORPH_REDUCTION
 
 void CcsInit(char **argv);
 int CcsEnabled(void);
@@ -67,6 +84,7 @@ typedef void *CcsDelayedReply;
 #define CcsInit(argv) /*empty*/
 #define CcsRegisterHandler(x,y) 0
 #define CcsRegisterHandlerFn(x,y,p) 0
+#define CcsSetMergeFn(x,y) 0
 #define CcsEnabled() 0
 #define CcsIsRemoteRequest() 0
 #define CcsCallerId(x,y)  /*empty*/
@@ -75,7 +93,6 @@ typedef void *CcsDelayedReply;
 #define CcsSendDelayedReply(d,s,r); 
 #define CcsNoReply() /*empty*/
 #define CcsNoDelayedReply(d) /*empty*/
-#define CcsSetMergeFn(x,y)      /* empty */
 #endif
 
 #ifdef __cplusplus
index aee9ad05b77dcee81939f2766ef967c59cfabd22..a8be196150d05ef7e1fd94d38a3d766eccc061a8 100644 (file)
@@ -1881,71 +1881,34 @@ void CmiSyncVectorSendAndFree(int destPE, int n, int *sizes, char **msgs) {
  ****************************************************************************/
 
 CpvStaticDeclare(int, CmiReductionMessageHandler);
-/*
-CpvStaticDeclare(int, _reduce_num_children);
-CpvStaticDeclare(int, _reduce_parent);
-CpvStaticDeclare(int, _reduce_received);
-CpvStaticDeclare(char**, _reduce_msg_list);
-CpvStaticDeclare(void*, _reduce_data);
-CpvStaticDeclare(int, _reduce_data_size);
-static CmiHandler _reduce_destination;
-static CmiReduceMergeFn _reduce_mergeFn;
-static CmiReducePupFn _reduce_pupFn;
-static CmiReduceDeleteFn _reduce_deleteFn;
-*/
+CpvStaticDeclare(int, CmiReductionDynamicRequestHandler);
 
 CpvStaticDeclare(CmiReduction**, _reduce_info);
 CpvStaticDeclare(int, _reduce_info_size); /* This is the log2 of the size of the array */
-CpvStaticDeclare(CmiUInt2, _reduce_seqID);
+CpvStaticDeclare(CmiUInt2, _reduce_seqID_global); /* This is used only by global reductions */
+CpvStaticDeclare(CmiUInt2, _reduce_seqID_request);
+CpvStaticDeclare(CmiUInt2, _reduce_seqID_dynamic);
 
-/*
-int CmiGetReductionHandler() { return CpvAccess(CmiReductionMessageHandler); }
-CmiHandler CmiGetReductionDestination() { return _reduce_destination; }
-*/
-
-void CmiReductionsInit() {
-/*
-  int numChildren;
-  CpvInitialize(int, _reduce_num_children);
-  CpvInitialize(int, _reduce_parent);
-  CpvInitialize(int, _reduce_received);
-  CpvInitialize(char**, _reduce_msg_list);
-  CpvInitialize(void*, _reduce_data);
-  CpvInitialize(int, _reduce_data_size);
-  CpvAccess(_reduce_num_children) = 0;
-  CpvAccess(_reduce_received) = 0;
-  numChildren = CmiNumSpanTreeChildren(CmiMyPe());
-  if (numChildren > 0) {
-    CpvAccess(_reduce_msg_list) = (char**)malloc(CmiNumSpanTreeChildren(CmiMyPe())*sizeof(void*));
-  } else {
-    CpvAccess(_reduce_msg_list) = NULL;
-  }
-*/
-  int i;
-  CpvInitialize(int, CmiReductionMessageHandler);
-  CpvAccess(CmiReductionMessageHandler) = CmiRegisterHandler((CmiHandler)CmiHandleReductionMessage);
-  CpvInitialize(CmiUInt2, _reduce_seqID);
-  CpvAccess(_reduce_seqID) = 0;
-  CpvInitialize(int, _reduce_info_size);
-  CpvAccess(_reduce_info_size) = 4;
-  CpvInitialize(CmiReduction*, _reduce_info);
-  CpvAccess(_reduce_info) = (CmiReduction**)malloc(16*sizeof(CmiReduction*));
-  for (i=0; i<16; ++i) CpvAccess(_reduce_info)[i] = NULL;
-}
+enum {
+  CmiReductionID_globalOffset = 0, /* Reductions that involve the whole set of processors */
+  CmiReductionID_requestOffset = 1, /* Reductions IDs that are requested by all the processors (i.e during intialization) */
+  CmiReductionID_dynamicOffset = 2, /* Reductions IDs that are requested by only one processor (typically at runtime) */
+  CmiReductionID_multiplier = 3
+};
 
 CmiReduction* CmiGetReductionCreate(int id, short int numChildren) {
   int index = id & ~((~0)<<CpvAccess(_reduce_info_size));
   CmiReduction *red = CpvAccess(_reduce_info)[index];
   if (red != NULL && red->seqID != id) {
     /* The table needs to be expanded */
-    CmiAbort("Too many simultaneous redctions");
+    CmiAbort("Too many simultaneous reductions");
   }
   if (red == NULL || red->numChildren < numChildren) {
+    CmiReduction *newred;
     if (red != NULL) CmiPrintf("[%d] Reduction structure reallocated\n",CmiMyPe());
     CmiAssert(red == NULL || red->localContributed == 0);
     if (numChildren == 0) numChildren = 4;
-    {
-    CmiReduction *newred = (CmiReduction*)malloc(sizeof(CmiReduction)+numChildren*sizeof(void*));
+    newred = (CmiReduction*)malloc(sizeof(CmiReduction)+numChildren*sizeof(void*));
     newred->numRemoteReceived = 0;
     newred->localContributed = 0;
     newred->seqID = id;
@@ -1957,7 +1920,6 @@ CmiReduction* CmiGetReductionCreate(int id, short int numChildren) {
     red->numChildren = numChildren;
     red->remoteData = (char**)(red+1);
     CpvAccess(_reduce_info)[index] = red;
-    }
   }
   return red;
 }
@@ -1973,10 +1935,50 @@ void CmiClearReduction(int id) {
 }
 
 CmiReduction* CmiGetNextReduction(short int numChildren) {
-  int id = CpvAccess(_reduce_seqID)++;
+  int id = CpvAccess(_reduce_seqID_global);
+  CpvAccess(_reduce_seqID_global) += CmiReductionID_multiplier;
+  if (id > 0xFFF0) CpvAccess(_reduce_seqID_global) = CmiReductionID_globalOffset;
   return CmiGetReductionCreate(id, numChildren);
 }
 
+CmiUInt2 CmiGetGlobalReduction() {
+  return CpvAccess(_reduce_seqID_request)+=CmiReductionID_multiplier;
+}
+
+CmiUInt2 CmiGetDynamicReduction() {
+  return CpvAccess(_reduce_seqID_dynamic)+=CmiReductionID_multiplier;
+}
+
+void CmiReductionHandleDynamicRequest(char *msg) {
+  int *values = (int*)(msg+CmiMsgHeaderSizeBytes);
+  int pe = values[0];
+  int size = CmiMsgHeaderSizeBytes+2*sizeof(int)+values[1];
+  values[0] = CmiGetDynamicReduction();
+  CmiSetHandler(msg, CmiGetXHandler(msg));
+  if (pe >= 0) {
+    CmiSyncSendAndFree(pe, size, msg);
+  } else {
+    CmiSyncBroadcastAllAndFree(size, msg);
+  }
+}
+
+void CmiGetDynamicReductionRemote(int handlerIdx, int pe, int dataSize, void *data) {
+  int size = CmiMsgHeaderSizeBytes+2*sizeof(int)+dataSize;
+  char *msg = (char*)CmiAlloc(size);
+  int *values = (int*)(msg+CmiMsgHeaderSizeBytes);
+  values[0] = pe;
+  values[1] = dataSize;
+  CmiSetXHandler(msg, handlerIdx);
+  if (dataSize) memcpy(msg+CmiMsgHeaderSizeBytes+2*sizeof(int), data, dataSize);
+  if (CmiMyPe() == 0) {
+    CmiReductionHandleDynamicRequest(msg);
+  } else {
+    // send the request to processor 0
+    CmiSetHandler(msg, CpvAccess(CmiReductionDynamicRequestHandler));
+    CmiSyncSendAndFree(0, size, msg);
+  }
+}
+
 void CmiSendReduce(CmiReduction *red) {
   void *mergedData, *msg;
   int msg_size;
@@ -2021,8 +2023,7 @@ void *CmiReduceMergeFn_random(int *size, void *data, void** remote, int n) {
   return data;
 }
 
-void CmiReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
-  CmiReduction *red = CmiGetNextReduction(CmiNumSpanTreeChildren(CmiMyPe()));
+static void CmiGlobalReduce(void *msg, int size, CmiReduceMergeFn mergeFn, CmiReduction *red) {
   CmiAssert(red->localContributed == 0);
   red->localContributed = 1;
   red->localData = msg;
@@ -2032,24 +2033,13 @@ void CmiReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
   red->ops.destination = (CmiHandler)CmiGetHandlerFunction(msg);
   red->ops.mergeFn = mergeFn;
   red->ops.pupFn = NULL;
-  /*CmiPrintf("[%d] CmiReduce::local %hd parent=%d, numChildren=%d\n",CmiMyPe(),red->seqID,red->parent,red->numChildren);*/
+  CmiPrintf("[%d] CmiReduce::local %hd parent=%d, numChildren=%d\n",CmiMyPe(),red->seqID,red->parent,red->numChildren);
   CmiSendReduce(red);
-/*  
-  CpvAccess(_reduce_data) = data;
-  CpvAccess(_reduce_data_size) = size;
-  CpvAccess(_reduce_parent) = CmiSpanTreeParent(CmiMyPe());
-  _reduce_destination = (CmiHandler)CmiGetHandlerFunction(data);
-  _reduce_pupFn = NULL;
-  _reduce_mergeFn = mergeFn;
-  CpvAccess(_reduce_num_children) = CmiNumSpanTreeChildren(CmiMyPe());
-  if (CpvAccess(_reduce_received) == CpvAccess(_reduce_num_children)) CmiSendReduce(red, size);
-*/
 }
 
-void CmiReduceStruct(void *data, CmiReducePupFn pupFn,
+static void CmiGlobalReduceStruct(void *data, CmiReducePupFn pupFn,
                      CmiReduceMergeFn mergeFn, CmiHandler dest,
-                     CmiReduceDeleteFn deleteFn) {
-  CmiReduction *red = CmiGetNextReduction(CmiNumSpanTreeChildren(CmiMyPe()));
+                     CmiReduceDeleteFn deleteFn, CmiReduction *red) {
   CmiAssert(red->localContributed == 0);
   red->localContributed = 1;
   red->localData = data;
@@ -2060,22 +2050,36 @@ void CmiReduceStruct(void *data, CmiReducePupFn pupFn,
   red->ops.mergeFn = mergeFn;
   red->ops.pupFn = pupFn;
   red->ops.deleteFn = deleteFn;
-  /*CmiPrintf("[%d] CmiReduceStruct::local %hd parent=%d, numChildren=%d\n",CmiMyPe(),red->seqID,red->parent,red->numChildren);*/
+  CmiPrintf("[%d] CmiReduceStruct::local %hd parent=%d, numChildren=%d\n",CmiMyPe(),red->seqID,red->parent,red->numChildren);
   CmiSendReduce(red);
-  /*
-  CpvAccess(_reduce_data) = data;
-  CpvAccess(_reduce_parent) = CmiSpanTreeParent(CmiMyPe());
-  _reduce_destination = dest;
-  _reduce_pupFn = pupFn;
-  _reduce_mergeFn = mergeFn;
-  _reduce_deleteFn = deleteFn;
-  CpvAccess(_reduce_num_children) = CmiNumSpanTreeChildren(CmiMyPe());
-  if (CpvAccess(_reduce_received) == CpvAccess(_reduce_num_children)) CmiSendReduce(0);
-*/
 }
 
-void CmiListReduce(int npes, int *pes, void *msg, int size, CmiReduceMergeFn mergeFn) {
+void CmiReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
+  CmiReduction *red = CmiGetNextReduction(CmiNumSpanTreeChildren(CmiMyPe()));
+  CmiGlobalReduce(msg, size, mergeFn, red);
+}
+
+void CmiReduceStruct(void *data, CmiReducePupFn pupFn,
+                     CmiReduceMergeFn mergeFn, CmiHandler dest,
+                     CmiReduceDeleteFn deleteFn) {
   CmiReduction *red = CmiGetNextReduction(CmiNumSpanTreeChildren(CmiMyPe()));
+  CmiGlobalReduceStruct(data, pupFn, mergeFn, dest, deleteFn, red);
+}
+
+void CmiReduceID(void *msg, int size, CmiReduceMergeFn mergeFn, CmiUInt2 id) {
+  CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
+  CmiGlobalReduce(msg, size, mergeFn, red);
+}
+
+void CmiReduceStructID(void *data, CmiReducePupFn pupFn,
+                     CmiReduceMergeFn mergeFn, CmiHandler dest,
+                     CmiReduceDeleteFn deleteFn, CmiUInt2 id) {
+  CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
+  CmiGlobalReduceStruct(data, pupFn, mergeFn, dest, deleteFn, red);
+}
+
+void CmiListReduce(int npes, int *pes, void *msg, int size, CmiReduceMergeFn mergeFn, CmiUInt2 id) {
+  CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
   int myPos;
   CmiAssert(red->localContributed == 0);
   red->localContributed = 1;
@@ -2088,19 +2092,20 @@ void CmiListReduce(int npes, int *pes, void *msg, int size, CmiReduceMergeFn mer
   red->numChildren = npes - (myPos << 2) - 1;
   if (red->numChildren > 4) red->numChildren = 4;
   if (red->numChildren < 0) red->numChildren = 0;
-  red->parent = (myPos - 1) >> 2;
   if (myPos == 0) red->parent = -1;
+  else red->parent = pes[(myPos - 1) >> 2];
   red->ops.destination = (CmiHandler)CmiGetHandlerFunction(msg);
   red->ops.mergeFn = mergeFn;
   red->ops.pupFn = NULL;
+  CmiPrintf("[%d] CmiListReduce::local %hd parent=%d, numChildren=%d\n",CmiMyPe(),red->seqID,red->parent,red->numChildren);
   CmiSendReduce(red);
 }
 
 void CmiListReduceStruct(int npes, int *pes,
                      void *data, CmiReducePupFn pupFn,
                      CmiReduceMergeFn mergeFn, CmiHandler dest,
-                     CmiReduceDeleteFn deleteFn) {
-  CmiReduction *red = CmiGetNextReduction(CmiNumSpanTreeChildren(CmiMyPe()));
+                     CmiReduceDeleteFn deleteFn, CmiUInt2 id) {
+  CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
   int myPos;
   CmiAssert(red->localContributed == 0);
   red->localContributed = 1;
@@ -2122,18 +2127,18 @@ void CmiListReduceStruct(int npes, int *pes,
   CmiSendReduce(red);
 }
 
-void CmiGroupReduce(CmiGroup grp, void *msg, int size, CmiReduceMergeFn mergeFn) {
+void CmiGroupReduce(CmiGroup grp, void *msg, int size, CmiReduceMergeFn mergeFn, CmiUInt2 id) {
   int npes, *pes;
   CmiLookupGroup(grp, &npes, &pes);
-  CmiListReduce(npes, pes, msg, size, mergeFn);
+  CmiListReduce(npes, pes, msg, size, mergeFn, id);
 }
 
 void CmiGroupReduceStruct(CmiGroup grp, void *data, CmiReducePupFn pupFn,
                      CmiReduceMergeFn mergeFn, CmiHandler dest,
-                     CmiReduceDeleteFn deleteFn) {
+                     CmiReduceDeleteFn deleteFn, CmiUInt2 id) {
   int npes, *pes;
   CmiLookupGroup(grp, &npes, &pes);
-  CmiListReduceStruct(npes, pes, data, pupFn, mergeFn, dest, deleteFn);
+  CmiListReduceStruct(npes, pes, data, pupFn, mergeFn, dest, deleteFn, id);
 }
 
 void CmiNodeReduce(void *data, int size, CmiReduceMergeFn mergeFn, int redID, int numChildren, int parent) {
@@ -2193,6 +2198,25 @@ void CmiHandleReductionMessage(void *msg) {
   / *else CmiPrintf("CmiHandleReductionMessage(%d): %d - %d\n",CmiMyPe(),CpvAccess(_reduce_received),CpvAccess(_reduce_num_children));*/
 }
 
+void CmiReductionsInit() {
+  int i;
+  CpvInitialize(int, CmiReductionMessageHandler);
+  CpvAccess(CmiReductionMessageHandler) = CmiRegisterHandler((CmiHandler)CmiHandleReductionMessage);
+  CpvInitialize(int, CmiReductionDynamicRequestHandler);
+  CpvAccess(CmiReductionDynamicRequestHandler) = CmiRegisterHandler((CmiHandler)CmiReductionHandleDynamicRequest);
+  CpvInitialize(CmiUInt2, _reduce_seqID_global);
+  CpvAccess(_reduce_seqID_global) = CmiReductionID_globalOffset;
+  CpvInitialize(CmiUInt2, _reduce_seqID_request);
+  CpvAccess(_reduce_seqID_request) = CmiReductionID_requestOffset;
+  CpvInitialize(CmiUInt2, _reduce_seqID_dynamic);
+  CpvAccess(_reduce_seqID_dynamic) = CmiReductionID_dynamicOffset;
+  CpvInitialize(int, _reduce_info_size);
+  CpvAccess(_reduce_info_size) = 4;
+  CpvInitialize(CmiReduction*, _reduce_info);
+  CpvAccess(_reduce_info) = (CmiReduction**)malloc(16*sizeof(CmiReduction*));
+  for (i=0; i<16; ++i) CpvAccess(_reduce_info)[i] = NULL;
+}
+
 /*****************************************************************************
  *
  * Multicast groups
index 0d89d45a38169422f97cc2113fa17190f8bdaa48..7877417150a3004e0f8da75b3ebb90c31b345c32 100644 (file)
@@ -979,22 +979,24 @@ void CmiReduce(void *msg, int size, CmiReduceMergeFn mergeFn);
 void CmiReduceStruct(void *data, CmiReducePupFn pupFn,
                      CmiReduceMergeFn mergeFn, CmiHandler dest,
                      CmiReduceDeleteFn deleteFn);
-void CmiListReduce(int npes, int *pes, void *msg, int size, CmiReduceMergeFn mergeFn);
+void CmiListReduce(int npes, int *pes, void *msg, int size, CmiReduceMergeFn mergeFn, CmiUInt2 id);
 void CmiListReduceStruct(int npes, int *pes,
                      void *data, CmiReducePupFn pupFn,
                      CmiReduceMergeFn mergeFn, CmiHandler dest,
-                     CmiReduceDeleteFn deleteFn);
-void CmiGroupReduce(CmiGroup grp, void *msg, int size, CmiReduceMergeFn mergeFn);
+                     CmiReduceDeleteFn deleteFn, CmiUInt2 id);
+void CmiGroupReduce(CmiGroup grp, void *msg, int size, CmiReduceMergeFn mergeFn, CmiUInt2 id);
 void CmiGroupReduceStruct(CmiGroup grp, void *data, CmiReducePupFn pupFn,
                      CmiReduceMergeFn mergeFn, CmiHandler dest,
-                     CmiReduceDeleteFn deleteFn);
+                     CmiReduceDeleteFn deleteFn, CmiUInt2 id);
 void CmiNodeReduce(void *msg, int size, CmiReduceMergeFn mergeFn, int, int, int);
 void CmiNodeReduceStruct(void *data, CmiReducePupFn pupFn,
                          CmiReduceMergeFn mergeFn, CmiHandler dest,
                          CmiReduceDeleteFn deleteFn);
-void CmiHandleReductionMessage(void *msg);
 int CmiGetReductionHandler();
 CmiHandler CmiGetReductionDestination();
+CmiUInt2 CmiGetGlobalReduction();
+CmiUInt2 CmiGetDynamicReduction();
+void CmiGetDynamicReductionRemote(int handlerIdx, int pe, int dataSize, void *data);
 
 /* If the second parameter (the number of chunks to send) is negative, then
  * every message will be started aligned with 8 bytes, and a message header will
index 12088c5bb23121f643e5f25508193da56468feff..8080aef5edb7c95f81fb2740cd0d8ff262436e4f 100644 (file)
@@ -179,32 +179,6 @@ static void CpdDebugCallMemStat(char *msg) {
   CmiFree(msg);
 }
 
-static void * CpdDebugMerge(int *size,void *local,void **remote,int n) {
-  void *reply;
-#if CMK_CCS_AVAILABLE
-  char *ptr;
-  CcsImplHeader *hdr;
-  int total = *size;
-  int i;
-  for (i=0; i<n; ++i) {
-    hdr = (CcsImplHeader*)(((char*)remote[i])+CmiMsgHeaderSizeBytes);
-    total += ChMessageInt(hdr->len);
-  }
-  reply = CmiAlloc(total);
-  memcpy(reply, local, *size);
-  ((CcsImplHeader*)(((char*)reply)+CmiMsgHeaderSizeBytes))->len = ChMessageInt_new(total-CmiMsgHeaderSizeBytes-sizeof(CcsImplHeader));
-  CmiFree(local);
-  ptr = ((char*)reply)+*size;
-  for (i=0; i<n; ++i) {
-    int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiMsgHeaderSizeBytes))->len);
-    memcpy(ptr, ((char*)remote[i])+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader), len);
-    ptr += len;
-  }
-  *size = total;
-#endif
-  return reply;
-}
-
 static void CpdDebugHandler(char *msg)
 {
     char name[128];
@@ -233,14 +207,6 @@ static void CpdDebugHandler(char *msg)
       reply[1] = ChMessageInt_new(CpdIsFrozen() ? 0 : 1);
       CcsSendReply(2*sizeof(ChMessageInt_t), reply);
     }
-#endif
-#if 0
-    else if (strncmp(name, "setBreakPoint", strlen("setBreakPoint")) == 0){
-      CmiPrintf("setBreakPoint received\n");
-      temp = strstr(name, "#");
-      temp++;
-      setBreakPoints(temp);
-    }
 #endif
     else{
       CmiPrintf("bad debugger command:%s received,len=%ld\n",name,strlen(name));
@@ -346,7 +312,7 @@ void CpdInit(void)
   CpvAccess(debugQueue) = CdsFifo_Create();
 
   CcsRegisterHandler("ccs_debug", (CmiHandler)CpdDebugHandler);
-  CcsSetMergeFn("ccs_debug", CpdDebugMerge);
+  CcsSetMergeFn("ccs_debug", CcsMerge_concat);
 
   CcsRegisterHandler("ccs_debug_allocationTree", (CmiHandler)CpdDebugCallAllocationTree);
   CpvInitialize(int, CpdDebugCallAllocationTree_Index);