Merge remote branch 'origin/charm' into charmrun_merge
[charm.git] / src / ck-cp / pathHistory.C
1 #include <charm++.h>
2 #include <iostream>
3 #include <fstream>
4 #include <string>
5 #include <map>
6 #include <set>
7 #include <vector>
8 #include <utility>
9
10
11 #if CMK_WITH_CONTROLPOINT
12
13
14 #include "PathHistory.decl.h"
15 #include "LBDatabase.h"
16 #include "pathHistory.h"
17 //#include "controlPoints.h"
18 //#include "arrayRedistributor.h"
19 #include <register.h> // for _entryTable
20
21 #include "trace-projections.h"
22
23
24 /**
25  *  \addtogroup CriticalPathFramework
26  *   @{
27  *
28  */
29
30
31
32 /*readonly*/ CProxy_pathHistoryManager pathHistoryManagerProxy;
33
34 CkpvDeclare(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
35 CkpvDeclare(double, timeEntryMethodStarted);
36
37 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
38 /** A table to store all the local nodes in the parallel dependency graph */
39 CkpvDeclare(PathHistoryTableType, pathHistoryTable);
40 /** A counter that defines the new keys for the entries in the pathHistoryTable */
41 CkpvDeclare(int, pathHistoryTableLastIdx);
42 #endif
43
44
45 /// A mainchare that is used just to create a group at startup
46 class pathHistoryMain : public CBase_pathHistoryMain {
47 public:
48   pathHistoryMain(CkArgMsg* args){
49 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
50     pathHistoryManagerProxy = CProxy_pathHistoryManager::ckNew();
51 #endif
52   }
53   ~pathHistoryMain(){}
54 };
55
56
57 pathHistoryManager::pathHistoryManager(){
58
59 }
60
61
62   /** Trace perform a traversal backwards over the critical path specified as a 
63       table index for the processor upon which this is called.
64       
65       The callback cb will be called with the resulting msg after the path has 
66       been traversed to its origin.  
67   */
68  void pathHistoryManager::traceCriticalPathBackStepByStep(pathInformationMsg *msg){
69 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
70    int count = CkpvAccess(pathHistoryTable).count(msg->table_idx);
71
72 #if DEBUGPRINT > 2
73    CkPrintf("Table entry %d on pe %d occurs %d times in table\n", msg->table_idx, CkMyPe(), count);
74 #endif
75    CkAssert(count==0 || count==1);
76
77     if(count > 0){ 
78       PathHistoryTableEntry & path = CkpvAccess(pathHistoryTable)[msg->table_idx];
79       int idx = path.sender_history_table_idx;
80       int pe = path.sender_pe;
81
82 #if DEBUGPRINT > 2
83       CkPrintf("Table entry %d on pe %d points to pe=%d idx=%d\n", msg->table_idx, CkMyPe(), pe, idx);
84 #endif
85
86       // Make a copy of the message as we forward it along
87       pathInformationMsg *newmsg = new(msg->historySize+1) pathInformationMsg;
88       for(int i=0;i<msg->historySize;i++){
89         newmsg->history[i] = msg->history[i];
90       }
91       newmsg->history[msg->historySize] = path;
92       newmsg->historySize = msg->historySize+1;
93       newmsg->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
94       newmsg->cb = msg->cb;
95       newmsg->table_idx = idx;
96         
97       if(idx > -1 && pe > -1){
98         // Not yet at origin, keep tracing the path back
99         CkAssert(pe < CkNumPes() && pe >= 0);
100         thisProxy[pe].traceCriticalPathBackStepByStep(newmsg);
101       } else {
102         CkPrintf("Traced critical path back to its origin.\n");
103         if(msg->saveAsProjectionsUserEvents){
104
105           // Keep a message for returning to the user's callback
106           pathForUser = new(msg->historySize+1) pathInformationMsg;
107           for(int i=0;i<msg->historySize;i++){
108             pathForUser->history[i] = msg->history[i];
109           }
110           pathForUser->history[msg->historySize] = path;
111           pathForUser->historySize = msg->historySize+1;
112           pathForUser->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
113           pathForUser->cb = msg->cb;
114           pathForUser->table_idx = idx;
115           
116           CkPrintf("Broadcasting it to all PE\n");
117           thisProxy.broadcastCriticalPathProjections(newmsg);
118         } else {
119           newmsg->cb.send(newmsg);
120         }
121
122       }
123     } else {
124       CkAbort("ERROR: Traced critical path back to a nonexistent table entry.\n");
125     }
126
127     delete msg;
128 #else
129     CkAbort("Shouldn't call pathHistoryManager::traceCriticalPathBack when critical path detection is not enabled");
130 #endif
131   }
132
133
134 void pathHistoryManager::broadcastCriticalPathProjections(pathInformationMsg *msg){
135
136 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
137   CkPrintf("[%d] Received broadcast of critical path\n", CkMyPe());
138   int me = CkMyPe();
139   int intersectsLocalPE = false;
140
141   // Create user events for critical path
142
143   for(int i=msg->historySize-1;i>=0;i--){
144     if(CkMyPe() == msg->history[i].local_pe){
145       // Part of critical path is local
146       // Create user event for it
147
148       //      CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, msg->history[i].get_local_path_time(), msg-> history[i].local_arr, msg->history[i].local_ep, msg->history[i].get_start_time(), msg->history[i].get_preceding_path_time(), msg->history[i].local_pe);
149       traceUserBracketEvent(32000, msg->history[i].get_start_time(), msg->history[i].get_start_time() + msg->history[i].get_local_path_time());
150
151       intersectsLocalPE = true;
152     }
153
154   }
155
156   traceRegisterUserEvent("Critical Path", 32000);
157   
158   
159 #define PRUNE_CRITICAL_PATH_LOGS 0
160
161 #if PRUNE_CRITICAL_PATH_LOGS
162   // Tell projections tracing to only output log entries if I contain part of the critical path
163
164   enableTraceLogOutput();
165   if(! intersectsLocalPE){
166     disableTraceLogOutput();
167     CkPrintf("PE %d doesn't intersect the critical path, so its log files won't be created\n", CkMyPe() );
168   }
169 #endif
170   
171 #if TRACE_ALL_PATH_TABLE_ENTRIES
172   // Create user events for all table entries
173   std::map< int, PathHistoryTableEntry >::iterator iter;
174   for(iter=pathHistoryTable.begin(); iter != pathHistoryTable.end(); iter++){
175     double startTime = iter->second.get_start_time();
176     double endTime = iter->second.get_start_time() + iter->second.get_local_path_time();
177     traceUserBracketEvent(32001, startTime, endTime);
178   }
179 #endif
180
181   int data=1;
182   CkCallback cb(CkIndex_pathHistoryManager::criticalPathProjectionsDone(NULL),thisProxy[0]); 
183   contribute(sizeof(int), &data, CkReduction::sum_int, cb);
184
185 #endif
186
187 }
188
189 void pathHistoryManager::criticalPathProjectionsDone(CkReductionMsg *msg){
190 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
191   CkPrintf("[%d] All PEs have received the critical path information. Sending critical path to user supplied callback.\n", CkMyPe());
192   pathForUser->cb.send(pathForUser);
193   pathForUser = NULL;
194 #endif
195 }
196
197
198
199
200 /// An interface callable by the application.
201 void useThisCriticalPathForPriorities(){
202 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
203   pathHistoryManagerProxy.ckLocalBranch()->useCriticalPathForPriories();
204 #endif
205 }
206
207
208 /// Callable from inside charm++ delivery mechanisms (after envelope contains epIdx):
209 void automaticallySetMessagePriority(envelope *env){
210   #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
211
212 #if DEBUG
213   if(env->getPriobits() == 8*sizeof(int)){
214     CkPrintf("[%d] priorities for env=%p are integers\n", CkMyPe(), env);
215   } else if(env->getPriobits() == 0) {
216     CkPrintf("[%d] priorities for env=%p are not allocated in message\n", CkMyPe(), env);
217   } else {
218     CkPrintf("[%d] priorities for env=%p are not integers: %d priobits\n", CkMyPe(), env, env->getPriobits());
219   }
220 #endif
221   
222   
223   const std::map< std::pair<int,int>, int> & criticalPathForPriorityCounts = pathHistoryManagerProxy.ckLocalBranch()->getCriticalPathForPriorityCounts();
224   
225   if(criticalPathForPriorityCounts.size() > 0 && env->getPriobits() == 8*sizeof(int)) {
226     
227     switch(env->getMsgtype()) {
228     case ForArrayEltMsg:
229     case ForChareMsg:
230       {        
231         const int ep = env->getsetArrayEp();
232         const int arr = env->getArrayMgrIdx();
233         
234         const std::pair<int,int> k = std::make_pair(arr, ep);
235         const int count = criticalPathForPriorityCounts.count(k);
236         
237 #if DEBUG
238         CkPrintf("[%d] destination array,ep occurs %d times along stored critical path\n", CkMyPe(), count);
239 #endif
240         
241         if(count > 0){
242           // Set the integer priority to high
243 #if DEBUG
244           CkPrintf("Prio auto high\n");
245 #endif
246           *(int*)(env->getPrioPtr()) = -5;
247         } else {
248           // Set the integer priority to low
249 #if DEBUG
250           CkPrintf("Prio auto low: %d,%d\n", arr, ep);
251 #endif
252           *(int*)(env->getPrioPtr()) = 5;
253         }
254         
255       }
256       break;
257       
258     case ForNodeBocMsg:
259       CkPrintf("Can't Critical Path Autoprioritize a ForNodeBocMsg\n");    
260       break;
261       
262     case ForBocMsg:
263       CkPrintf("Can't Critical Path Autoprioritize a ForBocMsg\n");
264       break;
265       
266     case ArrayEltInitMsg:
267       // Don't do anything special with these
268       CkPrintf("Can't Critical Path Autoprioritize a ArrayEltInitMsg\n");
269       break;
270       
271     default:
272       CkPrintf("Can't Critical Path Autoprioritize messages of [unknown type]\n");
273       break;
274     }
275       
276   }
277
278 #endif
279 }
280
281
282
283 void pathHistoryManager::useCriticalPathForPriories(){
284 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
285
286   // Request a critical path that will be stored everywhere for future use in autotuning message priorities
287   
288   // The resulting critical path should be broadcast to saveCriticalPathForPriorities() on all PEs
289   CkCallback cb(CkIndex_pathHistoryManager::saveCriticalPathForPriorities(NULL),thisProxy); 
290   traceCriticalPathBack(cb, false);
291 #endif  
292 }
293
294
295
296 void pathHistoryManager::saveCriticalPathForPriorities(pathInformationMsg *msg){
297 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
298
299   CkPrintf("[%d] saveCriticalPathForPriorities() Receiving critical paths\n", CkMyPe());
300   fflush(stdout);
301   
302   criticalPathForPriorityCounts.clear();
303   
304   // Save a list of which entries are along the critical path
305   for(int i=msg->historySize-1;i>=0;i--){
306     
307     PathHistoryTableEntry &e = msg->history[i];
308     
309 #if DEBUG
310     if(CkMyPe() == 0){
311       CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, e.get_local_path_time(), msg-> history[i].local_arr, e.local_ep, e.get_start_time(), e.get_preceding_path_time(), e.local_pe);
312     }
313 #endif
314     
315     const std::pair<int,int> k = std::make_pair(e.local_arr, e.local_ep);
316     if(criticalPathForPriorityCounts.count(k) == 1)
317       criticalPathForPriorityCounts[k]++;
318     else
319       criticalPathForPriorityCounts[k] = 1;  
320   }
321   
322
323
324
325   // print out the list just for debugging purposes
326   if(CkMyPe() == 0){
327     std::map< std::pair<int,int>, int>::iterator iter;
328     for(iter=criticalPathForPriorityCounts.begin();iter!=criticalPathForPriorityCounts.end();++iter){
329       const std::pair<int,int> k = iter->first;
330       const int c = iter->second;
331
332       CkPrintf("[%d] On critical path EP %d,%d occurs %d times\n", CkMyPe(), k.first, k.second, c);
333
334     }
335   }
336
337 #endif
338 }
339
340
341
342
343
344
345
346
347
348
349
350 void initializeCriticalPath(void){
351 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
352   CkpvInitialize(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
353   CkpvInitialize(double, timeEntryMethodStarted);
354   CkpvAccess(timeEntryMethodStarted) = 0.0;
355
356
357   CkpvInitialize(PathHistoryTableType, pathHistoryTable);
358   CkpvInitialize(int, pathHistoryTableLastIdx);
359   CkpvAccess(pathHistoryTableLastIdx) = 0;
360 #endif
361 }
362
363
364
365
366
367
368
369
370
371
372 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
373 // The PathHistoryEnvelope class is disabled if USE_CRITICAL_PATH_HEADER_ARRAY isn't defined
374 void PathHistoryEnvelope::reset(){
375   totalTime = 0.0;
376   sender_history_table_idx = -1;
377 }
378
379   
380 void PathHistoryEnvelope::print() const {
381   CkPrintf("print() is not implemented\n");
382 }
383
384 /// Write a description of the path into the beginning of the provided buffer. The buffer ought to be large enough.
385
386   
387 void PathHistoryEnvelope::incrementTotalTime(double time){
388   totalTime += time;
389 }
390
391
392
393 void PathHistoryEnvelope::setDebug100(){
394   totalTime = 100.0;   
395 }
396
397
398
399 /// Add an entry for this path history into the table, and write the corresponding information into the outgoing envelope
400 int PathHistoryTableEntry::addToTableAndEnvelope(envelope *env){
401   // Add to table
402   int new_idx = addToTable();
403
404   // Fill in outgoing envelope
405   CkAssert(env != NULL);
406   env->pathHistory.set_sender_history_table_idx(new_idx);
407   env->pathHistory.setTime(local_path_time + preceding_path_time);
408
409 #if 0
410   // Create a user event for projections
411   char *note = new char[4096];
412   sprintf(note, "addToTableAndEnvelope<br> ");
413   env->pathHistory.printHTMLToString(note+strlen(note));
414   traceUserSuppliedNote(note); // stores a copy of the string
415   delete[] note;
416 #endif  
417
418   return new_idx;
419 }
420   
421
422 /// Add an entry for this path history into the table. Returns the index in the table for it.
423 int PathHistoryTableEntry::addToTable(){
424   int new_idx = CkpvAccess(pathHistoryTableLastIdx) ++;
425   CkpvAccess(pathHistoryTable)[new_idx] = *this;
426   return new_idx;
427 }
428 #endif
429
430
431
432   
433 void resetThisEntryPath(void) {
434   CkpvAccess(currentlyExecutingPath).reset();
435 }
436
437
438 /// A debugging routine that outputs critical path info as user events.
439 void  saveCurrentPathAsUserEvent(char* prefix){
440   if(CkpvAccess(currentlyExecutingPath).getTotalTime() > 0.0){
441     //traceUserEvent(5020);
442
443 #if 0
444     char *note = new char[4096];
445     sprintf(note, "%s<br> saveCurrentPathAsUserEvent()<br> ", prefix);
446     CkpvAccess(currentlyExecutingPath).printHTMLToString(note+strlen(note));
447     traceUserSuppliedNote(note); // stores a copy of the string
448     delete[] note;
449 #endif
450
451   } else {
452 #if 0
453     traceUserEvent(5010);
454 #endif
455   }
456  
457 }
458
459
460 void setCurrentlyExecutingPathTo100(void){
461   CkpvAccess(currentlyExecutingPath).setDebug100();
462 }
463
464
465 /// Acquire the critical path and deliver it to the user supplied callback
466 void traceCriticalPathBack(CkCallback cb, bool saveToProjectionsTraces){
467 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
468   pathInformationMsg *newmsg = new(0) pathInformationMsg;
469   newmsg->historySize = 0;
470   newmsg->cb = cb;
471   newmsg->saveAsProjectionsUserEvents = saveToProjectionsTraces;
472   newmsg->table_idx = CkpvAccess(currentlyExecutingPath).sender_history_table_idx;
473   int pe = CkpvAccess(currentlyExecutingPath).sender_pe;
474   CkPrintf("Starting tracing of critical path from pe=%d table_idx=%d\n", pe,  CkpvAccess(currentlyExecutingPath).sender_history_table_idx);
475   CkAssert(pe < CkNumPes() && pe >= 0);
476   pathHistoryManagerProxy[pe].traceCriticalPathBackStepByStep(newmsg);
477 #else
478   pathInformationMsg * pathForUser = new(0) pathInformationMsg;  
479   pathForUser->historySize = 0;                                                                                        
480   pathForUser->cb = CkCallback();                                                                                                      
481   pathForUser->table_idx = -1;      
482   cb.send(pathForUser);  
483 #endif
484 }
485
486
487
488 // void  printPathInMsg(void* msg){
489 //   envelope *env = UsrToEnv(msg);
490 //   env->printPath();
491 // }
492
493
494
495 /// A debugging routine that prints the number of EPs for the program, and the size of the envelope's path fields
496 void  printEPInfo(){
497 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
498   CkPrintf("printEPInfo():\n");
499   CkPrintf("There are %d EPs\n", (int)_entryTable.size());
500   for (int epIdx=0;epIdx<_entryTable.size();epIdx++)
501     CkPrintf("EP %d is %s\n", epIdx, _entryTable[epIdx]->name);
502 #endif
503 }
504
505
506
507
508 /// Save information about the critical path contained in the message that is about to execute.
509 void criticalPath_start(envelope * env){ 
510 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
511 #if DEBUG
512   CkPrintf("criticalPath_start(envelope * env) srcpe=%d sender table idx=%d  time=%lf\n", env->getSrcPe(),  env->pathHistory.get_sender_history_table_idx(), env->pathHistory.getTotalTime() );
513 #endif
514
515   CkpvAccess(currentlyExecutingPath).sender_pe = env->getSrcPe();
516   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = env->pathHistory.get_sender_history_table_idx();
517   CkpvAccess(currentlyExecutingPath).preceding_path_time = env->pathHistory.getTotalTime();
518
519   CkpvAccess(currentlyExecutingPath).sanity_check();
520   
521   CkpvAccess(currentlyExecutingPath).local_ep  = -1;
522   CkpvAccess(currentlyExecutingPath).local_arr = -1;
523
524   double now = CmiWallTimer();
525   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
526   CkpvAccess(timeEntryMethodStarted) = now;
527
528   switch(env->getMsgtype()) {
529   case ForArrayEltMsg:
530     //    CkPrintf("Critical Path Detection handling a ForArrayEltMsg\n");    
531     CkpvAccess(currentlyExecutingPath).local_ep = env->getsetArrayEp();
532     CkpvAccess(currentlyExecutingPath).local_arr = env->getArrayMgrIdx();
533     
534     break;
535
536   case ForNodeBocMsg:
537     //CkPrintf("Critical Path Detection handling a ForNodeBocMsg\n");    
538     break;
539
540   case ForChareMsg:
541     //CkPrintf("Critical Path Detection handling a ForChareMsg\n");        
542     break;
543
544   case ForBocMsg:
545     //CkPrintf("Critical Path Detection handling a ForBocMsg\n");    
546     break;
547
548   case ArrayEltInitMsg:
549     // Don't do anything special with these
550     break;
551
552   default:
553     CkPrintf("Critical Path Detection can't yet handle message type %d\n", (int)env->getMsgtype());
554   }
555   
556   
557
558   saveCurrentPathAsUserEvent("criticalPath_start()<br> ");
559
560
561 #endif
562 }
563
564
565 /// Modify the envelope of a message that is being sent for critical path detection and store an entry in a table on this PE.
566 void criticalPath_send(envelope * sendingEnv){
567 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
568 #if DEBUG
569   CkPrintf("criticalPath_send(envelope * sendingEnv)\n");
570 #endif
571   double now = CmiWallTimer();
572   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), CkpvAccess(timeEntryMethodStarted), now);
573   entry.addToTableAndEnvelope(sendingEnv);
574   
575 #endif
576 }
577
578
579 /// Handle the end of the entry method in the critical path detection processes. This should create a forward dependency for the object.
580 void criticalPath_end(){
581 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
582   saveCurrentPathAsUserEvent("criticalPath_end()<br> ");
583
584   CkpvAccess(currentlyExecutingPath).reset();
585
586 #if DEBUG
587   CkPrintf("criticalPath_end()\n");
588 #endif
589
590 #endif
591 }
592
593
594
595 /// Split an entry method invocation into multiple logical tasks for the critical path analysis.
596 /// SDAG doen's break up the code in useful ways, so I'll make it add calls to this in the generated code.
597 void criticalPath_split(){
598 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
599   saveCurrentPathAsUserEvent("criticalPath_split()<br> ");
600
601   // save an entry in the table
602   double now = CmiWallTimer();
603   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), now-CkpvAccess(timeEntryMethodStarted) );
604   int tableidx = entry.addToTable();
605
606   // end the old task
607
608   
609   // start the new task
610   CkpvAccess(currentlyExecutingPath).sender_pe = CkMyPe();
611   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = tableidx;
612   CkpvAccess(currentlyExecutingPath).preceding_path_time = entry.getTotalTime();
613   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
614   CkpvAccess(timeEntryMethodStarted) = now;
615 #endif
616 }
617
618
619
620
621
622 #include "PathHistory.def.h"
623
624 /*! @} */
625
626 #endif