f2c8c1f9108b87399da01820934ad61c37aff369
[charm.git] / src / ck-com / StreamingStrategy.C
1 #include "StreamingStrategy.h"
2 #include "MsgPacker.h"
3
4 StreamingStrategy::StreamingStrategy(int periodMs,int bufferMax_)
5     : PERIOD(periodMs), bufferMax(bufferMax_), CharmStrategy()
6 {
7     streamingMsgBuf=NULL;
8     streamingMsgCount=NULL;
9     shortMsgPackingFlag = CmiFalse;
10     idleFlush = CmiTrue;
11 }
12
13 void StreamingStrategy::insertMessage(CharmMessageHolder *cmsg) {
14
15     int pe=cmsg->dest_proc;
16     char *msg = cmsg->getCharmMessage();
17     envelope *env = UsrToEnv(msg);
18     int size = env->getTotalsize();
19
20     if(size > MAX_STREAMING_MESSAGE_SIZE) {//AVOID COPYING
21       ComlibPrintf("StreamingStrategy::insertMessage: direct send\n");
22       CmiSyncSendAndFree(pe, size, (char *)env);
23       delete cmsg;
24       return;
25     }
26
27     ComlibPrintf("StreamingStrategy::insertMessage: buffering t=%d, n=%d, s=%d\n",  
28                  PERIOD, bufferMax, size);
29     
30     streamingMsgBuf[pe].enq(cmsg);
31     streamingMsgCount[pe]++;
32     if (streamingMsgCount[pe] > bufferMax) flushPE(pe);
33 }
34
35 void StreamingStrategy::doneInserting() {
36   ComlibPrintf("[%d] In Streaming strategy::doneInserting\n", CkMyPe());
37   //Do nothing
38 }
39
40 /// Send off all accumulated messages for this PE:
41 void StreamingStrategy::flushPE(int pe) {
42   if(streamingMsgCount[pe] == 0)
43     return; //Nothing to do.
44   
45   CharmMessageHolder *cmsg, *toBeDeleted = NULL;
46   if(shortMsgPackingFlag){
47     MsgPacker mpack(streamingMsgBuf[pe], streamingMsgCount[pe]);
48     CombinedMessage *msg; 
49     int size;
50     mpack.getMessage(msg, size);
51     ComlibPrintf("[%d] StreamingStrategy::flushPE: packed %d short messages to %d\n", 
52                  CkMyPe(), streamingMsgCount[pe], pe); 
53     CmiSyncSendAndFree(pe, size, (char *)msg);
54     streamingMsgCount[pe] = 0;
55   }
56   else {
57     // Build a CmiMultipleSend list of messages to be sent off:
58     int msg_count=streamingMsgCount[pe], msg_pe=0;
59     if(msg_count == 1) {
60       cmsg = streamingMsgBuf[pe].deq();
61       char *msg = cmsg->getCharmMessage();
62       envelope *env = UsrToEnv(msg);
63       int size = env->getTotalsize();
64       CmiSyncSendAndFree(pe, size, (char *)env);
65       ComlibPrintf("[%d] StreamingStrategy::flushPE: one message to %d\n", 
66                    CkMyPe(), pe);            
67       delete cmsg;
68       streamingMsgCount[pe] = 0;
69       return;
70     }
71     char **msgComps = new char*[msg_count];
72     int *sizes = new int[msg_count];
73     ComlibPrintf("[%d] StreamingStrategy::flushPE: %d messages to %d\n", 
74                  CkMyPe(), msg_count, pe);            
75     while (!streamingMsgBuf[pe].isEmpty()) {
76       cmsg = streamingMsgBuf[pe].deq();
77       char *msg = cmsg->getCharmMessage();
78       envelope *env = UsrToEnv(msg);
79       sizes[msg_pe] = env->getTotalsize();
80       msgComps[msg_pe] = (char *)env;
81       msg_pe++;
82       
83       // Link cmsg into the toBeDeleted list:
84       cmsg->next = toBeDeleted;
85       toBeDeleted = cmsg;            
86     }
87     
88     if (msg_count!=msg_pe) 
89       CkAbort("streamingMsgCount doesn't match streamingMsgBuf!\n");
90
91     ComlibPrintf("--> Sending %d Messages to PE %d\n", msg_count, pe);
92
93     CmiMultipleSend(pe, msg_count, sizes, msgComps);
94     delete [] msgComps;
95     delete [] sizes;
96     streamingMsgCount[pe] = 0;
97     
98     // Traverse the tobeDeleted list:
99     cmsg = toBeDeleted;
100     while (toBeDeleted) {
101         toBeDeleted = (CharmMessageHolder *)toBeDeleted->next;
102         CkFreeMsg(cmsg->getCharmMessage());
103         delete cmsg;
104         cmsg = toBeDeleted;            
105     }     
106   }
107 }
108
109 void StreamingStrategy::periodicFlush() {
110   for (int pe=0; pe<CkNumPes(); pe++) flushPE(pe);
111 }
112
113 /// This routine is called via CcdCallFnAfter to flush all messages:
114 static void call_delayFlush(void *arg,double curWallTime) {
115   StreamingStrategy *s=(StreamingStrategy *)arg;
116   s->periodicFlush();
117   s->registerFlush(); //Set ourselves up to be called again
118 }
119
120 void StreamingStrategy::registerFlush(void) {
121   // CkPrintf("[%d] Will call function again every %d ms\n",CkMyPe(),PERIOD);
122   CcdCallFnAfterOnPE((CcdVoidFn)call_delayFlush, (void *)this, PERIOD, CkMyPe());
123 }
124
125 /// This routine is called via CcdCallOnCondition to flush all messages:
126 static void call_idleFlush(void *arg,double curWallTime) {
127   StreamingStrategy *s=(StreamingStrategy *)arg;
128   s->periodicFlush();
129 }
130
131 // When we're finally ready to go, register for timeout and idle flush.
132 void StreamingStrategy::beginProcessing(int ignored) {
133   registerFlush();
134   if(idleFlush)
135     CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE,
136                                (CcdVoidFn)call_idleFlush, 
137                                (void *)this, CkMyPe());
138 }
139
140 void StreamingStrategy::pup(PUP::er &p){
141
142   CharmStrategy::pup(p);
143   p | PERIOD;
144   p | bufferMax;
145   p | shortMsgPackingFlag;
146   p | idleFlush;
147
148   if(p.isUnpacking()) {
149     streamingMsgBuf = new CkQ<CharmMessageHolder *>[CkNumPes()];
150     streamingMsgCount = new int[CkNumPes()];
151     for(int count = 0; count < CkNumPes(); count ++)
152       streamingMsgCount[count] = 0;
153   }
154 }
155
156 //PUPable_def(StreamingStrategy);