Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / conv-com / StreamingStrategy.C
index dfb566be1f5c465ce52584e48e7b361bb9e1d96b..001ea4d256e8fa8be2b9475e2039b484f1a83f56 100644 (file)
@@ -9,7 +9,7 @@
 #include "pup_cmialloc.h"
 
 /** The handler registerd to StreamingHandlerFn */
-CkpvDeclare(int, streaming_handler_id);
+CpvDeclare(int, streaming_handler_id);
 /**
  * Handler used to receive incoming combined messages, split them into the
  * individual messages and deliver all of them to the application.
@@ -17,7 +17,7 @@ CkpvDeclare(int, streaming_handler_id);
 void StreamingHandlerFn(void *msg) {
     StreamingMessage hdr;
     
-    ComlibPrintf("[%d] In streaming handler fn\n",CkMyPe());
+    ComlibPrintf("[%d] In streaming handler fn\n",CmiMyPe());
 
     PUP_fromCmiAllocMem fp(msg);
     fp | hdr;
@@ -26,7 +26,7 @@ void StreamingHandlerFn(void *msg) {
         char *msg;
         fp.pupCmiAllocBuf((void **)&msg);
         int size = SIZEFIELD(msg);
-        CmiSyncSendAndFree(CkMyPe(), size, msg);
+        CmiSyncSendAndFree(CmiMyPe(), size, msg);
     }
     CmiFree(msg);
     return;
@@ -66,14 +66,14 @@ void StreamingStrategy::insertMessage(MessageHolder *cmsg) {
     int size = cmsg->getSize(); // env->getTotalsize();
 
     if(size > msgSizeMax) {//AVOID COPYING
-        ComlibPrintf("[%d] StreamingStrategy::insertMessage: to %d direct send %d\n",CkMyPe(),pe,size);
+        ComlibPrintf("[%d] StreamingStrategy::insertMessage: to %d direct send %d\n",CmiMyPe(),pe,size);
         CmiSyncSendAndFree(pe, size, msg);
         delete cmsg;
         return;
     }
 
     ComlibPrintf("[%d] StreamingStrategy::insertMessage: buffering t=%g, n=%d, d=%d, s=%d\n",
-                CkMyPe(), PERIOD, bufferMax, pe, size);
+                CmiMyPe(), PERIOD, bufferMax, pe, size);
     
     streamingMsgBuf[pe].enq(cmsg);
     streamingMsgCount[pe]++;
@@ -82,7 +82,7 @@ void StreamingStrategy::insertMessage(MessageHolder *cmsg) {
 }
 
 void StreamingStrategy::doneInserting() {
-  ComlibPrintf("[%d] StreamingStrategy::doneInserting\n", CkMyPe());
+  ComlibPrintf("[%d] StreamingStrategy::doneInserting\n", CmiMyPe());
   //Do nothing
 
   periodicFlush();
@@ -96,12 +96,12 @@ void StreamingStrategy::flushPE(int pe) {
   if(streamingMsgCount[pe] == 0)
       return; //Nothing to do.
   
-  MessageHolder *cmsg, *toBeDeleted = NULL;
+  MessageHolder *cmsg;
   int size = 0;
  
 
     // Build a CmiMultipleSend list of messages to be sent off:
-    int msg_count=streamingMsgCount[pe], msg_pe=0;
+    int msg_count=streamingMsgCount[pe];
 
     // If we have a single message we don't want to copy it
     if(msg_count == 1) {
@@ -111,7 +111,7 @@ void StreamingStrategy::flushPE(int pe) {
         int size = cmsg->getSize();
         CmiSyncSendAndFree(pe, size, msg);
         ComlibPrintf("[%d] StreamingStrategy::flushPE: one message to %d\n", 
-                     CkMyPe(), pe);            
+                     CmiMyPe(), pe);            
         delete cmsg;
         streamingMsgCount[pe] = 0;
        bufSize[pe] = 0;
@@ -138,7 +138,7 @@ void StreamingStrategy::flushPE(int pe) {
     char *newmsg = (char *)CmiAlloc(sp.size());
     PUP_toCmiAllocMem mp(newmsg);
     
-    hdr.srcPE = CkMyPe();
+    hdr.srcPE = CmiMyPe();
     hdr.nmsgs = nmsgs;
     mp | hdr;
     
@@ -160,13 +160,13 @@ void StreamingStrategy::flushPE(int pe) {
     
     streamingMsgCount[pe] = 0;
     bufSize[pe] = 0;
-    CmiSetHandler(newmsg, CkpvAccess(streaming_handler_id));
+    CmiSetHandler(newmsg, CpvAccess(streaming_handler_id));
     CmiSyncSendAndFree(pe, sp.size(), newmsg); 
     //}
 }
 
 void StreamingStrategy::periodicFlush() {
-    for (int proc = 0; proc < CkNumPes(); proc++) 
+    for (int proc = 0; proc < CmiNumPes(); proc++) 
         flushPE(proc);
 }
 
@@ -185,7 +185,7 @@ void testHandler(void *msg) {
     s = (StreamingStrategy *) (mstruct->addr);
     s->periodicFlush();
 
-    CmiSyncSendAndFree(CkMyPe(), sizeof(MsgStruct), (char *)msg);
+    CmiSyncSendAndFree(CmiMyPe(), sizeof(MsgStruct), (char *)msg);
 }
 */
 
