Adding support for automatic message prioritization.
[charm.git] / src / ck-cp / pathHistory.C
1 #include <charm++.h>
2 #include <iostream>
3 #include <fstream>
4 #include <string>
5 #include <map>
6 #include <set>
7 #include <vector>
8 #include <utility>
9
10 #include "PathHistory.decl.h"
11 #include "LBDatabase.h"
12 #include "pathHistory.h"
13 //#include "controlPoints.h"
14 //#include "arrayRedistributor.h"
15 #include <register.h> // for _entryTable
16
17 #include "trace-projections.h"
18
19
20 /**
21  *  \addtogroup CriticalPathFramework
22  *   @{
23  *
24  */
25
26
27
28 /*readonly*/ CProxy_pathHistoryManager pathHistoryManagerProxy;
29
30 CkpvDeclare(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
31 CkpvDeclare(double, timeEntryMethodStarted);
32
33 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
34 /** A table to store all the local nodes in the parallel dependency graph */
35 CkpvDeclare(PathHistoryTableType, pathHistoryTable);
36 /** A counter that defines the new keys for the entries in the pathHistoryTable */
37 CkpvDeclare(int, pathHistoryTableLastIdx);
38 #endif
39
40
41 /// A mainchare that is used just to create a group at startup
42 class pathHistoryMain : public CBase_pathHistoryMain {
43 public:
44   pathHistoryMain(CkArgMsg* args){
45 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
46     pathHistoryManagerProxy = CProxy_pathHistoryManager::ckNew();
47 #endif
48   }
49   ~pathHistoryMain(){}
50 };
51
52
53 pathHistoryManager::pathHistoryManager(){
54
55 }
56
57
58   /** Trace perform a traversal backwards over the critical path specified as a 
59       table index for the processor upon which this is called.
60       
61       The callback cb will be called with the resulting msg after the path has 
62       been traversed to its origin.  
63   */
64  void pathHistoryManager::traceCriticalPathBackStepByStep(pathInformationMsg *msg){
65 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
66    int count = CkpvAccess(pathHistoryTable).count(msg->table_idx);
67
68 #if DEBUGPRINT > 2
69    CkPrintf("Table entry %d on pe %d occurs %d times in table\n", msg->table_idx, CkMyPe(), count);
70 #endif
71    CkAssert(count==0 || count==1);
72
73     if(count > 0){ 
74       PathHistoryTableEntry & path = CkpvAccess(pathHistoryTable)[msg->table_idx];
75       int idx = path.sender_history_table_idx;
76       int pe = path.sender_pe;
77
78 #if DEBUGPRINT > 2
79       CkPrintf("Table entry %d on pe %d points to pe=%d idx=%d\n", msg->table_idx, CkMyPe(), pe, idx);
80 #endif
81
82       // Make a copy of the message as we forward it along
83       pathInformationMsg *newmsg = new(msg->historySize+1) pathInformationMsg;
84       for(int i=0;i<msg->historySize;i++){
85         newmsg->history[i] = msg->history[i];
86       }
87       newmsg->history[msg->historySize] = path;
88       newmsg->historySize = msg->historySize+1;
89       newmsg->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
90       newmsg->cb = msg->cb;
91       newmsg->table_idx = idx;
92         
93       if(idx > -1 && pe > -1){
94         // Not yet at origin, keep tracing the path back
95         CkAssert(pe < CkNumPes() && pe >= 0);
96         thisProxy[pe].traceCriticalPathBackStepByStep(newmsg);
97       } else {
98         CkPrintf("Traced critical path back to its origin.\n");
99         if(msg->saveAsProjectionsUserEvents){
100
101           // Keep a message for returning to the user's callback
102           pathForUser = new(msg->historySize+1) pathInformationMsg;
103           for(int i=0;i<msg->historySize;i++){
104             pathForUser->history[i] = msg->history[i];
105           }
106           pathForUser->history[msg->historySize] = path;
107           pathForUser->historySize = msg->historySize+1;
108           pathForUser->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
109           pathForUser->cb = msg->cb;
110           pathForUser->table_idx = idx;
111           
112           CkPrintf("Broadcasting it to all PE\n");
113           thisProxy.broadcastCriticalPathProjections(newmsg);
114         } else {
115           newmsg->cb.send(newmsg);
116         }
117
118       }
119     } else {
120       CkAbort("ERROR: Traced critical path back to a nonexistent table entry.\n");
121     }
122
123     delete msg;
124 #else
125     CkAbort("Shouldn't call pathHistoryManager::traceCriticalPathBack when critical path detection is not enabled");
126 #endif
127   }
128
129
130 void pathHistoryManager::broadcastCriticalPathProjections(pathInformationMsg *msg){
131
132 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
133   CkPrintf("[%d] Received broadcast of critical path\n", CkMyPe());
134   int me = CkMyPe();
135   int intersectsLocalPE = false;
136
137   // Create user events for critical path
138
139   for(int i=msg->historySize-1;i>=0;i--){
140     if(CkMyPe() == msg->history[i].local_pe){
141       // Part of critical path is local
142       // Create user event for it
143
144       //      CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, msg->history[i].get_local_path_time(), msg-> history[i].local_arr, msg->history[i].local_ep, msg->history[i].get_start_time(), msg->history[i].get_preceding_path_time(), msg->history[i].local_pe);
145       traceUserBracketEvent(32000, msg->history[i].get_start_time(), msg->history[i].get_start_time() + msg->history[i].get_local_path_time());
146
147       intersectsLocalPE = true;
148     }
149
150   }
151
152   traceRegisterUserEvent("Critical Path", 32000);
153   
154   
155 #define PRUNE_CRITICAL_PATH_LOGS 0
156
157 #if PRUNE_CRITICAL_PATH_LOGS
158   // Tell projections tracing to only output log entries if I contain part of the critical path
159
160   enableTraceLogOutput();
161   if(! intersectsLocalPE){
162     disableTraceLogOutput();
163     CkPrintf("PE %d doesn't intersect the critical path, so its log files won't be created\n", CkMyPe() );
164   }
165 #endif
166   
167 #if TRACE_ALL_PATH_TABLE_ENTRIES
168   // Create user events for all table entries
169   std::map< int, PathHistoryTableEntry >::iterator iter;
170   for(iter=pathHistoryTable.begin(); iter != pathHistoryTable.end(); iter++){
171     double startTime = iter->second.get_start_time();
172     double endTime = iter->second.get_start_time() + iter->second.get_local_path_time();
173     traceUserBracketEvent(32001, startTime, endTime);
174   }
175 #endif
176
177   int data=1;
178   CkCallback cb(CkIndex_pathHistoryManager::criticalPathProjectionsDone(NULL),thisProxy[0]); 
179   contribute(sizeof(int), &data, CkReduction::sum_int, cb);
180
181 #endif
182
183 }
184
185 void pathHistoryManager::criticalPathProjectionsDone(CkReductionMsg *msg){
186 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
187   CkPrintf("[%d] All PEs have received the critical path information. Sending critical path to user supplied callback.\n", CkMyPe());
188   pathForUser->cb.send(pathForUser);
189   pathForUser = NULL;
190 #endif
191 }
192
193
194
195
196 /// An interface callable by the application.
197 void useThisCriticalPathForPriorities(){
198   pathHistoryManagerProxy.ckLocalBranch()->useCriticalPathForPriories();
199 }
200
201
202 /// Callable from inside charm++ delivery mechanisms (after envelope contains epIdx):
203 void automaticallySetMessagePriority(envelope *env){
204   
205 #if DEBUG
206   if(env->getPriobits() == 8*sizeof(int)){
207     CkPrintf("[%d] priorities for env=%p are integers\n", CkMyPe(), env);
208   } else if(env->getPriobits() == 0) {
209     CkPrintf("[%d] priorities for env=%p are not allocated in message\n", CkMyPe(), env);
210   } else {
211     CkPrintf("[%d] priorities for env=%p are not integers: %d priobits\n", CkMyPe(), env, env->getPriobits());
212   }
213 #endif
214   
215   
216   const std::map< std::pair<int,int>, int> & criticalPathForPriorityCounts = pathHistoryManagerProxy.ckLocalBranch()->getCriticalPathForPriorityCounts();
217   
218   if(criticalPathForPriorityCounts.size() > 0 && env->getPriobits() == 8*sizeof(int)) {
219     
220     switch(env->getMsgtype()) {
221     case ForArrayEltMsg:
222       {        
223         const int ep = env->getsetArrayEp();
224         const int arr = env->getArrayMgrIdx();
225         
226         const std::pair<int,int> k = std::make_pair(arr, ep);
227         const int count = criticalPathForPriorityCounts.count(k);
228         
229 #if DEBUG
230         CkPrintf("[%d] destination array,ep occurs %d times along stored critical path\n", CkMyPe(), count);
231 #endif
232         
233         if(count > 0 && env->getPriobits() == 8*sizeof(int)){
234           // Set the integer priority to high
235           CkPrintf("Prio auto high\n");
236           *(int*)(env->getPrioPtr()) = -5;
237         }  else if ( env->getPriobits() == 8*sizeof(int)){
238           // Set the integer priority to low
239           CkPrintf("Prio auto low: %d,%d\n", arr, ep);
240           *(int*)(env->getPrioPtr()) = 5;
241         }
242         
243       }
244       break;
245       
246     case ForNodeBocMsg:
247       CkPrintf("Can't Critical Path Autoprioritize a ForNodeBocMsg\n");    
248       break;
249       
250     case ForChareMsg:
251       CkPrintf("Can't Critical Path Autoprioritize a ForChareMsg\n");
252       break;
253       
254     case ForBocMsg:
255       CkPrintf("Can't Critical Path Autoprioritize a ForBocMsg\n");
256       break;
257       
258     case ArrayEltInitMsg:
259       // Don't do anything special with these
260       CkPrintf("Can't Critical Path Autoprioritize a ArrayEltInitMsg\n");
261       break;
262       
263     default:
264       CkPrintf("Can't Critical Path Autoprioritize messages of [unknown type]\n");
265       break;
266     }
267       
268   }
269 }
270
271
272
273 void pathHistoryManager::useCriticalPathForPriories(){
274   // Request a critical path that will be stored everywhere for future use in autotuning message priorities
275   
276   // The resulting critical path should be broadcast to saveCriticalPathForPriorities() on all PEs
277   CkCallback cb(CkIndex_pathHistoryManager::saveCriticalPathForPriorities(NULL),thisProxy); 
278   traceCriticalPathBack(cb, false);
279   
280 }
281
282
283
284 void pathHistoryManager::saveCriticalPathForPriorities(pathInformationMsg *msg){
285 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
286
287   CkPrintf("[%d] saveCriticalPathForPriorities() Receiving critical paths\n", CkMyPe());
288   fflush(stdout);
289   
290   criticalPathForPriorityCounts.clear();
291   
292   // Save a list of which entries are along the critical path
293   for(int i=msg->historySize-1;i>=0;i--){
294     
295     PathHistoryTableEntry &e = msg->history[i];
296     
297 #if DEBUG
298     if(CkMyPe() == 0){
299       CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, e.get_local_path_time(), msg-> history[i].local_arr, e.local_ep, e.get_start_time(), e.get_preceding_path_time(), e.local_pe);
300     }
301 #endif
302     
303     const std::pair<int,int> k = std::make_pair(e.local_arr, e.local_ep);
304     if(criticalPathForPriorityCounts.count(k) == 1)
305       criticalPathForPriorityCounts[k]++;
306     else
307       criticalPathForPriorityCounts[k] = 1;  
308   }
309   
310
311
312
313   // print out the list just for debugging purposes
314   if(CkMyPe() == 0){
315     std::map< std::pair<int,int>, int>::iterator iter;
316     for(iter=criticalPathForPriorityCounts.begin();iter!=criticalPathForPriorityCounts.end();++iter){
317       const std::pair<int,int> k = iter->first;
318       const int c = iter->second;
319
320       CkPrintf("[%d] On critical path EP %d,%d occurrs %d times\n", CkMyPe(), k.first, k.second, c);
321
322     }
323   }
324
325 #endif
326 }
327
328
329
330
331
332
333
334
335
336
337
338 void initializeCriticalPath(void){
339 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
340   CkpvInitialize(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
341   CkpvInitialize(double, timeEntryMethodStarted);
342   CkpvAccess(timeEntryMethodStarted) = 0.0;
343
344
345   CkpvInitialize(PathHistoryTableType, pathHistoryTable);
346   CkpvInitialize(int, pathHistoryTableLastIdx);
347   CkpvAccess(pathHistoryTableLastIdx) = 0;
348 #endif
349 }
350
351
352
353
354
355
356
357
358
359
360 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
361 // The PathHistoryEnvelope class is disabled if USE_CRITICAL_PATH_HEADER_ARRAY isn't defined
362 void PathHistoryEnvelope::reset(){
363   totalTime = 0.0;
364   sender_history_table_idx = -1;
365 }
366
367   
368 void PathHistoryEnvelope::print() const {
369   CkPrintf("print() is not implemented\n");
370 }
371
372 /// Write a description of the path into the beginning of the provided buffer. The buffer ought to be large enough.
373
374   
375 void PathHistoryEnvelope::incrementTotalTime(double time){
376   totalTime += time;
377 }
378
379
380
381 void PathHistoryEnvelope::setDebug100(){
382   totalTime = 100.0;   
383 }
384
385
386
387 /// Add an entry for this path history into the table, and write the corresponding information into the outgoing envelope
388 int PathHistoryTableEntry::addToTableAndEnvelope(envelope *env){
389   // Add to table
390   int new_idx = addToTable();
391
392   // Fill in outgoing envelope
393   CkAssert(env != NULL);
394   env->pathHistory.set_sender_history_table_idx(new_idx);
395   env->pathHistory.setTime(local_path_time + preceding_path_time);
396
397 #if 0
398   // Create a user event for projections
399   char *note = new char[4096];
400   sprintf(note, "addToTableAndEnvelope<br> ");
401   env->pathHistory.printHTMLToString(note+strlen(note));
402   traceUserSuppliedNote(note); // stores a copy of the string
403   delete[] note;
404 #endif  
405
406   return new_idx;
407 }
408   
409
410 /// Add an entry for this path history into the table. Returns the index in the table for it.
411 int PathHistoryTableEntry::addToTable(){
412   int new_idx = CkpvAccess(pathHistoryTableLastIdx) ++;
413   CkpvAccess(pathHistoryTable)[new_idx] = *this;
414   return new_idx;
415 }
416 #endif
417
418
419
420   
421 void resetThisEntryPath(void) {
422   CkpvAccess(currentlyExecutingPath).reset();
423 }
424
425
426 /// A debugging routine that outputs critical path info as user events.
427 void  saveCurrentPathAsUserEvent(char* prefix){
428   if(CkpvAccess(currentlyExecutingPath).getTotalTime() > 0.0){
429     //traceUserEvent(5020);
430
431 #if 0
432     char *note = new char[4096];
433     sprintf(note, "%s<br> saveCurrentPathAsUserEvent()<br> ", prefix);
434     CkpvAccess(currentlyExecutingPath).printHTMLToString(note+strlen(note));
435     traceUserSuppliedNote(note); // stores a copy of the string
436     delete[] note;
437 #endif
438
439   } else {
440 #if 0
441     traceUserEvent(5010);
442 #endif
443   }
444  
445 }
446
447
448 void setCurrentlyExecutingPathTo100(void){
449   CkpvAccess(currentlyExecutingPath).setDebug100();
450 }
451
452
453 /// Acquire the critical path and deliver it to the user supplied callback
454 void traceCriticalPathBack(CkCallback cb, bool saveToProjectionsTraces){
455 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
456   pathInformationMsg *newmsg = new(0) pathInformationMsg;
457   newmsg->historySize = 0;
458   newmsg->cb = cb;
459   newmsg->saveAsProjectionsUserEvents = saveToProjectionsTraces;
460   newmsg->table_idx = CkpvAccess(currentlyExecutingPath).sender_history_table_idx;
461   int pe = CkpvAccess(currentlyExecutingPath).sender_pe;
462   CkPrintf("Starting tracing of critical path from pe=%d table_idx=%d\n", pe,  CkpvAccess(currentlyExecutingPath).sender_history_table_idx);
463   CkAssert(pe < CkNumPes() && pe >= 0);
464   pathHistoryManagerProxy[pe].traceCriticalPathBackStepByStep(newmsg);
465 #else
466   pathInformationMsg * pathForUser = new(0) pathInformationMsg;  
467   pathForUser->historySize = 0;                                                                                        
468   pathForUser->cb = CkCallback();                                                                                                      
469   pathForUser->table_idx = -1;      
470   cb.send(pathForUser);  
471 #endif
472 }
473
474
475
476 // void  printPathInMsg(void* msg){
477 //   envelope *env = UsrToEnv(msg);
478 //   env->printPath();
479 // }
480
481
482
483 /// A debugging routine that prints the number of EPs for the program, and the size of the envelope's path fields
484 void  printEPInfo(){
485 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
486   CkPrintf("printEPInfo():\n");
487   CkPrintf("There are %d EPs\n", (int)_entryTable.size());
488   for (int epIdx=0;epIdx<_entryTable.size();epIdx++)
489     CkPrintf("EP %d is %s\n", epIdx, _entryTable[epIdx]->name);
490 #endif
491 }
492
493
494
495
496 /// Save information about the critical path contained in the message that is about to execute.
497 void criticalPath_start(envelope * env){ 
498 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
499 #if DEBUG
500   CkPrintf("criticalPath_start(envelope * env) srcpe=%d sender table idx=%d  time=%lf\n", env->getSrcPe(),  env->pathHistory.get_sender_history_table_idx(), env->pathHistory.getTotalTime() );
501 #endif
502
503   CkpvAccess(currentlyExecutingPath).sender_pe = env->getSrcPe();
504   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = env->pathHistory.get_sender_history_table_idx();
505   CkpvAccess(currentlyExecutingPath).preceding_path_time = env->pathHistory.getTotalTime();
506
507   CkpvAccess(currentlyExecutingPath).sanity_check();
508   
509   CkpvAccess(currentlyExecutingPath).local_ep  = -1;
510   CkpvAccess(currentlyExecutingPath).local_arr = -1;
511
512   double now = CmiWallTimer();
513   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
514   CkpvAccess(timeEntryMethodStarted) = now;
515
516   switch(env->getMsgtype()) {
517   case ForArrayEltMsg:
518     //    CkPrintf("Critical Path Detection handling a ForArrayEltMsg\n");    
519     CkpvAccess(currentlyExecutingPath).local_ep = env->getsetArrayEp();
520     CkpvAccess(currentlyExecutingPath).local_arr = env->getArrayMgrIdx();
521     
522     break;
523
524   case ForNodeBocMsg:
525     //CkPrintf("Critical Path Detection handling a ForNodeBocMsg\n");    
526     break;
527
528   case ForChareMsg:
529     //CkPrintf("Critical Path Detection handling a ForChareMsg\n");        
530     break;
531
532   case ForBocMsg:
533     //CkPrintf("Critical Path Detection handling a ForBocMsg\n");    
534     break;
535
536   case ArrayEltInitMsg:
537     // Don't do anything special with these
538     break;
539
540   default:
541     CkPrintf("Critical Path Detection can't yet handle message type %d\n", (int)env->getMsgtype());
542   }
543   
544   
545
546   saveCurrentPathAsUserEvent("criticalPath_start()<br> ");
547
548
549 #endif
550 }
551
552
553 /// Modify the envelope of a message that is being sent for critical path detection and store an entry in a table on this PE.
554 void criticalPath_send(envelope * sendingEnv){
555 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
556 #if DEBUG
557   CkPrintf("criticalPath_send(envelope * sendingEnv)\n");
558 #endif
559   double now = CmiWallTimer();
560   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), CkpvAccess(timeEntryMethodStarted), now);
561   entry.addToTableAndEnvelope(sendingEnv);
562   
563 #endif
564 }
565
566
567 /// Handle the end of the entry method in the critical path detection processes. This should create a forward dependency for the object.
568 void criticalPath_end(){
569 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
570   saveCurrentPathAsUserEvent("criticalPath_end()<br> ");
571
572   CkpvAccess(currentlyExecutingPath).reset();
573
574 #if DEBUG
575   CkPrintf("criticalPath_end()\n");
576 #endif
577
578 #endif
579 }
580
581
582
583 /// Split an entry method invocation into multiple logical tasks for the critical path analysis.
584 /// SDAG doen's break up the code in useful ways, so I'll make it add calls to this in the generated code.
585 void criticalPath_split(){
586 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
587   saveCurrentPathAsUserEvent("criticalPath_split()<br> ");
588
589   // save an entry in the table
590   double now = CmiWallTimer();
591   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), now-CkpvAccess(timeEntryMethodStarted) );
592   int tableidx = entry.addToTable();
593
594   // end the old task
595
596   
597   // start the new task
598   CkpvAccess(currentlyExecutingPath).sender_pe = CkMyPe();
599   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = tableidx;
600   CkpvAccess(currentlyExecutingPath).preceding_path_time = entry.getTotalTime();
601   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
602   CkpvAccess(timeEntryMethodStarted) = now;
603 #endif
604 }
605
606
607
608
609
610 #include "PathHistory.def.h"
611
612 /*! @} */
613