2a45b6b95448e8cea3fbd250f9fd786c7ed75dd0
[charm.git] / src / ck-com / ComlibManager.C
1 /**
2    @addtogroup CharmComlib
3    @{
4    @file
5    Implementation of the functions in ComlibManager.h and handler for message
6    transportation.
7 */
8
9 #include "ComlibManager.h"
10 #include "comlib.h"
11 #include "ck.h"
12 #include "envelope.h"
13
14
15 // We only want to print debug information for a single strategy. Otherwise we'll get incredibly confused
16 #undef ComlibManagerPrintf
17 //#define ComlibManagerPrintf  if(instid==1)ComlibPrintf
18 #define ComlibManagerPrintf  ComlibPrintf
19
20 #define getmax(a,b) ((a)>(b)?(a):(b))
21
22 CkpvExtern(int, RecvdummyHandle);
23  
24 CkpvDeclare(CkGroupID, cmgrID);
25
26 /***************************************************************************
27  * Handlers section:
28  *
29  * all the handlers used by the ComlibManager to coordinate the work, and
30  * propagate the messages from one processor to another
31  ***************************************************************************/
32
33 //Handler to receive array messages
34 CkpvDeclare(int, RecvmsgHandle);
35
36 void recv_array_msg(void *msg){
37
38   //    ComlibPrintf("%d:In recv_msg\n", CkMyPe());
39
40         if(msg == NULL)
41                 return;
42
43         register envelope* env = (envelope *)msg;
44         env->setUsed(0);
45         env->getsetArrayHops()=1; 
46         CkUnpackMessage(&env);
47
48         int srcPe = env->getSrcPe();
49         int sid = ((CmiMsgHeaderExt *) env)->stratid;
50
51         //      ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), sid, env->getTotalsize(), srcPe);
52
53         RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
54
55         CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
56         
57         a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
58
59         //      ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
60         return;
61 }
62
63
64
65 /**
66    A debugging routine that will periodically print out the status of the message queues. 
67    The Definition is at bottom of this file.
68  */
69 static void periodicDebugPrintStatus(void* ptr, double currWallTime);
70
71
72
73
74
75 /***************************************************************************
76  * Initialization section:
77  *
78  * Routines used by Comlib to initialize itself and all the strategies on all
79  * the processors. Used only at the beginning of the program.
80  ***************************************************************************/
81
82 // // initialized at startup before the main chare main methods are called:
83 // void initCharmComlibManager(){
84 //   CkPrintf("[%d] initCharmComlibManager()\n", CkMyPe());
85 //   fflush(stdout);
86 // }
87
88
89
90 ComlibManager::ComlibManager(){
91         init();
92 }
93
94 void ComlibManager::init(){
95
96    CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, (void*)this, 4000, CkMyPe());
97
98
99   if(CkNumPes() == 1 ){
100     ComlibPrintf("Doing nothing in ComlibManager::init() because we are running on 1 pe.\n");
101   } else {
102
103         if (CkMyRank() == 0) {
104                 PUPable_reg(CharmMessageHolder);
105         }
106
107         numStatsReceived = 0;
108         curComlibController = 0;
109         clibIteration = 0;
110
111         CkpvInitialize(comRectHashType *, com_rect_ptr); 
112         CkpvAccess(com_rect_ptr)= new comRectHashType;
113
114         CkpvInitialize(int, RecvmsgHandle);
115         CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
116
117         bcast_pelist = new int [CkNumPes()];
118         for(int brcount = 0; brcount < CkNumPes(); brcount++)
119                 bcast_pelist[brcount] = brcount;
120
121         section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
122
123         CkpvInitialize(CkGroupID, cmgrID);
124         CkpvAccess(cmgrID) = thisgroup;
125
126         dummyArrayIndex.nInts = 0;
127
128         CkAssert(CkpvInitialized(conv_com_object));
129         converseManager = &CkpvAccess(conv_com_object);
130
131         setupComplete = 0;
132
133         CkpvInitialize(int, migrationDoneHandlerID);
134         CkpvAccess(migrationDoneHandlerID) = 
135                 CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
136
137         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
138         cgproxy[curComlibController].barrier();
139   }
140 }
141
142 //First barrier makes sure that the communication library group 
143 //has been created on all processors
144 void ComlibManager::barrier(){
145         static int bcount = 0;
146         ComlibPrintf("barrier %d\n", bcount);
147         if(CkMyPe() == 0) {
148                 bcount ++;
149                 if(bcount == CkNumPes()){
150                         bcount = 0;
151                         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
152                         cgproxy.resumeFromSetupBarrier();
153                 }
154         }
155 }
156
157
158
159 /**
160    Due to the possibility of race conditions in the initialization of charm, this
161    barrier prevents comlib from being activated before all group branches are created.
162    This function completes the initialization of the charm layer of comlib.
163
164    In this function we also call ComlibDoneCreating for the user (basically
165    triggering the broadcast of the strategies created in Main::Main. Here the
166    Main::Main has for sure executed, otherwise we will not have received
167    confirmation by all other processors.
168    
169    
170  */
171 void ComlibManager::resumeFromSetupBarrier(){
172         ComlibPrintf("[%d] resumeFromSetupBarrier Charm group ComlibManager setup finished\n", CkMyPe());
173
174         setupComplete = 1;
175         ComlibDoneCreating();
176         ComlibPrintf("[%d] resumeFromSetupBarrier calling ComlibDoneCreating to tell converse layer strategies to set themselves up\n", CkMyPe());
177         sendBufferedMessagesAllStrategies();
178
179 }
180
181 /***************************************************************************
182  Determine whether the delegated messages should be buffered until the 
183  strategy has recovered from any error conditions and startup. Once the 
184  buffers can be flushed, ComlibManager::sendBufferedMessages() will be called
185 ***************************************************************************/
186 bool ComlibManager::shouldBufferMessagesNow(int instid){
187   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
188   return (!setupComplete) || myEntry->errorMode == ERROR_MODE || myEntry->errorMode == CONFIRM_MODE || myEntry->bufferOutgoing;
189 }
190
191
192 /***************************************************************************
193  Calls ComlibManager::sendBufferedMessages for each strategy.
194 ***************************************************************************/
195 void ComlibManager::sendBufferedMessagesAllStrategies(){
196   int nstrats = converseManager->getNumStrats();
197   for(int i=0;i<nstrats;i++){
198     sendBufferedMessages(i);
199   }
200 }
201
202
203 /***************************************************************************
204    Send all the buffered messages once startup has completed, and we have 
205    recovered from all errors.
206 ***************************************************************************/
207 void ComlibManager::sendBufferedMessages(int instid){
208   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
209   
210   if(shouldBufferMessagesNow(instid)){
211     ComlibManagerPrintf("[%d] sendBufferedMessages is not flushing buffered messages for strategy %d because shouldBufferMessagesNow()==true\n", CkMyPe(), instid);  
212   } else if(delayMessageSendBuffer[instid].size() == 0){
213     ComlibManagerPrintf("[%d] sendBufferedMessages: no bufferedmessages to send for strategy %d\n", CkMyPe(), instid);  
214   } else{
215     ComlibManagerPrintf("[%d] sendBufferedMessages Sending %d buffered messages for instid=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size(), instid);
216
217     for (std::set<CharmMessageHolder*>::iterator iter = delayMessageSendBuffer[instid].begin(); iter != delayMessageSendBuffer[instid].end(); ++iter) {
218       CharmMessageHolder* cmsg = *iter;
219           
220       switch(cmsg->type){
221             
222       case CMH_ARRAYSEND:
223         CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
224         CkpvAccess(conv_com_object).doneInserting(instid);
225         break;
226             
227       case CMH_GROUPSEND:
228         CkAbort("CMH_GROUPSEND unimplemented");
229         break;
230             
231       case CMH_ARRAYBROADCAST:
232       case CMH_ARRAYSECTIONSEND: 
233       case CMH_GROUPBROADCAST:
234         // Multicast/broadcast to an array or a section:
235         cmsg->sec_id = cmsg->copy_of_sec_id;
236         CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
237         CkpvAccess(conv_com_object).doneInserting(instid);
238         break;
239             
240       default:
241         CkAbort("Unknown cmsg->type was found in buffer of delayed messages\n");
242       }
243           
244     }
245
246     delayMessageSendBuffer[instid].clear();
247   }
248
249 }
250
251
252
253 /***************************************************************************
254  * Routine for bracketed strategies, for detecting count errors after 
255  * objects migrate.
256  * 
257  * This function must only be called after all messages from the previous 
258  * iteration have been delivered. This is the application's responsibility to 
259  * ensure. The iteration values provided must increase monotonically, 
260  * and must be greater than STARTUP_ITERATION.
261  *
262  ***************************************************************************/
263 /// Called when the array/group element starts sending messages
264 void ComlibManager::beginIteration(int instid, int iteration){
265         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
266                             
267         ComlibManagerPrintf("[%d] beginIteration iter=%d lastKnownIteration=%d  %s %s %s\n", CkMyPe(), iteration, myEntry->lastKnownIteration, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
268
269
270
271         if(iteration > myEntry->lastKnownIteration){
272               ComlibManagerPrintf("[%d] beginIteration Starting Next Iteration ( # %d )\n", CkMyPe(), iteration);
273
274               // Verify & update errorModeServer:
275               if(CkMyPe()==0){
276                 CkAssert(myEntry->errorModeServer == NORMAL_MODE_SERVER || 
277                          myEntry->errorModeServer == STARTUP_MODE_SERVER ||  
278                          myEntry->errorModeServer == ERROR_FIXED_MODE_SERVER);
279                 myEntry->errorModeServer = NORMAL_MODE_SERVER;
280               } else {
281                 CkAssert(myEntry->errorModeServer == NON_SERVER_MODE_SERVER || 
282                          myEntry->errorModeServer == STARTUP_MODE_SERVER);
283                 myEntry->errorModeServer = NON_SERVER_MODE_SERVER;
284               }
285               // Verify & update errorMode:
286               CkAssert(myEntry->errorMode == ERROR_FIXED_MODE || myEntry->errorMode == NORMAL_MODE  );
287               myEntry->errorMode = NORMAL_MODE;   
288               
289               ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
290
291               if(myEntry->lastKnownIteration == STARTUP_ITERATION){
292                 // At this point, myEntry->numElements == 0
293                 // An error will be detected below, so messages will be buffered.
294                 // Since this is startup, bracketedStartErrorRecoveryProcess 
295                 // will wait until the comlib layer is setup before continuing
296               } else {
297                 // switch to using the newly updated source and destination lists (describing which objects are local to a PE)
298                 // This must be done here because the list can only be updated once all sends 
299                 // have completed from the iteration during which the error was detected.
300                 myInfo->useNewSourceAndDestinations();  
301               }
302
303
304               myEntry->lastKnownIteration = iteration;
305               myEntry->nBeginItr = 1; // we are the first time to be called this iteration
306               myEntry->nEndItr = 0;
307               myEntry->nProcSync = 0;
308               myEntry->totalEndCounted = 0;
309               myEntry->discoveryMode = NORMAL_DISCOVERY_MODE;
310               myEntry->nEndSaved = 0;
311                       
312         } else {
313                 ComlibManagerPrintf("[%d] beginIteration continuing iteration # %d\n", CkMyPe(), iteration);
314                 myEntry->nBeginItr++;
315         }
316         
317         
318         // We need to check for error conditions here as well as EndIteration.
319         // This will ensure that if we are the processor that detects this error, 
320         // we won't deliver at least this message until the strategy is fixed
321         if (myEntry->nBeginItr > myEntry->numElements) {
322           ComlibManagerPrintf("[%d] beginIteration BUFFERING OUTGOING because nBeginItr=%d > numElements=%d\n",CkMyPe(), myEntry->nBeginItr, myEntry->numElements);
323           myEntry->bufferOutgoing = 1;
324         }
325         
326 }
327
328
329 /** Called by user program when each element has finished sending its messages.
330
331     If no errors are detected, ConvComlibManager::doneInserting() is called on the 
332     underlying converse strategy. If an error is detected, then ConvComlibManager::doneInserting
333     is NOT called. This likely causes the the underlying converse strategy to buffer the 
334     messages until we recover from the error mode, although we have buffered at least some of
335     the messages, so the user program cannot run ahead and start the next iteration.
336
337 */
338
339 void ComlibManager::endIteration(int instid, int step){ 
340         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
341         
342         // migration is not allowed between begin and end of iteration, 
343         // so each element must call end iteration on the same processor 
344         // as it called begin iteration
345         CkAssert(myEntry->nEndItr <= myEntry->nBeginItr);
346         CkAssert(step == myEntry->lastKnownIteration);
347
348
349         myEntry->nEndItr++;
350         
351         ComlibManagerPrintf("[%d] endIteration called\n",CkMyPe());
352         
353         
354         if (myEntry->bufferOutgoing) {
355           // If migration was detected in ComlibManager::beginIteration and hence messages have been buffered:
356           CkAssert(delayMessageSendBuffer[instid].size() > 0);
357           CProxy_ComlibManager myProxy(thisgroup);
358           myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
359         } 
360         else if(myEntry->nEndItr == myEntry->numElements) {
361           // If all the objects have called beginIteration and endIteration and no errors were detected
362           CkAssert(converseManager->isReady(instid));
363           converseManager->doneInserting(instid);
364         }
365         
366 }
367
368
369 /** Start recovery of errors. 
370     This entry method calls itself repeatedly if the underlying 
371     converse strategy is not yet ready.
372
373     If the PE is already in error mode, then only the difference 
374     in the counts will be contributed.
375 */
376 void ComlibManager::bracketedStartErrorRecoveryProcess(int instid, int step){  
377   CProxy_ComlibManager myProxy(thisgroup);
378   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);     
379
380   
381
382   if(converseManager->isReady(instid)){
383     ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess(instid=%d step=%d) %s %s %s\n", CkMyPe(), instid, step, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
384
385     CkAssert(myEntry->strategy != NULL);
386     CkAssert(myEntry->errorMode == NORMAL_MODE || myEntry->errorMode == ERROR_MODE);
387           
388     if (!myEntry->strategy->isBracketed()) {
389       CkPrintf("[%d] endIteration called unecessarily for a non-bracketed strategy\n", CkMyPe());
390       return;
391     }
392        
393     if (myEntry->errorMode == NORMAL_MODE) {
394       ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess()\n", CkMyPe());
395       myEntry->nEndSaved = myEntry->nEndItr;
396       myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
397       myEntry->errorMode = ERROR_MODE;
398       bracketedStartDiscovery(instid);
399     } else {
400       // Send the updated count
401       int update = myEntry->nEndItr - myEntry->nEndSaved;
402       if (update > 0) {
403         //      ComlibManagerPrintf("bracketedStartErrorRecoveryProcess sending update to bracketedReceiveCount\n");
404         CProxy_ComlibManager myProxy(thisgroup);
405         myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
406         myEntry->nEndSaved = myEntry->nEndItr;
407       }
408       
409     }
410   } else {
411     ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess() REENQUEUE\n", CkMyPe() );
412     // Re-enqueue myself because we can't start error recovery process until converse strategy is ready
413     myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
414   }
415 }
416
417
418 /// Invoked on all processors to inform that an error has been detected.
419 void ComlibManager::bracketedErrorDetected(int instid, int step) {
420         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
421         
422         bracketedCatchUpPE(instid,step);
423         CkAssert(step == myEntry->lastKnownIteration);
424         CkAssert(myEntry->errorMode == NORMAL_MODE || myEntry->errorMode == ERROR_MODE);
425
426         ComlibManagerPrintf("[%d] bracketedErrorDetected()\n", CkMyPe());
427
428         if (myEntry->errorMode == NORMAL_MODE) {
429           // save the value we are sending to bracketedReceiveCount
430           myEntry->nEndSaved = myEntry->nEndItr; // save the value we are sending to bracketedReceiveCount
431           CProxy_ComlibManager myProxy(thisgroup);
432           myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
433           bracketedStartDiscovery(instid);
434           myEntry->errorMode = ERROR_MODE;
435
436         } else { // ERROR_MODE
437           // If we have an update count, send it
438           int update = myEntry->nEndItr - myEntry->nEndSaved;
439           ComlibManagerPrintf("bracketedErrorDetected update=%d\n", update);
440           if (update > 0) {
441             ComlibManagerPrintf("bracketedErrorDetected sending update to bracketedReceiveCount\n");
442             CProxy_ComlibManager myProxy(thisgroup);
443             myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
444             myEntry->nEndSaved = myEntry->nEndItr;
445           }
446         }
447         
448 }
449
450 /// Invoked on all processors. After processor 0 has a count match, it sends out
451 /// a broadcast on this entry method to get a confirmation from all others.
452 void ComlibManager::bracketedConfirmCount(int instid) {
453         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
454         CkAssert(myEntry->errorMode == ERROR_MODE);
455         myEntry->errorMode = CONFIRM_MODE;
456         CProxy_ComlibManager myProxy(thisgroup);
457         ComlibManagerPrintf("[%d] bracketedConfirmCount\n", CkMyPe());
458         myProxy[0].bracketedCountConfirmed(instid, myEntry->nEndSaved, myEntry->lastKnownIteration);
459 }
460
461 /// Invoked on processor 0 upon a request to confirm the count. If the count is
462 /// correct a "NewPeList" is sent out, otherwise the error mode is returned with
463 /// "ErrorDetected"
464 void ComlibManager::bracketedCountConfirmed(int instid, int count, int step) {
465         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
466         CkAssert(CkMyPe() == 0);
467         // Advance PE0 to current step if we had no local objects
468         bracketedCatchUpPE(instid, step);
469         CkAssert(myEntry->errorModeServer == CONFIRM_MODE_SERVER);
470         CkAssert(step == myEntry->lastKnownIteration);
471
472         myEntry->total += count;
473         
474         ComlibManagerPrintf("[%d] bracketedCountConfirmed\n", CkMyPe());
475         
476         if (++myEntry->peConfirmCounter == CkNumPes()) {
477                 myEntry->peConfirmCounter = 0;
478
479                 CkAssert(myEntry->total == myEntry->totalEndCounted);
480                   
481                 CProxy_ComlibManager(thisgroup).bracketedReceiveNewCount(instid);
482                 myEntry->errorModeServer = ERROR_FIXED_MODE_SERVER;
483                   
484                 myEntry->total = 0;     
485         }
486
487 }
488
489
490
491 /** Update the state for a PE that was likely lacking any local objects.
492     If no local objects exist, noone will call begin iteration, and hence,
493     the lastKnownIteration value will be old. We need to advance to the 
494     current iteration before we can proceed.
495 */
496 void ComlibManager::bracketedCatchUpPE(int instid, int step){
497   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
498   CkAssert(step >= myEntry->lastKnownIteration);
499   if(step > myEntry->lastKnownIteration){
500     
501     myEntry->total = 0;
502     myEntry->lastKnownIteration = step;
503     myEntry->nBeginItr = 0;
504     myEntry->nEndItr = 0;
505     myEntry->nProcSync = 0;
506     myEntry->totalEndCounted = 0;
507     myEntry->nEndSaved = 0;
508     myEntry->errorMode = NORMAL_MODE;
509     myEntry->discoveryMode = NORMAL_DISCOVERY_MODE;
510
511     if(CkMyPe()==0){
512       CkAssert(myEntry->errorModeServer == NORMAL_MODE_SERVER || myEntry->errorModeServer == ERROR_FIXED_MODE_SERVER);
513       ComlibManagerPrintf("[%d] NORMAL_MODE_SERVER *******************AAAAAAAAAAAA**********************\n", CkMyPe());
514       myEntry->errorModeServer = NORMAL_MODE_SERVER;
515     } else {
516       CkAssert(myEntry->errorModeServer == NON_SERVER_MODE_SERVER || myEntry->errorModeServer == STARTUP_MODE_SERVER);
517       myEntry->errorModeServer == NON_SERVER_MODE_SERVER;
518     }
519     
520     
521   }
522 }
523
524 /// Invoked on processor 0 when a processor sends a count of how many elements
525 /// have already deposited. This number is incremental, and it refers to the
526 /// previous one sent by that processor. Processor 0 stores temporarily all the
527 /// numbers it receives. When a match happens, processor 0 switches to "confirm"
528 /// mode and send out a request for confirmation to all other processors.
529 /// The final parameter, "step", is used so that if  no objects exist on a processor,
530 /// then the counts sent by the processor will be tagged with an old timestep, and can 
531 /// safely be ignored. If no objects are on a PE, then beginIteration will never
532 /// be called there.
533 void ComlibManager::bracketedReceiveCount(int instid, int pe, int count, int isFirst, int step) {
534         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
535         CkAssert(CkMyPe() == 0);
536         CkAssert(myEntry->errorModeServer == NORMAL_MODE_SERVER || myEntry->errorModeServer == ERROR_MODE_SERVER );
537
538         ComlibManagerPrintf("[%d] bracketedReceiveCount step=%d \n", CkMyPe(), step);
539
540         
541         // Advance PE0 to current step if we had no local objects
542         bracketedCatchUpPE(instid, step);
543
544         // Encountering a message from an old step
545         CkAssert(step == myEntry->lastKnownIteration);
546
547         myEntry->totalEndCounted += count;
548
549         ComlibManagerPrintf("[%d] bracketedReceiveCount step=%d totalEndCounted=%d count=%d\n", CkMyPe(), step, myEntry->totalEndCounted, count);
550
551         
552         myEntry->nProcSync += isFirst; // isFirst is 1 the first time a processor send a message,
553         CkAssert(myEntry->nProcSync <= CkNumPes());
554         
555         if (myEntry->errorModeServer == NORMAL_MODE_SERVER) { 
556                 // first time this is called
557                 CkAssert(myEntry->nProcSync == 1);
558                 CProxy_ComlibManager(thisgroup).bracketedErrorDetected(instid, step);
559                 myEntry->errorModeServer = ERROR_MODE_SERVER;
560                 ComlibManagerPrintf("[%d] bracketedReceiveCount first time\n", CkMyPe());
561         } else { // ERROR_MODE
562         
563           CharmStrategy* s = dynamic_cast<CharmStrategy*>(myEntry->strategy);
564           ComlibArrayInfo ainfo = s->ainfo;
565           int totalsrc =  ainfo.getTotalSrc() ;
566           
567           if(myEntry->nProcSync == CkNumPes() && myEntry->totalEndCounted == totalsrc) {
568             // ok, we received notifications from all PEs and all objects have called endIteration
569             myEntry->errorModeServer = CONFIRM_MODE_SERVER; 
570             ComlibManagerPrintf("[%d] bracketedReceiveCount errorModeServer is now CONFIRM_MODE calling bracketedConfirmCount totalsrc=%d\n", CkMyPe(), (int)totalsrc);
571             CProxy_ComlibManager(thisgroup).bracketedConfirmCount(instid);
572                         
573           }
574         }
575                 
576 }
577
578
579 /// Invoked on all processors. After the count has been checked and it matches
580 /// the number of emements involved in the bracketed operation, processor 0
581 /// sends a broadcast to acknowledge the success. 
582 /// The strategy is disabled until a new Pe list is received.
583 void ComlibManager::bracketedReceiveNewCount(int instid) {
584         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
585         CkAssert(myEntry->errorMode == CONFIRM_MODE);
586
587         myEntry->errorMode = ERROR_FIXED_MODE;
588         
589         myEntry->nEndItr -= myEntry->nEndSaved;
590         myEntry->nBeginItr -= myEntry->nEndSaved;
591
592         myEntry->nEndSaved = 0;
593
594         bracketedFinalBarrier(instid);
595 }
596
597
598
599
600 /// Invoked on all processors. When all processors have discovered all elements
601 /// involved in the operation, processor 0 uses this method to broadcast the
602 /// entire processor list to all. Currently the discovery process HAS to go
603 /// together with the counting process, otherwise the strategy is updated at an
604 /// undetermined state. In future it may be useful to implement the discovery
605 /// process alone.
606 void ComlibManager::bracketedReceiveNewPeList(int instid, int *count) {
607         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
608         CkAssert(myEntry->discoveryMode == STARTED_DISCOVERY_MODE);
609         myEntry->discoveryMode = FINISHED_DISCOVERY_MODE;
610
611         myEntry->strategy->bracketedUpdatePeKnowledge(count);
612         
613         ComlibManagerPrintf("[%d] bracketedReceiveNewPeList Updating numElements\n", CkMyPe());
614         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
615         CkAssert((unsigned long)myInfo > 0x1000);
616         myEntry->numElements = myInfo->getLocalSrc();
617         
618         ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n",CkMyPe(), instid, delayMessageSendBuffer[instid].size() );
619         ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n", CkMyPe(), instid, delayMessageSendBuffer[instid].size());
620                 
621         bracketedFinalBarrier(instid);
622 }
623
624
625 /** Start a barrier phase where all processes will enter it once both 
626     the counting and discovery processes complete.
627
628     ComlibManager::bracketedReceiveNewPeList calls this method. 
629     ComlibManager::bracketedReceiveNewPeList is called as an array broadcast to thisProxy, 
630     so every PE will call this method.
631  */
632 void ComlibManager::bracketedFinalBarrier(int instid) {
633   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
634   ComlibManagerPrintf("[%d] ComlibManager::bracketedFinalBarrier %s %s %s\n", CkMyPe(), myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
635
636
637   if (myEntry->discoveryMode == FINISHED_DISCOVERY_MODE &&  myEntry->errorMode == ERROR_FIXED_MODE) {    
638     CProxy_ComlibManager myProxy(thisgroup);
639     myProxy[0].bracketedReleaseCount(instid);
640   }
641 }
642
643
644 /** 
645     Once all PEs report here, we will allow them to release the buffered messages from this iteration
646  */
647 void ComlibManager::bracketedReleaseCount(int instid) {
648     ComlibManagerPrintf("[%d] ComlibManager::bracketedReleaseCount\n", CkMyPe());
649
650     StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
651     CkAssert(CkMyPe() == 0);
652     CkAssert(myEntry->errorModeServer == ERROR_FIXED_MODE_SERVER);
653     
654     myEntry->numBufferReleaseReady++;
655     if(myEntry->numBufferReleaseReady == CkNumPes()) {
656       myEntry->errorModeServer = NORMAL_MODE_SERVER;
657       CProxy_ComlibManager(thisgroup).bracketedReleaseBufferedMessages(instid);
658       myEntry->numBufferReleaseReady = 0;
659     }
660 }
661
662 /** 
663     Release any buffered messages.
664  */
665 void ComlibManager::bracketedReleaseBufferedMessages(int instid) {
666   ComlibManagerPrintf("[%d] ComlibManager::bracketedReleaseBufferedMessages\n", CkMyPe());
667
668   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
669   
670   myEntry->bufferOutgoing = 0;
671   sendBufferedMessages(instid);
672   
673   converseManager->doneInserting(instid);
674 }
675
676
677
678
679
680
681 /** Called when an error is discovered. Such errors occur when it is observed
682     that an array element has migrated to a new PE. Specifically if a strategy
683     on some PE determines that it has received messages for more array elements
684     than the strategy knew were local, then some array element must have 
685     migrated to the PE. The strategy instance detecting the error will call this
686     method to initiate a global update operation that determines the locations
687     of all array elements.
688
689     This is called on each PE by the bracketedErrorDetected() method. 
690
691     Each array element previously located on this PE is examined to determine if
692     it is still here. If the array element has migrated away, then the
693     bracketedDiscover() method is called on the new PE.
694
695     the new source and destination element arrays will be empty at this point,
696     so myInfo->addNewDestinationList(e); will add a new record that
697     will be used for future iterations.
698
699 */
700 void ComlibManager::bracketedStartDiscovery(int instid) {
701         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
702         CkAssert(myEntry->discoveryMode == NORMAL_DISCOVERY_MODE);
703         myEntry->discoveryMode = STARTED_DISCOVERY_MODE;
704         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
705         const CProxy_ComlibManager myProxy(thisgroup);
706
707         ComlibManagerPrintf("[%d] bracketedStartDiscovery\n", CkMyPe());
708
709         int countSrc = 0;
710         int countDest = 0;
711
712         if (myInfo->isSourceArray()) {
713           CkAssert(myInfo->newSourceListSize() == 0);
714
715           const CkVec<CkArrayIndexMax> & srcElements = myInfo->getSourceElements();
716           const int nelem = srcElements.size();
717           const CkArrayID aid = myInfo->getSourceArrayID(); 
718           const CkArray *a = (CkArray*)_localBranch(aid);
719
720           for (int i=0; i<nelem; ++i) {
721             int pe = a->lastKnown(srcElements[i]);
722             if (pe == CkMyPe()) {
723               countSrc++;
724               myInfo->addNewLocalSource(srcElements[i]);
725             }
726             else {
727               myProxy[pe].bracketedDiscover(instid, aid, srcElements[i], true);
728             }
729           }
730
731         }
732
733         if (myInfo->isDestinationArray()) {
734           CkAssert(myInfo->newDestinationListSize() == 0);
735
736           const CkVec<CkArrayIndexMax> & destElements = myInfo->getDestinationElements();
737           const int nelem = destElements.size();
738           const CkArrayID aid = myInfo->getDestinationArrayID();
739           const CkArray *a = (CkArray*)_localBranch(aid);
740
741           for (int i=0; i<nelem; ++i) {
742             int pe = a->lastKnown(destElements[i]);
743             if (pe == CkMyPe()) {
744               countDest++;
745               myInfo->addNewLocalDestination(destElements[i]);
746             }
747             else {
748               myProxy[pe].bracketedDiscover(instid, aid, destElements[i], false);
749             }
750           }
751         }
752
753         // Report the number of elements that are now local to this PE (if >0).
754         // The new owner PEs will report the counts for those objects that are no longer local to this PE
755         if (countSrc > 0 || countDest > 0) {
756           myProxy[0].bracketedContributeDiscovery(instid, CkMyPe(), countSrc, countDest, myEntry->lastKnownIteration);
757         }
758         
759 }
760
761
762
763 /** Determine the PE of a given array index. This will be called for any 
764     array element by the previous owner PE.
765
766     If the index is local to the processor, then record this in the local 
767     strategy. Also send a message to PE 0 informing PE 0 that the array
768     element was located.
769
770     If the array element is not found locally, call this method on the last 
771     known location for the element.
772
773 */
774 void ComlibManager::bracketedDiscover(int instid, CkArrayID aid, CkArrayIndexMax &idx, int isSrc) {
775         ComlibManagerPrintf("[%d] bracketedDiscover\n", CkMyPe());
776         CkArray *a = (CkArray *)_localBranch(aid);
777         int pe = a->lastKnown(idx);
778         CProxy_ComlibManager myProxy(thisgroup);
779         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
780
781         if (pe == CkMyPe()) {
782                 // Object was found locally
783          
784                 // notify PE 0
785                 myProxy[0].bracketedContributeDiscovery(instid, pe, isSrc?1:0, isSrc?0:1, myEntry->lastKnownIteration);
786           
787
788                 // Find local strategy instance
789
790                 // ComlibArrayInfo *myInfo = &(dynamic_cast<CharmStrategy*>(getStrategy(instid))->ainfo);
791                 CkAssert((unsigned long)myEntry->strategy > 0x1000);
792                 ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
793                 CkAssert((unsigned long)myInfo > 0x1000);
794
795                 if (isSrc) {
796                         // Add the element as a source element for the strategy
797                         ComlibManagerPrintf("[%d] bracketedDiscover addSource\n", CkMyPe());
798                         CkAssert((unsigned long)myInfo > 0x1000);
799                         myInfo->addNewLocalSource(idx);
800
801                         ComlibManagerPrintf("[%d] bracketedDiscover updating numElements\n", CkMyPe());
802                         myEntry->numElements = myInfo->getLocalSrc();           
803                 }
804                 else {
805                         // Add the element as a Destination element for the strategy
806                         ComlibManagerPrintf("[%d] bracketedDiscover addNewDestination\n", CkMyPe());
807                         myInfo->addNewLocalDestination(idx);
808                 }
809         } else {
810                 ComlibManagerPrintf("Keep On Forwarding*********************\n");
811                 // forward to next potential processor
812                 myProxy[pe].bracketedDiscover(instid, aid, idx, isSrc);
813         }
814 }
815
816
817
818 /** On PE 0, record the notifications from all PEs about the actual locations of each
819     array element. Count the number of elements discovered. Once the new location of 
820     every elements has been discovered, broadcast the new locations to all PEs by 
821     invoking bracketedReceiveNewPeList.
822 */
823 void ComlibManager::bracketedContributeDiscovery(int instid, int pe, int nsrc, int ndest, int step) {
824         CkAssert(CkMyPe() == 0);
825         ComlibManagerPrintf("[%d] bracketedContributeDiscovery pe=%d nsrc=%d ndest=%d step=%d\n", CkMyPe(), pe, nsrc, ndest, step);
826         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
827         if (myEntry->peList == 0) {
828                 myEntry->peList = new int[CkNumPes()+2];
829                 // peList[CkNumPes()] keeps the sum of all source objects discovered
830                 // peList[CkNumPes()+1] keeps the sum of all destination objects discovered
831                 for (int i=0; i<CkNumPes()+2; ++i) myEntry->peList[i]=0;
832                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery zeroing new peList\n", CkMyPe());
833         }
834         myEntry->peList[CkNumPes()] += nsrc;
835         myEntry->peList[CkNumPes()+1] += ndest;
836         // update the count for the sender processor. peList[i] is:
837         // 0 if proc "i" has no objects,
838         // 1 if proc "i" has only source objects,
839         // 2 if proc "i" has only destination objects,
840         // 3 if proc "i" has both source and destination objects
841         // the following code maintains the property of peList!
842         if (nsrc > 0) myEntry->peList[pe] |= 1;
843         if (ndest > 0) myEntry->peList[pe] |= 2;
844
845         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
846         CkAssert((unsigned long)myInfo > 0x1000);
847
848         
849 //      ComlibManagerPrintf("[%d] bracketedContributeDiscovery myEntry->peList[CkNumPes()]=%d  myInfo->getTotalSrc()=%d\n", CkMyPe(), myEntry->peList[CkNumPes()], myInfo->getTotalSrc());
850 //      ComlibManagerPrintf("[%d] bracketedContributeDiscovery myEntry->peList[CkNumPes()+1]=%d  myInfo->getTotalDest()=%d\n", CkMyPe(), myEntry->peList[CkNumPes()+1], myInfo->getTotalDest());
851 //              
852         if (myEntry->peList[CkNumPes()] == myInfo->getTotalSrc() &&
853                         myEntry->peList[CkNumPes()+1] == myInfo->getTotalDest()) {
854                 // discovery process finished, broadcast the new pe list
855
856                 CProxy_ComlibManager myProxy(thisgroup);
857                 
858                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery calling bracketedReceiveNewPeList %d/%d, %d/%d\n",       
859                                 CkMyPe(), 
860                                 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(), 
861                                 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
862                 
863                 printPeList("bracketedContributeDiscovery peList=", myEntry->peList);
864                 myProxy.bracketedReceiveNewPeList(instid, myEntry->peList);
865                 delete myEntry->peList;
866                 myEntry->peList = NULL;
867         } else {
868                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery NOT calling bracketedReceiveNewPeList %d/%d, %d/%d\n", 
869                                 CkMyPe(), 
870                                 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(), 
871                                 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
872         }
873
874 }
875
876
877 void ComlibManager::printPeList(const char* note, int *count)
878 {
879         char *buf = (char*)malloc(1024*64);
880         sprintf(buf, "[%d] %s ", CkMyPe(), note);
881         for(int i=0;i<CkNumPes();i++){
882                 switch (count[i]){
883                 case 0:
884                         sprintf(buf+strlen(buf), " %d:no ", i, count[i]);
885                         break;
886                 case 1:
887                         sprintf(buf+strlen(buf), " %d:source ", i, count[i]);
888                         break;
889                 case 2:
890                         sprintf(buf+strlen(buf), " %d:dest ", i, count[i]);
891                         break;
892                 case 3:
893                         sprintf(buf+strlen(buf), " %d:both ", i, count[i]);
894                         break;
895                 }
896         }
897         
898         sprintf(buf+strlen(buf), ", all source objects discovered =  %d, all destination objects discovered = %d",  count[CkNumPes()] ,  count[CkNumPes()+1] );
899                 
900         ComlibPrintf("%s\n", buf);
901 }
902
903 /***************************************************************************
904  * Delegation framework section:
905  *
906  * Reimplementation of the main routines needed for the delegation framework:
907  * this routines will be called when a message is sent through a proxy which has
908  * been associated with comlib.
909  ***************************************************************************/
910
911 extern int _charmHandlerIdx;
912 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid);
913
914
915 /** 
916     Handle messages sent via ArraySend. These are single point to point messages 
917     to array elements.
918
919     This method should not optimize direct sends in the case where buffering occurs
920
921  */
922 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
923                 const CkArrayIndexMax &idx, CkArrayID a){
924
925         CkAssert(pd != NULL);
926         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
927         int instid = ci->getID();
928         CkAssert(instid != 0);
929
930         // Reading from two hash tables is a big overhead
931         CkArray *amgr = CkArrayID::CkLocalBranch(a);
932         int dest_proc = amgr->lastKnown(idx);
933
934         register envelope * env = UsrToEnv(msg);
935         msg_prepareSend_noinline((CkArrayMessage*)msg, ep, a);
936
937         env->getsetArrayIndex()=idx;
938         env->setUsed(0);
939         ((CmiMsgHeaderExt *)env)->stratid = instid;
940
941         CkPackMessage(&env);
942         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));        
943         
944         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_ARRAYSEND);
945         
946         if(shouldBufferMessagesNow(instid)){
947           delayMessageSendBuffer[instid].insert(cmsg);
948           ComlibManagerPrintf("[%d] ComlibManager::ArraySend BUFFERED OUTGOING: now buffer contains %d messages\n",CkMyPe(), delayMessageSendBuffer[instid].size() );
949         } else {
950           ComlibPrintf("ComlibManager::ArraySend NOT BUFFERING inserting message into strategy %d\n",instid);
951           
952           if(dest_proc == CkMyPe()){  
953             // Directly send if object is local
954             amgr->deliver((CkArrayMessage *)msg, CkDeliver_queue);
955             return;
956           } else {
957             // Send through converse level strategy if non-local
958             converseManager->insertMessage(cmsg, instid);
959           }
960           
961         }
962
963 }
964
965
966 #include "qd.h"
967 //CkpvExtern(QdState*, _qd);
968
969 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
970
971         CkAssert(pd != NULL);
972         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
973         int instid = ci->getID();
974
975         int dest_proc = onPE;
976
977         ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
978                         UsrToEnv(msg)->getTotalsize());
979
980         register envelope * env = UsrToEnv(msg);
981         if(dest_proc == CkMyPe()){
982                 _SET_USED(env, 0);
983                 CkSendMsgBranch(ep, msg, dest_proc, gid);
984                 return;
985         }
986
987         ((CmiMsgHeaderExt *)env)->stratid = instid;
988         CpvAccess(_qd)->create(1);
989
990         env->setMsgtype(ForBocMsg);
991         env->setEpIdx(ep);
992         env->setGroupNum(gid);
993         env->setSrcPe(CkMyPe());
994         env->setUsed(0);
995
996         CkPackMessage(&env);
997         CmiSetHandler(env, _charmHandlerIdx);
998
999         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_GROUPSEND); 
1000
1001         if(shouldBufferMessagesNow(instid)){
1002           ComlibPrintf("ComlibManager::GroupSend Buffering message for %d\n",instid);
1003           delayMessageSendBuffer[instid].insert(cmsg);
1004         } else {
1005           ComlibPrintf("ComlibManager::GroupSend inserting message into strategy %d\n",instid);
1006           converseManager->insertMessage(cmsg, instid);
1007         }
1008
1009
1010 }
1011
1012 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
1013         ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
1014
1015         CkAssert(pd != NULL);
1016         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1017         int instid = ci->getID();
1018
1019         register envelope * env = UsrToEnv(m);
1020         msg_prepareSend_noinline((CkArrayMessage*)m, ep, a);
1021
1022         env->getsetArrayIndex()= dummyArrayIndex;
1023         ((CmiMsgHeaderExt *)env)->stratid = instid;
1024
1025         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
1026
1027         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_BROADCAST, CMH_ARRAYBROADCAST);
1028         cmsg->npes = 0;
1029         cmsg->sec_id = NULL;
1030         cmsg->array_id = a;
1031
1032         multicast(cmsg, instid);
1033 }
1034
1035 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
1036                 CkArrayID a, CkSectionID &s, int opts) {
1037
1038         CkAssert(pd != NULL);
1039         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1040         int instid = ci->getID();
1041
1042         ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
1043
1044
1045         //Provide a dummy dest proc as it does not matter for mulitcast 
1046         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_SECTION_MULTICAST, CMH_ARRAYSECTIONSEND);
1047         cmsg->npes = 0;
1048         cmsg->sec_id = &s;
1049         cmsg->array_id = a;
1050         
1051
1052         msg_prepareSend_noinline((CkArrayMessage*)m, ep, a);
1053         
1054         register envelope * env = UsrToEnv(m);
1055         env->getsetArrayIndex()= dummyArrayIndex;
1056         ((CmiMsgHeaderExt *)env)->stratid = instid;
1057         
1058         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
1059         
1060         env->setUsed(0);
1061         CkPackMessage(&env);
1062         
1063
1064         CkSectionInfo minfo;
1065         minfo.type = COMLIB_MULTICAST_MESSAGE;
1066         minfo.sInfo.cInfo.instId = ci->getID();
1067         //minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
1068         minfo.sInfo.cInfo.id = 0; 
1069         minfo.pe = CkMyPe();
1070         ((CkMcastBaseMsg *)env)->_cookie = minfo;    
1071         
1072         multicast(cmsg, instid);
1073 }
1074
1075 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
1076
1077         CkAssert(pd != NULL);
1078         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1079         int instid = ci->getID();
1080         CkAssert(instid!=0);
1081
1082         register envelope * env = UsrToEnv(m);
1083
1084         CpvAccess(_qd)->create(1);
1085
1086         env->setMsgtype(ForBocMsg);
1087         env->setEpIdx(ep);
1088         env->setGroupNum(g);
1089         env->setSrcPe(CkMyPe());
1090         env->setUsed(0);
1091         ((CmiMsgHeaderExt *)env)->stratid = instid;
1092
1093         CkPackMessage(&env);
1094         CmiSetHandler(env, _charmHandlerIdx);
1095
1096         //Provide a dummy dest proc as it does not matter for mulitcast 
1097         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST, CMH_GROUPBROADCAST);
1098
1099         cmsg->npes = 0;
1100         //cmsg->pelist = NULL;
1101
1102         multicast(cmsg, instid);
1103 }
1104
1105
1106 /** 
1107     Multicast the message with the specified strategy(instid).
1108     This method is used in: ArrayBroadcast, ArraySectionSend, and GroupBroadcast
1109  */
1110 void ComlibManager::multicast(CharmMessageHolder *cmsg, int instid) {
1111         CkAssert(instid != 0);
1112         register envelope * env = UsrToEnv(cmsg->getCharmMessage());
1113
1114 #if DEBUG
1115         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1116         ComlibPrintf("[%d] multicast setupComplete=%d errorMode=%d errorModeServer=%d\n", CkMyPe(), setupComplete, myEntry->errorMode, myEntry->errorModeServer);
1117 #endif
1118  
1119         env->setUsed(0);
1120         CkPackMessage(&env);
1121         
1122         if(shouldBufferMessagesNow(instid)){
1123           cmsg->saveCopyOf_sec_id();
1124           delayMessageSendBuffer[instid].insert(cmsg);
1125         } else {
1126           converseManager->insertMessage(cmsg, instid);
1127         }
1128         
1129 }
1130
1131
1132 void ComlibManager::printDiagnostics(int instid){
1133   CkPrintf("[%d]     delayMessageSendBuffer.size()=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size());
1134 }
1135
1136
1137 void ComlibManager::printDiagnostics(){
1138
1139
1140   std::map<ComlibInstanceHandle, std::set<CharmMessageHolder*> >::iterator iter;
1141   for(iter = delayMessageSendBuffer.begin(); iter != delayMessageSendBuffer.end(); ++iter){
1142     int instid = iter->first;
1143     int size = iter->second.size();
1144     
1145     if(size>0 || true){
1146       CkPrintf("[%d] delayMessageSendBuffer[instid=%d] contains %d messages\n", CkMyPe(), instid, size);
1147       
1148       if(! shouldBufferMessagesNow(instid)){
1149         CkPrintf("[%d] printDiagnostics: No messages should be still in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
1150       } else {
1151         CkPrintf("[%d] printDiagnostics: Messages still ought to be delayed in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
1152       }
1153       
1154       StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1155       CkPrintf("[%d] printDiagnostics[instid=%d] setupComplete=%d %s %s %s bufferOutgoing=%d\n", (int)CkMyPe(), (int)instid, (int)setupComplete, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString(), (int)myEntry->bufferOutgoing);
1156
1157
1158
1159     }
1160   }
1161   
1162   CkpvAccess(conv_com_object).printDiagnostics();
1163   
1164 }
1165
1166
1167
1168 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
1169         //ComlibDelegateData *inst = static_cast<ComlibDelegateData *>(data);
1170         data->ref();
1171         return data;
1172         //return (new ComlibDelegateData(inst->getID()));
1173 }
1174
1175 CkDelegateData* ComlibManager::DelegatePointerPup(PUP::er &p,
1176                 CkDelegateData *pd) {
1177         if (!p.isUnpacking() && pd == NULL) CkAbort("Tryed to pup a null ComlibDelegateData!\n");
1178         //CmiBool isNotNull = pd!=NULL ? CmiTrue : CmiFalse;
1179         ComlibDelegateData *inst = static_cast<ComlibDelegateData *>(pd);
1180
1181         // call a migration constructor
1182         if (p.isUnpacking()) inst = new ComlibDelegateData((CkMigrateMessage*)0);
1183         inst->pup(p);
1184         /*
1185   p | isNotNull;
1186   if (isNotNull) {
1187     if (p.isUnpacking()) inst = new ComlibInstanceHandle();
1188     inst->pup(p);
1189   }
1190          */
1191         return inst;
1192 }
1193
1194
1195 //Collect statistics from all the processors, also gets the list of
1196 //array elements on each processor.
1197 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe) {
1198 }
1199
1200
1201 /// @TODO: eliminate AtSync and move toward anytime migration!
1202 void ComlibManager::AtSync() {
1203
1204 }
1205
1206
1207 /***************************************************************************
1208  * User section:
1209  *
1210  * Interface available to the user to interact with the ComlibManager
1211  ***************************************************************************/
1212
1213 /** Permanently associate the given proxy with comlib, to use the instance
1214     represented by cinst. All messages sent through the proxy will be forwarded
1215     by comlib. */
1216 void ComlibAssociateProxy(ComlibInstanceHandle cinst, CProxy &proxy) {
1217   if(CkNumPes() > 1){
1218         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1219         proxy.ckDelegate(cgproxy.ckLocalBranch(), new ComlibDelegateData(cinst));
1220   } else {
1221     ComlibPrintf("Doing nothing in ComlibAssociateProxy because we have only 1 pe\n"); 
1222   }
1223 }
1224
1225 /** Permanently assiciate the given proxy with comlib, to use the strategy
1226     represented by strat. All messages sent through the proxy will be forwarded
1227     by comlib. */
1228 void ComlibAssociateProxy(Strategy *strat, CProxy &proxy) {
1229         ComlibAssociateProxy(strat->getHandle(), proxy);
1230 }  
1231
1232 /* OLD DESCRIPTION! Register a strategy to the comlib framework, and return a
1233     handle to be used in the future with ComlibAssociateProxy to associate the
1234     strategy with a proxy. If one strategy is registered more than once, the
1235     handles will be different, but they has to be considered as aliases. */
1236
1237 /** Provided only for backward compatibility. A strategy is registered at the
1238     converse level as soon as it is created (see Strategy constructor). This
1239     function just retrieve a handle from the strategy itself. */
1240 ComlibInstanceHandle ComlibRegister(Strategy *strat) {
1241         return strat->getHandle();
1242 }
1243
1244 /** Call beginIteration on the ComlibManager with the instance handle for the strategy associated with the proxy. 
1245  *   If no strategy has been associated with the proxy, nothing is done in this function.
1246  */
1247 void ComlibBegin(CProxy &proxy, int iteration) {
1248   if(CkNumPes() > 1){
1249         ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
1250         if(cinst==NULL)
1251                 return;
1252         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1253         (cgproxy.ckLocalBranch())->beginIteration(cinst->getID(), iteration);
1254   } else {
1255     ComlibPrintf("Doing nothing in ComlibBegin because we have only 1 pe");  
1256   }
1257 }
1258
1259 /** Call endIteration on the ComlibManager with the instance handle for the strategy associated with the proxy. 
1260  *   If no strategy has been associated with the proxy, nothing is done in this function.
1261  */
1262 void ComlibEnd(CProxy &proxy, int iteration) {
1263   if(CkNumPes() > 1){
1264         ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
1265         if(cinst==NULL)
1266                 return;
1267         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1268         (cgproxy.ckLocalBranch())->endIteration(cinst->getID(), iteration);
1269   } else {
1270     ComlibPrintf("Doing nothing in ComlibEnd because we have only 1 pe");  
1271   }
1272 }
1273
1274 char *routerName;
1275 int sfactor=0;
1276
1277
1278 /** A mainchare, used to initialize the comlib framework at the program startup.
1279     Its main purpose is to create the ComlibManager group. */
1280 class ComlibManagerMain {
1281 public:
1282         ComlibManagerMain(CkArgMsg *msg) {
1283
1284                 if(CkMyPe() == 0 && msg !=  NULL)
1285                         CmiGetArgString(msg->argv, "+strategy", &routerName);         
1286
1287                 if(CkMyPe() == 0 && msg !=  NULL)
1288                         CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
1289
1290                 CProxy_ComlibManager::ckNew();
1291         }
1292 };
1293
1294 /***************************************************************************
1295  * ComlibInstanceHandle section:
1296  *
1297  * Implementation of the functions defined in the ComlibInstanceHandle class.
1298  ***************************************************************************/
1299
1300 ComlibDelegateData::ComlibDelegateData(int instid) : CkDelegateData(), _instid(instid) {
1301         ComlibManagerPrintf("[%d] Constructing ComlibDelegateData\n", CkMyPe());
1302         ref();
1303 }
1304
1305
1306 void ComlibInitSectionID(CkSectionID &sid){
1307
1308         sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1309         sid._cookie.pe = CkMyPe();
1310
1311         sid._cookie.sInfo.cInfo.id = 0;    
1312         sid.npes = 0;
1313         sid.pelist = NULL;
1314 }
1315
1316
1317 /** For backward compatibility - for old name commlib. The function
1318     _registercomlib() is generated by the translator. */
1319 void _registercommlib(void)
1320 {
1321         static int _done = 0; 
1322         if(_done) 
1323                 return; 
1324         _done = 1;
1325         _registercomlib();
1326 }
1327
1328
1329 void ComlibNotifyMigrationDoneHandler(void *msg) {
1330         CmiFree(msg);
1331         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1332         ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1333         if(cmgr_ptr)
1334                 cmgr_ptr->AtSync();    
1335 }
1336
1337
1338
1339
1340
1341
1342 static void periodicDebugPrintStatus(void* ptr, double currWallTime){
1343   CkPrintf("[%d] periodicDebugPrintStatus()\n", CkMyPe());
1344
1345   ComlibManager *mgr = (ComlibManager*)ptr;
1346   mgr->printDiagnostics();
1347
1348   CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, ptr, 4000, CkMyPe());
1349
1350 }
1351
1352
1353
1354
1355
1356 #include "comlib.def.h"
1357
1358 /*@}*/