Adding new capability to automatically set message priorities based on critical path...
[charm.git] / src / ck-cp / pathHistory.C
1 #include <charm++.h>
2 #include <iostream>
3 #include <fstream>
4 #include <string>
5 #include <map>
6 #include <set>
7 #include <vector>
8 #include <utility>
9
10 #include "PathHistory.decl.h"
11 #include "LBDatabase.h"
12 #include "pathHistory.h"
13 //#include "controlPoints.h"
14 //#include "arrayRedistributor.h"
15 #include <register.h> // for _entryTable
16
17
18 /**
19  *  \addtogroup CriticalPathFramework
20  *   @{
21  *
22  */
23
24
25
26 /*readonly*/ CProxy_pathHistoryManager pathHistoryManagerProxy;
27
28 CkpvDeclare(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
29 CkpvDeclare(double, timeEntryMethodStarted);
30
31 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
32 /** A table to store all the local nodes in the parallel dependency graph */
33 CkpvDeclare(PathHistoryTableType, pathHistoryTable);
34 /** A counter that defines the new keys for the entries in the pathHistoryTable */
35 CkpvDeclare(int, pathHistoryTableLastIdx);
36 #endif
37
38
39 /// A mainchare that is used just to create a group at startup
40 class pathHistoryMain : public CBase_pathHistoryMain {
41 public:
42   pathHistoryMain(CkArgMsg* args){
43 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
44     pathHistoryManagerProxy = CProxy_pathHistoryManager::ckNew();
45 #endif
46   }
47   ~pathHistoryMain(){}
48 };
49
50
51 pathHistoryManager::pathHistoryManager(){
52
53 }
54
55
56   /** Trace perform a traversal backwards over the critical path specified as a 
57       table index for the processor upon which this is called.
58       
59       The callback cb will be called with the resulting msg after the path has 
60       been traversed to its origin.  
61   */
62  void pathHistoryManager::traceCriticalPathBackStepByStep(pathInformationMsg *msg){
63 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
64    int count = CkpvAccess(pathHistoryTable).count(msg->table_idx);
65
66 #if DEBUGPRINT > 2
67    CkPrintf("Table entry %d on pe %d occurs %d times in table\n", msg->table_idx, CkMyPe(), count);
68 #endif
69    CkAssert(count==0 || count==1);
70
71     if(count > 0){ 
72       PathHistoryTableEntry & path = CkpvAccess(pathHistoryTable)[msg->table_idx];
73       int idx = path.sender_history_table_idx;
74       int pe = path.sender_pe;
75
76 #if DEBUGPRINT > 2
77       CkPrintf("Table entry %d on pe %d points to pe=%d idx=%d\n", msg->table_idx, CkMyPe(), pe, idx);
78 #endif
79
80       // Make a copy of the message as we forward it along
81       pathInformationMsg *newmsg = new(msg->historySize+1) pathInformationMsg;
82       for(int i=0;i<msg->historySize;i++){
83         newmsg->history[i] = msg->history[i];
84       }
85       newmsg->history[msg->historySize] = path;
86       newmsg->historySize = msg->historySize+1;
87       newmsg->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
88       newmsg->cb = msg->cb;
89       newmsg->table_idx = idx;
90         
91       if(idx > -1 && pe > -1){
92         // Not yet at origin, keep tracing the path back
93         CkAssert(pe < CkNumPes() && pe >= 0);
94         thisProxy[pe].traceCriticalPathBackStepByStep(newmsg);
95       } else {
96         CkPrintf("Traced critical path back to its origin.\n");
97         if(msg->saveAsProjectionsUserEvents){
98
99           // Keep a message for returning to the user's callback
100           pathForUser = new(msg->historySize+1) pathInformationMsg;
101           for(int i=0;i<msg->historySize;i++){
102             pathForUser->history[i] = msg->history[i];
103           }
104           pathForUser->history[msg->historySize] = path;
105           pathForUser->historySize = msg->historySize+1;
106           pathForUser->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
107           pathForUser->cb = msg->cb;
108           pathForUser->table_idx = idx;
109           
110           CkPrintf("Broadcasting it to all PE\n");
111           thisProxy.broadcastCriticalPathProjections(newmsg);
112         } else {
113           newmsg->cb.send(newmsg);
114         }
115
116       }
117     } else {
118       CkAbort("ERROR: Traced critical path back to a nonexistent table entry.\n");
119     }
120
121     delete msg;
122 #else
123     CkAbort("Shouldn't call pathHistoryManager::traceCriticalPathBack when critical path detection is not enabled");
124 #endif
125   }
126
127
128 void pathHistoryManager::broadcastCriticalPathProjections(pathInformationMsg *msg){
129
130 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
131   CkPrintf("[%d] Received broadcast of critical path\n", CkMyPe());
132   int me = CkMyPe();
133   int intersectsLocalPE = false;
134
135   // Create user events for critical path
136
137   for(int i=msg->historySize-1;i>=0;i--){
138     if(CkMyPe() == msg->history[i].local_pe){
139       // Part of critical path is local
140       // Create user event for it
141
142       //      CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, msg->history[i].get_local_path_time(), msg-> history[i].local_arr, msg->history[i].local_ep, msg->history[i].get_start_time(), msg->history[i].get_preceding_path_time(), msg->history[i].local_pe);
143       traceUserBracketEvent(32000, msg->history[i].get_start_time(), msg->history[i].get_start_time() + msg->history[i].get_local_path_time());
144       intersectsLocalPE = true;
145     }
146
147   }
148   
149
150 #if PRUNE_CRITICAL_PATH_LOGS
151   // Tell projections tracing to only output log entries if I contain part of the critical path
152   enableTraceLogOutput();
153   if(! intersectsLocalPE){
154     disableTraceLogOutput();
155     CkPrintf("PE %d doesn't intersect the critical path, so its log files won't be created\n", CkMyPe() );
156   }
157 #endif
158   
159 #if TRACE_ALL_PATH_TABLE_ENTRIES
160   // Create user events for all table entries
161   std::map< int, PathHistoryTableEntry >::iterator iter;
162   for(iter=pathHistoryTable.begin(); iter != pathHistoryTable.end(); iter++){
163     double startTime = iter->second.get_start_time();
164     double endTime = iter->second.get_start_time() + iter->second.get_local_path_time();
165     traceUserBracketEvent(32001, startTime, endTime);
166   }
167 #endif
168
169   int data=1;
170   CkCallback cb(CkIndex_pathHistoryManager::criticalPathProjectionsDone(NULL),thisProxy[0]); 
171   contribute(sizeof(int), &data, CkReduction::sum_int, cb);
172
173 #endif
174
175 }
176
177 void pathHistoryManager::criticalPathProjectionsDone(CkReductionMsg *msg){
178 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
179   CkPrintf("[%d] All PEs have received the critical path information. Sending critical path to user supplied callback.\n", CkMyPe());
180   pathForUser->cb.send(pathForUser);
181   pathForUser = NULL;
182 #endif
183 }
184
185
186
187
188 /// An interface callable by the application.
189 void useThisCriticalPathForPriorities(){
190   pathHistoryManagerProxy.ckLocalBranch()->useCriticalPathForPriories();
191 }
192
193
194 /// Callable from inside charm++ delivery mechanisms (after envelope contains epIdx):
195 void automaticallySetMessagePriority(envelope *env){
196
197   if(env->getPriobits() == 8*sizeof(int)){
198     CkPrintf("[%d] priorities for env=%p are integers\n", CkMyPe(), env);
199   } else if(env->getPriobits() == 0) {
200     CkPrintf("[%d] priorities for env=%p are not allocated in message\n", CkMyPe(), env);
201   } else {
202     CkPrintf("[%d] priorities for env=%p are not integers: %d priobits\n", CkMyPe(), env, env->getPriobits());
203   }
204
205   const int ep(env->getEpIdx());
206   const int arr(env->getArrayMgrIdx());
207
208   const std::pair<int,int> k = std::make_pair(arr, ep);
209
210   const std::map< std::pair<int,int>, int> & criticalPathForPriorityCounts = pathHistoryManagerProxy.ckLocalBranch()->getCriticalPathForPriorityCounts();
211   const int count = criticalPathForPriorityCounts.count(k);
212   CkPrintf("[%d] destination array,ep occurs %d times along stored critical path\n", CkMyPe(), count);
213
214   if(count > 0 && env->getPriobits() == 8*sizeof(int)){
215     // Set the integer priority to high
216     *(int*)(env->getPrioPtr()) = -5;
217   }  else if ( env->getPriobits() == 8*sizeof(int)){
218     // Set the integer priority to low
219     *(int*)(env->getPrioPtr()) = 5;
220   }
221   
222 }
223
224
225
226 void pathHistoryManager::useCriticalPathForPriories(){
227   // Request a critical path that will be stored everywhere for future use in autotuning message priorities
228  
229   // The resulting critical path should be broadcast to saveCriticalPathForPriorities() on all PEs
230   CkCallback cb(CkIndex_pathHistoryManager::saveCriticalPathForPriorities(NULL),thisProxy); 
231   traceCriticalPathBack(cb, false);
232   
233 }
234
235
236
237 void pathHistoryManager::saveCriticalPathForPriorities(pathInformationMsg *msg){
238 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
239
240   CkPrintf("[%d] saveCriticalPathForPriorities() Receiving critical paths\n", CkMyPe());
241   fflush(stdout);
242   
243   criticalPathForPriorityCounts.clear();
244
245   // Save a list of which entries are along the critical path
246   for(int i=msg->historySize-1;i>=0;i--){
247
248     PathHistoryTableEntry &e = msg->history[i];
249
250 #if DEBUG
251   if(CkMyPe() == 0){
252     CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, e.get_local_path_time(), msg-> history[i].local_arr, e.local_ep, e.get_start_time(), e.get_preceding_path_time(), e.local_pe);
253   }
254 #endif
255       
256   const std::pair<int,int> k = std::make_pair(e.local_arr, e.local_ep);
257     if(criticalPathForPriorityCounts.count(k) == 1)
258       criticalPathForPriorityCounts[k]++;
259     else
260       criticalPathForPriorityCounts[k] = 1;  
261   }
262
263
264   // print out the list just for debugging purposes
265   if(CkMyPe() == 0){
266     std::map< std::pair<int,int>, int>::iterator iter;
267     for(iter=criticalPathForPriorityCounts.begin();iter!=criticalPathForPriorityCounts.end();++iter){
268       const std::pair<int,int> k = iter->first;
269       const int c = iter->second;
270
271       CkPrintf("[%d] Found on critical path EP %d,%d occuring %d times\n", CkMyPe(), k.first, k.second, c);
272
273     }
274     
275   }
276 #endif
277 }
278
279
280
281
282
283
284
285
286
287
288
289 void initializeCriticalPath(void){
290 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
291   CkpvInitialize(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
292   CkpvInitialize(double, timeEntryMethodStarted);
293   CkpvAccess(timeEntryMethodStarted) = 0.0;
294
295
296   CkpvInitialize(PathHistoryTableType, pathHistoryTable);
297   CkpvInitialize(int, pathHistoryTableLastIdx);
298   CkpvAccess(pathHistoryTableLastIdx) = 0;
299 #endif
300 }
301
302
303
304
305
306
307
308
309
310
311 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
312 // The PathHistoryEnvelope class is disabled if USE_CRITICAL_PATH_HEADER_ARRAY isn't defined
313 void PathHistoryEnvelope::reset(){
314   totalTime = 0.0;
315   sender_history_table_idx = -1;
316 }
317
318   
319 void PathHistoryEnvelope::print() const {
320   CkPrintf("print() is not implemented\n");
321 }
322
323 /// Write a description of the path into the beginning of the provided buffer. The buffer ought to be large enough.
324
325   
326 void PathHistoryEnvelope::incrementTotalTime(double time){
327   totalTime += time;
328 }
329
330
331
332 void PathHistoryEnvelope::setDebug100(){
333   totalTime = 100.0;   
334 }
335
336
337
338 /// Add an entry for this path history into the table, and write the corresponding information into the outgoing envelope
339 int PathHistoryTableEntry::addToTableAndEnvelope(envelope *env){
340   // Add to table
341   int new_idx = addToTable();
342
343   // Fill in outgoing envelope
344   CkAssert(env != NULL);
345   env->pathHistory.set_sender_history_table_idx(new_idx);
346   env->pathHistory.setTime(local_path_time + preceding_path_time);
347
348   // Create a user event for projections
349   char *note = new char[4096];
350   sprintf(note, "addToTableAndEnvelope<br> ");
351   env->pathHistory.printHTMLToString(note+strlen(note));
352   traceUserSuppliedNote(note); // stores a copy of the string
353   delete[] note;
354   
355   return new_idx;
356 }
357   
358
359 /// Add an entry for this path history into the table. Returns the index in the table for it.
360 int PathHistoryTableEntry::addToTable(){
361   int new_idx = CkpvAccess(pathHistoryTableLastIdx) ++;
362   CkpvAccess(pathHistoryTable)[new_idx] = *this;
363   return new_idx;
364 }
365 #endif
366
367
368
369   
370 void resetThisEntryPath(void) {
371   CkpvAccess(currentlyExecutingPath).reset();
372 }
373
374
375 /// A debugging routine that outputs critical path info as user events.
376 void  saveCurrentPathAsUserEvent(char* prefix){
377   if(CkpvAccess(currentlyExecutingPath).getTotalTime() > 0.0){
378     //traceUserEvent(5020);
379
380     char *note = new char[4096];
381     sprintf(note, "%s<br> saveCurrentPathAsUserEvent()<br> ", prefix);
382     CkpvAccess(currentlyExecutingPath).printHTMLToString(note+strlen(note));
383     traceUserSuppliedNote(note); // stores a copy of the string
384     delete[] note;
385
386   } else {
387     traceUserEvent(5010);
388   }
389  
390 }
391
392
393 void setCurrentlyExecutingPathTo100(void){
394   CkpvAccess(currentlyExecutingPath).setDebug100();
395 }
396
397
398 /// Acquire the critical path and deliver it to the user supplied callback
399 void traceCriticalPathBack(CkCallback cb, bool saveToProjectionsTraces){
400 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
401   pathInformationMsg *newmsg = new(0) pathInformationMsg;
402   newmsg->historySize = 0;
403   newmsg->cb = cb;
404   newmsg->saveAsProjectionsUserEvents = saveToProjectionsTraces;
405   newmsg->table_idx = CkpvAccess(currentlyExecutingPath).sender_history_table_idx;
406   int pe = CkpvAccess(currentlyExecutingPath).sender_pe;
407   CkPrintf("Starting tracing of critical path from pe=%d table_idx=%d\n", pe,  CkpvAccess(currentlyExecutingPath).sender_history_table_idx);
408   CkAssert(pe < CkNumPes() && pe >= 0);
409   pathHistoryManagerProxy[pe].traceCriticalPathBackStepByStep(newmsg);
410 #else
411   pathInformationMsg * pathForUser = new(0) pathInformationMsg;  
412   pathForUser->historySize = 0;                                                                                        
413   pathForUser->cb = CkCallback();                                                                                                      
414   pathForUser->table_idx = -1;      
415   cb.send(pathForUser);  
416 #endif
417 }
418
419
420
421 // void  printPathInMsg(void* msg){
422 //   envelope *env = UsrToEnv(msg);
423 //   env->printPath();
424 // }
425
426
427
428 /// A debugging routine that prints the number of EPs for the program, and the size of the envelope's path fields
429 void  printEPInfo(){
430 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
431   CkPrintf("printEPInfo():\n");
432   CkPrintf("There are %d EPs\n", (int)_entryTable.size());
433   for (int epIdx=0;epIdx<_entryTable.size();epIdx++)
434     CkPrintf("EP %d is %s\n", epIdx, _entryTable[epIdx]->name);
435 #endif
436 }
437
438
439
440
441 /// Save information about the critical path contained in the message that is about to execute.
442 void criticalPath_start(envelope * env){ 
443 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
444 #if DEBUG
445   CkPrintf("criticalPath_start(envelope * env) srcpe=%d sender table idx=%d  time=%lf\n", env->getSrcPe(),  env->pathHistory.get_sender_history_table_idx(), env->pathHistory.getTotalTime() );
446 #endif
447
448   CkpvAccess(currentlyExecutingPath).sender_pe = env->getSrcPe();
449   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = env->pathHistory.get_sender_history_table_idx();
450   CkpvAccess(currentlyExecutingPath).preceding_path_time = env->pathHistory.getTotalTime();
451
452   CkpvAccess(currentlyExecutingPath).sanity_check();
453   
454   CkpvAccess(currentlyExecutingPath).local_ep  = -1;
455   CkpvAccess(currentlyExecutingPath).local_arr = -1;
456
457   double now = CmiWallTimer();
458   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
459   CkpvAccess(timeEntryMethodStarted) = now;
460
461   switch(env->getMsgtype()) {
462   case ForArrayEltMsg:
463     //    CkPrintf("Critical Path Detection handling a ForArrayEltMsg\n");    
464     CkpvAccess(currentlyExecutingPath).local_ep = env->getsetArrayEp();
465     CkpvAccess(currentlyExecutingPath).local_arr = env->getArrayMgrIdx();
466     
467     break;
468
469   case ForNodeBocMsg:
470     //CkPrintf("Critical Path Detection handling a ForNodeBocMsg\n");    
471     break;
472
473   case ForChareMsg:
474     //CkPrintf("Critical Path Detection handling a ForChareMsg\n");        
475     break;
476
477   case ForBocMsg:
478     //CkPrintf("Critical Path Detection handling a ForBocMsg\n");    
479     break;
480
481   case ArrayEltInitMsg:
482     // Don't do anything special with these
483     break;
484
485   default:
486     CkPrintf("Critical Path Detection can't yet handle message type %d\n", (int)env->getMsgtype());
487   }
488   
489   
490
491   saveCurrentPathAsUserEvent("criticalPath_start()<br> ");
492
493
494 #endif
495 }
496
497
498 /// Modify the envelope of a message that is being sent for critical path detection and store an entry in a table on this PE.
499 void criticalPath_send(envelope * sendingEnv){
500 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
501 #if DEBUG
502   CkPrintf("criticalPath_send(envelope * sendingEnv)\n");
503 #endif
504   double now = CmiWallTimer();
505   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), CkpvAccess(timeEntryMethodStarted), now);
506   entry.addToTableAndEnvelope(sendingEnv);
507   
508 #endif
509 }
510
511
512 /// Handle the end of the entry method in the critical path detection processes. This should create a forward dependency for the object.
513 void criticalPath_end(){
514 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
515   saveCurrentPathAsUserEvent("criticalPath_end()<br> ");
516
517   CkpvAccess(currentlyExecutingPath).reset();
518
519 #if DEBUG
520   CkPrintf("criticalPath_end()\n");
521 #endif
522
523 #endif
524 }
525
526
527
528 /// Split an entry method invocation into multiple logical tasks for the critical path analysis.
529 /// SDAG doen's break up the code in useful ways, so I'll make it add calls to this in the generated code.
530 void criticalPath_split(){
531 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
532   saveCurrentPathAsUserEvent("criticalPath_split()<br> ");
533
534   // save an entry in the table
535   double now = CmiWallTimer();
536   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), now-CkpvAccess(timeEntryMethodStarted) );
537   int tableidx = entry.addToTable();
538
539   // end the old task
540
541   
542   // start the new task
543   CkpvAccess(currentlyExecutingPath).sender_pe = CkMyPe();
544   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = tableidx;
545   CkpvAccess(currentlyExecutingPath).preceding_path_time = entry.getTotalTime();
546   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
547   CkpvAccess(timeEntryMethodStarted) = now;
548 #endif
549 }
550
551
552
553
554
555 #include "PathHistory.def.h"
556
557 /*! @} */
558