comlib: Fix non-initialized cookie that breaks multicast strategies
[charm.git] / src / ck-com / ComlibManager.C
1 /**
2    @addtogroup CharmComlib
3    @{
4    @file
5    Implementation of the functions in ComlibManager.h and handler for message
6    transportation.
7 */
8
9 #include "ComlibManager.h"
10 #include "comlib.h"
11 #include "ck.h"
12 #include "envelope.h"
13
14
15 // We only want to print debug information for a single strategy. Otherwise we'll get incredibly confused
16 #undef ComlibManagerPrintf
17 //#define ComlibManagerPrintf  if(instid==1)ComlibPrintf
18 #define ComlibManagerPrintf  ComlibPrintf
19
20 #define getmax(a,b) ((a)>(b)?(a):(b))
21
22 CkpvExtern(int, RecvdummyHandle);
23  
24 CkpvDeclare(CkGroupID, cmgrID);
25
26 /***************************************************************************
27  * Handlers section:
28  *
29  * all the handlers used by the ComlibManager to coordinate the work, and
30  * propagate the messages from one processor to another
31  ***************************************************************************/
32
33 //Handler to receive array messages
34 CkpvDeclare(int, RecvmsgHandle);
35
36 void recv_array_msg(void *msg){
37
38   //    ComlibPrintf("%d:In recv_msg\n", CkMyPe());
39
40         if(msg == NULL)
41                 return;
42
43         register envelope* env = (envelope *)msg;
44         env->setUsed(0);
45         env->getsetArrayHops()=1; 
46         CkUnpackMessage(&env);
47
48         int srcPe = env->getSrcPe();
49         int sid = ((CmiMsgHeaderExt *) env)->stratid;
50
51         //      ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), sid, env->getTotalsize(), srcPe);
52
53         RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
54
55         CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
56         
57         a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
58
59         //      ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
60         return;
61 }
62
63
64
65 /**
66    A debugging routine that will periodically print out the status of the message queues. 
67    The Definition is at bottom of this file.
68  */
69 static void periodicDebugPrintStatus(void* ptr, double currWallTime);
70
71
72
73
74
75 /***************************************************************************
76  * Initialization section:
77  *
78  * Routines used by Comlib to initialize itself and all the strategies on all
79  * the processors. Used only at the beginning of the program.
80  ***************************************************************************/
81
82 // // initialized at startup before the main chare main methods are called:
83 // void initCharmComlibManager(){
84 //   CkPrintf("[%d] initCharmComlibManager()\n", CkMyPe());
85 //   fflush(stdout);
86 // }
87
88
89
90 ComlibManager::ComlibManager(){
91         init();
92 }
93
94 void ComlibManager::init(){
95
96   //   CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, (void*)this, 4000, CkMyPe());
97
98
99   if(CkNumPes() == 1 ){
100     ComlibPrintf("Doing nothing in ComlibManager::init() because we are running on 1 pe.\n");
101   } else {
102
103         if (CkMyRank() == 0) {
104                 PUPable_reg(CharmMessageHolder);
105         }
106
107         numStatsReceived = 0;
108         curComlibController = 0;
109         clibIteration = 0;
110
111         CkpvInitialize(comRectHashType *, com_rect_ptr); 
112         CkpvAccess(com_rect_ptr)= new comRectHashType;
113
114         CkpvInitialize(int, RecvmsgHandle);
115         CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
116
117         bcast_pelist = new int [CkNumPes()];
118         _MEMCHECK(bcast_pelist);
119         for(int brcount = 0; brcount < CkNumPes(); brcount++)
120                 bcast_pelist[brcount] = brcount;
121
122         section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
123
124         CkpvInitialize(CkGroupID, cmgrID);
125         CkpvAccess(cmgrID) = thisgroup;
126
127         dummyArrayIndex.nInts = 0;
128
129         CkAssert(CkpvInitialized(conv_com_object));
130         converseManager = &CkpvAccess(conv_com_object);
131
132         setupComplete = 0;
133
134         CkpvInitialize(int, migrationDoneHandlerID);
135         CkpvAccess(migrationDoneHandlerID) = 
136                 CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
137
138         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
139         cgproxy[curComlibController].barrier();
140   }
141 }
142
143 //First barrier makes sure that the communication library group 
144 //has been created on all processors
145 void ComlibManager::barrier(){
146         static int bcount = 0;
147         ComlibPrintf("barrier %d\n", bcount);
148         if(CkMyPe() == 0) {
149                 bcount ++;
150                 if(bcount == CkNumPes()){
151                         bcount = 0;
152                         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
153                         cgproxy.resumeFromSetupBarrier();
154                 }
155         }
156 }
157
158
159
160 /**
161    Due to the possibility of race conditions in the initialization of charm, this
162    barrier prevents comlib from being activated before all group branches are created.
163    This function completes the initialization of the charm layer of comlib.
164
165    In this function we also call ComlibDoneCreating for the user (basically
166    triggering the broadcast of the strategies created in Main::Main. Here the
167    Main::Main has for sure executed, otherwise we will not have received
168    confirmation by all other processors.
169    
170    
171  */
172 void ComlibManager::resumeFromSetupBarrier(){
173         ComlibPrintf("[%d] resumeFromSetupBarrier Charm group ComlibManager setup finished\n", CkMyPe());
174
175         setupComplete = 1;
176         ComlibDoneCreating();
177         ComlibPrintf("[%d] resumeFromSetupBarrier calling ComlibDoneCreating to tell converse layer strategies to set themselves up\n", CkMyPe());
178         sendBufferedMessagesAllStrategies();
179
180 }
181
182 /***************************************************************************
183  Determine whether the delegated messages should be buffered until the 
184  strategy has recovered from any error conditions and startup. Once the 
185  buffers can be flushed, ComlibManager::sendBufferedMessages() will be called
186 ***************************************************************************/
187 bool ComlibManager::shouldBufferMessagesNow(int instid){
188   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
189   return (!setupComplete) || myEntry->getErrorMode() == ERROR_MODE || myEntry->getErrorMode() == CONFIRM_MODE || myEntry->bufferOutgoing;
190 }
191
192
193 /***************************************************************************
194  Calls ComlibManager::sendBufferedMessages for each strategy.
195 ***************************************************************************/
196 void ComlibManager::sendBufferedMessagesAllStrategies(){
197   int nstrats = converseManager->getNumStrats();
198   for(int i=0;i<nstrats;i++){
199     sendBufferedMessages(i);
200   }
201 }
202
203
204 /***************************************************************************
205    Send all the buffered messages once startup has completed, and we have 
206    recovered from all errors.
207 ***************************************************************************/
208 void ComlibManager::sendBufferedMessages(int instid, int step){
209   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
210   
211   if(shouldBufferMessagesNow(instid)){
212     ComlibPrintf("[%d] sendBufferedMessages is not flushing buffered messages for strategy %d because shouldBufferMessagesNow()==true step %d\n", CkMyPe(), instid, step);  
213   } else if(delayMessageSendBuffer[instid].size() == 0){
214     ComlibPrintf("[%d] sendBufferedMessages: no bufferedmessages to send for strategy %d step %d\n", CkMyPe(), instid, step);  
215   } else{
216     ComlibPrintf("[%d] sendBufferedMessages Sending %d buffered messages for instid=%d step %d\n", CkMyPe(), delayMessageSendBuffer[instid].size(), instid, step);
217  
218     for (std::set<CharmMessageHolder*>::iterator iter = delayMessageSendBuffer[instid].begin(); iter != delayMessageSendBuffer[instid].end(); ++iter) {
219       CharmMessageHolder* cmsg = *iter;
220           
221       switch(cmsg->type){
222             
223       case CMH_ARRAYSEND:
224         CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
225         CkpvAccess(conv_com_object).doneInserting(instid);
226         break;
227             
228       case CMH_GROUPSEND:
229         CkAbort("CMH_GROUPSEND unimplemented");
230         break;
231             
232       case CMH_ARRAYBROADCAST:
233       case CMH_ARRAYSECTIONSEND: 
234       case CMH_GROUPBROADCAST:
235         // Multicast/broadcast to an array or a section:
236         cmsg->sec_id = cmsg->copy_of_sec_id;
237         CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
238         CkpvAccess(conv_com_object).doneInserting(instid);
239         break;
240             
241       default:
242         CkAbort("Unknown cmsg->type was found in buffer of delayed messages\n");
243       }
244           
245     }
246
247     delayMessageSendBuffer[instid].clear();
248   }
249
250 }
251
252
253
254 /***************************************************************************
255  * Routine for bracketed strategies, for detecting count errors after 
256  * objects migrate.
257  * 
258  * This function must only be called after all messages from the previous 
259  * iteration have been delivered. This is the application's responsibility to 
260  * ensure. The iteration values provided must increase monotonically, 
261  * and must be greater than STARTUP_ITERATION.
262  *
263  ***************************************************************************/
264 /// Called when the array/group element starts sending messages
265 void ComlibManager::beginIteration(int instid, int iteration){
266         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
267                             
268         ComlibManagerPrintf("[%d] beginIteration iter=%d lastKnownIteration=%d  %s %s %s\n", CkMyPe(), iteration, myEntry->lastKnownIteration, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
269
270         CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
271         if(CkMyPe()==0)
272           CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER ||  myEntry->getErrorModeServer() == ERROR_MODE_SERVER);
273
274         if(iteration > myEntry->lastKnownIteration){
275               ComlibManagerPrintf("[%d] beginIteration Starting Next Iteration ( # %d )\n", CkMyPe(), iteration);
276
277               myEntry->lastKnownIteration = iteration;
278               myEntry->nBeginItr = 1; // we are the first time to be called this iteration
279               myEntry->nEndItr = 0;
280               myEntry->nProcSync = 0;
281               myEntry->totalEndCounted = 0;
282               myEntry->nEndSaved = 0;
283                       
284         } else if(iteration == myEntry->lastKnownIteration){
285                 ComlibManagerPrintf("[%d] beginIteration continuing iteration # %d\n", CkMyPe(), iteration);
286                 myEntry->nBeginItr++;
287         } else {
288           CkPrintf("[%d] ERROR: ComlibManager::beginIteration iteration=%d < myEntry->lastKnownIteration=%d", iteration, myEntry->lastKnownIteration);
289           CkAbort("[%d] ERROR: ComlibManager::beginIteration iteration < myEntry->lastKnownIteration");
290         }
291         
292         
293         // We need to check for error conditions here as well as EndIteration.
294         // This will ensure that if we are the processor that detects this error, 
295         // we won't deliver at least this message until the strategy is fixed
296         if (myEntry->nBeginItr > myEntry->numElements) {
297           ComlibManagerPrintf("[%d] beginIteration BUFFERING OUTGOING because nBeginItr=%d > numElements=%d\n",CkMyPe(), myEntry->nBeginItr, myEntry->numElements);
298           myEntry->bufferOutgoing = 1;
299         }
300         
301 }
302
303
304 /** Called by user program when each element has finished sending its messages.
305
306     If no errors are detected, ConvComlibManager::doneInserting() is called on the 
307     underlying converse strategy. If an error is detected, then ConvComlibManager::doneInserting
308     is NOT called. This likely causes the the underlying converse strategy to buffer the 
309     messages until we recover from the error mode, although we have buffered at least some of
310     the messages, so the user program cannot run ahead and start the next iteration.
311
312 */
313
314 void ComlibManager::endIteration(int instid, int step){ 
315         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
316         
317         // migration is not allowed between begin and end of iteration, 
318         // so each element must call end iteration on the same processor 
319         // as it called begin iteration
320         CkAssert(myEntry->nEndItr <= myEntry->nBeginItr);
321         CkAssert(step == myEntry->lastKnownIteration);
322
323         CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
324         if(CkMyPe()==0)
325           CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER ||  myEntry->getErrorModeServer() == ERROR_MODE_SERVER);
326
327         myEntry->nEndItr++;
328         
329         ComlibManagerPrintf("[%d] endIteration called\n",CkMyPe());
330         
331         
332         if (myEntry->bufferOutgoing) {
333           // If migration was detected in ComlibManager::beginIteration and hence messages have been buffered:
334           CkAssert(delayMessageSendBuffer[instid].size() > 0);
335           CProxy_ComlibManager myProxy(thisgroup);
336           myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
337         } 
338         else if(myEntry->nEndItr == myEntry->numElements) {
339           // If all the objects have called beginIteration and endIteration and no errors were detected
340           CkAssert(converseManager->isReady(instid));
341           converseManager->doneInserting(instid);
342         }
343         
344 }
345
346
347 /** Start recovery of errors. 
348     This entry method calls itself repeatedly if the underlying 
349     converse strategy is not yet ready.
350
351     If the PE is already in error mode, then only the difference 
352     in the counts will be contributed.
353 */
354 void ComlibManager::bracketedStartErrorRecoveryProcess(int instid, int step){  
355   CProxy_ComlibManager myProxy(thisgroup);
356   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);     
357   CkAssert(step >= myEntry->lastKnownIteration);
358
359
360   if(converseManager->isReady(instid)){
361     ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess(instid=%d step=%d) %s %s %s\n", CkMyPe(), instid, step, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
362
363     CkAssert(myEntry->strategy != NULL);
364     CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
365           
366     if (!myEntry->strategy->isBracketed()) {
367       CkPrintf("[%d] endIteration called unecessarily for a non-bracketed strategy\n", CkMyPe());
368       return;
369     }
370        
371     if (myEntry->getErrorMode() == NORMAL_MODE) {
372       ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess()\n", CkMyPe());
373       myEntry->nEndSaved = myEntry->nEndItr;
374       myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
375       myEntry->setErrorMode(ERROR_MODE);
376       bracketedStartDiscovery(instid);
377     } else {
378       // Send the updated count
379       int update = myEntry->nEndItr - myEntry->nEndSaved;
380       if (update > 0) {
381         //      ComlibManagerPrintf("bracketedStartErrorRecoveryProcess sending update to bracketedReceiveCount\n");
382         CProxy_ComlibManager myProxy(thisgroup);
383         myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
384         myEntry->nEndSaved = myEntry->nEndItr;
385       }
386       
387     }
388   } else {
389     ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess() REENQUEUE\n", CkMyPe() );
390     // Re-enqueue myself because we can't start error recovery process until converse strategy is ready
391     myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
392   }
393 }
394
395
396 /// Invoked on all processors to inform that an error has been detected.
397 void ComlibManager::bracketedErrorDetected(int instid, int step) {
398         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
399         
400         bracketedCatchUpPE(instid,step);
401         CkAssert(step == myEntry->lastKnownIteration);
402         CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
403
404         ComlibManagerPrintf("[%d] bracketedErrorDetected()\n", CkMyPe());
405
406         if (myEntry->getErrorMode() == NORMAL_MODE) {
407           // save the value we are sending to bracketedReceiveCount
408           myEntry->nEndSaved = myEntry->nEndItr; // save the value we are sending to bracketedReceiveCount
409           CProxy_ComlibManager myProxy(thisgroup);
410           myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
411           bracketedStartDiscovery(instid);
412           myEntry->setErrorMode(ERROR_MODE);
413
414         } else { // ERROR_MODE
415           // If we have an update count, send it
416           int update = myEntry->nEndItr - myEntry->nEndSaved;
417           ComlibManagerPrintf("bracketedErrorDetected update=%d\n", update);
418           if (update > 0) {
419             ComlibManagerPrintf("bracketedErrorDetected sending update to bracketedReceiveCount\n");
420             CProxy_ComlibManager myProxy(thisgroup);
421             myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
422             myEntry->nEndSaved = myEntry->nEndItr;
423           }
424         }
425         
426 }
427
428 /// Invoked on all processors. After processor 0 has a count match, it sends out
429 /// a broadcast on this entry method to get a confirmation from all others.
430 void ComlibManager::bracketedConfirmCount(int instid, int step) {
431         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
432         CkAssert(myEntry->getErrorMode() == ERROR_MODE);
433         myEntry->setErrorMode(CONFIRM_MODE);
434         CProxy_ComlibManager myProxy(thisgroup);
435         ComlibManagerPrintf("[%d] bracketedConfirmCount\n", CkMyPe());
436         myProxy[0].bracketedCountConfirmed(instid, myEntry->nEndSaved, step);
437 }
438
439 /// Invoked on processor 0 upon a request to confirm the count. If the count is
440 /// correct a "NewPeList" is sent out, otherwise the error mode is returned with
441 /// "ErrorDetected"
442 void ComlibManager::bracketedCountConfirmed(int instid, int count, int step) {
443         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
444         CkAssert(CkMyPe() == 0);
445         // Advance PE0 to current step if we had no local objects
446         bracketedCatchUpPE(instid, step);
447         CkAssert(myEntry->getErrorModeServer() == CONFIRM_MODE_SERVER);
448         CkAssert(step == myEntry->lastKnownIteration);
449
450         myEntry->total += count;
451         
452         ComlibManagerPrintf("[%d] bracketedCountConfirmed\n", CkMyPe());
453         
454         if (++myEntry->peConfirmCounter == CkNumPes()) {
455                 myEntry->peConfirmCounter = 0;
456
457                 CkAssert(myEntry->total == myEntry->totalEndCounted);
458                   
459                 CProxy_ComlibManager(thisgroup).bracketedReceiveNewCount(instid, step);
460                 myEntry->setErrorModeServer(ERROR_FIXED_MODE_SERVER);
461                   
462                 myEntry->total = 0;
463         }
464
465 }
466
467
468
469 /** Update the state for a PE that was likely lacking any local objects.
470     If no local objects exist, noone will call begin iteration, and hence,
471     the lastKnownIteration value will be old. We need to advance to the 
472     current iteration before we can proceed.
473 */
474 void ComlibManager::bracketedCatchUpPE(int instid, int step){
475   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
476   CkAssert(step >= myEntry->lastKnownIteration);
477   if(step > myEntry->lastKnownIteration){
478     
479     myEntry->total = 0;
480     myEntry->lastKnownIteration = step;
481     myEntry->nBeginItr = 0;
482     myEntry->nEndItr = 0;
483     myEntry->nProcSync = 0;
484     myEntry->totalEndCounted = 0;
485     myEntry->nEndSaved = 0;
486     myEntry->setErrorMode(NORMAL_MODE);
487     myEntry->setDiscoveryMode(NORMAL_DISCOVERY_MODE);
488
489     if(CkMyPe()==0){
490       CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER || myEntry->getErrorModeServer() == ERROR_FIXED_MODE_SERVER);
491       myEntry->setErrorModeServer(NORMAL_MODE_SERVER);
492     }    
493     
494   }
495 }
496
497 /// Invoked on processor 0 when a processor sends a count of how many elements
498 /// have already deposited. This number is incremental, and it refers to the
499 /// previous one sent by that processor. Processor 0 stores temporarily all the
500 /// numbers it receives. When a match happens, processor 0 switches to "confirm"
501 /// mode and send out a request for confirmation to all other processors.
502 /// The final parameter, "step", is used so that if  no objects exist on a processor,
503 /// then the counts sent by the processor will be tagged with an old timestep, and can 
504 /// safely be ignored. If no objects are on a PE, then beginIteration will never
505 /// be called there.
506 void ComlibManager::bracketedReceiveCount(int instid, int pe, int count, int isFirst, int step) {
507         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
508         ComlibPrintf("[%d] bracketedReceiveCount begins step=%d lastKnownIteration=%d totalEndCounted=%d count=%d\n", CkMyPe(), step, myEntry->lastKnownIteration, myEntry->totalEndCounted, count);
509         CkAssert(step >= myEntry->lastKnownIteration);
510
511         CkAssert(CkMyPe() == 0);
512         CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER || myEntry->getErrorModeServer() == ERROR_MODE_SERVER );
513         
514         
515         // Advance PE0 to current step if we had no local objects
516         bracketedCatchUpPE(instid, step);
517
518         // Encountering a message from an old step
519         CkAssert(step == myEntry->lastKnownIteration);
520
521         myEntry->totalEndCounted += count;
522
523
524         
525         myEntry->nProcSync += isFirst; // isFirst is 1 the first time a processor send a message,
526         CkAssert(myEntry->nProcSync <= CkNumPes());
527         
528         if (myEntry->getErrorModeServer() == NORMAL_MODE_SERVER) { 
529                 // first time this is called
530                 CkAssert(myEntry->nProcSync == 1);
531                 CProxy_ComlibManager(thisgroup).bracketedErrorDetected(instid, step);
532                 myEntry->setErrorModeServer(ERROR_MODE_SERVER);
533                 ComlibManagerPrintf("[%d] bracketedReceiveCount first time\n", CkMyPe());
534         }
535
536
537
538         CharmStrategy* s = dynamic_cast<CharmStrategy*>(myEntry->strategy);
539         ComlibArrayInfo ainfo = s->ainfo;
540         int totalsrc =  ainfo.getTotalSrc() ;
541           
542         if(myEntry->nProcSync == CkNumPes() && myEntry->totalEndCounted == totalsrc) {
543           // ok, we received notifications from all PEs and all objects have called endIteration
544           myEntry->setErrorModeServer(CONFIRM_MODE_SERVER); 
545           ComlibManagerPrintf("[%d] bracketedReceiveCount errorModeServer is now CONFIRM_MODE calling bracketedConfirmCount totalsrc=%d\n", CkMyPe(), (int)totalsrc);
546           CProxy_ComlibManager(thisgroup).bracketedConfirmCount(instid, step);
547                         
548         }
549                 
550 }
551
552
553 /// Invoked on all processors. After the count has been checked and it matches
554 /// the number of emements involved in the bracketed operation, processor 0
555 /// sends a broadcast to acknowledge the success. 
556 /// The strategy is disabled until a new Pe list is received.
557 void ComlibManager::bracketedReceiveNewCount(int instid, int step) {
558         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
559         CkAssert(myEntry->getErrorMode() == CONFIRM_MODE);
560
561         myEntry->setErrorMode(ERROR_FIXED_MODE);
562         
563         myEntry->nEndItr -= myEntry->nEndSaved;
564         myEntry->nBeginItr -= myEntry->nEndSaved;
565
566         myEntry->nEndSaved = 0;
567
568         bracketedFinalBarrier(instid, step);
569 }
570
571
572
573
574 /// Invoked on all processors. When all processors have discovered all elements
575 /// involved in the operation, processor 0 uses this method to broadcast the
576 /// entire processor list to all. Currently the discovery process HAS to go
577 /// together with the counting process, otherwise the strategy is updated at an
578 /// undetermined state. In future it may be useful to implement the discovery
579 /// process alone.
580 void ComlibManager::bracketedReceiveNewPeList(int instid, int step, int *count) {
581         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
582         CkAssert(myEntry->getDiscoveryMode() == STARTED_DISCOVERY_MODE);
583         myEntry->setDiscoveryMode(FINISHED_DISCOVERY_MODE);
584
585         myEntry->strategy->bracketedUpdatePeKnowledge(count);
586         
587         ComlibManagerPrintf("[%d] bracketedReceiveNewPeList Updating numElements\n", CkMyPe());
588         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
589         CkAssert((unsigned long)myInfo > 0x1000);
590         myEntry->numElements = myInfo->getLocalSrc();
591         
592         ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n",CkMyPe(), instid, delayMessageSendBuffer[instid].size() );
593         ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n", CkMyPe(), instid, delayMessageSendBuffer[instid].size());
594                 
595         bracketedFinalBarrier(instid, step);
596 }
597
598
599 /** Start a barrier phase where all processes will enter it once both 
600     the counting and discovery processes complete.
601
602     ComlibManager::bracketedReceiveNewPeList calls this method. 
603     ComlibManager::bracketedReceiveNewPeList is called as an array broadcast to thisProxy, 
604     so every PE will call this method.
605  */
606 void ComlibManager::bracketedFinalBarrier(int instid, int step) {
607   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
608   ComlibManagerPrintf("[%d] ComlibManager::bracketedFinalBarrier %s %s %s\n", CkMyPe(), myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
609
610   
611   if (myEntry->getDiscoveryMode() == FINISHED_DISCOVERY_MODE &&  myEntry->getErrorMode() == ERROR_FIXED_MODE) {       
612     myEntry->setDiscoveryMode(NORMAL_DISCOVERY_MODE);
613     myEntry->setErrorMode(NORMAL_MODE);
614
615     // Update destination and source element lists for use in the next step
616     ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
617     myInfo->useNewSourceAndDestinations();      
618     
619     CProxy_ComlibManager myProxy(thisgroup);
620     myProxy[0].bracketedReleaseCount(instid, step);
621   }
622 }
623
624
625 /** 
626     Once all PEs report here, we will allow them to release the buffered messages from this iteration
627  */
628 void ComlibManager::bracketedReleaseCount(int instid, int step) {
629
630     StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
631     ComlibPrintf("[%d] ComlibManager::bracketedReleaseCount myEntry->numBufferReleaseReady was %d\n", CkMyPe(), myEntry->numBufferReleaseReady);
632
633     CkAssert(CkMyPe() == 0);
634     CkAssert(myEntry->getErrorModeServer() == ERROR_FIXED_MODE_SERVER);
635     
636     myEntry->numBufferReleaseReady++;
637     if(myEntry->numBufferReleaseReady == CkNumPes()) {
638       myEntry->setErrorModeServer(NORMAL_MODE_SERVER);
639       CProxy_ComlibManager(thisgroup).bracketedReleaseBufferedMessages(instid, step);
640       myEntry->numBufferReleaseReady = 0;
641     }
642 }
643
644 /** 
645     Release any buffered messages.
646  */
647 void ComlibManager::bracketedReleaseBufferedMessages(int instid, int step) {
648   ComlibManagerPrintf("[%d] ComlibManager::bracketedReleaseBufferedMessages step=%d\n", CkMyPe(), step);
649   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
650
651   CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER);
652   CkAssert(myEntry->getErrorMode() == NORMAL_MODE);
653   CkAssert(myEntry->getDiscoveryMode() == NORMAL_DISCOVERY_MODE);
654   
655   myEntry->bufferOutgoing = 0;
656   sendBufferedMessages(instid, step);
657   
658   converseManager->doneInserting(instid);
659 }
660
661
662
663
664
665
666 /** Called when an error is discovered. Such errors occur when it is observed
667     that an array element has migrated to a new PE. Specifically if a strategy
668     on some PE determines that it has received messages for more array elements
669     than the strategy knew were local, then some array element must have 
670     migrated to the PE. The strategy instance detecting the error will call this
671     method to initiate a global update operation that determines the locations
672     of all array elements.
673
674     This is called on each PE by the bracketedErrorDetected() method. 
675
676     Each array element previously located on this PE is examined to determine if
677     it is still here. If the array element has migrated away, then the
678     bracketedDiscover() method is called on the new PE.
679
680 */
681 void ComlibManager::bracketedStartDiscovery(int instid) {
682         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
683         CkAssert(myEntry->getDiscoveryMode() == NORMAL_DISCOVERY_MODE);
684         myEntry->setDiscoveryMode(STARTED_DISCOVERY_MODE);
685         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
686         const CProxy_ComlibManager myProxy(thisgroup);
687
688         ComlibManagerPrintf("[%d] bracketedStartDiscovery\n", CkMyPe());
689
690         int countSrc = 0;
691         int countDest = 0;
692
693         if (myInfo->isSourceArray()) {
694
695           const CkVec<CkArrayIndexMax> & srcElements = myInfo->getSourceElements();
696           const int nelem = srcElements.size();
697           const CkArrayID aid = myInfo->getSourceArrayID(); 
698           const CkArray *a = (CkArray*)_localBranch(aid);
699
700           for (int i=0; i<nelem; ++i) {
701             int pe = a->lastKnown(srcElements[i]);
702             if (pe == CkMyPe()) {
703               countSrc++;
704               myInfo->addNewLocalSource(srcElements[i]);
705             }
706             else {
707               myProxy[pe].bracketedDiscover(instid, aid, srcElements[i], true);
708             }
709           }
710
711         }
712
713         if (myInfo->isDestinationArray()) {
714 //        CkAssert(myInfo->newDestinationListSize() == 0);
715
716           const CkVec<CkArrayIndexMax> & destElements = myInfo->getDestinationElements();
717           const int nelem = destElements.size();
718           const CkArrayID aid = myInfo->getDestinationArrayID();
719           const CkArray *a = (CkArray*)_localBranch(aid);
720
721           for (int i=0; i<nelem; ++i) {
722             int pe = a->lastKnown(destElements[i]);
723             if (pe == CkMyPe()) {
724               countDest++;
725               myInfo->addNewLocalDestination(destElements[i]);
726             }
727             else {
728               ComlibPrintf("[%d] destination element %d is no longer local\n", CkMyPe(), (int)destElements[i].data()[0]);
729               myProxy[pe].bracketedDiscover(instid, aid, destElements[i], false);
730             }
731           }
732         }
733
734         // Report the number of elements that are now local to this PE (if >0).
735         // The new owner PEs will report the counts for those objects that are no longer local to this PE
736         if (countSrc > 0 || countDest > 0) {
737           myProxy[0].bracketedContributeDiscovery(instid, CkMyPe(), countSrc, countDest, myEntry->lastKnownIteration);
738         }
739         
740 }
741
742
743
744 /** Determine the PE of a given array index. This will be called for any 
745     array element by the previous owner PE.
746
747     If the index is local to the processor, then record this in the local 
748     strategy. Also send a message to PE 0 informing PE 0 that the array
749     element was located.
750
751     If the array element is not found locally, call this method on the last 
752     known location for the element.
753
754 */
755 void ComlibManager::bracketedDiscover(int instid, CkArrayID aid, CkArrayIndexMax &idx, int isSrc) {
756         ComlibManagerPrintf("[%d] bracketedDiscover\n", CkMyPe());
757         CkArray *a = (CkArray *)_localBranch(aid);
758         int pe = a->lastKnown(idx);
759         CProxy_ComlibManager myProxy(thisgroup);
760         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
761
762         if (pe == CkMyPe()) {
763                 // Object was found locally
764          
765                 // notify PE 0
766                 myProxy[0].bracketedContributeDiscovery(instid, pe, isSrc?1:0, isSrc?0:1, myEntry->lastKnownIteration);
767           
768
769                 // Find local strategy instance
770
771                 // ComlibArrayInfo *myInfo = &(dynamic_cast<CharmStrategy*>(getStrategy(instid))->ainfo);
772                 CkAssert((unsigned long)myEntry->strategy > 0x1000);
773                 ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
774                 CkAssert((unsigned long)myInfo > 0x1000);
775
776                 if (isSrc) {
777                         // Add the element as a source element for the strategy
778                         ComlibManagerPrintf("[%d] bracketedDiscover addSource\n", CkMyPe());
779                         CkAssert((unsigned long)myInfo > 0x1000);
780                         myInfo->addNewLocalSource(idx);
781
782                         ComlibManagerPrintf("[%d] bracketedDiscover updating numElements\n", CkMyPe());
783                         myEntry->numElements = myInfo->getLocalSrc();           
784                 }
785                 else {
786                         // Add the element as a Destination element for the strategy
787                         ComlibManagerPrintf("[%d] bracketedDiscover addNewDestination\n", CkMyPe());
788                         myInfo->addNewLocalDestination(idx);
789                 }
790         } else {
791                 ComlibManagerPrintf("Keep On Forwarding*********************\n");
792                 // forward to next potential processor
793                 myProxy[pe].bracketedDiscover(instid, aid, idx, isSrc);
794         }
795 }
796
797
798
799 /** On PE 0, record the notifications from all PEs about the actual locations of each
800     array element. Count the number of elements discovered. Once the new location of 
801     every elements has been discovered, broadcast the new locations to all PEs by 
802     invoking bracketedReceiveNewPeList.
803 */
804 void ComlibManager::bracketedContributeDiscovery(int instid, int pe, int nsrc, int ndest, int step) {
805         CkAssert(CkMyPe() == 0);
806         ComlibManagerPrintf("[%d] bracketedContributeDiscovery pe=%d nsrc=%d ndest=%d step=%d\n", CkMyPe(), pe, nsrc, ndest, step);
807         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
808         if (myEntry->peList == 0) {
809                 myEntry->peList = new int[CkNumPes()+2];
810                 // peList[CkNumPes()] keeps the sum of all source objects discovered
811                 // peList[CkNumPes()+1] keeps the sum of all destination objects discovered
812                 for (int i=0; i<CkNumPes()+2; ++i) myEntry->peList[i]=0;
813                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery zeroing new peList\n", CkMyPe());
814         }
815         myEntry->peList[CkNumPes()] += nsrc;
816         myEntry->peList[CkNumPes()+1] += ndest;
817         // update the count for the sender processor. peList[i] is:
818         // 0 if proc "i" has no objects,
819         // 1 if proc "i" has only source objects,
820         // 2 if proc "i" has only destination objects,
821         // 3 if proc "i" has both source and destination objects
822         // the following code maintains the property of peList!
823         if (nsrc > 0) myEntry->peList[pe] |= 1;
824         if (ndest > 0) myEntry->peList[pe] |= 2;
825
826         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
827         CkAssert((unsigned long)myInfo > 0x1000);
828
829         
830 //      ComlibManagerPrintf("[%d] bracketedContributeDiscovery myEntry->peList[CkNumPes()]=%d  myInfo->getTotalSrc()=%d\n", CkMyPe(), myEntry->peList[CkNumPes()], myInfo->getTotalSrc());
831 //      ComlibManagerPrintf("[%d] bracketedContributeDiscovery myEntry->peList[CkNumPes()+1]=%d  myInfo->getTotalDest()=%d\n", CkMyPe(), myEntry->peList[CkNumPes()+1], myInfo->getTotalDest());
832 //              
833         if (myEntry->peList[CkNumPes()] == myInfo->getTotalSrc() &&
834                         myEntry->peList[CkNumPes()+1] == myInfo->getTotalDest()) {
835                 // discovery process finished, broadcast the new pe list
836
837                 CProxy_ComlibManager myProxy(thisgroup);
838                 
839                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery calling bracketedReceiveNewPeList %d/%d, %d/%d\n",       
840                                 CkMyPe(), 
841                                 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(), 
842                                 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
843                 
844                 printPeList("bracketedContributeDiscovery peList=", myEntry->peList);
845                 myProxy.bracketedReceiveNewPeList(instid, step, myEntry->peList);
846                 delete myEntry->peList;
847                 myEntry->peList = NULL;
848         } else {
849                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery NOT calling bracketedReceiveNewPeList %d/%d, %d/%d\n", 
850                                 CkMyPe(), 
851                                 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(), 
852                                 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
853         }
854
855 }
856
857
858 void ComlibManager::printPeList(const char* note, int *count)
859 {
860         char *buf = (char*)malloc(1024*64);
861         sprintf(buf, "[%d] %s ", CkMyPe(), note);
862         for(int i=0;i<CkNumPes();i++){
863                 switch (count[i]){
864                 case 0:
865                         sprintf(buf+strlen(buf), " %d:no ", i);
866                         break;
867                 case 1:
868                         sprintf(buf+strlen(buf), " %d:source ", i);
869                         break;
870                 case 2:
871                         sprintf(buf+strlen(buf), " %d:dest ", i);
872                         break;
873                 case 3:
874                         sprintf(buf+strlen(buf), " %d:both ", i);
875                         break;
876                 }
877         }
878         
879         sprintf(buf+strlen(buf), ", all source objects discovered =  %d, all destination objects discovered = %d",  count[CkNumPes()] ,  count[CkNumPes()+1] );
880                 
881         ComlibPrintf("%s\n", buf);
882         free(buf);
883 }
884
885 /***************************************************************************
886  * Delegation framework section:
887  *
888  * Reimplementation of the main routines needed for the delegation framework:
889  * this routines will be called when a message is sent through a proxy which has
890  * been associated with comlib.
891  ***************************************************************************/
892
893 extern int _charmHandlerIdx;
894 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid);
895
896
897 /** 
898     Handle messages sent via ArraySend. These are single point to point messages 
899     to array elements.
900
901     This method should not optimize direct sends in the case where buffering occurs
902
903  */
904 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
905                 const CkArrayIndexMax &idx, CkArrayID a){
906
907         CkAssert(pd != NULL);
908         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
909         int instid = ci->getID();
910         CkAssert(instid != 0);
911
912         // Reading from two hash tables is a big overhead
913         CkArray *amgr = CkArrayID::CkLocalBranch(a);
914         int dest_proc = amgr->lastKnown(idx);
915
916         register envelope * env = UsrToEnv(msg);
917         msg_prepareSend_noinline((CkArrayMessage*)msg, ep, a);
918
919         env->getsetArrayIndex()=idx;
920         env->setUsed(0);
921         ((CmiMsgHeaderExt *)env)->stratid = instid;
922
923         CkPackMessage(&env);
924         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));        
925         
926         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_ARRAYSEND);
927         
928         if(shouldBufferMessagesNow(instid)){
929           delayMessageSendBuffer[instid].insert(cmsg);
930           StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
931           int step = myEntry->lastKnownIteration;
932           ComlibManagerPrintf("[%d] ComlibManager::ArraySend BUFFERED OUTGOING: now buffer contains %d messages step=%d\n",CkMyPe(), delayMessageSendBuffer[instid].size(), step);
933         } else {
934           ComlibPrintf("ComlibManager::ArraySend NOT BUFFERING inserting message into strategy %d\n",instid);
935           
936           if(dest_proc == CkMyPe()){  
937             // Directly send if object is local
938             amgr->deliver((CkArrayMessage *)msg, CkDeliver_queue);
939             return;
940           } else {
941             // Send through converse level strategy if non-local
942             converseManager->insertMessage(cmsg, instid);
943           }
944           
945         }
946
947 }
948
949
950 #include "qd.h"
951 //CkpvExtern(QdState*, _qd);
952
953 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
954
955         CkAssert(pd != NULL);
956         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
957         int instid = ci->getID();
958
959         int dest_proc = onPE;
960
961         ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
962                         UsrToEnv(msg)->getTotalsize());
963
964         register envelope * env = UsrToEnv(msg);
965         if(dest_proc == CkMyPe()){
966                 _SET_USED(env, 0);
967                 CkSendMsgBranch(ep, msg, dest_proc, gid);
968                 return;
969         }
970
971         ((CmiMsgHeaderExt *)env)->stratid = instid;
972         CpvAccess(_qd)->create(1);
973
974         env->setMsgtype(ForBocMsg);
975         env->setEpIdx(ep);
976         env->setGroupNum(gid);
977         env->setSrcPe(CkMyPe());
978         env->setUsed(0);
979
980         CkPackMessage(&env);
981         CmiSetHandler(env, _charmHandlerIdx);
982
983         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_GROUPSEND); 
984
985         if(shouldBufferMessagesNow(instid)){
986           StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
987           int step = myEntry->lastKnownIteration;
988           ComlibPrintf("ComlibManager::GroupSend Buffering message for %d step=%d\n",instid, step);
989           delayMessageSendBuffer[instid].insert(cmsg);
990         } else {
991           ComlibPrintf("ComlibManager::GroupSend inserting message into strategy %d\n",instid);
992           converseManager->insertMessage(cmsg, instid);
993         }
994
995
996 }
997
998 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
999         ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
1000
1001         CkAssert(pd != NULL);
1002         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1003         int instid = ci->getID();
1004
1005         register envelope * env = UsrToEnv(m);
1006         msg_prepareSend_noinline((CkArrayMessage*)m, ep, a);
1007
1008         env->getsetArrayIndex()= dummyArrayIndex;
1009         ((CmiMsgHeaderExt *)env)->stratid = instid;
1010
1011         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
1012
1013         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_BROADCAST, CMH_ARRAYBROADCAST);
1014         cmsg->npes = 0;
1015         cmsg->sec_id = NULL;
1016         cmsg->array_id = a;
1017
1018         multicast(cmsg, instid);
1019 }
1020
1021 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
1022                 int nsid, CkSectionID *s, int opts) {
1023
1024     CkAssert(nsid == 1);
1025         CkAssert(pd != NULL);
1026         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1027         int instid = ci->getID();
1028
1029         ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
1030
1031
1032         //Provide a dummy dest proc as it does not matter for mulitcast 
1033         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_SECTION_MULTICAST, CMH_ARRAYSECTIONSEND);
1034         cmsg->npes = 0;
1035         cmsg->sec_id = s;
1036         cmsg->array_id = s->_cookie.aid;
1037         
1038
1039         msg_prepareSend_noinline((CkArrayMessage*)m, ep, s->_cookie.aid);
1040         
1041         register envelope * env = UsrToEnv(m);
1042         env->getsetArrayIndex()= dummyArrayIndex;
1043         ((CmiMsgHeaderExt *)env)->stratid = instid;
1044         
1045         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
1046         
1047         env->setUsed(0);
1048         CkPackMessage(&env);
1049         
1050
1051         CkSectionInfo minfo;
1052         minfo.type = COMLIB_MULTICAST_MESSAGE;
1053         minfo.sInfo.cInfo.instId = ci->getID();
1054         //minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
1055         minfo.sInfo.cInfo.id = 0; 
1056         minfo.pe = CkMyPe();
1057         ((CkMcastBaseMsg *)env)->_cookie = minfo;    
1058         
1059         multicast(cmsg, instid);
1060 }
1061
1062 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
1063
1064         CkAssert(pd != NULL);
1065         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1066         int instid = ci->getID();
1067         CkAssert(instid!=0);
1068
1069         register envelope * env = UsrToEnv(m);
1070
1071         CpvAccess(_qd)->create(1);
1072
1073         env->setMsgtype(ForBocMsg);
1074         env->setEpIdx(ep);
1075         env->setGroupNum(g);
1076         env->setSrcPe(CkMyPe());
1077         env->setUsed(0);
1078         ((CmiMsgHeaderExt *)env)->stratid = instid;
1079
1080         CkPackMessage(&env);
1081         CmiSetHandler(env, _charmHandlerIdx);
1082
1083         //Provide a dummy dest proc as it does not matter for mulitcast 
1084         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST, CMH_GROUPBROADCAST);
1085
1086         cmsg->npes = 0;
1087         //cmsg->pelist = NULL;
1088
1089         multicast(cmsg, instid);
1090 }
1091
1092
1093 /** 
1094     Multicast the message with the specified strategy(instid).
1095     This method is used in: ArrayBroadcast, ArraySectionSend, and GroupBroadcast
1096  */
1097 void ComlibManager::multicast(CharmMessageHolder *cmsg, int instid) {
1098         CkAssert(instid != 0);
1099         register envelope * env = UsrToEnv(cmsg->getCharmMessage());
1100
1101 #if DEBUG
1102         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1103         ComlibPrintf("[%d] multicast setupComplete=%d %s %s\n", CkMyPe(), setupComplete, myEntry->getErrorModeString(), myEntry->getErrorModeServerString());
1104 #endif
1105  
1106         env->setUsed(0);
1107         CkPackMessage(&env);
1108         
1109         if(shouldBufferMessagesNow(instid)){
1110           cmsg->saveCopyOf_sec_id();
1111           StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1112           int step = myEntry->lastKnownIteration;
1113           ComlibPrintf("[%d] ComlibManager::multicast Buffering message for %d lastKnownIteration=%d\n", CkMyPe(),instid, step);
1114           delayMessageSendBuffer[instid].insert(cmsg);
1115         } else {
1116           converseManager->insertMessage(cmsg, instid);
1117         }
1118         
1119 }
1120
1121
1122 void ComlibManager::printDiagnostics(int instid){
1123   CkPrintf("[%d]     delayMessageSendBuffer.size()=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size());
1124 }
1125
1126
1127 void ComlibManager::printDiagnostics(){
1128
1129
1130   std::map<ComlibInstanceHandle, std::set<CharmMessageHolder*> >::iterator iter;
1131   for(iter = delayMessageSendBuffer.begin(); iter != delayMessageSendBuffer.end(); ++iter){
1132     int instid = iter->first;
1133     int size = iter->second.size();
1134     
1135     if(size>0 || true){
1136       CkPrintf("[%d] delayMessageSendBuffer[instid=%d] contains %d messages\n", CkMyPe(), instid, size);
1137       
1138       if(! shouldBufferMessagesNow(instid)){
1139         CkPrintf("[%d] printDiagnostics: No messages should be still in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
1140       } else {
1141         CkPrintf("[%d] printDiagnostics: Messages still ought to be delayed in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
1142       }
1143       
1144       StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1145       CkPrintf("[%d] printDiagnostics[instid=%d] setupComplete=%d %s %s %s bufferOutgoing=%d\n", (int)CkMyPe(), (int)instid, (int)setupComplete, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString(), (int)myEntry->bufferOutgoing);
1146
1147
1148
1149     }
1150   }
1151   
1152   CkpvAccess(conv_com_object).printDiagnostics();
1153   
1154 }
1155
1156
1157
1158 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
1159         //ComlibDelegateData *inst = static_cast<ComlibDelegateData *>(data);
1160         data->ref();
1161         return data;
1162         //return (new ComlibDelegateData(inst->getID()));
1163 }
1164
1165 CkDelegateData* ComlibManager::DelegatePointerPup(PUP::er &p,
1166                 CkDelegateData *pd) {
1167         if (!p.isUnpacking() && pd == NULL) CkAbort("Tryed to pup a null ComlibDelegateData!\n");
1168         //CmiBool isNotNull = pd!=NULL ? CmiTrue : CmiFalse;
1169         ComlibDelegateData *inst = static_cast<ComlibDelegateData *>(pd);
1170
1171         // call a migration constructor
1172         if (p.isUnpacking()) inst = new ComlibDelegateData((CkMigrateMessage*)0);
1173         inst->pup(p);
1174         /*
1175   p | isNotNull;
1176   if (isNotNull) {
1177     if (p.isUnpacking()) inst = new ComlibInstanceHandle();
1178     inst->pup(p);
1179   }
1180          */
1181         return inst;
1182 }
1183
1184
1185 //Collect statistics from all the processors, also gets the list of
1186 //array elements on each processor.
1187 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe) {
1188 }
1189
1190
1191 /// @TODO: eliminate AtSync and move toward anytime migration!
1192 void ComlibManager::AtSync() {
1193
1194 }
1195
1196
1197 /***************************************************************************
1198  * User section:
1199  *
1200  * Interface available to the user to interact with the ComlibManager
1201  ***************************************************************************/
1202
1203 /** Permanently associate the given proxy with comlib, to use the instance
1204     represented by cinst. All messages sent through the proxy will be forwarded
1205     by comlib. */
1206 void ComlibAssociateProxy(ComlibInstanceHandle cinst, CProxy &proxy) {
1207   if(CkNumPes() > 1){
1208         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1209         proxy.ckDelegate(cgproxy.ckLocalBranch(), new ComlibDelegateData(cinst));
1210   } else {
1211     ComlibPrintf("Doing nothing in ComlibAssociateProxy because we have only 1 pe\n"); 
1212   }
1213 }
1214
1215 /** Permanently assiciate the given proxy with comlib, to use the strategy
1216     represented by strat. All messages sent through the proxy will be forwarded
1217     by comlib. */
1218 void ComlibAssociateProxy(Strategy *strat, CProxy &proxy) {
1219         ComlibAssociateProxy(strat->getHandle(), proxy);
1220 }  
1221
1222 /* OLD DESCRIPTION! Register a strategy to the comlib framework, and return a
1223     handle to be used in the future with ComlibAssociateProxy to associate the
1224     strategy with a proxy. If one strategy is registered more than once, the
1225     handles will be different, but they has to be considered as aliases. */
1226
1227 /** Provided only for backward compatibility. A strategy is registered at the
1228     converse level as soon as it is created (see Strategy constructor). This
1229     function just retrieve a handle from the strategy itself. */
1230 ComlibInstanceHandle ComlibRegister(Strategy *strat) {
1231         return strat->getHandle();
1232 }
1233
1234 /** Call beginIteration on the ComlibManager with the instance handle for the strategy associated with the proxy. 
1235  *   If no strategy has been associated with the proxy, nothing is done in this function.
1236  */
1237 void ComlibBegin(CProxy &proxy, int iteration) {
1238   if(CkNumPes() > 1){
1239         ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
1240         if(cinst==NULL)
1241                 return;
1242         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1243         (cgproxy.ckLocalBranch())->beginIteration(cinst->getID(), iteration);
1244   } else {
1245     ComlibPrintf("Doing nothing in ComlibBegin because we have only 1 pe");  
1246   }
1247 }
1248
1249 /** Call endIteration on the ComlibManager with the instance handle for the strategy associated with the proxy. 
1250  *   If no strategy has been associated with the proxy, nothing is done in this function.
1251  */
1252 void ComlibEnd(CProxy &proxy, int iteration) {
1253   if(CkNumPes() > 1){
1254         ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
1255         if(cinst==NULL)
1256                 return;
1257         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1258         (cgproxy.ckLocalBranch())->endIteration(cinst->getID(), iteration);
1259   } else {
1260     ComlibPrintf("Doing nothing in ComlibEnd because we have only 1 pe");  
1261   }
1262 }
1263
1264 char *routerName;
1265 int sfactor=0;
1266
1267
1268 /** A mainchare, used to initialize the comlib framework at the program startup.
1269     Its main purpose is to create the ComlibManager group. */
1270 class ComlibManagerMain {
1271 public:
1272         ComlibManagerMain(CkArgMsg *msg) {
1273
1274                 if(CkMyPe() == 0 && msg !=  NULL)
1275                         CmiGetArgString(msg->argv, "+strategy", &routerName);         
1276
1277                 if(CkMyPe() == 0 && msg !=  NULL)
1278                         CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
1279
1280                 CProxy_ComlibManager::ckNew();
1281         }
1282 };
1283
1284 /***************************************************************************
1285  * ComlibInstanceHandle section:
1286  *
1287  * Implementation of the functions defined in the ComlibInstanceHandle class.
1288  ***************************************************************************/
1289
1290 ComlibDelegateData::ComlibDelegateData(int instid) : CkDelegateData(), _instid(instid) {
1291         ComlibManagerPrintf("[%d] Constructing ComlibDelegateData\n", CkMyPe());
1292         ref();
1293 }
1294
1295
1296 void ComlibInitSectionID(CkSectionID &sid){
1297
1298         sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1299         sid._cookie.pe = CkMyPe();
1300
1301         sid._cookie.sInfo.cInfo.id = 0;    
1302         sid.npes = 0;
1303         sid.pelist = NULL;
1304 }
1305
1306
1307 /** For backward compatibility - for old name commlib. The function
1308     _registercomlib() is generated by the translator. */
1309 void _registercommlib(void)
1310 {
1311         static int _done = 0; 
1312         if(_done) 
1313                 return; 
1314         _done = 1;
1315         _registercomlib();
1316 }
1317
1318
1319 void ComlibNotifyMigrationDoneHandler(void *msg) {
1320         CmiFree(msg);
1321         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1322         ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1323         if(cmgr_ptr)
1324                 cmgr_ptr->AtSync();    
1325 }
1326
1327
1328
1329
1330
1331
1332 static void periodicDebugPrintStatus(void* ptr, double currWallTime){
1333   CkPrintf("[%d] periodicDebugPrintStatus()\n", CkMyPe());
1334
1335   ComlibManager *mgr = (ComlibManager*)ptr;
1336   mgr->printDiagnostics();
1337
1338   CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, ptr, 4000, CkMyPe());
1339
1340 }
1341
1342
1343
1344
1345
1346 #include "comlib.def.h"
1347
1348 /*@}*/