0b988de1fd5fd0ccd07752154c223dc67880d9af
[charm.git] / src / ck-com / ComlibManager.C
1 /**
2    @addtogroup CharmComlib
3    @{
4    @file
5    Implementation of the functions in ComlibManager.h and handler for message
6    transportation.
7 */
8
9 #include "ComlibManager.h"
10 #include "comlib.h"
11 #include "ck.h"
12 #include "envelope.h"
13
14
15 // We only want to print debug information for a single strategy. Otherwise we'll get incredibly confused
16 #undef ComlibManagerPrintf
17 //#define ComlibManagerPrintf  if(instid==1)ComlibPrintf
18 #define ComlibManagerPrintf  ComlibPrintf
19
20 #define getmax(a,b) ((a)>(b)?(a):(b))
21
22 CkpvExtern(int, RecvdummyHandle);
23  
24 CkpvDeclare(CkGroupID, cmgrID);
25
26 /***************************************************************************
27  * Handlers section:
28  *
29  * all the handlers used by the ComlibManager to coordinate the work, and
30  * propagate the messages from one processor to another
31  ***************************************************************************/
32
33 //Handler to receive array messages
34 CkpvDeclare(int, RecvmsgHandle);
35
36 void recv_array_msg(void *msg){
37
38   //    ComlibPrintf("%d:In recv_msg\n", CkMyPe());
39
40         if(msg == NULL)
41                 return;
42
43         register envelope* env = (envelope *)msg;
44         env->setUsed(0);
45         env->getsetArrayHops()=1; 
46         CkUnpackMessage(&env);
47
48         int srcPe = env->getSrcPe();
49         int sid = ((CmiMsgHeaderExt *) env)->stratid;
50
51         //      ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), sid, env->getTotalsize(), srcPe);
52
53         RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
54
55         CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
56         
57         a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
58
59         //      ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
60         return;
61 }
62
63
64
65 /**
66    A debugging routine that will periodically print out the status of the message queues. 
67    The Definition is at bottom of this file.
68  */
69 static void periodicDebugPrintStatus(void* ptr, double currWallTime);
70
71
72
73
74
75 /***************************************************************************
76  * Initialization section:
77  *
78  * Routines used by Comlib to initialize itself and all the strategies on all
79  * the processors. Used only at the beginning of the program.
80  ***************************************************************************/
81
82 // // initialized at startup before the main chare main methods are called:
83 // void initCharmComlibManager(){
84 //   CkPrintf("[%d] initCharmComlibManager()\n", CkMyPe());
85 //   fflush(stdout);
86 // }
87
88
89
90 ComlibManager::ComlibManager(){
91         init();
92 }
93
94 void ComlibManager::init(){
95
96
97   //  CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, (void*)this, 4000, CkMyPe());
98
99
100   if(CkNumPes() == 1 ){
101     ComlibPrintf("Doing nothing in ComlibManager::init() because we are running on 1 pe.\n");
102   } else {
103
104         if (CkMyRank() == 0) {
105                 PUPable_reg(CharmMessageHolder);
106         }
107
108         numStatsReceived = 0;
109         curComlibController = 0;
110         clibIteration = 0;
111
112         CkpvInitialize(comRectHashType *, com_rect_ptr); 
113         CkpvAccess(com_rect_ptr)= new comRectHashType;
114
115         CkpvInitialize(int, RecvmsgHandle);
116         CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
117
118         bcast_pelist = new int [CkNumPes()];
119         for(int brcount = 0; brcount < CkNumPes(); brcount++)
120                 bcast_pelist[brcount] = brcount;
121
122         section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
123
124         CkpvInitialize(CkGroupID, cmgrID);
125         CkpvAccess(cmgrID) = thisgroup;
126
127         dummyArrayIndex.nInts = 0;
128
129         CkAssert(CkpvInitialized(conv_com_object));
130         converseManager = &CkpvAccess(conv_com_object);
131
132         setupComplete = 0;
133
134         CkpvInitialize(int, migrationDoneHandlerID);
135         CkpvAccess(migrationDoneHandlerID) = 
136                 CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
137
138         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
139         cgproxy[curComlibController].barrier();
140   }
141 }
142
143 //First barrier makes sure that the communication library group 
144 //has been created on all processors
145 void ComlibManager::barrier(){
146         static int bcount = 0;
147         ComlibPrintf("barrier %d\n", bcount);
148         if(CkMyPe() == 0) {
149                 bcount ++;
150                 if(bcount == CkNumPes()){
151                         bcount = 0;
152                         //barrierReached = 1;
153                         //barrier2Reached = 0;
154
155                         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
156                         cgproxy.resumeFromSetupBarrier();
157                 }
158         }
159 }
160
161
162
163 /**
164    Due to the possibility of race conditions in the initialization of charm, this
165    barrier prevents comlib from being activated before all group branches are created.
166    This function completes the initialization of the charm layer of comlib.
167
168    In this function we also call ComlibDoneCreating for the user (basically
169    triggering the broadcast of the strategies created in Main::Main. Here the
170    Main::Main has for sure executed, otherwise we will not have received
171    confirmation by all other processors.
172  */
173 void ComlibManager::resumeFromSetupBarrier(){
174         ComlibPrintf("[%d] resumeFromSetupBarrier Charm group ComlibManager setup finished\n", CkMyPe());
175
176         setupComplete = 1;
177         ComlibDoneCreating();
178         sendBufferedMessagesAllStrategies();
179 }
180
181
182 /***************************************************************************
183  Determine whether the delegated messages should be buffered until the 
184  strategy has recovered from any error conditions and startup. Once the 
185  buffers can be flushed, ComlibManager::sendBufferedMessages() will be called
186 ***************************************************************************/
187 bool ComlibManager::shouldBufferMessagesNow(int instid){
188   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
189   return (!setupComplete) || myEntry->errorMode == ERROR_MODE || myEntry->errorMode == CONFIRM_MODE || myEntry->bufferOutgoing;
190 }
191
192
193 /***************************************************************************
194  Calls ComlibManager::sendBufferedMessages for each strategy.
195 ***************************************************************************/
196 void ComlibManager::sendBufferedMessagesAllStrategies(){
197   int nstrats = converseManager->getNumStrats();
198   for(int i=0;i<nstrats;i++){
199     sendBufferedMessages(i);
200   }
201 }
202
203
204 /***************************************************************************
205    Send all the buffered messages once startup has completed, and we have 
206    recovered from any errors have recovered.
207 ***************************************************************************/
208 void ComlibManager::sendBufferedMessages(int instid){
209   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
210
211   if(! shouldBufferMessagesNow(instid) && delayMessageSendBuffer[instid].size() > 0){
212     ComlibManagerPrintf("[%d] sendBufferedMessages Sending %d buffered messages for instid=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size(), instid);
213
214
215     for (std::set<CharmMessageHolder*>::iterator iter = delayMessageSendBuffer[instid].begin(); iter != delayMessageSendBuffer[instid].end(); ++iter) {
216       CharmMessageHolder* cmsg = *iter;
217           
218       switch(cmsg->type){
219             
220       case CMH_ARRAYSEND:
221         //          CkAbort("CMH_ARRAYSEND unimplemented");
222         CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
223         CkpvAccess(conv_com_object).doneInserting(instid);
224         break;
225             
226       case CMH_GROUPSEND:
227         CkAbort("CMH_GROUPSEND unimplemented");
228         break;
229             
230       case CMH_ARRAYBROADCAST:
231       case CMH_ARRAYSECTIONSEND: 
232       case CMH_GROUPBROADCAST:
233         // Multicast/broadcast to an array or a section:
234         cmsg->sec_id = cmsg->copy_of_sec_id;
235         CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
236         CkpvAccess(conv_com_object).doneInserting(instid);
237         break;
238             
239       default:
240         CkAbort("Unknown cmsg->type was found in buffer of delayed messages\n");
241       }
242           
243     }
244
245     delayMessageSendBuffer[instid].clear();
246   } 
247   else {
248     ComlibManagerPrintf("sendBufferedMessages is not flushing buffered messages for strategy %d because myEntry->errorMode=%d && setupComplete = %d && myEntry->discoveryMode = %d && myEntry->bufferOutgoing = %d\n", instid, (int)myEntry->errorMode, (int)setupComplete, (int)myEntry->discoveryMode, (int)myEntry->bufferOutgoing);  
249   }
250
251
252
253 }
254
255
256
257 /***************************************************************************
258  * Bracketed section:
259  *
260  * Routines for bracketed strategies, to call the brackets begin and end, and to
261  * restore count errors after some objects migrate.
262  * 
263  * This function must only be called after all messages from the previous 
264  * iteration have been delivered. This is the application's responsibility to 
265  * ensure. The iteration values provided must increase monotonically.
266  ***************************************************************************/
267
268 /// Called when the array/group element starts sending messages
269 void ComlibManager::beginIteration(int instid, int iteration){
270         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
271                             
272         ComlibManagerPrintf("[%d] beginIteration iter=%d lastKnownIteration=%d  %s %s %s\n", CkMyPe(), iteration, myEntry->lastKnownIteration, myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
273
274
275         if(iteration > myEntry->lastKnownIteration){
276               // Verify & update errorModeServer:
277               if(CkMyPe()==0){
278                 CkAssert(myEntry->errorModeServer == NORMAL_MODE_SERVER || 
279                          myEntry->errorModeServer == STARTUP_MODE_SERVER ||  
280                          myEntry->errorModeServer == ERROR_FIXED_MODE_SERVER);
281                 myEntry->errorModeServer = NORMAL_MODE_SERVER;
282               } else {
283                 CkAssert(myEntry->errorModeServer == NON_SERVER_MODE_SERVER || 
284                          myEntry->errorModeServer == STARTUP_MODE_SERVER);
285                 myEntry->errorModeServer = NON_SERVER_MODE_SERVER;
286               }
287               // Verify & update errorMode:
288               CkAssert(myEntry->errorMode == ERROR_FIXED_MODE || myEntry->errorMode == NORMAL_MODE  );
289               myEntry->errorMode = NORMAL_MODE;   
290
291               myEntry->lastKnownIteration = iteration;
292               myEntry->nBeginItr = 1; // we are the first time to be called this iteration
293               myEntry->nEndItr = 0;
294               myEntry->nProcSync = 0;
295               myEntry->totalEndCounted = 0;
296               myEntry->discoveryMode = NORMAL_DISCOVERY_MODE;
297               myEntry->nEndSaved = 0;
298               
299               
300               ComlibManagerPrintf("[%d] beginIteration Starting Next Iteration ( # %d )\n", CkMyPe(), iteration);
301         } else {
302                 myEntry->nBeginItr++;
303                 ComlibManagerPrintf("[%d] beginIteration continuing iteration # %d\n", CkMyPe(), iteration);
304         }
305         
306         
307         // We need to check for error conditions here as well as EndIteration.
308         // This will ensure that if we are the processor that detects this error, 
309         // we won't deliver at least this message until the strategy is fixed
310         if (myEntry->nBeginItr > myEntry->numElements) {
311           ComlibManagerPrintf("[%d] beginIteration BUFFERING OUTGOING\n",CkMyPe());                     
312           myEntry->bufferOutgoing = 1;
313         }
314         
315 }
316
317
318 /** Called by user program when each element has finished sending its messages.
319
320     If no errors are detected, ConvComlibManager::doneInserting() is called on the 
321     underlying converse strategy. If an error is detected, then ConvComlibManager::doneInserting
322     is NOT called. This likely causes the the underlying converse strategy to buffer the 
323     messages until we recover from the error mode, although we have buffered at least some of
324     the messages, so the user program cannot run ahead and start the next iteration.
325
326 */
327
328 void ComlibManager::endIteration(int instid, int step){ 
329         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
330         
331         // migration is not allowed between begin and end of iteration, 
332         // so each element must call end iteration on the same processor 
333         // as it called begin iteration
334         CkAssert(myEntry->nEndItr <= myEntry->nBeginItr);
335         CkAssert(step == myEntry->lastKnownIteration);
336
337
338         myEntry->nEndItr++;
339         
340         ComlibManagerPrintf("[%d] endIteration called\n",CkMyPe());
341         
342         
343         if (myEntry->bufferOutgoing) {
344           // If migration was detected in ComlibManager::beginIteration and hence messages have been buffered:
345           CkAssert(delayMessageSendBuffer[instid].size() > 0);
346           CProxy_ComlibManager myProxy(thisgroup);
347           myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
348         } 
349         else if(myEntry->nEndItr == myEntry->numElements) {
350           // If all the objects have called beginIteration and endIteration and no errors were detected
351           CkAssert(converseManager->isReady(instid));
352           converseManager->doneInserting(instid);
353         }
354         
355 }
356
357
358 /** Start recovery of errors. 
359     This entry method calls itself repeatedly if the underlying 
360     converse strategy is not yet ready.
361
362     If the PE is already in error mode, then only the difference 
363     in the counts will be contributed.
364 */
365 void ComlibManager::bracketedStartErrorRecoveryProcess(int instid, int step){  
366   CProxy_ComlibManager myProxy(thisgroup);
367   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);     
368
369   
370
371   if(converseManager->isReady(instid)){
372     ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess() %s %s %s\n", CkMyPe(), myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
373
374     CkAssert(myEntry->strategy != NULL);
375     CkAssert(myEntry->errorMode == NORMAL_MODE || myEntry->errorMode == ERROR_MODE);
376           
377     if (!myEntry->strategy->isBracketed()) {
378       CkPrintf("[%d] endIteration called unecessarily for a non-bracketed strategy\n", CkMyPe());
379       return;
380     }
381        
382     if (myEntry->errorMode == NORMAL_MODE) {
383       ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess()\n", CkMyPe());
384       myEntry->nEndSaved = myEntry->nEndItr;
385       myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
386       myEntry->errorMode = ERROR_MODE;
387       bracketedStartDiscovery(instid);
388     } else {
389       // Send the updated count
390       int update = myEntry->nEndItr - myEntry->nEndSaved;
391       if (update > 0) {
392         //      ComlibManagerPrintf("bracketedStartErrorRecoveryProcess sending update to bracketedReceiveCount\n");
393         CProxy_ComlibManager myProxy(thisgroup);
394         myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
395         myEntry->nEndSaved = myEntry->nEndItr;
396       }
397       
398     }
399   } else {
400     ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess() REENQUEUE\n", CkMyPe() );
401     // Re-enqueue myself because we can't start error recovery process until converse strategy is ready
402     myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
403   }
404 }
405
406
407 /// Invoked on all processors to inform that an error has been detected.
408 void ComlibManager::bracketedErrorDetected(int instid, int step) {
409         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
410         
411         bracketedCatchUpPE(instid,step);
412         CkAssert(step == myEntry->lastKnownIteration);
413         CkAssert(myEntry->errorMode == NORMAL_MODE || myEntry->errorMode == ERROR_MODE);
414
415         ComlibManagerPrintf("[%d] bracketedErrorDetected()\n", CkMyPe());
416
417         if (myEntry->errorMode == NORMAL_MODE) {
418           // save the value we are sending to bracketedReceiveCount
419           myEntry->nEndSaved = myEntry->nEndItr; // save the value we are sending to bracketedReceiveCount
420           CProxy_ComlibManager myProxy(thisgroup);
421           myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
422           bracketedStartDiscovery(instid);
423           myEntry->errorMode = ERROR_MODE;
424
425         } else { // ERROR_MODE
426           // If we have an update count, send it
427           int update = myEntry->nEndItr - myEntry->nEndSaved;
428           ComlibManagerPrintf("bracketedErrorDetected update=%d\n", update);
429           if (update > 0) {
430             ComlibManagerPrintf("bracketedErrorDetected sending update to bracketedReceiveCount\n");
431             CProxy_ComlibManager myProxy(thisgroup);
432             myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
433             myEntry->nEndSaved = myEntry->nEndItr;
434           }
435         }
436         
437 }
438
439 /// Invoked on all processors. After processor 0 has a count match, it sends out
440 /// a broadcast on this entry method to get a confirmation from all others.
441 void ComlibManager::bracketedConfirmCount(int instid) {
442         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
443         CkAssert(myEntry->errorMode == ERROR_MODE);
444         myEntry->errorMode = CONFIRM_MODE;
445         CProxy_ComlibManager myProxy(thisgroup);
446         ComlibManagerPrintf("[%d] bracketedConfirmCount\n", CkMyPe());
447         myProxy[0].bracketedCountConfirmed(instid, myEntry->nEndSaved, myEntry->lastKnownIteration);
448 }
449
450 /// Invoked on processor 0 upon a request to confirm the count. If the count is
451 /// correct a "NewPeList" is sent out, otherwise the error mode is returned with
452 /// "ErrorDetected"
453 void ComlibManager::bracketedCountConfirmed(int instid, int count, int step) {
454         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
455         CkAssert(CkMyPe() == 0);
456         // Advance PE0 to current step if we had no local objects
457         bracketedCatchUpPE(instid, step);
458         CkAssert(myEntry->errorModeServer == CONFIRM_MODE_SERVER);
459         CkAssert(step == myEntry->lastKnownIteration);
460
461         myEntry->total += count;
462         
463         ComlibManagerPrintf("[%d] bracketedCountConfirmed\n", CkMyPe());
464         
465         if (++myEntry->peConfirmCounter == CkNumPes()) {
466                 myEntry->peConfirmCounter = 0;
467
468                 CkAssert(myEntry->total == myEntry->totalEndCounted);
469                   
470                 CProxy_ComlibManager(thisgroup).bracketedReceiveNewCount(instid);
471                 myEntry->errorModeServer = ERROR_FIXED_MODE_SERVER;
472                   
473                 myEntry->total = 0;     
474         }
475
476 }
477
478
479
480 /** Update the state for a PE that was likely lacking any local objects.
481     If no local objects exist, noone will call begin iteration, and hence,
482     the lastKnownIteration value will be old. We need to advance to the 
483     current iteration before we can proceed.
484 */
485 void ComlibManager::bracketedCatchUpPE(int instid, int step){
486   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
487   CkAssert(step >= myEntry->lastKnownIteration);
488   if(step > myEntry->lastKnownIteration){
489     
490     myEntry->total = 0;
491     myEntry->lastKnownIteration = step;
492     myEntry->nBeginItr = 0;
493     myEntry->nEndItr = 0;
494     myEntry->nProcSync = 0;
495     myEntry->totalEndCounted = 0;
496     myEntry->nEndSaved = 0;
497     myEntry->errorMode = NORMAL_MODE;
498     myEntry->discoveryMode = NORMAL_DISCOVERY_MODE;
499
500     if(CkMyPe()==0){
501       CkAssert(myEntry->errorModeServer == NORMAL_MODE_SERVER || myEntry->errorModeServer == ERROR_FIXED_MODE_SERVER);
502       ComlibManagerPrintf("[%d] NORMAL_MODE_SERVER *******************AAAAAAAAAAAA**********************\n", CkMyPe());
503       myEntry->errorModeServer = NORMAL_MODE_SERVER;
504     } else {
505       CkAssert(myEntry->errorModeServer == NON_SERVER_MODE_SERVER || myEntry->errorModeServer == STARTUP_MODE_SERVER);
506       myEntry->errorModeServer == NON_SERVER_MODE_SERVER;
507     }
508     
509     
510   }
511 }
512
513 /// Invoked on processor 0 when a processor sends a count of how many elements
514 /// have already deposited. This number is incremental, and it refers to the
515 /// previous one sent by that processor. Processor 0 stores temporarily all the
516 /// numbers it receives. When a match happens, processor 0 switches to "confirm"
517 /// mode and send out a request for confirmation to all other processors.
518 /// The final parameter, "step", is used so that if  no objects exist on a processor,
519 /// then the counts sent by the processor will be tagged with an old timestep, and can 
520 /// safely be ignored. If no objects are on a PE, then beginIteration will never
521 /// be called there.
522 void ComlibManager::bracketedReceiveCount(int instid, int pe, int count, int isFirst, int step) {
523         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
524         CkAssert(CkMyPe() == 0);
525         CkAssert(myEntry->errorModeServer == NORMAL_MODE_SERVER || myEntry->errorModeServer == ERROR_MODE_SERVER );
526
527         ComlibManagerPrintf("[%d] bracketedReceiveCount step=%d \n", CkMyPe(), step);
528
529         
530         // Advance PE0 to current step if we had no local objects
531         bracketedCatchUpPE(instid, step);
532
533         // Encountering a message from an old step
534         CkAssert(step == myEntry->lastKnownIteration);
535
536         myEntry->totalEndCounted += count;
537
538         ComlibManagerPrintf("[%d] bracketedReceiveCount step=%d totalEndCounted=%d count=%d\n", CkMyPe(), step, myEntry->totalEndCounted, count);
539
540         
541         myEntry->nProcSync += isFirst; // isFirst is 1 the first time a processor send a message,
542         CkAssert(myEntry->nProcSync <= CkNumPes());
543         
544         if (myEntry->errorModeServer == NORMAL_MODE_SERVER) { 
545                 // first time this is called
546                 CkAssert(myEntry->nProcSync == 1);
547                 CProxy_ComlibManager(thisgroup).bracketedErrorDetected(instid, step);
548                 myEntry->errorModeServer = ERROR_MODE_SERVER;
549                 ComlibManagerPrintf("[%d] bracketedReceiveCount first time\n", CkMyPe());
550         } else { // ERROR_MODE
551         
552           CharmStrategy* s = dynamic_cast<CharmStrategy*>(myEntry->strategy);
553           ComlibArrayInfo ainfo = s->ainfo;
554           int totalsrc =  ainfo.getTotalSrc() ;
555           
556           if(myEntry->nProcSync == CkNumPes() && myEntry->totalEndCounted == totalsrc) {
557             // ok, we received notifications from all PEs and all objects have called endIteration
558             myEntry->errorModeServer = CONFIRM_MODE_SERVER; 
559             ComlibManagerPrintf("[%d] bracketedReceiveCount errorModeServer is now CONFIRM_MODE calling bracketedConfirmCount totalsrc=%d\n", CkMyPe(), (int)totalsrc);
560             CProxy_ComlibManager(thisgroup).bracketedConfirmCount(instid);
561                         
562           }
563         }
564                 
565 }
566
567
568 /// Invoked on all processors. After the count has been checked and it matches
569 /// the number of emements involved in the bracketed operation, processor 0
570 /// sends a broadcast to acknowledge the success. 
571 /// The strategy is disabled until a new Pe list is received.
572 void ComlibManager::bracketedReceiveNewCount(int instid) {
573         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
574         CkAssert(myEntry->errorMode == CONFIRM_MODE);
575
576         myEntry->errorMode = ERROR_FIXED_MODE;
577         
578         myEntry->nEndItr -= myEntry->nEndSaved;
579         myEntry->nBeginItr -= myEntry->nEndSaved;
580
581         myEntry->nEndSaved = 0;
582
583         bracketedFinalBarrier(instid);
584 }
585
586
587
588
589 /// Invoked on all processors. When all processors have discovered all elements
590 /// involved in the operation, processor 0 uses this method to broadcast the
591 /// entire processor list to all. Currently the discovery process HAS to go
592 /// together with the counting process, otherwise the strategy is updated at an
593 /// undetermined state. In future it may be useful to implement the discovery
594 /// process alone.
595 void ComlibManager::bracketedReceiveNewPeList(int instid, int *count) {
596         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
597         CkAssert(myEntry->discoveryMode == STARTED_DISCOVERY_MODE);
598         myEntry->discoveryMode = FINISHED_DISCOVERY_MODE;
599
600         myEntry->strategy->bracketedUpdatePeKnowledge(count);
601         
602         ComlibManagerPrintf("[%d] bracketedReceiveNewPeList Updating numElements\n", CkMyPe());
603         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
604         CkAssert((unsigned long)myInfo > 0x1000);
605         myEntry->numElements = myInfo->getLocalSrc();
606         
607         ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n",CkMyPe(), instid, delayMessageSendBuffer[instid].size() );
608         ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n", CkMyPe(), instid, delayMessageSendBuffer[instid].size());
609                 
610         bracketedFinalBarrier(instid);
611 }
612
613
614 /** Start a barrier phase where all processes will enter it once both 
615     the counting and discovery processes complete.
616
617     ComlibManager::bracketedReceiveNewPeList calls this method. 
618     ComlibManager::bracketedReceiveNewPeList is called as an array broadcast to thisProxy, 
619     so every PE will call this method.
620  */
621 void ComlibManager::bracketedFinalBarrier(int instid) {
622   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
623   ComlibManagerPrintf("[%d] ComlibManager::bracketedFinalBarrier %s %s %s\n", CkMyPe(), myEntry->errorModeString(),  myEntry->errorModeServerString(),  myEntry->discoveryModeString() );
624
625
626   if (myEntry->discoveryMode == FINISHED_DISCOVERY_MODE &&  myEntry->errorMode == ERROR_FIXED_MODE) {    
627     CProxy_ComlibManager myProxy(thisgroup);
628     myProxy[0].bracketedReleaseCount(instid);
629   }
630 }
631
632
633 /** 
634     Once all PEs report here, we will allow them to release the buffered messages from this iteration
635  */
636 void ComlibManager::bracketedReleaseCount(int instid) {
637     ComlibManagerPrintf("[%d] ComlibManager::bracketedReleaseCount\n", CkMyPe());
638
639     StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
640     CkAssert(CkMyPe() == 0);
641     CkAssert(myEntry->errorModeServer == ERROR_FIXED_MODE_SERVER);
642     
643     myEntry->numBufferReleaseReady++;
644     if(myEntry->numBufferReleaseReady == CkNumPes()) {
645       myEntry->errorModeServer = NORMAL_MODE_SERVER;
646       CProxy_ComlibManager(thisgroup).bracketedReleaseBufferedMessages(instid);
647       myEntry->numBufferReleaseReady = 0;
648     }
649 }
650
651 /** 
652     Release any buffered messages.
653  */
654 void ComlibManager::bracketedReleaseBufferedMessages(int instid) {
655   ComlibManagerPrintf("[%d] ComlibManager::bracketedReleaseBufferedMessages\n", CkMyPe());
656
657   StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
658   
659   myEntry->bufferOutgoing = 0;
660   sendBufferedMessages(instid);
661   
662   converseManager->doneInserting(instid);
663 }
664
665
666
667
668
669
670 /** Called when an error is discovered. Such errors occur when it is observed
671     that an array element has migrated to a new PE. Specifically if a strategy
672     on some PE determines that it has received messages for more array elements
673     than the strategy knew were local, then some array element must have 
674     migrated to the PE. The strategy instance detecting the error will call this
675     method to initiate a global update operation that determines the locations
676     of all array elements.
677
678     This is called on each PE by the bracketedErrorDetected() method. 
679
680     Each array element previously located on this PE is examined to determine if
681     it is still here. If the array element has migrated away, then the
682     bracketedDiscover() method is called on the new PE.
683
684 */
685 void ComlibManager::bracketedStartDiscovery(int instid) {
686         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
687         CkAssert(myEntry->discoveryMode == NORMAL_DISCOVERY_MODE);
688         myEntry->discoveryMode = STARTED_DISCOVERY_MODE;
689         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
690                                 
691         ComlibManagerPrintf("[%d] bracketedStartDiscovery\n", CkMyPe());
692
693         int countSrc = 0;
694         int countDest = 0;
695         CkArrayID aid;
696         CkArrayIndexMax *idx;
697         int nelem;
698         CkArray *a;
699         CProxy_ComlibManager myProxy(thisgroup);
700
701         if (myInfo->isSourceArray()) {
702                 myInfo->getSourceArray(aid, idx, nelem);
703                 myInfo->resetSource();
704                 a = (CkArray*)_localBranch(aid);
705                 for (int i=0; i<nelem; ++i) {
706                         int pe = a->lastKnown(idx[i]);
707 //                      ComlibManagerPrintf("[%d] bracketedStartDiscovery src Index %d was lastKnown at pe %d\n", CkMyPe(), idx[i].data()[0], pe);
708                         if (pe == CkMyPe()) {
709                                 countSrc++;
710                                 myInfo->addSource(idx[i]);
711                         }
712                         else {
713                                 myProxy[pe].bracketedDiscover(instid, aid, idx[i], true);
714                         }
715                 }
716                 delete[] idx;
717         }
718
719         if (myInfo->isDestinationArray()) {
720                 myInfo->getDestinationArray(aid, idx, nelem);
721                 myInfo->resetDestination();
722                 a = (CkArray*)_localBranch(aid);
723                 for (int i=0; i<nelem; ++i) {
724                         int pe = a->lastKnown(idx[i]);
725 //                      ComlibManagerPrintf("[%d] bracketedStartDiscovery dest Index %d was lastKnown at pe %d\n", CkMyPe(), idx[i].data()[0], pe);
726                         if (pe == CkMyPe()) {
727                                 countDest++;
728                                 myInfo->addDestination(idx[i]);
729                         }
730                         else {
731                                 myProxy[pe].bracketedDiscover(instid, aid, idx[i], false);
732                         }
733                 }
734                 delete[] idx;
735         }
736
737         if (countSrc > 0 || countDest > 0) {
738                 // contribute only if we have something
739                 myProxy[0].bracketedContributeDiscovery(instid, CkMyPe(), countSrc, countDest, myEntry->lastKnownIteration);
740         }
741
742 }
743
744
745
746 /** Determine the PE of a given array index. This will be called for any 
747     array element by the previous owner PE.
748
749     If the index is local to the processor, then record this in the local 
750     strategy. Also send a message to PE 0 informing PE 0 that the array
751     element was located.
752
753     If the array element is not found locally, call this method on the last 
754     known location for the element.
755
756 */
757 void ComlibManager::bracketedDiscover(int instid, CkArrayID aid, CkArrayIndexMax &idx, int isSrc) {
758         ComlibManagerPrintf("[%d] bracketedDiscover\n", CkMyPe());
759         CkArray *a = (CkArray *)_localBranch(aid);
760         int pe = a->lastKnown(idx);
761         CProxy_ComlibManager myProxy(thisgroup);
762         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
763
764         if (pe == CkMyPe()) {
765                 // Object was found locally
766          
767                 // notify PE 0
768                 myProxy[0].bracketedContributeDiscovery(instid, pe, isSrc?1:0, isSrc?0:1, myEntry->lastKnownIteration);
769           
770
771                 // Find local strategy instance
772
773                 // ComlibArrayInfo *myInfo = &(dynamic_cast<CharmStrategy*>(getStrategy(instid))->ainfo);
774                 CkAssert((unsigned long)myEntry->strategy > 0x1000);
775                 ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
776                 CkAssert((unsigned long)myInfo > 0x1000);
777
778                 if (isSrc) {
779                         // Add the element as a source element for the strategy
780                         ComlibManagerPrintf("[%d] bracketedDiscover addSource\n", CkMyPe());
781                         CkAssert((unsigned long)myInfo > 0x1000);
782                         myInfo->addSource(idx);
783
784                         ComlibManagerPrintf("[%d] bracketedDiscover updating numElements\n", CkMyPe());
785                         myEntry->numElements = myInfo->getLocalSrc();           
786                 }
787                 else {
788                         // Add the element as a Destination element for the strategy
789                         ComlibManagerPrintf("[%d] bracketedDiscover addDestination\n", CkMyPe());
790                         myInfo->addDestination(idx);
791                 }
792         } else {
793                 ComlibManagerPrintf("Keep On Forwarding*********************\n");
794                 // forward to next potential processor
795                 myProxy[pe].bracketedDiscover(instid, aid, idx, isSrc);
796         }
797 }
798
799
800
801 /** On PE 0, record the notifications from all PEs about the actual locations of each
802     array element. Count the number of elements discovered. Once the new location of 
803     every elements has been discovered, broadcast the new locations to all PEs by 
804     invoking bracketedReceiveNewPeList.
805 */
806 void ComlibManager::bracketedContributeDiscovery(int instid, int pe, int nsrc, int ndest, int step) {
807         CkAssert(CkMyPe() == 0);
808         ComlibManagerPrintf("[%d] bracketedContributeDiscovery pe=%d nsrc=%d ndest=%d step=%d\n", CkMyPe(), pe, nsrc, ndest, step);
809         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
810         if (myEntry->peList == 0) {
811                 myEntry->peList = new int[CkNumPes()+2];
812                 // peList[CkNumPes()] keeps the sum of all source objects discovered
813                 // peList[CkNumPes()+1] keeps the sum of all destination objects discovered
814                 for (int i=0; i<CkNumPes()+2; ++i) myEntry->peList[i]=0;
815                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery zeroing new peList\n", CkMyPe());
816         }
817         myEntry->peList[CkNumPes()] += nsrc;
818         myEntry->peList[CkNumPes()+1] += ndest;
819         // update the count for the sender processor. peList[i] is:
820         // 0 if proc "i" has no objects,
821         // 1 if proc "i" has only source objects,
822         // 2 if proc "i" has only destination objects,
823         // 3 if proc "i" has both source and destination objects
824         // the following code maintains the property of peList!
825         if (nsrc > 0) myEntry->peList[pe] |= 1;
826         if (ndest > 0) myEntry->peList[pe] |= 2;
827
828         ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
829         CkAssert((unsigned long)myInfo > 0x1000);
830
831         
832 //      ComlibManagerPrintf("[%d] bracketedContributeDiscovery myEntry->peList[CkNumPes()]=%d  myInfo->getTotalSrc()=%d\n", CkMyPe(), myEntry->peList[CkNumPes()], myInfo->getTotalSrc());
833 //      ComlibManagerPrintf("[%d] bracketedContributeDiscovery myEntry->peList[CkNumPes()+1]=%d  myInfo->getTotalDest()=%d\n", CkMyPe(), myEntry->peList[CkNumPes()+1], myInfo->getTotalDest());
834 //              
835         if (myEntry->peList[CkNumPes()] == myInfo->getTotalSrc() &&
836                         myEntry->peList[CkNumPes()+1] == myInfo->getTotalDest()) {
837                 // discovery process finished, broadcast the new pe list
838
839                 CProxy_ComlibManager myProxy(thisgroup);
840                 
841                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery calling bracketedReceiveNewPeList %d/%d, %d/%d\n",       
842                                 CkMyPe(), 
843                                 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(), 
844                                 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
845                 
846                 printPeList("bracketedContributeDiscovery peList=", myEntry->peList);
847                 myProxy.bracketedReceiveNewPeList(instid, myEntry->peList);
848                 delete myEntry->peList;
849                 myEntry->peList = NULL;
850         } else {
851                 ComlibManagerPrintf("[%d] bracketedContributeDiscovery NOT calling bracketedReceiveNewPeList %d/%d, %d/%d\n", 
852                                 CkMyPe(), 
853                                 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(), 
854                                 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
855         }
856
857 }
858
859
860 void ComlibManager::printPeList(const char* note, int *count)
861 {
862         char *buf = (char*)malloc(1024*64);
863         sprintf(buf, "[%d] %s ", CkMyPe(), note);
864         for(int i=0;i<CkNumPes();i++){
865                 switch (count[i]){
866                 case 0:
867                         sprintf(buf+strlen(buf), " %d:no ", i, count[i]);
868                         break;
869                 case 1:
870                         sprintf(buf+strlen(buf), " %d:source ", i, count[i]);
871                         break;
872                 case 2:
873                         sprintf(buf+strlen(buf), " %d:dest ", i, count[i]);
874                         break;
875                 case 3:
876                         sprintf(buf+strlen(buf), " %d:both ", i, count[i]);
877                         break;
878                 }
879         }
880         
881         sprintf(buf+strlen(buf), ", all source objects discovered =  %d, all destination objects discovered = %d",  count[CkNumPes()] ,  count[CkNumPes()+1] );
882                 
883         ComlibPrintf("%s\n", buf);
884 }
885
886 /***************************************************************************
887  * Delegation framework section:
888  *
889  * Reimplementation of the main routines needed for the delegation framework:
890  * this routines will be called when a message is sent through a proxy which has
891  * been associated with comlib.
892  ***************************************************************************/
893
894 extern int _charmHandlerIdx;
895 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid);
896
897
898 /** 
899     Handle messages sent via ArraySend. These are single point to point messages 
900     to array elements.
901
902     This method should not optimize direct sends in the case where buffering occurs
903
904  */
905 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
906                 const CkArrayIndexMax &idx, CkArrayID a){
907
908         CkAssert(pd != NULL);
909         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
910         int instid = ci->getID();
911         CkAssert(instid != 0);
912
913         // Reading from two hash tables is a big overhead
914         CkArray *amgr = CkArrayID::CkLocalBranch(a);
915         int dest_proc = amgr->lastKnown(idx);
916
917         register envelope * env = UsrToEnv(msg);
918         msg_prepareSend_noinline((CkArrayMessage*)msg, ep, a);
919
920         env->getsetArrayIndex()=idx;
921         env->setUsed(0);
922         ((CmiMsgHeaderExt *)env)->stratid = instid;
923
924         CkPackMessage(&env);
925         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));        
926         
927         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_ARRAYSEND);
928         
929         if(shouldBufferMessagesNow(instid)){
930           delayMessageSendBuffer[instid].insert(cmsg);
931           ComlibManagerPrintf("[%d] ComlibManager::ArraySend BUFFERED OUTGOING: now buffer contains %d messages\n",CkMyPe(), delayMessageSendBuffer[instid].size() );
932         } else {
933           ComlibPrintf("ComlibManager::ArraySend NOT BUFFERING inserting message into strategy %d\n",instid);
934           
935           if(dest_proc == CkMyPe()){  
936             // Directly send if object is local
937             amgr->deliver((CkArrayMessage *)msg, CkDeliver_queue);
938             return;
939           } else {
940             // Send through converse level strategy if non-local
941             converseManager->insertMessage(cmsg, instid);
942           }
943           
944         }
945
946 }
947
948
949 #include "qd.h"
950 //CkpvExtern(QdState*, _qd);
951
952 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
953
954         CkAssert(pd != NULL);
955         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
956         int instid = ci->getID();
957
958         int dest_proc = onPE;
959
960         ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
961                         UsrToEnv(msg)->getTotalsize());
962
963         register envelope * env = UsrToEnv(msg);
964         if(dest_proc == CkMyPe()){
965                 _SET_USED(env, 0);
966                 CkSendMsgBranch(ep, msg, dest_proc, gid);
967                 return;
968         }
969
970         ((CmiMsgHeaderExt *)env)->stratid = instid;
971         CpvAccess(_qd)->create(1);
972
973         env->setMsgtype(ForBocMsg);
974         env->setEpIdx(ep);
975         env->setGroupNum(gid);
976         env->setSrcPe(CkMyPe());
977         env->setUsed(0);
978
979         CkPackMessage(&env);
980         CmiSetHandler(env, _charmHandlerIdx);
981
982         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_GROUPSEND); 
983
984         if(shouldBufferMessagesNow(instid)){
985           ComlibPrintf("ComlibManager::GroupSend Buffering message for %d\n",instid);
986           delayMessageSendBuffer[instid].insert(cmsg);
987         } else {
988           ComlibPrintf("ComlibManager::GroupSend inserting message into strategy %d\n",instid);
989           converseManager->insertMessage(cmsg, instid);
990         }
991
992
993 }
994
995 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
996         ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
997
998         CkAssert(pd != NULL);
999         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1000         int instid = ci->getID();
1001
1002         register envelope * env = UsrToEnv(m);
1003         msg_prepareSend_noinline((CkArrayMessage*)m, ep, a);
1004
1005         env->getsetArrayIndex()= dummyArrayIndex;
1006         ((CmiMsgHeaderExt *)env)->stratid = instid;
1007
1008         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
1009
1010         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_BROADCAST, CMH_ARRAYBROADCAST);
1011         cmsg->npes = 0;
1012         cmsg->sec_id = NULL;
1013         cmsg->array_id = a;
1014
1015         multicast(cmsg, instid);
1016 }
1017
1018 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
1019                 CkArrayID a, CkSectionID &s, int opts) {
1020
1021         CkAssert(pd != NULL);
1022         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1023         int instid = ci->getID();
1024
1025         ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
1026
1027
1028         //Provide a dummy dest proc as it does not matter for mulitcast 
1029         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_SECTION_MULTICAST, CMH_ARRAYSECTIONSEND);
1030         cmsg->npes = 0;
1031         cmsg->sec_id = &s;
1032         cmsg->array_id = a;
1033         
1034
1035         msg_prepareSend_noinline((CkArrayMessage*)m, ep, a);
1036         
1037         register envelope * env = UsrToEnv(m);
1038         env->getsetArrayIndex()= dummyArrayIndex;
1039         ((CmiMsgHeaderExt *)env)->stratid = instid;
1040         
1041         CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
1042         
1043         env->setUsed(0);
1044         CkPackMessage(&env);
1045         
1046         
1047         /// @TODO why are CkSectionInfo needed since it doesn't seem to have
1048         /// useful information?
1049         CkSectionInfo minfo;
1050         minfo.type = COMLIB_MULTICAST_MESSAGE;
1051         minfo.sInfo.cInfo.instId = ci->getID();
1052         //minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
1053         minfo.sInfo.cInfo.id = 0; 
1054         minfo.pe = CkMyPe();
1055         ((CkMcastBaseMsg *)env)->_cookie = minfo;    
1056         
1057         multicast(cmsg, instid);
1058 }
1059
1060 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
1061
1062         CkAssert(pd != NULL);
1063         ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
1064         int instid = ci->getID();
1065         CkAssert(instid!=0);
1066
1067         register envelope * env = UsrToEnv(m);
1068
1069         CpvAccess(_qd)->create(1);
1070
1071         env->setMsgtype(ForBocMsg);
1072         env->setEpIdx(ep);
1073         env->setGroupNum(g);
1074         env->setSrcPe(CkMyPe());
1075         env->setUsed(0);
1076         ((CmiMsgHeaderExt *)env)->stratid = instid;
1077
1078         CkPackMessage(&env);
1079         CmiSetHandler(env, _charmHandlerIdx);
1080
1081         //Provide a dummy dest proc as it does not matter for mulitcast 
1082         CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST, CMH_GROUPBROADCAST);
1083
1084         cmsg->npes = 0;
1085         //cmsg->pelist = NULL;
1086
1087         multicast(cmsg, instid);
1088 }
1089
1090
1091 /** 
1092     Multicast the message with the specified strategy(instid).
1093     This method is used in: ArrayBroadcast, ArraySectionSend, and GroupBroadcast
1094  */
1095 void ComlibManager::multicast(CharmMessageHolder *cmsg, int instid) {
1096         CkAssert(instid != 0);
1097         register envelope * env = UsrToEnv(cmsg->getCharmMessage());
1098
1099 #if DEBUG
1100         StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1101         ComlibPrintf("[%d] multicast setupComplete=%d errorMode=%d errorModeServer=%d\n", CkMyPe(), setupComplete, myEntry->errorMode, myEntry->errorModeServer);
1102 #endif
1103  
1104         env->setUsed(0);
1105         CkPackMessage(&env);
1106         
1107         if(shouldBufferMessagesNow(instid)){
1108           cmsg->saveCopyOf_sec_id();
1109           delayMessageSendBuffer[instid].insert(cmsg);
1110         } else {
1111           converseManager->insertMessage(cmsg, instid);
1112         }
1113         
1114 }
1115
1116
1117 void ComlibManager::printDiagnostics(int instid){
1118         ComlibManagerPrintf("[%d]     delayMessageSendBuffer.size()=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size());
1119 }
1120
1121
1122 void ComlibManager::printDiagnostics(){
1123   
1124
1125   std::map<ComlibInstanceHandle, std::set<CharmMessageHolder*> >::iterator iter;
1126   for(iter = delayMessageSendBuffer.begin(); iter != delayMessageSendBuffer.end(); ++iter){
1127     int instid = iter->first;
1128     int size = iter->second.size();
1129     
1130     if(size>0){
1131       CkPrintf("[%d] delayMessageSendBuffer[instid=%d] contains %d messages\n", CkMyPe(), instid, size);
1132       
1133       if(! shouldBufferMessagesNow(instid)){
1134         CkPrintf("[%d] printDiagnostics: No messages should be still in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
1135       } else {
1136         CkPrintf("[%d] printDiagnostics: Messages still ought to be delayed in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
1137       }
1138       
1139       StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
1140       CkPrintf("[%d] printDiagnostics[instid=%d] setupComplete=%d errorMode=%d errorModeServer=%d discoveryMode=%d bufferOutgoing=%d\n", (int)CkMyPe(), (int)instid, (int)setupComplete, (int)myEntry->errorMode, (int)myEntry->errorModeServer, (int)myEntry->discoveryMode, (int)myEntry->bufferOutgoing);
1141     }
1142   }
1143   
1144   CkpvAccess(conv_com_object).printDiagnostics();
1145   
1146 }
1147
1148
1149
1150 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
1151         //ComlibDelegateData *inst = static_cast<ComlibDelegateData *>(data);
1152         data->ref();
1153         return data;
1154         //return (new ComlibDelegateData(inst->getID()));
1155 }
1156
1157 CkDelegateData* ComlibManager::DelegatePointerPup(PUP::er &p,
1158                 CkDelegateData *pd) {
1159         if (!p.isUnpacking() && pd == NULL) CkAbort("Tryed to pup a null ComlibDelegateData!\n");
1160         //CmiBool isNotNull = pd!=NULL ? CmiTrue : CmiFalse;
1161         ComlibDelegateData *inst = static_cast<ComlibDelegateData *>(pd);
1162
1163         // call a migration constructor
1164         if (p.isUnpacking()) inst = new ComlibDelegateData((CkMigrateMessage*)0);
1165         inst->pup(p);
1166         /*
1167   p | isNotNull;
1168   if (isNotNull) {
1169     if (p.isUnpacking()) inst = new ComlibInstanceHandle();
1170     inst->pup(p);
1171   }
1172          */
1173         return inst;
1174 }
1175
1176
1177 //Collect statistics from all the processors, also gets the list of
1178 //array elements on each processor.
1179 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe) {
1180 }
1181
1182
1183 /// @TODO: eliminate AtSync and move toward anytime migration!
1184 void ComlibManager::AtSync() {
1185
1186 }
1187
1188
1189 /***************************************************************************
1190  * User section:
1191  *
1192  * Interface available to the user to interact with the ComlibManager
1193  ***************************************************************************/
1194
1195 /** Permanently associate the given proxy with comlib, to use the instance
1196     represented by cinst. All messages sent through the proxy will be forwarded
1197     by comlib. */
1198 void ComlibAssociateProxy(ComlibInstanceHandle cinst, CProxy &proxy) {
1199   if(CkNumPes() > 1){
1200         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1201         proxy.ckDelegate(cgproxy.ckLocalBranch(), new ComlibDelegateData(cinst));
1202   } else {
1203     ComlibPrintf("Doing nothing in ComlibAssociateProxy because we have only 1 pe\n"); 
1204   }
1205 }
1206
1207 /** Permanently assiciate the given proxy with comlib, to use the strategy
1208     represented by strat. All messages sent through the proxy will be forwarded
1209     by comlib. */
1210 void ComlibAssociateProxy(Strategy *strat, CProxy &proxy) {
1211         ComlibAssociateProxy(strat->getHandle(), proxy);
1212 }  
1213
1214 /* OLD DESCRIPTION! Register a strategy to the comlib framework, and return a
1215     handle to be used in the future with ComlibAssociateProxy to associate the
1216     strategy with a proxy. If one strategy is registered more than once, the
1217     handles will be different, but they has to be considered as aliases. */
1218
1219 /** Provided only for backward compatibility. A strategy is registered at the
1220     converse level as soon as it is created (see Strategy constructor). This
1221     function just retrieve a handle from the strategy itself. */
1222 ComlibInstanceHandle ComlibRegister(Strategy *strat) {
1223         return strat->getHandle();
1224 }
1225
1226 /** Call beginIteration on the ComlibManager with the instance handle for the strategy associated with the proxy. 
1227  *   If no strategy has been associated with the proxy, nothing is done in this function.
1228  */
1229 void ComlibBegin(CProxy &proxy, int iteration) {
1230   if(CkNumPes() > 1){
1231         ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
1232         if(cinst==NULL)
1233                 return;
1234         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1235         (cgproxy.ckLocalBranch())->beginIteration(cinst->getID(), iteration);
1236   } else {
1237     ComlibPrintf("Doing nothing in ComlibBegin because we have only 1 pe");  
1238   }
1239 }
1240
1241 /** Call endIteration on the ComlibManager with the instance handle for the strategy associated with the proxy. 
1242  *   If no strategy has been associated with the proxy, nothing is done in this function.
1243  */
1244 void ComlibEnd(CProxy &proxy, int iteration) {
1245   if(CkNumPes() > 1){
1246         ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
1247         if(cinst==NULL)
1248                 return;
1249         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1250         (cgproxy.ckLocalBranch())->endIteration(cinst->getID(), iteration);
1251   } else {
1252     ComlibPrintf("Doing nothing in ComlibEnd because we have only 1 pe");  
1253   }
1254 }
1255
1256 char *routerName;
1257 int sfactor=0;
1258
1259
1260 /** A mainchare, used to initialize the comlib framework at the program startup.
1261     Its main purpose is to create the ComlibManager group. */
1262 class ComlibManagerMain {
1263 public:
1264         ComlibManagerMain(CkArgMsg *msg) {
1265
1266                 if(CkMyPe() == 0 && msg !=  NULL)
1267                         CmiGetArgString(msg->argv, "+strategy", &routerName);         
1268
1269                 if(CkMyPe() == 0 && msg !=  NULL)
1270                         CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
1271
1272                 CProxy_ComlibManager::ckNew();
1273         }
1274 };
1275
1276 /***************************************************************************
1277  * ComlibInstanceHandle section:
1278  *
1279  * Implementation of the functions defined in the ComlibInstanceHandle class.
1280  ***************************************************************************/
1281
1282 ComlibDelegateData::ComlibDelegateData(int instid) : CkDelegateData(), _instid(instid) {
1283         ComlibManagerPrintf("[%d] Constructing ComlibDelegateData\n", CkMyPe());
1284         ref();
1285 }
1286
1287
1288 void ComlibInitSectionID(CkSectionID &sid){
1289
1290         sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1291         sid._cookie.pe = CkMyPe();
1292
1293         sid._cookie.sInfo.cInfo.id = 0;    
1294         sid.npes = 0;
1295         sid.pelist = NULL;
1296 }
1297
1298
1299 /** For backward compatibility - for old name commlib. The function
1300     _registercomlib() is generated by the translator. */
1301 void _registercommlib(void)
1302 {
1303         static int _done = 0; 
1304         if(_done) 
1305                 return; 
1306         _done = 1;
1307         _registercomlib();
1308 }
1309
1310
1311 void ComlibNotifyMigrationDoneHandler(void *msg) {
1312         CmiFree(msg);
1313         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1314         ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1315         if(cmgr_ptr)
1316                 cmgr_ptr->AtSync();    
1317 }
1318
1319
1320
1321
1322
1323
1324 static void periodicDebugPrintStatus(void* ptr, double currWallTime){
1325   // CkPrintf("[%d] periodicDebugPrintStatus()\n", CkMyPe());
1326
1327   ComlibManager *mgr = (ComlibManager*)ptr;
1328   mgr->printDiagnostics();
1329
1330   CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, ptr, 4000, CkMyPe());
1331
1332 }
1333
1334
1335
1336
1337
1338 #include "comlib.def.h"
1339
1340 /*@}*/