Fixing more bugs in the bracketed strategies including: 1) destination and source...
[charm.git] / src / ck-com / ComlibManager.C
1 /**
2    @addtogroup CharmComlib
3    @{
4    @file
5    Implementation of the functions in ComlibManager.h and handler for message
6    transportation.
7 */
8
9 #include "ComlibManager.h"
10 #include "comlib.h"
11 #include "ck.h"
12 #include "envelope.h"
13
14
15 // We only want to print debug information for a single strategy. Otherwise we'll get incredibly confused
16 #undef ComlibManagerPrintf
17 //#define ComlibManagerPrintf  if(instid==1)ComlibPrintf
18 #define ComlibManagerPrintf  ComlibPrintf
19
20 #define getmax(a,b) ((a)>(b)?(a):(b))
21
22 CkpvExtern(int, RecvdummyHandle);
23  
24 CkpvDeclare(CkGroupID, cmgrID);
25
26 /***************************************************************************
27  * Handlers section:
28  *
29  * all the handlers used by the ComlibManager to coordinate the work, and
30  * propagate the messages from one processor to another
31  ***************************************************************************/
32
33 //Handler to receive array messages
34 CkpvDeclare(int, RecvmsgHandle);
35
36 void recv_array_msg(void *msg){
37
38   //    ComlibPrintf("%d:In recv_msg\n", CkMyPe());
39
40         if(msg == NULL)
41                 return;
42
43         register envelope* env = (envelope *)msg;
44         env->setUsed(0);
45         env->getsetArrayHops()=1; 
46         CkUnpackMessage(&env);
47
48         int srcPe = env->getSrcPe();
49         int sid = ((CmiMsgHeaderExt *) env)->stratid;
50
51         //      ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), sid, env->getTotalsize(), srcPe);
52
53         RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
54
55         CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
56         
57         a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
58
59         //      ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
60         return;
61 }
62
63
64
65 /**
66    A debugging routine that will periodically print out the status of the message queues. 
67    The Definition is at bottom of this file.
68  */
69 static void periodicDebugPrintStatus(void* ptr, double currWallTime);
70
71
72
73
74
75 /***************************************************************************
76  * Initialization section:
77  *
78  * Routines used by Comlib to initialize itself and all the strategies on all
79  * the processors. Used only at the beginning of the program.
80  ***************************************************************************/
81
82 // // initialized at startup before the main chare main methods are called:
83 // void initCharmComlibManager(){
84 //   CkPrintf("[%d] initCharmComlibManager()\n", CkMyPe());
85 //   fflush(stdout);
86 // }
87
88
89
90 ComlibManager::ComlibManager(){
91         init();
92 }
93
94 void ComlibManager::init(){
95
96    CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, (void*)this, 4000, CkMyPe());
97
98
99   if(CkNumPes() == 1 ){
100     ComlibPrintf("Doing nothing in ComlibManager::init() because we are running on 1 pe.\n");
101   } else {
102
103         if (CkMyRank() == 0) {
104                 PUPable_reg(CharmMessageHolder);
105         }
106
107         numStatsReceived = 0;
108         curComlibController = 0;
109         clibIteration = 0;
110
111         CkpvInitialize(comRectHashType *, com_rect_ptr); 
112         CkpvAccess(com_rect_ptr)= new comRectHashType;
113
114         CkpvInitialize(int, RecvmsgHandle);
115         CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
116
117         bcast_pelist = new int [CkNumPes()];
118         for(int brcount = 0; brcount < CkNumPes(); brcount++)
119                 bcast_pelist[brcount] = brcount;
120
121         section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
122
123         CkpvInitialize(CkGroupID, cmgrID);
124         CkpvAccess(cmgrID) = thisgroup;
125
126         dummyArrayIndex.nInts = 0;
127
128         CkAssert(CkpvInitialized(conv_com_object));
129         converseManager = &CkpvAccess(conv_com_object);
130
131         setupComplete = 0;
132
133         CkpvInitialize(int, migrationDoneHandlerID);
134         CkpvAccess(migrationDoneHandlerID) = 
135                 CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
136
137         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
138         cgproxy[curComlibController].barrier();
139   }
140 }
141
142 //First barrier makes sure that the communication library group 
143 //has been created on all processors
144 void ComlibManager::barrier(){
145         static int bcount = 0;
146         ComlibPrintf("barrier %d\n", bcount);
147         if(CkMyPe() == 0) {
148                 bcount ++;
149                 if(bcount == CkNumPes()){
150                         bcount = 0;
151                         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
152                         cgproxy.resumeFromSetupBarrier();
153                 }
154         }
155 }
156
157
158
159 /**
160    Due to the possibility of race conditions in the initialization of charm, this
161    barrier prevents comlib from being activated before all group branches are created.
162    This function completes the initialization of the charm layer of comlib.
163
164    In this function we also call ComlibDoneCreating for the user (basically
165    triggering the broadcast of the strategies created in Main::Main. Here the
166    Main::Main has for sure executed, otherwise we will not have received
167    confirmation by all other processors.
168    
169    
170  */
171 void ComlibManager::resumeFromSetupBarrier(){
172         ComlibPrintf("[%d] resumeFromSetupBarrier Charm group ComlibManager setup finished\n", CkMyPe());
173
174         setupComplete = 1;
175         ComlibDoneCreating();
176         ComlibPrintf("[%d] resumeFromSetupBarrier calling ComlibDoneCreating to tell converse layer strategies to set themselves up\n", CkMyPe());
177         sendBufferedMessagesAllStrategies();
178
179 }
180
181 /***************************************************************************
182  Determine whether the delegated messages should be buffered until the 
183  strategy has recovered from any error conditions and startup. Once the 
184  buffers can be flushed, ComlibManager::sendBufferedMessages() will be called
185 ***************************************************************************/
186 bool ComlibManager::shouldBufferMessagesNow(int instid){
187   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
188   return (!setupComplete) || myEntry->getErrorMode() == ERROR_MODE || myEntry->getErrorMode() == CONFIRM_MODE || myEntry->bufferOutgoing;
189 }
190
191
192 /***************************************************************************
193  Calls ComlibManager::sendBufferedMessages for each strategy.
194 ***************************************************************************/
195 void ComlibManager::sendBufferedMessagesAllStrategies(){
196   int nstrats = converseManager->getNumStrats();
197   for(int i=0;i<nstrats;i++){
198     sendBufferedMessages(i);
199   }
200 }
201
202
203 /***************************************************************************
204    Send all the buffered messages once startup has completed, and we have 
205    recovered from all errors.
206 ***************************************************************************/
207 void ComlibManager::sendBufferedMessages(int instid, int step){
208   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
209   
210   if(shouldBufferMessagesNow(instid)){
211     ComlibPrintf("[%d] sendBufferedMessages is not flushing buffered messages for strategy %d because shouldBufferMessagesNow()==true step %d\n", CkMyPe(), instid, step);  
212   } else if(delayMessageSendBuffer[instid].size() == 0){
213     ComlibPrintf("[%d] sendBufferedMessages: no bufferedmessages to send for strategy %d step %d\n", CkMyPe(), instid, step);  
214   } else{
215     ComlibPrintf("[%d] sendBufferedMessages Sending %d buffered messages for instid=%d step %d\n", CkMyPe(), delayMessageSendBuffer[instid].size(), instid, step);
216  
217     for (std::set<CharmMessageHolder*>::iterator iter = delayMessageSendBuffer[instid].begin(); iter != delayMessageSendBuffer[instid].end(); ++iter) {
218       CharmMessageHolder* cmsg = *iter;
219           
220       switch(cmsg->type){
221             
222       case CMH_ARRAYSEND:
223         CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
224         CkpvAccess(conv_com_object).doneInserting(instid);
225         break;
226             
227       case CMH_GROUPSEND:
228         CkAbort("CMH_GROUPSEND unimplemented");
229         break;
230             
231       case CMH_ARRAYBROADCAST:
232       case CMH_ARRAYSECTIONSEND: 
233       case CMH_GROUPBROADCAST:
234         // Multicast/broadcast to an array or a section:
235         cmsg->sec_id = cmsg->copy_of_sec_id;
236         CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
237         CkpvAccess(conv_com_object).doneInserting(instid);
238         break;
239             
240       default:
241         CkAbort("Unknown cmsg->type was found in buffer of delayed messages\n");
242       }
243           
244     }
245
246     delayMessageSendBuffer[instid].clear();
247   }
248
249 }
250
251
252
253 /***************************************************************************
254  * Routine for bracketed strategies, for detecting count errors after 
255  * objects migrate.
256  * 
257  * This function must only be called after all messages from the previous 
258  * iteration have been delivered. This is the application's responsibility to 
259  * ensure. The iteration values provided must increase monotonically, 
260  * and must be greater than STARTUP_ITERATION.
261  *
262  ***************************************************************************/
263 /// Called when the array/group element starts sending messages
264 void ComlibManager::beginIteration(int instid, int iteration){
265         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
266                             
267         ComlibManagerPrintf("[%d] beginIteration iter=%d lastKnownIteration=%d  %s %s %s\n", CkMyPe(), iteration, myEntry->lastKnownIteration, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
268
269         CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
270         if(CkMyPe()==0)
271           CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER ||  myEntry->getErrorModeServer() == ERROR_MODE_SERVER);
272
273         if(iteration > myEntry->lastKnownIteration){
274               ComlibManagerPrintf("[%d] beginIteration Starting Next Iteration ( # %d )\n", CkMyPe(), iteration);
275
276               myEntry->lastKnownIteration = iteration;
277               myEntry->nBeginItr = 1; // we are the first time to be called this iteration
278               myEntry->nEndItr = 0;
279               myEntry->nProcSync = 0;
280               myEntry->totalEndCounted = 0;
281               myEntry->nEndSaved = 0;
282                       
283         } else if(iteration == myEntry->lastKnownIteration){
284                 ComlibManagerPrintf("[%d] beginIteration continuing iteration # %d\n", CkMyPe(), iteration);
285                 myEntry->nBeginItr++;
286         } else {
287           CkPrintf("[%d] ERROR: ComlibManager::beginIteration iteration=%d < myEntry->lastKnownIteration=%d", iteration, myEntry->lastKnownIteration);
288           CkAbort("[%d] ERROR: ComlibManager::beginIteration iteration < myEntry->lastKnownIteration");
289         }
290         
291         
292         // We need to check for error conditions here as well as EndIteration.
293         // This will ensure that if we are the processor that detects this error, 
294         // we won't deliver at least this message until the strategy is fixed
295         if (myEntry->nBeginItr > myEntry->numElements) {
296           ComlibManagerPrintf("[%d] beginIteration BUFFERING OUTGOING because nBeginItr=%d > numElements=%d\n",CkMyPe(), myEntry->nBeginItr, myEntry->numElements);
297           myEntry->bufferOutgoing = 1;
298         }
299         
300 }
301
302
303 /** Called by user program when each element has finished sending its messages.
304
305     If no errors are detected, ConvComlibManager::doneInserting() is called on the 
306     underlying converse strategy. If an error is detected, then ConvComlibManager::doneInserting
307     is NOT called. This likely causes the the underlying converse strategy to buffer the 
308     messages until we recover from the error mode, although we have buffered at least some of
309     the messages, so the user program cannot run ahead and start the next iteration.
310
311 */
312
313 void ComlibManager::endIteration(int instid, int step){ 
314         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
315         
316         // migration is not allowed between begin and end of iteration, 
317         // so each element must call end iteration on the same processor 
318         // as it called begin iteration
319         CkAssert(myEntry->nEndItr <= myEntry->nBeginItr);
320         CkAssert(step == myEntry->lastKnownIteration);
321
322         CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
323         if(CkMyPe()==0)
324           CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER ||  myEntry->getErrorModeServer() == ERROR_MODE_SERVER);
325
326         myEntry->nEndItr++;
327         
328         ComlibManagerPrintf("[%d] endIteration called\n",CkMyPe());
329         
330         
331         if (myEntry->bufferOutgoing) {
332           // If migration was detected in ComlibManager::beginIteration and hence messages have been buffered:
333           CkAssert(delayMessageSendBuffer[instid].size() > 0);
334           CProxy_ComlibManager myProxy(thisgroup);
335           myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
336         } 
337         else if(myEntry->nEndItr == myEntry->numElements) {
338           // If all the objects have called beginIteration and endIteration and no errors were detected
339           CkAssert(converseManager->isReady(instid));
340           converseManager->doneInserting(instid);
341         }
342         
343 }
344
345
346 /** Start recovery of errors. 
347     This entry method calls itself repeatedly if the underlying 
348     converse strategy is not yet ready.
349
350     If the PE is already in error mode, then only the difference 
351     in the counts will be contributed.
352 */
353 void ComlibManager::bracketedStartErrorRecoveryProcess(int instid, int step){  
354   CProxy_ComlibManager myProxy(thisgroup);
355   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);     
356   CkAssert(step >= myEntry->lastKnownIteration);
357
358
359   if(converseManager->isReady(instid)){
360     ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess(instid=%d step=%d) %s %s %s\n", CkMyPe(), instid, step, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
361
362     CkAssert(myEntry->strategy != NULL);
363     CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
364           
365     if (!myEntry->strategy->isBracketed()) {
366       CkPrintf("[%d] endIteration called unecessarily for a non-bracketed strategy\n", CkMyPe());
367       return;
368     }
369        
370     if (myEntry->getErrorMode() == NORMAL_MODE) {
371       ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess()\n", CkMyPe());
372       myEntry->nEndSaved = myEntry->nEndItr;
373       myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
374       myEntry->setErrorMode(ERROR_MODE);
375       bracketedStartDiscovery(instid);
376     } else {
377       // Send the updated count
378       int update = myEntry->nEndItr - myEntry->nEndSaved;
379       if (update > 0) {
380         //      ComlibManagerPrintf("bracketedStartErrorRecoveryProcess sending update to bracketedReceiveCount\n");
381         CProxy_ComlibManager myProxy(thisgroup);
382         myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
383         myEntry->nEndSaved = myEntry->nEndItr;
384       }
385       
386     }
387   } else {
388     ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess() REENQUEUE\n", CkMyPe() );
389     // Re-enqueue myself because we can't start error recovery process until converse strategy is ready
390     myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
391   }
392 }
393
394
395 /// Invoked on all processors to inform that an error has been detected.
396 void ComlibManager::bracketedErrorDetected(int instid, int step) {
397         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
398         
399         bracketedCatchUpPE(instid,step);
400         CkAssert(step == myEntry->lastKnownIteration);
401         CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
402
403         ComlibManagerPrintf("[%d] bracketedErrorDetected()\n", CkMyPe());
404
405         if (myEntry->getErrorMode() == NORMAL_MODE) {
406           // save the value we are sending to bracketedReceiveCount
407           myEntry->nEndSaved = myEntry->nEndItr; // save the value we are sending to bracketedReceiveCount
408           CProxy_ComlibManager myProxy(thisgroup);
409           myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
410           bracketedStartDiscovery(instid);
411           myEntry->setErrorMode(ERROR_MODE);
412
413         } else { // ERROR_MODE
414           // If we have an update count, send it
415           int update = myEntry->nEndItr - myEntry->nEndSaved;
416           ComlibManagerPrintf("bracketedErrorDetected update=%d\n", update);
417           if (update > 0) {
418             ComlibManagerPrintf("bracketedErrorDetected sending update to bracketedReceiveCount\n");
419             CProxy_ComlibManager myProxy(thisgroup);
420             myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
421             myEntry->nEndSaved = myEntry->nEndItr;
422           }
423         }
424         
425 }
426
427 /// Invoked on all processors. After processor 0 has a count match, it sends out
428 /// a broadcast on this entry method to get a confirmation from all others.
429 void ComlibManager::bracketedConfirmCount(int instid, int step) {
430         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
431         CkAssert(myEntry->getErrorMode() == ERROR_MODE);
432         myEntry->setErrorMode(CONFIRM_MODE);
433         CProxy_ComlibManager myProxy(thisgroup);
434         ComlibManagerPrintf("[%d] bracketedConfirmCount\n", CkMyPe());
435         myProxy[0].bracketedCountConfirmed(instid, myEntry->nEndSaved, step);
436 }
437
438 /// Invoked on processor 0 upon a request to confirm the count. If the count is
439 /// correct a "NewPeList" is sent out, otherwise the error mode is returned with
440 /// "ErrorDetected"
441 void ComlibManager::bracketedCountConfirmed(int instid, int count, int step) {
442         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
443         CkAssert(CkMyPe() == 0);
444         // Advance PE0 to current step if we had no local objects
445         bracketedCatchUpPE(instid, step);
446         CkAssert(myEntry->getErrorModeServer() == CONFIRM_MODE_SERVER);
447         CkAssert(step == myEntry->lastKnownIteration);
448
449         myEntry->total += count;
450         
451         ComlibManagerPrintf("[%d] bracketedCountConfirmed\n", CkMyPe());
452         
453         if (++myEntry->peConfirmCounter == CkNumPes()) {
454                 myEntry->peConfirmCounter = 0;
455
456                 CkAssert(myEntry->total == myEntry->totalEndCounted);
457                   
458                 CProxy_ComlibManager(thisgroup).bracketedReceiveNewCount(instid, step);
459                 myEntry->setErrorModeServer(ERROR_FIXED_MODE_SERVER);
460                   
461                 myEntry->total = 0;
462         }
463
464 }
465
466
467
468 /** Update the state for a PE that was likely lacking any local objects.
469     If no local objects exist, noone will call begin iteration, and hence,
470     the lastKnownIteration value will be old. We need to advance to the 
471     current iteration before we can proceed.
472 */
473 void ComlibManager::bracketedCatchUpPE(int instid, int step){
474   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
475   CkAssert(step >= myEntry->lastKnownIteration);
476   if(step > myEntry->lastKnownIteration){
477     
478     myEntry->total = 0;
479     myEntry->lastKnownIteration = step;
480     myEntry->nBeginItr = 0;
481     myEntry->nEndItr = 0;
482     myEntry->nProcSync = 0;
483     myEntry->totalEndCounted = 0;
484     myEntry->nEndSaved = 0;
485     myEntry->setErrorMode(NORMAL_MODE);
486     myEntry->setDiscoveryMode(NORMAL_DISCOVERY_MODE);
487
488     if(CkMyPe()==0){
489       CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER || myEntry->getErrorModeServer() == ERROR_FIXED_MODE_SERVER);
490       myEntry->setErrorModeServer(NORMAL_MODE_SERVER);
491     }    
492     
493   }
494 }
495
496 /// Invoked on processor 0 when a processor sends a count of how many elements
497 /// have already deposited. This number is incremental, and it refers to the
498 /// previous one sent by that processor. Processor 0 stores temporarily all the
499 /// numbers it receives. When a match happens, processor 0 switches to "confirm"
500 /// mode and send out a request for confirmation to all other processors.
501 /// The final parameter, "step", is used so that if  no objects exist on a processor,
502 /// then the counts sent by the processor will be tagged with an old timestep, and can 
503 /// safely be ignored. If no objects are on a PE, then beginIteration will never
504 /// be called there.
505 void ComlibManager::bracketedReceiveCount(int instid, int pe, int count, int isFirst, int step) {
506         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
507         ComlibPrintf("[%d] bracketedReceiveCount begins step=%d lastKnownIteration=%d totalEndCounted=%d count=%d\n", CkMyPe(), step, myEntry->lastKnownIteration, myEntry->totalEndCounted, count);
508         CkAssert(step >= myEntry->lastKnownIteration);
509
510         CkAssert(CkMyPe() == 0);
511         CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER || myEntry->getErrorModeServer() == ERROR_MODE_SERVER );
512         
513         
514         // Advance PE0 to current step if we had no local objects
515         bracketedCatchUpPE(instid, step);
516
517         // Encountering a message from an old step
518         CkAssert(step == myEntry->lastKnownIteration);
519
520         myEntry->totalEndCounted += count;
521
522
523         
524         myEntry->nProcSync += isFirst; // isFirst is 1 the first time a processor send a message,
525         CkAssert(myEntry->nProcSync <= CkNumPes());
526         
527         if (myEntry->getErrorModeServer() == NORMAL_MODE_SERVER) { 
528                 // first time this is called
529                 CkAssert(myEntry->nProcSync == 1);
530                 CProxy_ComlibManager(thisgroup).bracketedErrorDetected(instid, step);
531                 myEntry->setErrorModeServer(ERROR_MODE_SERVER);
532                 ComlibManagerPrintf("[%d] bracketedReceiveCount first time\n", CkMyPe());
533         }
534
535
536
537         CharmStrategy* s = dynamic_cast<CharmStrategy*>(myEntry->strategy);
538         ComlibArrayInfo ainfo = s->ainfo;
539         int totalsrc =  ainfo.getTotalSrc() ;
540           
541         if(myEntry->nProcSync == CkNumPes() && myEntry->totalEndCounted == totalsrc) {
542           // ok, we received notifications from all PEs and all objects have called endIteration
543           myEntry->setErrorModeServer(CONFIRM_MODE_SERVER); 
544           ComlibManagerPrintf("[%d] bracketedReceiveCount errorModeServer is now CONFIRM_MODE calling bracketedConfirmCount totalsrc=%d\n", CkMyPe(), (int)totalsrc);
545           CProxy_ComlibManager(thisgroup).bracketedConfirmCount(instid, step);
546                         
547         }
548                 
549 }
550
551
552 /// Invoked on all processors. After the count has been checked and it matches
553 /// the number of emements involved in the bracketed operation, processor 0
554 /// sends a broadcast to acknowledge the success. 
555 /// The strategy is disabled until a new Pe list is received.
556 void ComlibManager::bracketedReceiveNewCount(int instid, int step) {
557         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
558         CkAssert(myEntry->getErrorMode() == CONFIRM_MODE);
559
560         myEntry->setErrorMode(ERROR_FIXED_MODE);
561         
562         myEntry->nEndItr -= myEntry->nEndSaved;
563         myEntry->nBeginItr -= myEntry->nEndSaved;
564
565         myEntry->nEndSaved = 0;
566
567         bracketedFinalBarrier(instid, step);
568 }
569
570
571
572
573 /// Invoked on all processors. When all processors have discovered all elements
574 /// involved in the operation, processor 0 uses this method to broadcast the
575 /// entire processor list to all. Currently the discovery process HAS to go
576 /// together with the counting process, otherwise the strategy is updated at an
577 /// undetermined state. In future it may be useful to implement the discovery
578 /// process alone.
579 void ComlibManager::bracketedReceiveNewPeList(int instid, int step, int *count) {
580         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
581         CkAssert(myEntry->getDiscoveryMode() == STARTED_DISCOVERY_MODE);
582         myEntry->setDiscoveryMode(FINISHED_DISCOVERY_MODE);
583
584         myEntry->strategy->bracketedUpdatePeKnowledge(count);
585         
586         ComlibManagerPrintf("[%d] bracketedReceiveNewPeList Updating numElements\n", CkMyPe());
587         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
588         CkAssert((unsigned long)myInfo > 0x1000);
589         myEntry->numElements = myInfo->getLocalSrc();
590         
591         ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n",CkMyPe(), instid, delayMessageSendBuffer[instid].size() );
592         ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n", CkMyPe(), instid, delayMessageSendBuffer[instid].size());
593                 
594         bracketedFinalBarrier(instid, step);
595 }
596
597
598 /** Start a barrier phase where all processes will enter it once both 
599     the counting and discovery processes complete.
600
601     ComlibManager::bracketedReceiveNewPeList calls this method. 
602     ComlibManager::bracketedReceiveNewPeList is called as an array broadcast to thisProxy, 
603     so every PE will call this method.
604  */
605 void ComlibManager::bracketedFinalBarrier(int instid, int step) {
606   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
607   ComlibManagerPrintf("[%d] ComlibManager::bracketedFinalBarrier %s %s %s\n", CkMyPe(), myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
608
609   
610   if (myEntry->getDiscoveryMode() == FINISHED_DISCOVERY_MODE &&  myEntry->getErrorMode() == ERROR_FIXED_MODE) {       
611     myEntry->setDiscoveryMode(NORMAL_DISCOVERY_MODE);
612     myEntry->setErrorMode(NORMAL_MODE);
613
614     // Update destination and source element lists for use in the next step
615     ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
616     myInfo->useNewSourceAndDestinations();      
617     
618     CProxy_ComlibManager myProxy(thisgroup);
619     myProxy[0].bracketedReleaseCount(instid, step);
620   }
621 }
622
623
624 /** 
625     Once all PEs report here, we will allow them to release the buffered messages from this iteration
626  */
627 void ComlibManager::bracketedReleaseCount(int instid, int step) {
628
629     StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
630     ComlibPrintf("[%d] ComlibManager::bracketedReleaseCount myEntry->numBufferReleaseReady was %d\n", CkMyPe(), myEntry->numBufferReleaseReady);
631
632     CkAssert(CkMyPe() == 0);
633     CkAssert(myEntry->getErrorModeServer() == ERROR_FIXED_MODE_SERVER);
634     
635     myEntry->numBufferReleaseReady++;
636     if(myEntry->numBufferReleaseReady == CkNumPes()) {
637       myEntry->setErrorModeServer(NORMAL_MODE_SERVER);
638       CProxy_ComlibManager(thisgroup).bracketedReleaseBufferedMessages(instid, step);
639       myEntry->numBufferReleaseReady = 0;
640     }
641 }
642
643 /** 
644     Release any buffered messages.
645  */
646 void ComlibManager::bracketedReleaseBufferedMessages(int instid, int step) {
647   ComlibManagerPrintf("[%d] ComlibManager::bracketedReleaseBufferedMessages step=%d\n", CkMyPe(), step);
648   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
649
650   CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER);
651   CkAssert(myEntry->getErrorMode() == NORMAL_MODE);
652   CkAssert(myEntry->getDiscoveryMode() == NORMAL_DISCOVERY_MODE);
653   
654   myEntry->bufferOutgoing = 0;
655   sendBufferedMessages(instid, step);
656   
657   converseManager->doneInserting(instid);
658 }
659
660
661
662
663
664
665 /** Called when an error is discovered. Such errors occur when it is observed
666     that an array element has migrated to a new PE. Specifically if a strategy
667     on some PE determines that it has received messages for more array elements
668     than the strategy knew were local, then some array element must have 
669     migrated to the PE. The strategy instance detecting the error will call this
670     method to initiate a global update operation that determines the locations
671     of all array elements.
672
673     This is called on each PE by the bracketedErrorDetected() method. 
674
675     Each array element previously located on this PE is examined to determine if
676     it is still here. If the array element has migrated away, then the
677     bracketedDiscover() method is called on the new PE.
678
679 */
680 void ComlibManager::bracketedStartDiscovery(int instid) {
681         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
682         CkAssert(myEntry->getDiscoveryMode() == NORMAL_DISCOVERY_MODE);
683         myEntry->setDiscoveryMode(STARTED_DISCOVERY_MODE);
684         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
685         const CProxy_ComlibManager myProxy(thisgroup);
686
687         ComlibManagerPrintf("[%d] bracketedStartDiscovery\n", CkMyPe());
688
689         int countSrc = 0;
690         int countDest = 0;
691
692         if (myInfo->isSourceArray()) {
693
694           const CkVec<CkArrayIndexMax> & srcElements = myInfo->getSourceElements();
695           const int nelem = srcElements.size();
696           const CkArrayID aid = myInfo->getSourceArrayID(); 
697           const CkArray *a = (CkArray*)_localBranch(aid);
698
699           for (int i=0; i<nelem; ++i) {
700             int pe = a->lastKnown(srcElements[i]);
701             if (pe == CkMyPe()) {
702               countSrc++;
703               myInfo->addNewLocalSource(srcElements[i]);
704             }
705             else {
706               myProxy[pe].bracketedDiscover(instid, aid, srcElements[i], true);
707             }
708           }
709
710         }
711
712         if (myInfo->isDestinationArray()) {
713 //        CkAssert(myInfo->newDestinationListSize() == 0);
714
715           const CkVec<CkArrayIndexMax> & destElements = myInfo->getDestinationElements();
716           const int nelem = destElements.size();
717           const CkArrayID aid = myInfo->getDestinationArrayID();
718           const CkArray *a = (CkArray*)_localBranch(aid);
719
720           for (int i=0; i<nelem; ++i) {
721             int pe = a->lastKnown(destElements[i]);
722             if (pe == CkMyPe()) {
723               countDest++;
724               myInfo->addNewLocalDestination(destElements[i]);
725             }
726             else {
727               ComlibPrintf("[%d] destination element %d is no longer local\n", CkMyPe(), (int)destElements[i].data()[0]);
728               myProxy[pe].bracketedDiscover(instid, aid, destElements[i], false);
729             }
730           }
731         }
732
733         // Report the number of elements that are now local to this PE (if >0).
734         // The new owner PEs will report the counts for those objects that are no longer local to this PE
735         if (countSrc > 0 || countDest > 0) {
736           myProxy[0].bracketedContributeDiscovery(instid, CkMyPe(), countSrc, countDest, myEntry->lastKnownIteration);
737         }
738         
739 }
740
741
742
743 /** Determine the PE of a given array index. This will be called for any 
744     array element by the previous owner PE.
745
746     If the index is local to the processor, then record this in the local 
747     strategy. Also send a message to PE 0 informing PE 0 that the array
748     element was located.
749
750     If the array element is not found locally, call this method on the last 
751     known location for the element.
752
753 */
754 void ComlibManager::bracketedDiscover(int instid, CkArrayID aid, CkArrayIndexMax &idx, int isSrc) {
755         ComlibManagerPrintf("[%d] bracketedDiscover\n", CkMyPe());
756         CkArray *a = (CkArray *)_localBranch(aid);
757         int pe = a->lastKnown(idx);
758         CProxy_ComlibManager myProxy(thisgroup);
759         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
760
761         if (pe == CkMyPe()) {
762                 // Object was found locally
763          
764                 // notify PE 0
765                 myProxy[0].bracketedContributeDiscovery(instid, pe, isSrc?1:0, isSrc?0:1, myEntry->lastKnownIteration);
766           
767
768                 // Find local strategy instance
769
770                 // ComlibArrayInfo *myInfo = &(dynamic_cast<CharmStrategy*>(getStrategy(instid))->ainfo);
771                 CkAssert((unsigned long)myEntry->strategy > 0x1000);
772                 ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
773                 CkAssert((unsigned long)myInfo > 0x1000);
774
775                 if (isSrc) {
776                         // Add the element as a source element for the strategy
777                         ComlibManagerPrintf("[%d] bracketedDiscover addSource\n", CkMyPe());
778                         CkAssert((unsigned long)myInfo > 0x1000);
779                         myInfo->addNewLocalSource(idx);
780
781                         ComlibManagerPrintf("[%d] bracketedDiscover updating numElements\n", CkMyPe());
782                         myEntry->numElements = myInfo->getLocalSrc();           
783                 }
784                 else {
785                         // Add the element as a Destination element for the strategy
786                         ComlibManagerPrintf("[%d] bracketedDiscover addNewDestination\n", CkMyPe());
787                         myInfo->addNewLocalDestination(idx);
788                 }
789         } else {
790                 ComlibManagerPrintf("Keep On Forwarding*********************\n");
791                 // forward to next potential processor
792                 myProxy[pe].bracketedDiscover(instid, aid, idx, isSrc);
793         }
794 }
795
796
797
798 /** On PE 0, record the notifications from all PEs about the actual locations of each
799     array element. Count the number of elements discovered. Once the new location of 
800     every elements has been discovered, broadcast the new locations to all PEs by 
801     invoking bracketedReceiveNewPeList.
802 */
803 void ComlibManager::bracketedContributeDiscovery(int instid, int pe, int nsrc, int ndest, int step) {
804         CkAssert(CkMyPe() == 0);
805         ComlibManagerPrintf("[%d] bracketedContributeDiscovery pe=%d nsrc=%d ndest=%d step=%d\n", CkMyPe(), pe, nsrc, ndest, step);
806         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
807         if (myEntry->peList == 0) {
808                 myEntry->peList = new int[CkNumPes()+2];
809                 // peList[CkNumPes()] keeps the sum of all source objects discovered
810                 // peList[CkNumPes()+1] keeps the sum of all destination objects discovered
811                 for (int i=0; i<CkNumPes()+2; ++i) myEntry->peList[i]=0;
812                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery zeroing new peList\n", CkMyPe());
813         }
814         myEntry->peList[CkNumPes()] += nsrc;
815         myEntry->peList[CkNumPes()+1] += ndest;
816         // update the count for the sender processor. peList[i] is:
817         // 0 if proc "i" has no objects,
818         // 1 if proc "i" has only source objects,
819         // 2 if proc "i" has only destination objects,
820         // 3 if proc "i" has both source and destination objects
821         // the following code maintains the property of peList!
822         if (nsrc > 0) myEntry->peList[pe] |= 1;
823         if (ndest > 0) myEntry->peList[pe] |= 2;
824
825         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
826         CkAssert((unsigned long)myInfo > 0x1000);
827
828         
829 //      ComlibManagerPrintf("[%d] bracketedContributeDiscovery myEntry->peList[CkNumPes()]=%d  myInfo->getTotalSrc()=%d\n", CkMyPe(), myEntry->peList[CkNumPes()], myInfo->getTotalSrc());
830 //      ComlibManagerPrintf("[%d] bracketedContributeDiscovery myEntry->peList[CkNumPes()+1]=%d  myInfo->getTotalDest()=%d\n", CkMyPe(), myEntry->peList[CkNumPes()+1], myInfo->getTotalDest());
831 //              
832         if (myEntry->peList[CkNumPes()] == myInfo->getTotalSrc() &&
833                         myEntry->peList[CkNumPes()+1] == myInfo->getTotalDest()) {
834                 // discovery process finished, broadcast the new pe list
835
836                 CProxy_ComlibManager myProxy(thisgroup);
837                 
838                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery calling bracketedReceiveNewPeList %d/%d, %d/%d\n",       
839                                 CkMyPe(), 
840                                 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(), 
841                                 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
842                 
843                 printPeList("bracketedContributeDiscovery peList=", myEntry->peList);
844                 myProxy.bracketedReceiveNewPeList(instid, step, myEntry->peList);
845                 delete myEntry->peList;
846                 myEntry->peList = NULL;
847         } else {
848                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery NOT calling bracketedReceiveNewPeList %d/%d, %d/%d\n", 
849                                 CkMyPe(), 
850                                 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(), 
851                                 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
852         }
853
854 }
855
856
857 void ComlibManager::printPeList(const char* note, int *count)
858 {
859         char *buf = (char*)malloc(1024*64);
860         sprintf(buf, "[%d] %s ", CkMyPe(), note);
861         for(int i=0;i<CkNumPes();i++){
862                 switch (count[i]){
863                 case 0:
864                         sprintf(buf+strlen(buf), " %d:no ", i, count[i]);
865                         break;
866                 case 1:
867                         sprintf(buf+strlen(buf), " %d:source ", i, count[i]);
868                         break;
869                 case 2:
870                         sprintf(buf+strlen(buf), " %d:dest ", i, count[i]);
871                         break;
872                 case 3:
873                         sprintf(buf+strlen(buf), " %d:both ", i, count[i]);
874                         break;
875                 }
876         }
877         
878         sprintf(buf+strlen(buf), ", all source objects discovered =  %d, all destination objects discovered = %d",  count[CkNumPes()] ,  count[CkNumPes()+1] );
879                 
880         ComlibPrintf("%s\n", buf);
881 }
882
883 /***************************************************************************
884  * Delegation framework section:
885  *
886  * Reimplementation of the main routines needed for the delegation framework:
887  * this routines will be called when a message is sent through a proxy which has
888  * been associated with comlib.
889  ***************************************************************************/
890
891 extern int _charmHandlerIdx;
892 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid);
893
894
895 /** 
896     Handle messages sent via ArraySend. These are single point to point messages 
897     to array elements.
898
899     This method should not optimize direct sends in the case where buffering occurs
900
901  */
902 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
903                 const CkArrayIndexMax &idx, CkArrayID a){
904
905         CkAssert(pd != NULL);
906         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
907         int instid = ci->getID();
908         CkAssert(instid != 0);
909
910         // Reading from two hash tables is a big overhead
911         CkArray *amgr = CkArrayID::CkLocalBranch(a);
912         int dest_proc = amgr->lastKnown(idx);
913
914         register envelope * env = UsrToEnv(msg);
915         msg_prepareSend_noinline((CkArrayMessage*)msg, ep, a);
916
917         env->getsetArrayIndex()=idx;
918         env->setUsed(0);
919         ((CmiMsgHeaderExt *)env)->stratid = instid;
920
921         CkPackMessage(&env);
922         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));        
923         
924         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_ARRAYSEND);
925         
926         if(shouldBufferMessagesNow(instid)){
927           delayMessageSendBuffer[instid].insert(cmsg);
928           StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
929           int step = myEntry->lastKnownIteration;
930           ComlibManagerPrintf("[%d] ComlibManager::ArraySend BUFFERED OUTGOING: now buffer contains %d messages step=%d\n",CkMyPe(), delayMessageSendBuffer[instid].size(), step);
931         } else {
932           ComlibPrintf("ComlibManager::ArraySend NOT BUFFERING inserting message into strategy %d\n",instid);
933           
934           if(dest_proc == CkMyPe()){  
935             // Directly send if object is local
936             amgr->deliver((CkArrayMessage *)msg, CkDeliver_queue);
937             return;
938           } else {
939             // Send through converse level strategy if non-local
940             converseManager->insertMessage(cmsg, instid);
941           }
942           
943         }
944
945 }
946
947
948 #include "qd.h"
949 //CkpvExtern(QdState*, _qd);
950
951 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
952
953         CkAssert(pd != NULL);
954         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
955         int instid = ci->getID();
956
957         int dest_proc = onPE;
958
959         ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
960                         UsrToEnv(msg)->getTotalsize());
961
962         register envelope * env = UsrToEnv(msg);
963         if(dest_proc == CkMyPe()){
964                 _SET_USED(env, 0);
965                 CkSendMsgBranch(ep, msg, dest_proc, gid);
966                 return;
967         }
968
969         ((CmiMsgHeaderExt *)env)->stratid = instid;
970         CpvAccess(_qd)->create(1);
971
972         env->setMsgtype(ForBocMsg);
973         env->setEpIdx(ep);
974         env->setGroupNum(gid);
975         env->setSrcPe(CkMyPe());
976         env->setUsed(0);
977
978         CkPackMessage(&env);
979         CmiSetHandler(env, _charmHandlerIdx);
980
981         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_GROUPSEND); 
982
983         if(shouldBufferMessagesNow(instid)){
984           StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
985           int step = myEntry->lastKnownIteration;
986           ComlibPrintf("ComlibManager::GroupSend Buffering message for %d step=%d\n",instid, step);
987           delayMessageSendBuffer[instid].insert(cmsg);
988         } else {
989           ComlibPrintf("ComlibManager::GroupSend inserting message into strategy %d\n",instid);
990           converseManager->insertMessage(cmsg, instid);
991         }
992
993
994 }
995
996 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
997         ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
998
999         CkAssert(pd != NULL);
1000         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1001         int instid = ci->getID();
1002
1003         register envelope * env = UsrToEnv(m);
1004         msg_prepareSend_noinline((CkArrayMessage*)m, ep, a);
1005
1006         env->getsetArrayIndex()= dummyArrayIndex;
1007         ((CmiMsgHeaderExt *)env)->stratid = instid;
1008
1009         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
1010
1011         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_BROADCAST, CMH_ARRAYBROADCAST);
1012         cmsg->npes = 0;
1013         cmsg->sec_id = NULL;
1014         cmsg->array_id = a;
1015
1016         multicast(cmsg, instid);
1017 }
1018
1019 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
1020                 CkArrayID a, CkSectionID &s, int opts) {
1021
1022         CkAssert(pd != NULL);
1023         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1024         int instid = ci->getID();
1025
1026         ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
1027
1028
1029         //Provide a dummy dest proc as it does not matter for mulitcast 
1030         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_SECTION_MULTICAST, CMH_ARRAYSECTIONSEND);
1031         cmsg->npes = 0;
1032         cmsg->sec_id = &s;
1033         cmsg->array_id = a;
1034         
1035
1036         msg_prepareSend_noinline((CkArrayMessage*)m, ep, a);
1037         
1038         register envelope * env = UsrToEnv(m);
1039         env->getsetArrayIndex()= dummyArrayIndex;
1040         ((CmiMsgHeaderExt *)env)->stratid = instid;
1041         
1042         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
1043         
1044         env->setUsed(0);
1045         CkPackMessage(&env);
1046         
1047
1048         CkSectionInfo minfo;
1049         minfo.type = COMLIB_MULTICAST_MESSAGE;
1050         minfo.sInfo.cInfo.instId = ci->getID();
1051         //minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
1052         minfo.sInfo.cInfo.id = 0; 
1053         minfo.pe = CkMyPe();
1054         ((CkMcastBaseMsg *)env)->_cookie = minfo;    
1055         
1056         multicast(cmsg, instid);
1057 }
1058
1059 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
1060
1061         CkAssert(pd != NULL);
1062         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1063         int instid = ci->getID();
1064         CkAssert(instid!=0);
1065
1066         register envelope * env = UsrToEnv(m);
1067
1068         CpvAccess(_qd)->create(1);
1069
1070         env->setMsgtype(ForBocMsg);
1071         env->setEpIdx(ep);
1072         env->setGroupNum(g);
1073         env->setSrcPe(CkMyPe());
1074         env->setUsed(0);
1075         ((CmiMsgHeaderExt *)env)->stratid = instid;
1076
1077         CkPackMessage(&env);
1078         CmiSetHandler(env, _charmHandlerIdx);
1079
1080         //Provide a dummy dest proc as it does not matter for mulitcast 
1081         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST, CMH_GROUPBROADCAST);
1082
1083         cmsg->npes = 0;
1084         //cmsg->pelist = NULL;
1085
1086         multicast(cmsg, instid);
1087 }
1088
1089
1090 /** 
1091     Multicast the message with the specified strategy(instid).
1092     This method is used in: ArrayBroadcast, ArraySectionSend, and GroupBroadcast
1093  */
1094 void ComlibManager::multicast(CharmMessageHolder *cmsg, int instid) {
1095         CkAssert(instid != 0);
1096         register envelope * env = UsrToEnv(cmsg->getCharmMessage());
1097
1098 #if DEBUG
1099         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1100         ComlibPrintf("[%d] multicast setupComplete=%d %s %s\n", CkMyPe(), setupComplete, myEntry->getErrorModeString(), myEntry->getErrorModeServerString());
1101 #endif
1102  
1103         env->setUsed(0);
1104         CkPackMessage(&env);
1105         
1106         if(shouldBufferMessagesNow(instid)){
1107           cmsg->saveCopyOf_sec_id();
1108           StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1109           int step = myEntry->lastKnownIteration;
1110           ComlibPrintf("[%d] ComlibManager::multicast Buffering message for %d lastKnownIteration=%d\n", CkMyPe(),instid, step);
1111           delayMessageSendBuffer[instid].insert(cmsg);
1112         } else {
1113           converseManager->insertMessage(cmsg, instid);
1114         }
1115         
1116 }
1117
1118
1119 void ComlibManager::printDiagnostics(int instid){
1120   CkPrintf("[%d]     delayMessageSendBuffer.size()=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size());
1121 }
1122
1123
1124 void ComlibManager::printDiagnostics(){
1125
1126
1127   std::map<ComlibInstanceHandle, std::set<CharmMessageHolder*> >::iterator iter;
1128   for(iter = delayMessageSendBuffer.begin(); iter != delayMessageSendBuffer.end(); ++iter){
1129     int instid = iter->first;
1130     int size = iter->second.size();
1131     
1132     if(size>0 || true){
1133       CkPrintf("[%d] delayMessageSendBuffer[instid=%d] contains %d messages\n", CkMyPe(), instid, size);
1134       
1135       if(! shouldBufferMessagesNow(instid)){
1136         CkPrintf("[%d] printDiagnostics: No messages should be still in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
1137       } else {
1138         CkPrintf("[%d] printDiagnostics: Messages still ought to be delayed in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
1139       }
1140       
1141       StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1142       CkPrintf("[%d] printDiagnostics[instid=%d] setupComplete=%d %s %s %s bufferOutgoing=%d\n", (int)CkMyPe(), (int)instid, (int)setupComplete, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString(), (int)myEntry->bufferOutgoing);
1143
1144
1145
1146     }
1147   }
1148   
1149   CkpvAccess(conv_com_object).printDiagnostics();
1150   
1151 }
1152
1153
1154
1155 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
1156         //ComlibDelegateData *inst = static_cast<ComlibDelegateData *>(data);
1157         data->ref();
1158         return data;
1159         //return (new ComlibDelegateData(inst->getID()));
1160 }
1161
1162 CkDelegateData* ComlibManager::DelegatePointerPup(PUP::er &p,
1163                 CkDelegateData *pd) {
1164         if (!p.isUnpacking() && pd == NULL) CkAbort("Tryed to pup a null ComlibDelegateData!\n");
1165         //CmiBool isNotNull = pd!=NULL ? CmiTrue : CmiFalse;
1166         ComlibDelegateData *inst = static_cast<ComlibDelegateData *>(pd);
1167
1168         // call a migration constructor
1169         if (p.isUnpacking()) inst = new ComlibDelegateData((CkMigrateMessage*)0);
1170         inst->pup(p);
1171         /*
1172   p | isNotNull;
1173   if (isNotNull) {
1174     if (p.isUnpacking()) inst = new ComlibInstanceHandle();
1175     inst->pup(p);
1176   }
1177          */
1178         return inst;
1179 }
1180
1181
1182 //Collect statistics from all the processors, also gets the list of
1183 //array elements on each processor.
1184 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe) {
1185 }
1186
1187
1188 /// @TODO: eliminate AtSync and move toward anytime migration!
1189 void ComlibManager::AtSync() {
1190
1191 }
1192
1193
1194 /***************************************************************************
1195  * User section:
1196  *
1197  * Interface available to the user to interact with the ComlibManager
1198  ***************************************************************************/
1199
1200 /** Permanently associate the given proxy with comlib, to use the instance
1201     represented by cinst. All messages sent through the proxy will be forwarded
1202     by comlib. */
1203 void ComlibAssociateProxy(ComlibInstanceHandle cinst, CProxy &proxy) {
1204   if(CkNumPes() > 1){
1205         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1206         proxy.ckDelegate(cgproxy.ckLocalBranch(), new ComlibDelegateData(cinst));
1207   } else {
1208     ComlibPrintf("Doing nothing in ComlibAssociateProxy because we have only 1 pe\n"); 
1209   }
1210 }
1211
1212 /** Permanently assiciate the given proxy with comlib, to use the strategy
1213     represented by strat. All messages sent through the proxy will be forwarded
1214     by comlib. */
1215 void ComlibAssociateProxy(Strategy *strat, CProxy &proxy) {
1216         ComlibAssociateProxy(strat->getHandle(), proxy);
1217 }  
1218
1219 /* OLD DESCRIPTION! Register a strategy to the comlib framework, and return a
1220     handle to be used in the future with ComlibAssociateProxy to associate the
1221     strategy with a proxy. If one strategy is registered more than once, the
1222     handles will be different, but they has to be considered as aliases. */
1223
1224 /** Provided only for backward compatibility. A strategy is registered at the
1225     converse level as soon as it is created (see Strategy constructor). This
1226     function just retrieve a handle from the strategy itself. */
1227 ComlibInstanceHandle ComlibRegister(Strategy *strat) {
1228         return strat->getHandle();
1229 }
1230
1231 /** Call beginIteration on the ComlibManager with the instance handle for the strategy associated with the proxy. 
1232  *   If no strategy has been associated with the proxy, nothing is done in this function.
1233  */
1234 void ComlibBegin(CProxy &proxy, int iteration) {
1235   if(CkNumPes() > 1){
1236         ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
1237         if(cinst==NULL)
1238                 return;
1239         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1240         (cgproxy.ckLocalBranch())->beginIteration(cinst->getID(), iteration);
1241   } else {
1242     ComlibPrintf("Doing nothing in ComlibBegin because we have only 1 pe");  
1243   }
1244 }
1245
1246 /** Call endIteration on the ComlibManager with the instance handle for the strategy associated with the proxy. 
1247  *   If no strategy has been associated with the proxy, nothing is done in this function.
1248  */
1249 void ComlibEnd(CProxy &proxy, int iteration) {
1250   if(CkNumPes() > 1){
1251         ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
1252         if(cinst==NULL)
1253                 return;
1254         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1255         (cgproxy.ckLocalBranch())->endIteration(cinst->getID(), iteration);
1256   } else {
1257     ComlibPrintf("Doing nothing in ComlibEnd because we have only 1 pe");  
1258   }
1259 }
1260
1261 char *routerName;
1262 int sfactor=0;
1263
1264
1265 /** A mainchare, used to initialize the comlib framework at the program startup.
1266     Its main purpose is to create the ComlibManager group. */
1267 class ComlibManagerMain {
1268 public:
1269         ComlibManagerMain(CkArgMsg *msg) {
1270
1271                 if(CkMyPe() == 0 && msg !=  NULL)
1272                         CmiGetArgString(msg->argv, "+strategy", &routerName);         
1273
1274                 if(CkMyPe() == 0 && msg !=  NULL)
1275                         CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
1276
1277                 CProxy_ComlibManager::ckNew();
1278         }
1279 };
1280
1281 /***************************************************************************
1282  * ComlibInstanceHandle section:
1283  *
1284  * Implementation of the functions defined in the ComlibInstanceHandle class.
1285  ***************************************************************************/
1286
1287 ComlibDelegateData::ComlibDelegateData(int instid) : CkDelegateData(), _instid(instid) {
1288         ComlibManagerPrintf("[%d] Constructing ComlibDelegateData\n", CkMyPe());
1289         ref();
1290 }
1291
1292
1293 void ComlibInitSectionID(CkSectionID &sid){
1294
1295         sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1296         sid._cookie.pe = CkMyPe();
1297
1298         sid._cookie.sInfo.cInfo.id = 0;    
1299         sid.npes = 0;
1300         sid.pelist = NULL;
1301 }
1302
1303
1304 /** For backward compatibility - for old name commlib. The function
1305     _registercomlib() is generated by the translator. */
1306 void _registercommlib(void)
1307 {
1308         static int _done = 0; 
1309         if(_done) 
1310                 return; 
1311         _done = 1;
1312         _registercomlib();
1313 }
1314
1315
1316 void ComlibNotifyMigrationDoneHandler(void *msg) {
1317         CmiFree(msg);
1318         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1319         ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1320         if(cmgr_ptr)
1321                 cmgr_ptr->AtSync();    
1322 }
1323
1324
1325
1326
1327
1328
1329 static void periodicDebugPrintStatus(void* ptr, double currWallTime){
1330   CkPrintf("[%d] periodicDebugPrintStatus()\n", CkMyPe());
1331
1332   ComlibManager *mgr = (ComlibManager*)ptr;
1333   mgr->printDiagnostics();
1334
1335   CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, ptr, 4000, CkMyPe());
1336
1337 }
1338
1339
1340
1341
1342
1343 #include "comlib.def.h"
1344
1345 /*@}*/