@@ -197,8 +197,8 @@ static void call_delayFlush(void *arg,double curWallTime) {
 }
 
 void StreamingStrategy::registerFlush(void) {
-    //CkPrintf("[%d] Will call function again every %d ms\n",CkMyPe(),PERIOD);
-    CcdCallFnAfterOnPE(call_delayFlush, (void *)this, PERIOD, CkMyPe());
+    //CkPrintf("[%d] Will call function again every %d ms\n",CmiMyPe(),PERIOD);
+    CcdCallFnAfterOnPE(call_delayFlush, (void *)this, PERIOD, CmiMyPe());
 }
 
 /// This routine is called via CcdCallOnCondition to flush all messages:
@@ -214,7 +214,7 @@ void StreamingStrategy::beginProcessing(int ignored) {
     //if(idleFlush)
     //  CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE,
     //                             (CcdVoidFn)call_idleFlush, 
-    //                             (void *)this, CkMyPe());
+    //                             (void *)this, CmiMyPe());
     
     streaming_handler_id = CkRegisterHandler(StreamingHandlerFn);
     
@@ -224,7 +224,7 @@ void StreamingStrategy::beginProcessing(int ignored) {
 //       msg->addr = this;
 //       CmiSetHandler(msg, handler);
       
-//       CmiSyncSendAndFree(CkMyPe(), sizeof(MsgStruct), (char *)msg);
+//       CmiSyncSendAndFree(CmiMyPe(), sizeof(MsgStruct), (char *)msg);
 
 }
 */
@@ -241,10 +241,10 @@ void StreamingStrategy::pup(PUP::er &p){
   //p | streaming_handler_id;
 
   if(p.isPacking() || p.isUnpacking()) {
-      streamingMsgBuf = new CkQ<MessageHolder *>[CkNumPes()];
-      streamingMsgCount = new int[CkNumPes()];
-      bufSize = new int[CkNumPes()];
-      for(int count = 0; count < CkNumPes(); count ++) {
+      streamingMsgBuf = new CkQ<MessageHolder *>[CmiNumPes()];
+      streamingMsgCount = new int[CmiNumPes()];
+      bufSize = new int[CmiNumPes()];
+      for(int count = 0; count < CmiNumPes(); count ++) {
        streamingMsgCount[count] = 0;
        bufSize[count] = 0;
       }
@@ -254,6 +254,6 @@ void StreamingStrategy::pup(PUP::er &p){
   if (p.isPacking() || p.isUnpacking()) registerFlush();
 }
 
-PUPable_def(StreamingStrategy);
+PUPable_def(StreamingStrategy)
 
 /*@}*/