f2bdbc2fefef0d9fc0b60ab7f1fb6f42fec6e40e
[charm.git] / src / ck-com / ComlibManager.C
1 /**
2    @addtogroup CharmComlib
3    @{
4    @file
5    Implementation of the functions in ComlibManager.h and handler for message
6    transportation.
7 */
8
9 #include "ComlibManager.h"
10 #include "comlib.h"
11 #include "ck.h"
12 #include "envelope.h"
13
14
15 // We only want to print debug information for a single strategy. Otherwise we'll get incredibly confused
16 #undef ComlibManagerPrintf
17 //#define ComlibManagerPrintf  if(instid==1)ComlibPrintf
18 #define ComlibManagerPrintf  ComlibPrintf
19
20 #define getmax(a,b) ((a)>(b)?(a):(b))
21
22 CkpvExtern(int, RecvdummyHandle);
23  
24 CkpvDeclare(CkGroupID, cmgrID);
25
26 /***************************************************************************
27  * Handlers section:
28  *
29  * all the handlers used by the ComlibManager to coordinate the work, and
30  * propagate the messages from one processor to another
31  ***************************************************************************/
32
33 //Handler to receive array messages
34 CkpvDeclare(int, RecvmsgHandle);
35
36 void recv_array_msg(void *msg){
37
38   //    ComlibPrintf("%d:In recv_msg\n", CkMyPe());
39
40         if(msg == NULL)
41                 return;
42
43         register envelope* env = (envelope *)msg;
44         env->setUsed(0);
45         env->getsetArrayHops()=1; 
46         CkUnpackMessage(&env);
47
48         int srcPe = env->getSrcPe();
49         int sid = ((CmiMsgHeaderExt *) env)->stratid;
50
51         //      ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), sid, env->getTotalsize(), srcPe);
52
53         RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
54
55         CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
56         
57         a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
58
59         //      ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
60         return;
61 }
62
63
64
65 /**
66    A debugging routine that will periodically print out the status of the message queues. 
67    The Definition is at bottom of this file.
68  */
69 static void periodicDebugPrintStatus(void* ptr, double currWallTime);
70
71
72
73
74
75 /***************************************************************************
76  * Initialization section:
77  *
78  * Routines used by Comlib to initialize itself and all the strategies on all
79  * the processors. Used only at the beginning of the program.
80  ***************************************************************************/
81
82 ComlibManager::ComlibManager(){
83         init();
84 }
85
86 void ComlibManager::init(){
87
88   CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, (void*)this, 4000, CkMyPe());
89
90   if(CkNumPes() == 1 ){
91     ComlibPrintf("Doing nothing in ComlibManager::init() because we are running on 1 pe.\n");
92   } else {
93
94         initComlibManager();
95
96         if (CkMyRank() == 0) {
97                 PUPable_reg(CharmMessageHolder);
98         }
99
100         numStatsReceived = 0;
101         curComlibController = 0;
102         clibIteration = 0;
103
104         CkpvInitialize(comRectHashType *, com_rect_ptr); 
105         CkpvAccess(com_rect_ptr)= new comRectHashType;
106
107         CkpvInitialize(int, RecvmsgHandle);
108         CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
109
110         bcast_pelist = new int [CkNumPes()];
111         for(int brcount = 0; brcount < CkNumPes(); brcount++)
112                 bcast_pelist[brcount] = brcount;
113
114         section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
115
116         CkpvInitialize(CkGroupID, cmgrID);
117         CkpvAccess(cmgrID) = thisgroup;
118
119         dummyArrayIndex.nInts = 0;
120         converseManager = CkpvAccess(conv_com_ptr);
121
122         setupComplete = 0;
123
124         CkpvInitialize(int, migrationDoneHandlerID);
125         CkpvAccess(migrationDoneHandlerID) = 
126                 CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
127
128         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
129         cgproxy[curComlibController].barrier();
130   }
131 }
132
133 //First barrier makes sure that the communication library group 
134 //has been created on all processors
135 void ComlibManager::barrier(){
136         static int bcount = 0;
137         ComlibPrintf("barrier %d\n", bcount);
138         if(CkMyPe() == 0) {
139                 bcount ++;
140                 if(bcount == CkNumPes()){
141                         bcount = 0;
142                         //barrierReached = 1;
143                         //barrier2Reached = 0;
144
145                         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
146                         cgproxy.resumeFromSetupBarrier();
147                 }
148         }
149 }
150
151
152
153 /**
154    Due to the possibility of race conditions in the initialization of charm, this
155    barrier prevents comlib from being activated before all group branches are created.
156    This function completes the initialization of the charm layer of comlib.
157
158    In this function we also call ComlibDoneCreating for the user (basically
159    triggering the broadcast of the strategies created in Main::Main. Here the
160    Main::Main has for sure executed, otherwise we will not have received
161    confirmation by all other processors.
162  */
163 void ComlibManager::resumeFromSetupBarrier(){
164         ComlibPrintf("[%d] resumeFromSetupBarrier Charm group ComlibManager setup finished\n", CkMyPe());
165
166         setupComplete = 1;
167         ComlibDoneCreating();
168         sendBufferedMessagesAllStrategies();
169 }
170
171
172 /***************************************************************************
173  Determine whether the delegated messages should be buffered until the 
174  strategy has recovered from any error conditions and startup. Once the 
175  buffers can be flushed, ComlibManager::sendBufferedMessages() will be called
176 ***************************************************************************/
177 bool ComlibManager::shouldBufferMessagesNow(int instid){
178   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
179   return (!setupComplete) || myEntry->errorMode == ERROR_MODE || myEntry->errorMode == CONFIRM_MODE || myEntry->bufferOutgoing;
180 }
181
182
183 /***************************************************************************
184  Calls ComlibManager::sendBufferedMessages for each strategy.
185 ***************************************************************************/
186 void ComlibManager::sendBufferedMessagesAllStrategies(){
187   int nstrats = converseManager->getNumStrats();
188   for(int i=0;i<nstrats;i++){
189     sendBufferedMessages(i);
190   }
191 }
192
193
194 /***************************************************************************
195    Send all the buffered messages once startup has completed, and we have 
196    recovered from any errors have recovered.
197 ***************************************************************************/
198 void ComlibManager::sendBufferedMessages(int instid){
199   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
200
201   if(! shouldBufferMessagesNow(instid) && delayMessageSendBuffer[instid].size() > 0){
202     ComlibManagerPrintf("[%d] sendBufferedMessages Sending %d buffered messages for instid=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size(), instid);
203
204
205     for (std::set<CharmMessageHolder*>::iterator iter = delayMessageSendBuffer[instid].begin(); iter != delayMessageSendBuffer[instid].end(); ++iter) {
206       CharmMessageHolder* cmsg = *iter;
207           
208       switch(cmsg->type){
209             
210       case CMH_ARRAYSEND:
211         //          CkAbort("CMH_ARRAYSEND unimplemented");
212         CkpvAccess(conv_com_ptr)->insertMessage(cmsg, instid);
213         CkpvAccess(conv_com_ptr)->doneInserting(instid);
214         break;
215             
216       case CMH_GROUPSEND:
217         CkAbort("CMH_GROUPSEND unimplemented");
218         break;
219             
220       case CMH_ARRAYBROADCAST:
221       case CMH_ARRAYSECTIONSEND: 
222       case CMH_GROUPBROADCAST:
223         // Multicast/broadcast to an array or a section:
224         cmsg->sec_id = cmsg->copy_of_sec_id;
225         CkpvAccess(conv_com_ptr)->insertMessage(cmsg, instid);
226         CkpvAccess(conv_com_ptr)->doneInserting(instid);
227         break;
228             
229       default:
230         CkAbort("Unknown cmsg->type was found in buffer of delayed messages\n");
231       }
232           
233     }
234
235     delayMessageSendBuffer[instid].clear();
236   } 
237   else {
238     ComlibManagerPrintf("sendBufferedMessages is not flushing buffered messages for strategy %d because myEntry->errorMode=%d && setupComplete = %d && myEntry->discoveryMode = %d && myEntry->bufferOutgoing = %d\n", instid, (int)myEntry->errorMode, (int)setupComplete, (int)myEntry->discoveryMode, (int)myEntry->bufferOutgoing);  
239   }
240
241
242
243 }
244
245
246
247 /***************************************************************************
248  * Bracketed section:
249  *
250  * Routines for bracketed strategies, to call the brackets begin and end, and to
251  * restore count errors after some objects migrate.
252  * 
253  * This function must only be called after all messages from the previous 
254  * iteration have been delivered. This is the application's responsibility to 
255  * ensure. The iteration values provided must increase monotonically.
256  ***************************************************************************/
257
258 /// Called when the array/group element starts sending messages
259 void ComlibManager::beginIteration(int instid, int iteration){
260         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
261                             
262         ComlibManagerPrintf("[%d] beginIteration iter=%d lastKnownIteration=%d  %s %s %s\n", CkMyPe(), iteration, myEntry->lastKnownIteration, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
263
264
265         if(iteration > myEntry->lastKnownIteration){
266               // Verify & update errorModeServer:
267               if(CkMyPe()==0){
268                 CkAssert(myEntry->errorModeServer == NORMAL_MODE_SERVER || 
269                          myEntry->errorModeServer == STARTUP_MODE_SERVER ||  
270                          myEntry->errorModeServer == ERROR_FIXED_MODE_SERVER);
271                 myEntry->errorModeServer = NORMAL_MODE_SERVER;
272               } else {
273                 CkAssert(myEntry->errorModeServer == NON_SERVER_MODE_SERVER || 
274                          myEntry->errorModeServer == STARTUP_MODE_SERVER);
275                 myEntry->errorModeServer = NON_SERVER_MODE_SERVER;
276               }
277               // Verify & update errorMode:
278               CkAssert(myEntry->errorMode == ERROR_FIXED_MODE || myEntry->errorMode == NORMAL_MODE  );
279               myEntry->errorMode = NORMAL_MODE;   
280
281               myEntry->lastKnownIteration = iteration;
282               myEntry->nBeginItr = 1; // we are the first time to be called this iteration
283               myEntry->nEndItr = 0;
284               myEntry->nProcSync = 0;
285               myEntry->totalEndCounted = 0;
286               myEntry->discoveryMode = NORMAL_DISCOVERY_MODE;
287               myEntry->nEndSaved = 0;
288               
289               
290               ComlibManagerPrintf("[%d] beginIteration Starting Next Iteration ( # %d )\n", CkMyPe(), iteration);
291         } else {
292                 myEntry->nBeginItr++;
293                 ComlibManagerPrintf("[%d] beginIteration continuing iteration # %d\n", CkMyPe(), iteration);
294         }
295         
296         
297         // We need to check for error conditions here as well as EndIteration.
298         // This will ensure that if we are the processor that detects this error, 
299         // we won't deliver at least this message until the strategy is fixed
300         if (myEntry->nBeginItr > myEntry->numElements) {
301           ComlibManagerPrintf("[%d] beginIteration BUFFERING OUTGOING\n",CkMyPe());                     
302           myEntry->bufferOutgoing = 1;
303         }
304         
305 }
306
307
308 /** Called by user program when each element has finished sending its messages.
309
310     If no errors are detected, ConvComlibManager::doneInserting() is called on the 
311     underlying converse strategy. If an error is detected, then ConvComlibManager::doneInserting
312     is NOT called. This likely causes the the underlying converse strategy to buffer the 
313     messages until we recover from the error mode, although we have buffered at least some of
314     the messages, so the user program cannot run ahead and start the next iteration.
315
316 */
317
318 void ComlibManager::endIteration(int instid, int step){ 
319         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
320         
321         // migration is not allowed between begin and end of iteration, 
322         // so each element must call end iteration on the same processor 
323         // as it called begin iteration
324         CkAssert(myEntry->nEndItr <= myEntry->nBeginItr);
325         CkAssert(step == myEntry->lastKnownIteration);
326
327
328         myEntry->nEndItr++;
329         
330         ComlibManagerPrintf("[%d] endIteration called\n",CkMyPe());
331         
332         
333         if (myEntry->bufferOutgoing) {
334           // If migration was detected in ComlibManager::beginIteration and hence messages have been buffered:
335           CkAssert(delayMessageSendBuffer[instid].size() > 0);
336           CProxy_ComlibManager myProxy(thisgroup);
337           myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
338         } 
339         else if(myEntry->nEndItr == myEntry->numElements) {
340           // If all the objects have called beginIteration and endIteration and no errors were detected
341           CkAssert(converseManager->isReady(instid));
342           converseManager->doneInserting(instid);
343         }
344         
345 }
346
347
348 /** Start recovery of errors. 
349     This entry method calls itself repeatedly if the underlying 
350     converse strategy is not yet ready.
351
352     If the PE is already in error mode, then only the difference 
353     in the counts will be contributed.
354 */
355 void ComlibManager::bracketedStartErrorRecoveryProcess(int instid, int step){  
356   CProxy_ComlibManager myProxy(thisgroup);
357   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);     
358
359   
360
361   if(converseManager->isReady(instid)){
362     ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess() %s %s %s\n", CkMyPe(), myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
363
364     CkAssert(myEntry->strategy != NULL);
365     CkAssert(myEntry->errorMode == NORMAL_MODE || myEntry->errorMode == ERROR_MODE);
366           
367     if (!myEntry->strategy->isBracketed()) {
368       CkPrintf("[%d] endIteration called unecessarily for a non-bracketed strategy\n", CkMyPe());
369       return;
370     }
371        
372     if (myEntry->errorMode == NORMAL_MODE) {
373       ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess()\n", CkMyPe());
374       myEntry->nEndSaved = myEntry->nEndItr;
375       myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
376       myEntry->errorMode = ERROR_MODE;
377       bracketedStartDiscovery(instid);
378     } else {
379       // Send the updated count
380       int update = myEntry->nEndItr - myEntry->nEndSaved;
381       if (update > 0) {
382         //      ComlibManagerPrintf("bracketedStartErrorRecoveryProcess sending update to bracketedReceiveCount\n");
383         CProxy_ComlibManager myProxy(thisgroup);
384         myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
385         myEntry->nEndSaved = myEntry->nEndItr;
386       }
387       
388     }
389   } else {
390     ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess() REENQUEUE\n", CkMyPe() );
391     // Re-enqueue myself because we can't start error recovery process until converse strategy is ready
392     myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
393   }
394 }
395
396
397 /// Invoked on all processors to inform that an error has been detected.
398 void ComlibManager::bracketedErrorDetected(int instid, int step) {
399         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
400         
401         bracketedCatchUpPE(instid,step);
402         CkAssert(step == myEntry->lastKnownIteration);
403         CkAssert(myEntry->errorMode == NORMAL_MODE || myEntry->errorMode == ERROR_MODE);
404
405         ComlibManagerPrintf("[%d] bracketedErrorDetected()\n", CkMyPe());
406
407         if (myEntry->errorMode == NORMAL_MODE) {
408           // save the value we are sending to bracketedReceiveCount
409           myEntry->nEndSaved = myEntry->nEndItr; // save the value we are sending to bracketedReceiveCount
410           CProxy_ComlibManager myProxy(thisgroup);
411           myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
412           bracketedStartDiscovery(instid);
413           myEntry->errorMode = ERROR_MODE;
414
415         } else { // ERROR_MODE
416           // If we have an update count, send it
417           int update = myEntry->nEndItr - myEntry->nEndSaved;
418           ComlibManagerPrintf("bracketedErrorDetected update=%d\n", update);
419           if (update > 0) {
420             ComlibManagerPrintf("bracketedErrorDetected sending update to bracketedReceiveCount\n");
421             CProxy_ComlibManager myProxy(thisgroup);
422             myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
423             myEntry->nEndSaved = myEntry->nEndItr;
424           }
425         }
426         
427 }
428
429 /// Invoked on all processors. After processor 0 has a count match, it sends out
430 /// a broadcast on this entry method to get a confirmation from all others.
431 void ComlibManager::bracketedConfirmCount(int instid) {
432         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
433         CkAssert(myEntry->errorMode == ERROR_MODE);
434         myEntry->errorMode = CONFIRM_MODE;
435         CProxy_ComlibManager myProxy(thisgroup);
436         ComlibManagerPrintf("[%d] bracketedConfirmCount\n", CkMyPe());
437         myProxy[0].bracketedCountConfirmed(instid, myEntry->nEndSaved, myEntry->lastKnownIteration);
438 }
439
440 /// Invoked on processor 0 upon a request to confirm the count. If the count is
441 /// correct a "NewPeList" is sent out, otherwise the error mode is returned with
442 /// "ErrorDetected"
443 void ComlibManager::bracketedCountConfirmed(int instid, int count, int step) {
444         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
445         CkAssert(CkMyPe() == 0);
446         // Advance PE0 to current step if we had no local objects
447         bracketedCatchUpPE(instid, step);
448         CkAssert(myEntry->errorModeServer == CONFIRM_MODE_SERVER);
449         CkAssert(step == myEntry->lastKnownIteration);
450
451         myEntry->total += count;
452         
453         ComlibManagerPrintf("[%d] bracketedCountConfirmed\n", CkMyPe());
454         
455         if (++myEntry->peConfirmCounter == CkNumPes()) {
456                 myEntry->peConfirmCounter = 0;
457
458                 CkAssert(myEntry->total == myEntry->totalEndCounted);
459                   
460                 CProxy_ComlibManager(thisgroup).bracketedReceiveNewCount(instid);
461                 myEntry->errorModeServer = ERROR_FIXED_MODE_SERVER;
462                   
463                 myEntry->total = 0;     
464         }
465
466 }
467
468
469
470 /** Update the state for a PE that was likely lacking any local objects.
471     If no local objects exist, noone will call begin iteration, and hence,
472     the lastKnownIteration value will be old. We need to advance to the 
473     current iteration before we can proceed.
474 */
475 void ComlibManager::bracketedCatchUpPE(int instid, int step){
476   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
477   CkAssert(step >= myEntry->lastKnownIteration);
478   if(step > myEntry->lastKnownIteration){
479     
480     myEntry->total = 0;
481     myEntry->lastKnownIteration = step;
482     myEntry->nBeginItr = 0;
483     myEntry->nEndItr = 0;
484     myEntry->nProcSync = 0;
485     myEntry->totalEndCounted = 0;
486     myEntry->nEndSaved = 0;
487     myEntry->errorMode = NORMAL_MODE;
488     myEntry->discoveryMode = NORMAL_DISCOVERY_MODE;
489
490     if(CkMyPe()==0){
491       CkAssert(myEntry->errorModeServer == NORMAL_MODE_SERVER || myEntry->errorModeServer == ERROR_FIXED_MODE_SERVER);
492       ComlibManagerPrintf("[%d] NORMAL_MODE_SERVER *******************AAAAAAAAAAAA**********************\n", CkMyPe());
493       myEntry->errorModeServer = NORMAL_MODE_SERVER;
494     } else {
495       CkAssert(myEntry->errorModeServer == NON_SERVER_MODE_SERVER || myEntry->errorModeServer == STARTUP_MODE_SERVER);
496       myEntry->errorModeServer == NON_SERVER_MODE_SERVER;
497     }
498     
499     
500   }
501 }
502
503 /// Invoked on processor 0 when a processor sends a count of how many elements
504 /// have already deposited. This number is incremental, and it refers to the
505 /// previous one sent by that processor. Processor 0 stores temporarily all the
506 /// numbers it receives. When a match happens, processor 0 switches to "confirm"
507 /// mode and send out a request for confirmation to all other processors.
508 /// The final parameter, "step", is used so that if  no objects exist on a processor,
509 /// then the counts sent by the processor will be tagged with an old timestep, and can 
510 /// safely be ignored. If no objects are on a PE, then beginIteration will never
511 /// be called there.
512 void ComlibManager::bracketedReceiveCount(int instid, int pe, int count, int isFirst, int step) {
513         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
514         CkAssert(CkMyPe() == 0);
515         CkAssert(myEntry->errorModeServer == NORMAL_MODE_SERVER || myEntry->errorModeServer == ERROR_MODE_SERVER );
516
517         ComlibManagerPrintf("[%d] bracketedReceiveCount step=%d \n", CkMyPe(), step);
518
519         
520         // Advance PE0 to current step if we had no local objects
521         bracketedCatchUpPE(instid, step);
522
523         // Encountering a message from an old step
524         CkAssert(step == myEntry->lastKnownIteration);
525
526         myEntry->totalEndCounted += count;
527
528         ComlibManagerPrintf("[%d] bracketedReceiveCount step=%d totalEndCounted=%d count=%d\n", CkMyPe(), step, myEntry->totalEndCounted, count);
529
530         
531         myEntry->nProcSync += isFirst; // isFirst is 1 the first time a processor send a message,
532         CkAssert(myEntry->nProcSync <= CkNumPes());
533         
534         if (myEntry->errorModeServer == NORMAL_MODE_SERVER) { 
535                 // first time this is called
536                 CkAssert(myEntry->nProcSync == 1);
537                 CProxy_ComlibManager(thisgroup).bracketedErrorDetected(instid, step);
538                 myEntry->errorModeServer = ERROR_MODE_SERVER;
539                 ComlibManagerPrintf("[%d] bracketedReceiveCount first time\n", CkMyPe());
540         } else { // ERROR_MODE
541         
542           CharmStrategy* s = dynamic_cast<CharmStrategy*>(myEntry->strategy);
543           ComlibArrayInfo ainfo = s->ainfo;
544           int totalsrc =  ainfo.getTotalSrc() ;
545           
546           if(myEntry->nProcSync == CkNumPes() && myEntry->totalEndCounted == totalsrc) {
547             // ok, we received notifications from all PEs and all objects have called endIteration
548             myEntry->errorModeServer = CONFIRM_MODE_SERVER; 
549             ComlibManagerPrintf("[%d] bracketedReceiveCount errorModeServer is now CONFIRM_MODE calling bracketedConfirmCount totalsrc=%d\n", CkMyPe(), (int)totalsrc);
550             CProxy_ComlibManager(thisgroup).bracketedConfirmCount(instid);
551                         
552           }
553         }
554                 
555 }
556
557
558 /// Invoked on all processors. After the count has been checked and it matches
559 /// the number of emements involved in the bracketed operation, processor 0
560 /// sends a broadcast to acknowledge the success. 
561 /// The strategy is disabled until a new Pe list is received.
562 void ComlibManager::bracketedReceiveNewCount(int instid) {
563         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
564         CkAssert(myEntry->errorMode == CONFIRM_MODE);
565
566         myEntry->errorMode = ERROR_FIXED_MODE;
567         
568         myEntry->nEndItr -= myEntry->nEndSaved;
569         myEntry->nBeginItr -= myEntry->nEndSaved;
570
571         myEntry->nEndSaved = 0;
572
573         bracketedFinalBarrier(instid);
574 }
575
576
577
578
579 /// Invoked on all processors. When all processors have discovered all elements
580 /// involved in the operation, processor 0 uses this method to broadcast the
581 /// entire processor list to all. Currently the discovery process HAS to go
582 /// together with the counting process, otherwise the strategy is updated at an
583 /// undetermined state. In future it may be useful to implement the discovery
584 /// process alone.
585 void ComlibManager::bracketedReceiveNewPeList(int instid, int *count) {
586         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
587         CkAssert(myEntry->discoveryMode == STARTED_DISCOVERY_MODE);
588         myEntry->discoveryMode = FINISHED_DISCOVERY_MODE;
589
590         myEntry->strategy->bracketedUpdatePeKnowledge(count);
591         
592         ComlibManagerPrintf("[%d] bracketedReceiveNewPeList Updating numElements\n", CkMyPe());
593         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
594         CkAssert((unsigned long)myInfo > 0x1000);
595         myEntry->numElements = myInfo->getLocalSrc();
596         
597         ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n",CkMyPe(), instid, delayMessageSendBuffer[instid].size() );
598         ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n", CkMyPe(), instid, delayMessageSendBuffer[instid].size());
599                 
600         bracketedFinalBarrier(instid);
601 }
602
603
604 /** Start a barrier phase where all processes will enter it once both 
605     the counting and discovery processes complete.
606
607     ComlibManager::bracketedReceiveNewPeList calls this method. 
608     ComlibManager::bracketedReceiveNewPeList is called as an array broadcast to thisProxy, 
609     so every PE will call this method.
610  */
611 void ComlibManager::bracketedFinalBarrier(int instid) {
612   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
613   ComlibManagerPrintf("[%d] ComlibManager::bracketedFinalBarrier %s %s %s\n", CkMyPe(), myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
614
615
616   if (myEntry->discoveryMode == FINISHED_DISCOVERY_MODE &&  myEntry->errorMode == ERROR_FIXED_MODE) {    
617     CProxy_ComlibManager myProxy(thisgroup);
618     myProxy[0].bracketedReleaseCount(instid);
619   }
620 }
621
622
623 /** 
624     Once all PEs report here, we will allow them to release the buffered messages from this iteration
625  */
626 void ComlibManager::bracketedReleaseCount(int instid) {
627     ComlibManagerPrintf("[%d] ComlibManager::bracketedReleaseCount\n", CkMyPe());
628
629     StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
630     CkAssert(CkMyPe() == 0);
631     CkAssert(myEntry->errorModeServer == ERROR_FIXED_MODE_SERVER);
632     
633     myEntry->numBufferReleaseReady++;
634     if(myEntry->numBufferReleaseReady == CkNumPes()) {
635       myEntry->errorModeServer = NORMAL_MODE_SERVER;
636       CProxy_ComlibManager(thisgroup).bracketedReleaseBufferedMessages(instid);
637       myEntry->numBufferReleaseReady = 0;
638     }
639 }
640
641 /** 
642     Release any buffered messages.
643  */
644 void ComlibManager::bracketedReleaseBufferedMessages(int instid) {
645   ComlibManagerPrintf("[%d] ComlibManager::bracketedReleaseBufferedMessages\n", CkMyPe());
646
647   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
648   
649   myEntry->bufferOutgoing = 0;
650   sendBufferedMessages(instid);
651   
652   converseManager->doneInserting(instid);
653 }
654
655
656
657
658
659
660 /** Called when an error is discovered. Such errors occur when it is observed
661     that an array element has migrated to a new PE. Specifically if a strategy
662     on some PE determines that it has received messages for more array elements
663     than the strategy knew were local, then some array element must have 
664     migrated to the PE. The strategy instance detecting the error will call this
665     method to initiate a global update operation that determines the locations
666     of all array elements.
667
668     This is called on each PE by the bracketedErrorDetected() method. 
669
670     Each array element previously located on this PE is examined to determine if
671     it is still here. If the array element has migrated away, then the
672     bracketedDiscover() method is called on the new PE.
673
674 */
675 void ComlibManager::bracketedStartDiscovery(int instid) {
676         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
677         CkAssert(myEntry->discoveryMode == NORMAL_DISCOVERY_MODE);
678         myEntry->discoveryMode = STARTED_DISCOVERY_MODE;
679         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
680                                 
681         ComlibManagerPrintf("[%d] bracketedStartDiscovery\n", CkMyPe());
682
683         int countSrc = 0;
684         int countDest = 0;
685         CkArrayID aid;
686         CkArrayIndexMax *idx;
687         int nelem;
688         CkArray *a;
689         CProxy_ComlibManager myProxy(thisgroup);
690
691         if (myInfo->isSourceArray()) {
692                 myInfo->getSourceArray(aid, idx, nelem);
693                 myInfo->resetSource();
694                 a = (CkArray*)_localBranch(aid);
695                 for (int i=0; i<nelem; ++i) {
696                         int pe = a->lastKnown(idx[i]);
697 //                      ComlibManagerPrintf("[%d] bracketedStartDiscovery src Index %d was lastKnown at pe %d\n", CkMyPe(), idx[i].data()[0], pe);
698                         if (pe == CkMyPe()) {
699                                 countSrc++;
700                                 myInfo->addSource(idx[i]);
701                         }
702                         else {
703                                 myProxy[pe].bracketedDiscover(instid, aid, idx[i], true);
704                         }
705                 }
706                 delete[] idx;
707         }
708
709         if (myInfo->isDestinationArray()) {
710                 myInfo->getDestinationArray(aid, idx, nelem);
711                 myInfo->resetDestination();
712                 a = (CkArray*)_localBranch(aid);
713                 for (int i=0; i<nelem; ++i) {
714                         int pe = a->lastKnown(idx[i]);
715 //                      ComlibManagerPrintf("[%d] bracketedStartDiscovery dest Index %d was lastKnown at pe %d\n", CkMyPe(), idx[i].data()[0], pe);
716                         if (pe == CkMyPe()) {
717                                 countDest++;
718                                 myInfo->addDestination(idx[i]);
719                         }
720                         else {
721                                 myProxy[pe].bracketedDiscover(instid, aid, idx[i], false);
722                         }
723                 }
724                 delete[] idx;
725         }
726
727         if (countSrc > 0 || countDest > 0) {
728                 // contribute only if we have something
729                 myProxy[0].bracketedContributeDiscovery(instid, CkMyPe(), countSrc, countDest, myEntry->lastKnownIteration);
730         }
731
732 }
733
734
735
736 /** Determine the PE of a given array index. This will be called for any 
737     array element by the previous owner PE.
738
739     If the index is local to the processor, then record this in the local 
740     strategy. Also send a message to PE 0 informing PE 0 that the array
741     element was located.
742
743     If the array element is not found locally, call this method on the last 
744     known location for the element.
745
746 */
747 void ComlibManager::bracketedDiscover(int instid, CkArrayID aid, CkArrayIndexMax &idx, int isSrc) {
748         ComlibManagerPrintf("[%d] bracketedDiscover\n", CkMyPe());
749         CkArray *a = (CkArray *)_localBranch(aid);
750         int pe = a->lastKnown(idx);
751         CProxy_ComlibManager myProxy(thisgroup);
752         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
753
754         if (pe == CkMyPe()) {
755                 // Object was found locally
756          
757                 // notify PE 0
758                 myProxy[0].bracketedContributeDiscovery(instid, pe, isSrc?1:0, isSrc?0:1, myEntry->lastKnownIteration);
759           
760
761                 // Find local strategy instance
762
763                 // ComlibArrayInfo *myInfo = &(dynamic_cast<CharmStrategy*>(getStrategy(instid))->ainfo);
764                 CkAssert((unsigned long)myEntry->strategy > 0x1000);
765                 ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
766                 CkAssert((unsigned long)myInfo > 0x1000);
767
768                 if (isSrc) {
769                         // Add the element as a source element for the strategy
770                         ComlibManagerPrintf("[%d] bracketedDiscover addSource\n", CkMyPe());
771                         CkAssert((unsigned long)myInfo > 0x1000);
772                         myInfo->addSource(idx);
773
774                         ComlibManagerPrintf("[%d] bracketedDiscover updating numElements\n", CkMyPe());
775                         myEntry->numElements = myInfo->getLocalSrc();           
776                 }
777                 else {
778                         // Add the element as a Destination element for the strategy
779                         ComlibManagerPrintf("[%d] bracketedDiscover addDestination\n", CkMyPe());
780                         myInfo->addDestination(idx);
781                 }
782         } else {
783                 ComlibManagerPrintf("Keep On Forwarding*********************\n");
784                 // forward to next potential processor
785                 myProxy[pe].bracketedDiscover(instid, aid, idx, isSrc);
786         }
787 }
788
789
790
791 /** On PE 0, record the notifications from all PEs about the actual locations of each
792     array element. Count the number of elements discovered. Once the new location of 
793     every elements has been discovered, broadcast the new locations to all PEs by 
794     invoking bracketedReceiveNewPeList.
795 */
796 void ComlibManager::bracketedContributeDiscovery(int instid, int pe, int nsrc, int ndest, int step) {
797         CkAssert(CkMyPe() == 0);
798         ComlibManagerPrintf("[%d] bracketedContributeDiscovery pe=%d nsrc=%d ndest=%d step=%d\n", CkMyPe(), pe, nsrc, ndest, step);
799         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
800         if (myEntry->peList == 0) {
801                 myEntry->peList = new int[CkNumPes()+2];
802                 // peList[CkNumPes()] keeps the sum of all source objects discovered
803                 // peList[CkNumPes()+1] keeps the sum of all destination objects discovered
804                 for (int i=0; i<CkNumPes()+2; ++i) myEntry->peList[i]=0;
805                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery zeroing new peList\n", CkMyPe());
806         }
807         myEntry->peList[CkNumPes()] += nsrc;
808         myEntry->peList[CkNumPes()+1] += ndest;
809         // update the count for the sender processor. peList[i] is:
810         // 0 if proc "i" has no objects,
811         // 1 if proc "i" has only source objects,
812         // 2 if proc "i" has only destination objects,
813         // 3 if proc "i" has both source and destination objects
814         // the following code maintains the property of peList!
815         if (nsrc > 0) myEntry->peList[pe] |= 1;
816         if (ndest > 0) myEntry->peList[pe] |= 2;
817
818         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
819         CkAssert((unsigned long)myInfo > 0x1000);
820
821         
822 //      ComlibManagerPrintf("[%d] bracketedContributeDiscovery myEntry->peList[CkNumPes()]=%d  myInfo->getTotalSrc()=%d\n", CkMyPe(), myEntry->peList[CkNumPes()], myInfo->getTotalSrc());
823 //      ComlibManagerPrintf("[%d] bracketedContributeDiscovery myEntry->peList[CkNumPes()+1]=%d  myInfo->getTotalDest()=%d\n", CkMyPe(), myEntry->peList[CkNumPes()+1], myInfo->getTotalDest());
824 //              
825         if (myEntry->peList[CkNumPes()] == myInfo->getTotalSrc() &&
826                         myEntry->peList[CkNumPes()+1] == myInfo->getTotalDest()) {
827                 // discovery process finished, broadcast the new pe list
828
829                 CProxy_ComlibManager myProxy(thisgroup);
830                 
831                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery calling bracketedReceiveNewPeList %d/%d, %d/%d\n",       
832                                 CkMyPe(), 
833                                 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(), 
834                                 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
835                 
836                 printPeList("bracketedContributeDiscovery peList=", myEntry->peList);
837                 myProxy.bracketedReceiveNewPeList(instid, myEntry->peList);
838                 delete myEntry->peList;
839                 myEntry->peList = NULL;
840         } else {
841                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery NOT calling bracketedReceiveNewPeList %d/%d, %d/%d\n", 
842                                 CkMyPe(), 
843                                 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(), 
844                                 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
845         }
846
847 }
848
849
850 void ComlibManager::printPeList(char* note, int *count){
851         
852         char *buf = (char*)malloc(1024*64);
853         sprintf(buf, "[%d] %s ", CkMyPe(), note);
854         for(int i=0;i<CkNumPes();i++){
855                 switch (count[i]){
856                 case 0:
857                         sprintf(buf+strlen(buf), " %d:no ", i, count[i]);
858                         break;
859                 case 1:
860                         sprintf(buf+strlen(buf), " %d:source ", i, count[i]);
861                         break;
862                 case 2:
863                         sprintf(buf+strlen(buf), " %d:dest ", i, count[i]);
864                         break;
865                 case 3:
866                         sprintf(buf+strlen(buf), " %d:both ", i, count[i]);
867                         break;
868                 }
869         }
870         
871         sprintf(buf+strlen(buf), ", all source objects discovered =  %d, all destination objects discovered = %d",  count[CkNumPes()] ,  count[CkNumPes()+1] );
872                 
873         ComlibPrintf("%s\n", buf);
874 }
875
876 /***************************************************************************
877  * Delegation framework section:
878  *
879  * Reimplementation of the main routines needed for the delegation framework:
880  * this routines will be called when a message is sent through a proxy which has
881  * been associated with comlib.
882  ***************************************************************************/
883
884 extern int _charmHandlerIdx;
885 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid);
886
887
888 /** 
889     Handle messages sent via ArraySend. These are single point to point messages 
890     to array elements.
891
892     This method should not optimize direct sends in the case where buffering occurs
893
894  */
895 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
896                 const CkArrayIndexMax &idx, CkArrayID a){
897
898         CkAssert(pd != NULL);
899         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
900         int instid = ci->getID();
901         CkAssert(instid != 0);
902
903         // Reading from two hash tables is a big overhead
904         CkArray *amgr = CkArrayID::CkLocalBranch(a);
905         int dest_proc = amgr->lastKnown(idx);
906
907         register envelope * env = UsrToEnv(msg);
908         msg_prepareSend_noinline((CkArrayMessage*)msg, ep, a);
909
910         env->getsetArrayIndex()=idx;
911         env->setUsed(0);
912         ((CmiMsgHeaderExt *)env)->stratid = instid;
913
914         CkPackMessage(&env);
915         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));        
916         
917         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_ARRAYSEND);
918         
919         if(shouldBufferMessagesNow(instid)){
920           delayMessageSendBuffer[instid].insert(cmsg);
921           ComlibManagerPrintf("[%d] ComlibManager::ArraySend BUFFERED OUTGOING: now buffer contains %d messages\n",CkMyPe(), delayMessageSendBuffer[instid].size() );
922         } else {
923           ComlibPrintf("ComlibManager::ArraySend NOT BUFFERING inserting message into strategy %d\n",instid);
924           
925           if(dest_proc == CkMyPe()){  
926             // Directly send if object is local
927             amgr->deliver((CkArrayMessage *)msg, CkDeliver_queue);
928             return;
929           } else {
930             // Send through converse level strategy if non-local
931             converseManager->insertMessage(cmsg, instid);
932           }
933           
934         }
935
936 }
937
938
939 #include "qd.h"
940 //CkpvExtern(QdState*, _qd);
941
942 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
943
944         CkAssert(pd != NULL);
945         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
946         int instid = ci->getID();
947
948         int dest_proc = onPE;
949
950         ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
951                         UsrToEnv(msg)->getTotalsize());
952
953         register envelope * env = UsrToEnv(msg);
954         if(dest_proc == CkMyPe()){
955                 _SET_USED(env, 0);
956                 CkSendMsgBranch(ep, msg, dest_proc, gid);
957                 return;
958         }
959
960         ((CmiMsgHeaderExt *)env)->stratid = instid;
961         CpvAccess(_qd)->create(1);
962
963         env->setMsgtype(ForBocMsg);
964         env->setEpIdx(ep);
965         env->setGroupNum(gid);
966         env->setSrcPe(CkMyPe());
967         env->setUsed(0);
968
969         CkPackMessage(&env);
970         CmiSetHandler(env, _charmHandlerIdx);
971
972         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_GROUPSEND); 
973
974         if(shouldBufferMessagesNow(instid)){
975           ComlibPrintf("ComlibManager::GroupSend Buffering message for %d\n",instid);
976           delayMessageSendBuffer[instid].insert(cmsg);
977         } else {
978           ComlibPrintf("ComlibManager::GroupSend inserting message into strategy %d\n",instid);
979           converseManager->insertMessage(cmsg, instid);
980         }
981
982
983 }
984
985 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
986         ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
987
988         CkAssert(pd != NULL);
989         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
990         int instid = ci->getID();
991
992         register envelope * env = UsrToEnv(m);
993         msg_prepareSend_noinline((CkArrayMessage*)m, ep, a);
994
995         env->getsetArrayIndex()= dummyArrayIndex;
996         ((CmiMsgHeaderExt *)env)->stratid = instid;
997
998         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
999
1000         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_BROADCAST, CMH_ARRAYBROADCAST);
1001         cmsg->npes = 0;
1002         cmsg->sec_id = NULL;
1003         cmsg->array_id = a;
1004
1005         multicast(cmsg, instid);
1006 }
1007
1008 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
1009                 CkArrayID a, CkSectionID &s, int opts) {
1010
1011         CkAssert(pd != NULL);
1012         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1013         int instid = ci->getID();
1014
1015         ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
1016
1017
1018         //Provide a dummy dest proc as it does not matter for mulitcast 
1019         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_SECTION_MULTICAST, CMH_ARRAYSECTIONSEND);
1020         cmsg->npes = 0;
1021         cmsg->sec_id = &s;
1022         cmsg->array_id = a;
1023         
1024
1025         msg_prepareSend_noinline((CkArrayMessage*)m, ep, a);
1026         
1027         register envelope * env = UsrToEnv(m);
1028         env->getsetArrayIndex()= dummyArrayIndex;
1029         ((CmiMsgHeaderExt *)env)->stratid = instid;
1030         
1031         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
1032         
1033         env->setUsed(0);
1034         CkPackMessage(&env);
1035         
1036         
1037         /// @TODO why are CkSectionInfo needed since it doesn't seem to have
1038         /// useful information?
1039         CkSectionInfo minfo;
1040         minfo.type = COMLIB_MULTICAST_MESSAGE;
1041         minfo.sInfo.cInfo.instId = ci->getID();
1042         //minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
1043         minfo.sInfo.cInfo.id = 0; 
1044         minfo.pe = CkMyPe();
1045         ((CkMcastBaseMsg *)env)->_cookie = minfo;    
1046         
1047         multicast(cmsg, instid);
1048 }
1049
1050 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
1051
1052         CkAssert(pd != NULL);
1053         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1054         int instid = ci->getID();
1055         CkAssert(instid!=0);
1056
1057         register envelope * env = UsrToEnv(m);
1058
1059         CpvAccess(_qd)->create(1);
1060
1061         env->setMsgtype(ForBocMsg);
1062         env->setEpIdx(ep);
1063         env->setGroupNum(g);
1064         env->setSrcPe(CkMyPe());
1065         env->setUsed(0);
1066         ((CmiMsgHeaderExt *)env)->stratid = instid;
1067
1068         CkPackMessage(&env);
1069         CmiSetHandler(env, _charmHandlerIdx);
1070
1071         //Provide a dummy dest proc as it does not matter for mulitcast 
1072         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST, CMH_GROUPBROADCAST);
1073
1074         cmsg->npes = 0;
1075         //cmsg->pelist = NULL;
1076
1077         multicast(cmsg, instid);
1078 }
1079
1080
1081 /** 
1082     Multicast the message with the specified strategy(instid).
1083     This method is used in: ArrayBroadcast, ArraySectionSend, and GroupBroadcast
1084  */
1085 void ComlibManager::multicast(CharmMessageHolder *cmsg, int instid) {
1086         CkAssert(instid != 0);
1087         register envelope * env = UsrToEnv(cmsg->getCharmMessage());
1088
1089 #if DEBUG
1090         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1091         ComlibPrintf("[%d] multicast setupComplete=%d errorMode=%d errorModeServer=%d\n", CkMyPe(), setupComplete, myEntry->errorMode, myEntry->errorModeServer);
1092 #endif
1093  
1094         env->setUsed(0);
1095         CkPackMessage(&env);
1096         
1097         if(shouldBufferMessagesNow(instid)){
1098           cmsg->saveCopyOf_sec_id();
1099           delayMessageSendBuffer[instid].insert(cmsg);
1100         } else {
1101           converseManager->insertMessage(cmsg, instid);
1102         }
1103         
1104 }
1105
1106
1107 void ComlibManager::printDiagnostics(int instid){
1108         ComlibManagerPrintf("[%d]     delayMessageSendBuffer.size()=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size());
1109 }
1110
1111
1112 void ComlibManager::printDiagnostics(){
1113   
1114
1115   std::map<ComlibInstanceHandle, std::set<CharmMessageHolder*> >::iterator iter;
1116   for(iter = delayMessageSendBuffer.begin(); iter != delayMessageSendBuffer.end(); ++iter){
1117     int instid = iter->first;
1118     int size = iter->second.size();
1119     
1120     if(size>0){
1121       CkPrintf("[%d] delayMessageSendBuffer[instid=%d] contains %d messages\n", CkMyPe(), instid, size);
1122       
1123       if(! shouldBufferMessagesNow(instid)){
1124         CkPrintf("[%d] printDiagnostics: No messages should be still in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
1125       } else {
1126         CkPrintf("[%d] printDiagnostics: Messages still ought to be delayed in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
1127       }
1128       
1129       StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1130       CkPrintf("[%d] printDiagnostics[instid=%d] setupComplete=%d errorMode=%d errorModeServer=%d discoveryMode=%d bufferOutgoing=%d\n", (int)CkMyPe(), (int)instid, (int)setupComplete, (int)myEntry->errorMode, (int)myEntry->errorModeServer, (int)myEntry->discoveryMode, (int)myEntry->bufferOutgoing);
1131     }
1132   }
1133   
1134   ConvComlibManager * conv_mgr = CkpvAccess(conv_com_ptr);
1135   
1136   conv_mgr->printDiagnostics();
1137   
1138 }
1139
1140
1141
1142 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
1143         //ComlibDelegateData *inst = static_cast<ComlibDelegateData *>(data);
1144         data->ref();
1145         return data;
1146         //return (new ComlibDelegateData(inst->getID()));
1147 }
1148
1149 CkDelegateData* ComlibManager::DelegatePointerPup(PUP::er &p,
1150                 CkDelegateData *pd) {
1151         if (!p.isUnpacking() && pd == NULL) CkAbort("Tryed to pup a null ComlibDelegateData!\n");
1152         //CmiBool isNotNull = pd!=NULL ? CmiTrue : CmiFalse;
1153         ComlibDelegateData *inst = static_cast<ComlibDelegateData *>(pd);
1154
1155         // call a migration constructor
1156         if (p.isUnpacking()) inst = new ComlibDelegateData((CkMigrateMessage*)0);
1157         inst->pup(p);
1158         /*
1159   p | isNotNull;
1160   if (isNotNull) {
1161     if (p.isUnpacking()) inst = new ComlibInstanceHandle();
1162     inst->pup(p);
1163   }
1164          */
1165         return inst;
1166 }
1167
1168
1169 //Collect statistics from all the processors, also gets the list of
1170 //array elements on each processor.
1171 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe) {
1172 }
1173
1174
1175 /// @TODO: eliminate AtSync and move toward anytime migration!
1176 void ComlibManager::AtSync() {
1177
1178 }
1179
1180
1181 /***************************************************************************
1182  * User section:
1183  *
1184  * Interface available to the user to interact with the ComlibManager
1185  ***************************************************************************/
1186
1187 /** Permanently associate the given proxy with comlib, to use the instance
1188     represented by cinst. All messages sent through the proxy will be forwarded
1189     by comlib. */
1190 void ComlibAssociateProxy(ComlibInstanceHandle cinst, CProxy &proxy) {
1191   if(CkNumPes() > 1){
1192         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1193         proxy.ckDelegate(cgproxy.ckLocalBranch(), new ComlibDelegateData(cinst));
1194   } else {
1195     ComlibPrintf("Doing nothing in ComlibAssociateProxy because we have only 1 pe\n"); 
1196   }
1197 }
1198
1199 /** Permanently assiciate the given proxy with comlib, to use the strategy
1200     represented by strat. All messages sent through the proxy will be forwarded
1201     by comlib. */
1202 void ComlibAssociateProxy(Strategy *strat, CProxy &proxy) {
1203         ComlibAssociateProxy(strat->getHandle(), proxy);
1204 }  
1205
1206 /* OLD DESCRIPTION! Register a strategy to the comlib framework, and return a
1207     handle to be used in the future with ComlibAssociateProxy to associate the
1208     strategy with a proxy. If one strategy is registered more than once, the
1209     handles will be different, but they has to be considered as aliases. */
1210
1211 /** Provided only for backward compatibility. A strategy is registered at the
1212     converse level as soon as it is created (see Strategy constructor). This
1213     function just retrieve a handle from the strategy itself. */
1214 ComlibInstanceHandle ComlibRegister(Strategy *strat) {
1215         return strat->getHandle();
1216 }
1217
1218 /** Call beginIteration on the ComlibManager with the instance handle for the strategy associated with the proxy. 
1219  *   If no strategy has been associated with the proxy, nothing is done in this function.
1220  */
1221 void ComlibBegin(CProxy &proxy, int iteration) {
1222   if(CkNumPes() > 1){
1223         ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
1224         if(cinst==NULL)
1225                 return;
1226         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1227         (cgproxy.ckLocalBranch())->beginIteration(cinst->getID(), iteration);
1228   } else {
1229     ComlibPrintf("Doing nothing in ComlibBegin because we have only 1 pe");  
1230   }
1231 }
1232
1233 /** Call endIteration on the ComlibManager with the instance handle for the strategy associated with the proxy. 
1234  *   If no strategy has been associated with the proxy, nothing is done in this function.
1235  */
1236 void ComlibEnd(CProxy &proxy, int iteration) {
1237   if(CkNumPes() > 1){
1238         ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
1239         if(cinst==NULL)
1240                 return;
1241         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1242         (cgproxy.ckLocalBranch())->endIteration(cinst->getID(), iteration);
1243   } else {
1244     ComlibPrintf("Doing nothing in ComlibEnd because we have only 1 pe");  
1245   }
1246 }
1247
1248 char *routerName;
1249 int sfactor=0;
1250
1251
1252 /** A mainchare, used to initialize the comlib framework at the program startup.
1253     Its main purpose is to create the ComlibManager group. */
1254 class ComlibManagerMain {
1255 public:
1256         ComlibManagerMain(CkArgMsg *msg) {
1257
1258                 if(CkMyPe() == 0 && msg !=  NULL)
1259                         CmiGetArgString(msg->argv, "+strategy", &routerName);         
1260
1261                 if(CkMyPe() == 0 && msg !=  NULL)
1262                         CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
1263
1264                 CProxy_ComlibManager::ckNew();
1265         }
1266 };
1267
1268 /***************************************************************************
1269  * ComlibInstanceHandle section:
1270  *
1271  * Implementation of the functions defined in the ComlibInstanceHandle class.
1272  ***************************************************************************/
1273
1274 ComlibDelegateData::ComlibDelegateData(int instid) : CkDelegateData(), _instid(instid) {
1275         ComlibManagerPrintf("[%d] Constructing ComlibDelegateData\n", CkMyPe());
1276         ref();
1277 }
1278
1279
1280 void ComlibInitSectionID(CkSectionID &sid){
1281
1282         sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1283         sid._cookie.pe = CkMyPe();
1284
1285         sid._cookie.sInfo.cInfo.id = 0;    
1286         sid.npes = 0;
1287         sid.pelist = NULL;
1288 }
1289
1290
1291 /** For backward compatibility - for old name commlib. The function
1292     _registercomlib() is generated by the translator. */
1293 void _registercommlib(void)
1294 {
1295         static int _done = 0; 
1296         if(_done) 
1297                 return; 
1298         _done = 1;
1299         _registercomlib();
1300 }
1301
1302
1303 void ComlibNotifyMigrationDoneHandler(void *msg) {
1304         CmiFree(msg);
1305         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1306         ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1307         if(cmgr_ptr)
1308                 cmgr_ptr->AtSync();    
1309 }
1310
1311
1312
1313
1314
1315
1316 static void periodicDebugPrintStatus(void* ptr, double currWallTime){
1317   CkPrintf("[%d] periodicDebugPrintStatus()\n", CkMyPe());
1318
1319   ComlibManager *mgr = (ComlibManager*)ptr;
1320   mgr->printDiagnostics();
1321
1322   CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, ptr, 4000, CkMyPe());
1323
1324 }
1325
1326
1327
1328
1329
1330 #include "comlib.def.h"
1331
1332 /*@}*/