Ccd callbacks now take the current wall clock time as their
[charm.git] / src / libs / ck-libs / commlib / StreamingStrategy.C
1 #include "StreamingStrategy.h"
2 #include "MsgPacker.h"
3
4 StreamingStrategy::StreamingStrategy(int periodMs,int bufferMax_)
5         : PERIOD(periodMs), bufferMax(bufferMax_)
6 {
7     streamingMsgBuf=NULL;
8     streamingMsgCount=NULL;
9 }
10
11 void StreamingStrategy::insertMessage(CharmMessageHolder *cmsg) {
12     int pe=cmsg->dest_proc;
13     streamingMsgBuf[pe].enq(cmsg);
14     streamingMsgCount[pe]++;
15     if (streamingMsgCount[pe]>bufferMax) flushPE(pe);
16 }
17
18 void StreamingStrategy::doneInserting(){
19     ComlibPrintf("[%d] In Streaming strategy::doneInserting\n", CkMyPe());
20     //Do nothing
21 }
22
23 /// Send off all accumulated messages for this PE:
24 void StreamingStrategy::flushPE(int pe) {
25     if(streamingMsgCount[pe] == 0)
26         return; //Nothing to do.
27
28     CharmMessageHolder *cmsg, *toBeDeleted = NULL;
29     
30     if(shortMsgPackingFlag){
31         MsgPacker mpack(streamingMsgBuf[pe], streamingMsgCount[pe]);
32         CombinedMessage *msg; 
33         int size;
34         mpack.getMessage(msg, size);
35         
36         CmiSyncSendAndFree(pe, size, (char *)msg);
37         streamingMsgCount[pe] = 0;
38     }
39     else {
40         // Build a CmiMultipleSend list of messages to be sent off:
41         int msg_count=streamingMsgCount[pe], msg_pe=0;
42         char **msgComps = new char*[msg_count];
43         int *sizes = new int[msg_count];
44         while (!streamingMsgBuf[pe].isEmpty()) {
45             cmsg = streamingMsgBuf[pe].deq();
46             char *msg = cmsg->getCharmMessage();
47             envelope *env = UsrToEnv(msg);
48             sizes[msg_pe] = env->getTotalsize();
49             msgComps[msg_pe] = (char *)env;
50             msg_pe++;
51             
52             // Link cmsg into the toBeDeleted list:
53             cmsg->next = toBeDeleted;
54             toBeDeleted = cmsg;            
55         }
56     
57         if (msg_count!=msg_pe) 
58             CkAbort("streamingMsgCount doesn't match streamingMsgBuf!\n");
59     
60         CmiMultipleSend(pe, msg_count, sizes, msgComps);
61         delete [] msgComps;
62         delete [] sizes;
63         streamingMsgCount[pe] = 0;
64                 
65         // Traverse the tobeDeleted list:
66         cmsg = toBeDeleted;
67         while (toBeDeleted) {
68             toBeDeleted = toBeDeleted->next;
69             CkFreeMsg(cmsg->getCharmMessage());
70             delete cmsg;
71             cmsg = toBeDeleted;            
72         }     
73     }
74 }
75
76 void StreamingStrategy::periodicFlush(){
77     for (int pe=0; pe<CkNumPes(); pe++) flushPE(pe);
78 }
79
80 /// This routine is called via CcdCallFnAfter to flush all messages:
81 static void call_delayFlush(void *arg,double curWallTime){
82     StreamingStrategy *s=(StreamingStrategy *)arg;
83     s->periodicFlush();
84     s->registerFlush(); //Set ourselves up to be called again
85 }
86
87 void StreamingStrategy::registerFlush(void) {
88     // CkPrintf("[%d] Will call function again every %d ms\n",CkMyPe(),PERIOD);
89     CcdCallFnAfter((CcdVoidFn)call_delayFlush, (void *)this, PERIOD);
90 }
91
92 /// This routine is called via CcdCallOnCondition to flush all messages:
93 static void call_idleFlush(void *arg,double curWallTime){
94     StreamingStrategy *s=(StreamingStrategy *)arg;
95     s->periodicFlush();
96 }
97
98 // When we're finally ready to go, register for timeout and idle flush.
99 void StreamingStrategy::beginProcessing(int ignored) {
100     registerFlush();
101     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)call_idleFlush,
102                            (void *)this);
103 }
104
105 void StreamingStrategy::pup(PUP::er &p){
106     p | PERIOD;
107     p | bufferMax;
108     p | shortMsgPackingFlag;
109
110     if(p.isUnpacking()) {
111         streamingMsgBuf = new CkQ<CharmMessageHolder *>[CkNumPes()];
112         streamingMsgCount = new int[CkNumPes()];
113         for(int count = 0; count < CkNumPes(); count ++)
114             streamingMsgCount[count] = 0;
115     }
116 }
117
118 //PUPable_def(StreamingStrategy);