fa596284e1e749ab5306675313a54bf1b4e873d7
[charm.git] / src / ck-cp / pathHistory.C
1 #include <charm++.h>
2 #include <iostream>
3 #include <fstream>
4 #include <string>
5 #include <map>
6 #include <set>
7 #include <vector>
8 #include <utility>
9
10
11 #if CMK_WITH_CONTROLPOINT
12
13
14 #include "PathHistory.decl.h"
15 #include "LBDatabase.h"
16 #include "pathHistory.h"
17 //#include "controlPoints.h"
18 //#include "arrayRedistributor.h"
19 #include <register.h> // for _entryTable
20
21 #include "trace-projections.h"
22
23
24 /**
25  *  \addtogroup CriticalPathFramework
26  *   @{
27  *
28  */
29
30
31
32 /*readonly*/ CProxy_pathHistoryManager pathHistoryManagerProxy;
33
34 CkpvDeclare(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
35 CkpvDeclare(double, timeEntryMethodStarted);
36
37 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
38 /** A table to store all the local nodes in the parallel dependency graph */
39 CkpvDeclare(PathHistoryTableType, pathHistoryTable);
40 /** A counter that defines the new keys for the entries in the pathHistoryTable */
41 CkpvDeclare(int, pathHistoryTableLastIdx);
42 #endif
43
44
45 /// A mainchare that is used just to create a group at startup
46 class pathHistoryMain : public CBase_pathHistoryMain {
47 public:
48   pathHistoryMain(CkArgMsg* args){
49 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
50     pathHistoryManagerProxy = CProxy_pathHistoryManager::ckNew();
51 #endif
52     delete args;
53   }
54   ~pathHistoryMain(){}
55 };
56
57
58 pathHistoryManager::pathHistoryManager(){
59
60 }
61
62
63   /** Trace perform a traversal backwards over the critical path specified as a 
64       table index for the processor upon which this is called.
65       
66       The callback cb will be called with the resulting msg after the path has 
67       been traversed to its origin.  
68   */
69  void pathHistoryManager::traceCriticalPathBackStepByStep(pathInformationMsg *msg){
70 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
71    int count = CkpvAccess(pathHistoryTable).count(msg->table_idx);
72
73 #if DEBUGPRINT > 2
74    CkPrintf("Table entry %d on pe %d occurs %d times in table\n", msg->table_idx, CkMyPe(), count);
75 #endif
76    CkAssert(count==0 || count==1);
77
78     if(count > 0){ 
79       PathHistoryTableEntry & path = CkpvAccess(pathHistoryTable)[msg->table_idx];
80       int idx = path.sender_history_table_idx;
81       int pe = path.sender_pe;
82
83 #if DEBUGPRINT > 2
84       CkPrintf("Table entry %d on pe %d points to pe=%d idx=%d\n", msg->table_idx, CkMyPe(), pe, idx);
85 #endif
86
87       // Make a copy of the message as we forward it along
88       pathInformationMsg *newmsg = new(msg->historySize+1) pathInformationMsg;
89       for(int i=0;i<msg->historySize;i++){
90         newmsg->history[i] = msg->history[i];
91       }
92       newmsg->history[msg->historySize] = path;
93       newmsg->historySize = msg->historySize+1;
94       newmsg->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
95       newmsg->cb = msg->cb;
96       newmsg->table_idx = idx;
97         
98       if(idx > -1 && pe > -1){
99         // Not yet at origin, keep tracing the path back
100         CkAssert(pe < CkNumPes() && pe >= 0);
101         thisProxy[pe].traceCriticalPathBackStepByStep(newmsg);
102       } else {
103         CkPrintf("Traced critical path back to its origin.\n");
104         if(msg->saveAsProjectionsUserEvents){
105
106           // Keep a message for returning to the user's callback
107           pathForUser = new(msg->historySize+1) pathInformationMsg;
108           for(int i=0;i<msg->historySize;i++){
109             pathForUser->history[i] = msg->history[i];
110           }
111           pathForUser->history[msg->historySize] = path;
112           pathForUser->historySize = msg->historySize+1;
113           pathForUser->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
114           pathForUser->cb = msg->cb;
115           pathForUser->table_idx = idx;
116           
117           CkPrintf("Broadcasting it to all PE\n");
118           thisProxy.broadcastCriticalPathProjections(newmsg);
119         } else {
120           newmsg->cb.send(newmsg);
121         }
122
123       }
124     } else {
125       CkAbort("ERROR: Traced critical path back to a nonexistent table entry.\n");
126     }
127
128     delete msg;
129 #else
130     CkAbort("Shouldn't call pathHistoryManager::traceCriticalPathBack when critical path detection is not enabled");
131 #endif
132   }
133
134
135 void pathHistoryManager::broadcastCriticalPathProjections(pathInformationMsg *msg){
136
137 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
138   CkPrintf("[%d] Received broadcast of critical path\n", CkMyPe());
139   int me = CkMyPe();
140   int intersectsLocalPE = false;
141
142   // Create user events for critical path
143
144   for(int i=msg->historySize-1;i>=0;i--){
145     if(CkMyPe() == msg->history[i].local_pe){
146       // Part of critical path is local
147       // Create user event for it
148
149       //      CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, msg->history[i].get_local_path_time(), msg-> history[i].local_arr, msg->history[i].local_ep, msg->history[i].get_start_time(), msg->history[i].get_preceding_path_time(), msg->history[i].local_pe);
150       traceUserBracketEvent(32000, msg->history[i].get_start_time(), msg->history[i].get_start_time() + msg->history[i].get_local_path_time());
151
152       intersectsLocalPE = true;
153     }
154
155   }
156
157   traceRegisterUserEvent("Critical Path", 32000);
158   
159   
160 #define PRUNE_CRITICAL_PATH_LOGS 0
161
162 #if PRUNE_CRITICAL_PATH_LOGS
163   // Tell projections tracing to only output log entries if I contain part of the critical path
164
165   enableTraceLogOutput();
166   if(! intersectsLocalPE){
167     disableTraceLogOutput();
168     CkPrintf("PE %d doesn't intersect the critical path, so its log files won't be created\n", CkMyPe() );
169   }
170 #endif
171   
172 #if TRACE_ALL_PATH_TABLE_ENTRIES
173   // Create user events for all table entries
174   std::map< int, PathHistoryTableEntry >::iterator iter;
175   for(iter=pathHistoryTable.begin(); iter != pathHistoryTable.end(); iter++){
176     double startTime = iter->second.get_start_time();
177     double endTime = iter->second.get_start_time() + iter->second.get_local_path_time();
178     traceUserBracketEvent(32001, startTime, endTime);
179   }
180 #endif
181
182   int data=1;
183   CkCallback cb(CkIndex_pathHistoryManager::criticalPathProjectionsDone(NULL),thisProxy[0]); 
184   contribute(sizeof(int), &data, CkReduction::sum_int, cb);
185
186 #endif
187
188 }
189
190 void pathHistoryManager::criticalPathProjectionsDone(CkReductionMsg *msg){
191 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
192   CkPrintf("[%d] All PEs have received the critical path information. Sending critical path to user supplied callback.\n", CkMyPe());
193   pathForUser->cb.send(pathForUser);
194   pathForUser = NULL;
195 #endif
196 }
197
198
199
200
201 /// An interface callable by the application.
202 void useThisCriticalPathForPriorities(){
203 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
204   pathHistoryManagerProxy.ckLocalBranch()->useCriticalPathForPriories();
205 #endif
206 }
207
208
209 /// Callable from inside charm++ delivery mechanisms (after envelope contains epIdx):
210 void automaticallySetMessagePriority(envelope *env){
211   #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
212
213 #if DEBUG
214   if(env->getPriobits() == 8*sizeof(int)){
215     CkPrintf("[%d] priorities for env=%p are integers\n", CkMyPe(), env);
216   } else if(env->getPriobits() == 0) {
217     CkPrintf("[%d] priorities for env=%p are not allocated in message\n", CkMyPe(), env);
218   } else {
219     CkPrintf("[%d] priorities for env=%p are not integers: %d priobits\n", CkMyPe(), env, env->getPriobits());
220   }
221 #endif
222   
223   
224   const std::map< std::pair<int,int>, int> & criticalPathForPriorityCounts = pathHistoryManagerProxy.ckLocalBranch()->getCriticalPathForPriorityCounts();
225   
226   if(criticalPathForPriorityCounts.size() > 0 && env->getPriobits() == 8*sizeof(int)) {
227     
228     switch(env->getMsgtype()) {
229     case ForArrayEltMsg:
230     case ForIDedObjMsg:
231     case ForChareMsg:
232       {        
233         const int ep = env->getsetArrayEp();
234         const int arr = env->getArrayMgrIdx();
235         
236         const std::pair<int,int> k = std::make_pair(arr, ep);
237         const int count = criticalPathForPriorityCounts.count(k);
238         
239 #if DEBUG
240         CkPrintf("[%d] destination array,ep occurs %d times along stored critical path\n", CkMyPe(), count);
241 #endif
242         
243         if(count > 0){
244           // Set the integer priority to high
245 #if DEBUG
246           CkPrintf("Prio auto high\n");
247 #endif
248           *(int*)(env->getPrioPtr()) = -5;
249         } else {
250           // Set the integer priority to low
251 #if DEBUG
252           CkPrintf("Prio auto low: %d,%d\n", arr, ep);
253 #endif
254           *(int*)(env->getPrioPtr()) = 5;
255         }
256         
257       }
258       break;
259       
260     case ForNodeBocMsg:
261       CkPrintf("Can't Critical Path Autoprioritize a ForNodeBocMsg\n");    
262       break;
263       
264     case ForBocMsg:
265       CkPrintf("Can't Critical Path Autoprioritize a ForBocMsg\n");
266       break;
267       
268     case ArrayEltInitMsg:
269       // Don't do anything special with these
270       CkPrintf("Can't Critical Path Autoprioritize a ArrayEltInitMsg\n");
271       break;
272       
273     default:
274       CkPrintf("Can't Critical Path Autoprioritize messages of [unknown type]\n");
275       break;
276     }
277       
278   }
279
280 #endif
281 }
282
283
284
285 void pathHistoryManager::useCriticalPathForPriories(){
286 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
287
288   // Request a critical path that will be stored everywhere for future use in autotuning message priorities
289   
290   // The resulting critical path should be broadcast to saveCriticalPathForPriorities() on all PEs
291   CkCallback cb(CkIndex_pathHistoryManager::saveCriticalPathForPriorities(NULL),thisProxy); 
292   traceCriticalPathBack(cb, false);
293 #endif  
294 }
295
296
297
298 void pathHistoryManager::saveCriticalPathForPriorities(pathInformationMsg *msg){
299 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
300
301   CkPrintf("[%d] saveCriticalPathForPriorities() Receiving critical paths\n", CkMyPe());
302   fflush(stdout);
303   
304   criticalPathForPriorityCounts.clear();
305   
306   // Save a list of which entries are along the critical path
307   for(int i=msg->historySize-1;i>=0;i--){
308     
309     PathHistoryTableEntry &e = msg->history[i];
310     
311 #if DEBUG
312     if(CkMyPe() == 0){
313       CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, e.get_local_path_time(), msg-> history[i].local_arr, e.local_ep, e.get_start_time(), e.get_preceding_path_time(), e.local_pe);
314     }
315 #endif
316     
317     const std::pair<int,int> k = std::make_pair(e.local_arr, e.local_ep);
318     if(criticalPathForPriorityCounts.count(k) == 1)
319       criticalPathForPriorityCounts[k]++;
320     else
321       criticalPathForPriorityCounts[k] = 1;  
322   }
323   
324
325
326
327   // print out the list just for debugging purposes
328   if(CkMyPe() == 0){
329     std::map< std::pair<int,int>, int>::iterator iter;
330     for(iter=criticalPathForPriorityCounts.begin();iter!=criticalPathForPriorityCounts.end();++iter){
331       const std::pair<int,int> k = iter->first;
332       const int c = iter->second;
333
334       CkPrintf("[%d] On critical path EP %d,%d occurs %d times\n", CkMyPe(), k.first, k.second, c);
335
336     }
337   }
338
339 #endif
340 }
341
342
343
344
345
346
347
348
349
350
351
352 void initializeCriticalPath(void){
353 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
354   CkpvInitialize(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
355   CkpvInitialize(double, timeEntryMethodStarted);
356   CkpvAccess(timeEntryMethodStarted) = 0.0;
357
358
359   CkpvInitialize(PathHistoryTableType, pathHistoryTable);
360   CkpvInitialize(int, pathHistoryTableLastIdx);
361   CkpvAccess(pathHistoryTableLastIdx) = 0;
362 #endif
363 }
364
365
366
367
368
369
370
371
372
373
374 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
375 // The PathHistoryEnvelope class is disabled if USE_CRITICAL_PATH_HEADER_ARRAY isn't defined
376 void PathHistoryEnvelope::reset(){
377   totalTime = 0.0;
378   sender_history_table_idx = -1;
379 }
380
381   
382 void PathHistoryEnvelope::print() const {
383   CkPrintf("print() is not implemented\n");
384 }
385
386 /// Write a description of the path into the beginning of the provided buffer. The buffer ought to be large enough.
387
388   
389 void PathHistoryEnvelope::incrementTotalTime(double time){
390   totalTime += time;
391 }
392
393
394
395 void PathHistoryEnvelope::setDebug100(){
396   totalTime = 100.0;   
397 }
398
399
400
401 /// Add an entry for this path history into the table, and write the corresponding information into the outgoing envelope
402 int PathHistoryTableEntry::addToTableAndEnvelope(envelope *env){
403   // Add to table
404   int new_idx = addToTable();
405
406   // Fill in outgoing envelope
407   CkAssert(env != NULL);
408   env->pathHistory.set_sender_history_table_idx(new_idx);
409   env->pathHistory.setTime(local_path_time + preceding_path_time);
410
411 #if 0
412   // Create a user event for projections
413   char *note = new char[4096];
414   sprintf(note, "addToTableAndEnvelope<br> ");
415   env->pathHistory.printHTMLToString(note+strlen(note));
416   traceUserSuppliedNote(note); // stores a copy of the string
417   delete[] note;
418 #endif  
419
420   return new_idx;
421 }
422   
423
424 /// Add an entry for this path history into the table. Returns the index in the table for it.
425 int PathHistoryTableEntry::addToTable(){
426   int new_idx = CkpvAccess(pathHistoryTableLastIdx) ++;
427   CkpvAccess(pathHistoryTable)[new_idx] = *this;
428   return new_idx;
429 }
430 #endif
431
432
433
434   
435 void resetThisEntryPath(void) {
436   CkpvAccess(currentlyExecutingPath).reset();
437 }
438
439
440 /// A debugging routine that outputs critical path info as user events.
441 void  saveCurrentPathAsUserEvent(const char* prefix){
442   if(CkpvAccess(currentlyExecutingPath).getTotalTime() > 0.0){
443     //traceUserEvent(5020);
444
445 #if 0
446     char *note = new char[4096];
447     sprintf(note, "%s<br> saveCurrentPathAsUserEvent()<br> ", prefix);
448     CkpvAccess(currentlyExecutingPath).printHTMLToString(note+strlen(note));
449     traceUserSuppliedNote(note); // stores a copy of the string
450     delete[] note;
451 #endif
452
453   } else {
454 #if 0
455     traceUserEvent(5010);
456 #endif
457   }
458  
459 }
460
461
462 void setCurrentlyExecutingPathTo100(void){
463   CkpvAccess(currentlyExecutingPath).setDebug100();
464 }
465
466
467 /// Acquire the critical path and deliver it to the user supplied callback
468 void traceCriticalPathBack(CkCallback cb, bool saveToProjectionsTraces){
469 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
470   pathInformationMsg *newmsg = new(0) pathInformationMsg;
471   newmsg->historySize = 0;
472   newmsg->cb = cb;
473   newmsg->saveAsProjectionsUserEvents = saveToProjectionsTraces;
474   newmsg->table_idx = CkpvAccess(currentlyExecutingPath).sender_history_table_idx;
475   int pe = CkpvAccess(currentlyExecutingPath).sender_pe;
476   CkPrintf("Starting tracing of critical path from pe=%d table_idx=%d\n", pe,  CkpvAccess(currentlyExecutingPath).sender_history_table_idx);
477   CkAssert(pe < CkNumPes() && pe >= 0);
478   pathHistoryManagerProxy[pe].traceCriticalPathBackStepByStep(newmsg);
479 #else
480   pathInformationMsg * pathForUser = new(0) pathInformationMsg;  
481   pathForUser->historySize = 0;                                                                                        
482   pathForUser->cb = CkCallback();                                                                                                      
483   pathForUser->table_idx = -1;      
484   cb.send(pathForUser);  
485 #endif
486 }
487
488
489
490 // void  printPathInMsg(void* msg){
491 //   envelope *env = UsrToEnv(msg);
492 //   env->printPath();
493 // }
494
495
496
497 /// A debugging routine that prints the number of EPs for the program, and the size of the envelope's path fields
498 void  printEPInfo(){
499 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
500   CkPrintf("printEPInfo():\n");
501   CkPrintf("There are %d EPs\n", (int)_entryTable.size());
502   for (int epIdx=0;epIdx<_entryTable.size();epIdx++)
503     CkPrintf("EP %d is %s\n", epIdx, _entryTable[epIdx]->name);
504 #endif
505 }
506
507
508
509
510 /// Save information about the critical path contained in the message that is about to execute.
511 void criticalPath_start(envelope * env){ 
512 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
513 #if DEBUG
514   CkPrintf("criticalPath_start(envelope * env) srcpe=%d sender table idx=%d  time=%lf\n", env->getSrcPe(),  env->pathHistory.get_sender_history_table_idx(), env->pathHistory.getTotalTime() );
515 #endif
516
517   CkpvAccess(currentlyExecutingPath).sender_pe = env->getSrcPe();
518   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = env->pathHistory.get_sender_history_table_idx();
519   CkpvAccess(currentlyExecutingPath).preceding_path_time = env->pathHistory.getTotalTime();
520
521   CkpvAccess(currentlyExecutingPath).sanity_check();
522   
523   CkpvAccess(currentlyExecutingPath).local_ep  = -1;
524   CkpvAccess(currentlyExecutingPath).local_arr = -1;
525
526   double now = CmiWallTimer();
527   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
528   CkpvAccess(timeEntryMethodStarted) = now;
529
530   switch(env->getMsgtype()) {
531   case ForArrayEltMsg:
532     //    CkPrintf("Critical Path Detection handling a ForArrayEltMsg\n");    
533     CkpvAccess(currentlyExecutingPath).local_ep = env->getsetArrayEp();
534     CkpvAccess(currentlyExecutingPath).local_arr = env->getArrayMgrIdx();
535     
536     break;
537
538   case ForNodeBocMsg:
539     //CkPrintf("Critical Path Detection handling a ForNodeBocMsg\n");    
540     break;
541
542   case ForChareMsg:
543     //CkPrintf("Critical Path Detection handling a ForChareMsg\n");        
544     break;
545
546   case ForBocMsg:
547     //CkPrintf("Critical Path Detection handling a ForBocMsg\n");    
548     break;
549
550   case ArrayEltInitMsg:
551     // Don't do anything special with these
552     break;
553
554   default:
555     CkPrintf("Critical Path Detection can't yet handle message type %d\n", (int)env->getMsgtype());
556   }
557   
558   
559
560   saveCurrentPathAsUserEvent("criticalPath_start()<br> ");
561
562
563 #endif
564 }
565
566
567 /// Modify the envelope of a message that is being sent for critical path detection and store an entry in a table on this PE.
568 void criticalPath_send(envelope * sendingEnv){
569 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
570 #if DEBUG
571   CkPrintf("criticalPath_send(envelope * sendingEnv)\n");
572 #endif
573   double now = CmiWallTimer();
574   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), CkpvAccess(timeEntryMethodStarted), now);
575   entry.addToTableAndEnvelope(sendingEnv);
576   
577 #endif
578 }
579
580
581 /// Handle the end of the entry method in the critical path detection processes. This should create a forward dependency for the object.
582 void criticalPath_end(){
583 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
584   saveCurrentPathAsUserEvent("criticalPath_end()<br> ");
585
586   CkpvAccess(currentlyExecutingPath).reset();
587
588 #if DEBUG
589   CkPrintf("criticalPath_end()\n");
590 #endif
591
592 #endif
593 }
594
595
596
597 /// Split an entry method invocation into multiple logical tasks for the critical path analysis.
598 /// SDAG doen's break up the code in useful ways, so I'll make it add calls to this in the generated code.
599 void criticalPath_split(){
600 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
601   saveCurrentPathAsUserEvent("criticalPath_split()<br> ");
602
603   // save an entry in the table
604   double now = CmiWallTimer();
605   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), now-CkpvAccess(timeEntryMethodStarted) );
606   int tableidx = entry.addToTable();
607
608   // end the old task
609
610   
611   // start the new task
612   CkpvAccess(currentlyExecutingPath).sender_pe = CkMyPe();
613   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = tableidx;
614   CkpvAccess(currentlyExecutingPath).preceding_path_time = entry.getTotalTime();
615   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
616   CkpvAccess(timeEntryMethodStarted) = now;
617 #endif
618 }
619
620
621
622
623
624 #include "PathHistory.def.h"
625
626 /*! @} */
627
628 #endif