Critical path header changes for the pics merge
[charm.git] / src / ck-cp / pathHistory.C
1 #include <charm++.h>
2 #include <iostream>
3 #include <fstream>
4 #include <string>
5 #include <map>
6 #include <set>
7 #include <vector>
8 #include <utility>
9
10 //#define DEBUG  1
11 #include "PathHistory.decl.h"
12 #include "LBDatabase.h"
13 #include "pathHistory.h"
14 #include <register.h> 
15
16 #include "trace-projections.h"
17
18 /**
19  *  \addtogroup CriticalPathFramework
20  *   @{
21  *
22  */
23
24 /*readonly*/ CProxy_pathHistoryManager pathHistoryManagerProxy;
25
26 CkpvDeclare(int, traceLastHop);
27
28 CkpvDeclare(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
29 CkpvDeclare(double, timeEntryMethodStarted);
30
31 /** A table to store all the local nodes in the parallel dependency graph */
32 CkpvDeclare(PathHistoryTableType, pathHistoryTable);
33 /** A counter that defines the new keys for the entries in the pathHistoryTable */
34 CkpvDeclare(int, pathHistoryTableLastIdx);
35
36
37 /// A mainchare that is used just to create a group at startup
38 class pathHistoryMain : public CBase_pathHistoryMain {
39 public:
40   pathHistoryMain(CkArgMsg* args){
41     pathHistoryManagerProxy = CProxy_pathHistoryManager::ckNew();
42     delete args;
43   }
44   ~pathHistoryMain(){}
45 };
46
47 pathHistoryManager::pathHistoryManager(){ }
48
49 /** Trace perform a traversal backwards over the critical path specified as a 
50  * table index for the processor upon which this is called.
51  *
52  * The callback cb will be called with the resulting msg after the path has 
53  * been traversed to its origin.  
54  **/
55
56 void pathHistoryManager::traceCriticalPathBackStepByStep(pathInformationMsg *msg){
57    int count = CkpvAccess(pathHistoryTable).count(msg->table_idx);
58
59 #if DEBUGPRINT > 2
60    CkPrintf("Table entry %d on pe %d occurs %d times in table\n", msg->table_idx, CkMyPe(), count);
61 #endif
62    CkAssert(count==0 || count==1);
63
64     if(count > 0){ 
65       PathHistoryTableEntry & path = CkpvAccess(pathHistoryTable)[msg->table_idx];
66       int idx = path.sender_history_table_idx;
67       int pe = path.sender_pe;
68 //#if DEBUGPRINT > 2
69 #if DEBUG
70       CkPrintf("Table entry %d on pe %d points to pe=%d idx=%d history size %d \n", msg->table_idx, CkMyPe(), pe, idx, msg->historySize);
71 #endif
72
73       // Make a copy of the message as we forward it along
74       pathInformationMsg *newmsg = new(msg->historySize+1) pathInformationMsg;
75       for(int i=0;i<msg->historySize;i++){
76           newmsg->history[i] = msg->history[i];
77       }
78       newmsg->history[msg->historySize] = path;
79       newmsg->historySize = msg->historySize+1;
80       newmsg->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
81       newmsg->cb = msg->cb;
82       newmsg->hops = msg->hops -1;
83       newmsg->table_idx = idx;
84         
85       if(msg->hops > 0 ){
86           // Not yet at origin, keep tracing the path back
87           CkAssert(pe < CkNumPes() && pe >= 0);
88           thisProxy[pe].traceCriticalPathBackStepByStep(newmsg);
89       } else {
90           if(msg->saveAsProjectionsUserEvents){
91
92               // Keep a message for returning to the user's callback
93               pathForUser = new(msg->historySize+1) pathInformationMsg;
94               for(int i=0;i<msg->historySize;i++){
95                   pathForUser->history[i] = msg->history[i];
96               }
97               pathForUser->history[msg->historySize] = path;
98               pathForUser->historySize = msg->historySize+1;
99               pathForUser->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
100               pathForUser->cb = msg->cb;
101               pathForUser->table_idx = idx;
102
103               CkPrintf("Broadcasting it to all PE\n");
104               thisProxy.broadcastCriticalPathProjections(newmsg);
105           } else {
106               newmsg->cb.send(newmsg);
107           }
108
109       }
110     } else {
111         CkAbort("ERROR: Traced critical path back to a nonexistent table entry.\n");
112     }
113
114     delete msg;
115   }
116
117
118 void pathHistoryManager::broadcastCriticalPathProjections(pathInformationMsg *msg){
119
120   CkPrintf("[%d] Received broadcast of critical path\n", CkMyPe());
121   int me = CkMyPe();
122   int intersectsLocalPE = false;
123
124   // Create user events for critical path
125
126   for(int i=msg->historySize-1;i>=0;i--){
127     if(CkMyPe() == msg->history[i].local_pe){
128       // Part of critical path is local
129       // Create user event for it
130
131       traceUserBracketEvent(32000, msg->history[i].get_start_time(), msg->history[i].get_start_time() + msg->history[i].get_local_path_time());
132
133       intersectsLocalPE = true;
134     }
135
136   }
137
138   traceRegisterUserEvent("Critical Path", 32000);
139   
140   
141 #define PRUNE_CRITICAL_PATH_LOGS 0
142
143 #if PRUNE_CRITICAL_PATH_LOGS
144   // Tell projections tracing to only output log entries if I contain part of the critical path
145
146   enableTraceLogOutput();
147   if(! intersectsLocalPE){
148     disableTraceLogOutput();
149     CkPrintf("PE %d doesn't intersect the critical path, so its log files won't be created\n", CkMyPe() );
150   }
151 #endif
152   
153 #if TRACE_ALL_PATH_TABLE_ENTRIES
154   // Create user events for all table entries
155   std::map< int, PathHistoryTableEntry >::iterator iter;
156   for(iter=pathHistoryTable.begin(); iter != pathHistoryTable.end(); iter++){
157     double startTime = iter->second.get_start_time();
158     double endTime = iter->second.get_start_time() + iter->second.get_local_path_time();
159     traceUserBracketEvent(32001, startTime, endTime);
160   }
161 #endif
162
163   int data=1;
164   CkCallback cb(CkIndex_pathHistoryManager::criticalPathProjectionsDone(NULL),thisProxy[0]); 
165   contribute(sizeof(int), &data, CkReduction::sum_int, cb);
166
167
168 }
169
170 void pathHistoryManager::criticalPathProjectionsDone(CkReductionMsg *msg){
171   CkPrintf("[%d] All PEs have received the critical path information. Sending critical path to user supplied callback.\n", CkMyPe());
172   pathForUser->cb.send(pathForUser);
173   pathForUser = NULL;
174 }
175
176 /// An interface callable by the application.
177 void useThisCriticalPathForPriorities(){
178   pathHistoryManagerProxy.ckLocalBranch()->useCriticalPathForPriories();
179 }
180
181
182 /// Callable from inside charm++ delivery mechanisms (after envelope contains epIdx):
183 void automaticallySetMessagePriority(envelope *env){
184     int ep = env->getEpIdx();
185     if (ep==CkIndex_CkArray::recvBroadcast(0))
186         ep = env->getsetArrayBcastEp();
187 #if DEBUG 
188     CkPrintf("----------- ep = %d  %s \n", ep, _entryTable[ep]->name); 
189     if(env->getPriobits() == 8*sizeof(int)){
190     CkPrintf("[%d] priorities for env=%p are integers\n", CkMyPe(), env);
191   } else if(env->getPriobits() == 0) {
192     CkPrintf("[%d] priorities for env=%p are not allocated in message\n", CkMyPe(), env);
193   } else {
194     CkPrintf("[%d] priorities for env=%p are not integers: %d priobits\n", CkMyPe(), env, env->getPriobits());
195   }
196 #endif
197   
198  
199   const std::map< int, int> & criticalPathForPriorityCounts = pathHistoryManagerProxy.ckLocalBranch()->getCriticalPathForPriorityCounts();
200
201   if(criticalPathForPriorityCounts.size() > 0 && env->getPriobits() == 8*sizeof(int)) {
202     switch(env->getMsgtype()) {
203     case ForArrayEltMsg:
204     case ForIDedObjMsg:
205     case ForChareMsg:
206     case ForNodeBocMsg:
207     case ForBocMsg:
208     case ArrayEltInitMsg:
209         {        
210           const int arr = env->getArrayMgrIdx();
211           const int count = criticalPathForPriorityCounts.count(ep);
212 #if DEBUG
213           CkPrintf("[%d] destination array,ep occurs %d times along stored critical path\n", CkMyPe(), count);
214 #endif
215         
216           if(count > 0){
217               // Set the integer priority to high
218 #if DEBUG 
219               CkPrintf("Prio auto high %d  %s \n", ep, _entryTable[ep]->name);
220 #endif
221               *(int*)(env->getPrioPtr()) = -5;
222           } else {
223               // Set the integer priority to low
224 #if DEBUG 
225               CkPrintf("Prio auto low: %d,%d\n", arr, ep);
226 #endif
227               *(int*)(env->getPrioPtr()) = 0;
228           }
229
230       }
231       break;
232       
233     default:
234       //CkPrintf("Can't Critical Path Autoprioritize messages of [unknown type]\n");
235       break;
236     }
237       
238   }
239 }
240
241 void pathHistoryManager::useCriticalPathForPriories(){
242   // Request a critical path that will be stored everywhere for future use in autotuning message priorities
243   // The resulting critical path should be broadcast to saveCriticalPathForPriorities() on all PEs
244   CkCallback cb(CkIndex_pathHistoryManager::saveCriticalPathForPriorities(NULL),thisProxy); 
245   traceCriticalPathBack(cb, false);
246 }
247
248 void pathHistoryManager::saveCriticalPathForPriorities(pathInformationMsg *msg){
249   //CkPrintf("[%d] saveCriticalPathForPriorities() Receiving critical paths\n", CkMyPe());
250   fflush(stdout);
251   
252   criticalPathForPriorityCounts.clear();
253   
254   // Save a list of which entries are along the critical path
255   for(int i=msg->historySize-1;i>=0;i--){
256     
257     PathHistoryTableEntry &e = msg->history[i];
258     
259 //#if 1 
260 #if DEBUG
261     if(CkMyPe() == 0){
262         char name[100];
263         if(e.local_ep >=0)
264             strcpy(name, _entryTable[e.local_ep]->name);
265         else
266             strcpy(name, "unknow");
267
268       CkPrintf("\t[%d] Path Step %d: local_path_time=%lf ep=%d starttime=%lf preceding path time=%lf pe=%d name=%s\n",CkMyPe(), i, e.get_local_path_time(), e.local_ep, e.get_start_time(), e.get_preceding_path_time(), e.local_pe, name);
269     }
270 #endif
271     
272     if(criticalPathForPriorityCounts.count(e.local_ep) == 1)
273       criticalPathForPriorityCounts[e.local_ep]++;
274     else
275       criticalPathForPriorityCounts[e.local_ep] = 1;  
276   }
277
278   // print out the list just for debugging purposes
279   if(CkMyPe() == 0){
280     std::map< int, int>::iterator iter;
281     for(iter=criticalPathForPriorityCounts.begin();iter!=criticalPathForPriorityCounts.end();++iter){
282       int epidx = iter->first;
283       const int c = iter->second;
284
285 #if DEBUG
286       CkPrintf("[%d] On critical path EP %d occurs %d times\n", CkMyPe(), epidx, c);
287 #endif
288
289     }
290   }
291 }
292
293 /// Add an entry for this path history into the table, and write the corresponding information into the outgoing envelope
294 int PathHistoryTableEntry::addToTableAndEnvelope(envelope *env){
295   // Add to table
296 #if USE_CRITICAL_PATH_HEADER_ARRAY
297   int new_idx = addToTable();
298
299   // Fill in outgoing envelope
300   CkAssert(env != NULL);
301   env->pathHistory.set_sender_history_table_idx(new_idx);
302   env->pathHistory.setTime(local_path_time + preceding_path_time);
303 #if DEBUG
304   if(local_path_time > 0.1)
305       CkPrintf("------########## %d generating new msg env   critical path length %f:%f:%f app time %f id:%d\n", CkMyPe(), local_path_time , preceding_path_time, local_path_time+preceding_path_time, CkWallTimer(), new_idx);
306 #endif
307   return new_idx;
308 #endif
309 }
310
311 /// Add an entry for this path history into the table. Returns the index in the table for it.
312 int PathHistoryTableEntry::addToTable(){
313   int new_idx = CkpvAccess(pathHistoryTableLastIdx) ++;
314   CkpvAccess(pathHistoryTable)[new_idx] = *this;
315 #if DEBUG
316   CkPrintf("-------- add to entry  %d   ----- %d  %d %d  \n", new_idx, local_ep,  local_pe, sender_history_table_idx); 
317 #endif
318   return new_idx;
319 }
320
321 void initializeCriticalPath(void){
322   CkpvInitialize(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
323   CkpvInitialize(double, timeEntryMethodStarted);
324   CkpvAccess(timeEntryMethodStarted) = 0.0;
325   CkpvInitialize(PathHistoryTableType, pathHistoryTable);
326   CkpvInitialize(int, pathHistoryTableLastIdx);
327   CkpvAccess(pathHistoryTableLastIdx) = 0;
328   CkpvInitialize(int, traceLastHop);
329   CkpvAccess(traceLastHop) = 0;
330 }
331
332
333 void resetThisEntryPath(void) {
334   CkpvAccess(currentlyExecutingPath).reset();
335 }
336
337
338 /// A debugging routine that outputs critical path info as user events.
339 void  saveCurrentPathAsUserEvent(const char* prefix){
340   if(CkpvAccess(currentlyExecutingPath).getTotalTime() > 0.0){
341     //traceUserEvent(5020);
342
343 #if 0
344     char *note = new char[4096];
345     sprintf(note, "%s<br> saveCurrentPathAsUserEvent()<br> ", prefix);
346     CkpvAccess(currentlyExecutingPath).printHTMLToString(note+strlen(note));
347     traceUserSuppliedNote(note); // stores a copy of the string
348     delete[] note;
349 #endif
350
351   } else {
352 #if 0
353     traceUserEvent(5010);
354 #endif
355   }
356  
357 }
358
359
360 void setCurrentlyExecutingPathTo100(void){
361     CkpvAccess(currentlyExecutingPath).setDebug100();
362 }
363
364 /// Acquire the critical path and deliver it to the user supplied callback
365 void traceCriticalPathBack(CkCallback cb, bool saveToProjectionsTraces){
366     pathInformationMsg *newmsg = new(0) pathInformationMsg;
367     newmsg->historySize = 0;
368     newmsg->cb = cb;
369     newmsg->hops = CkpvAccess(currentlyExecutingPath).hops - CkpvAccess(traceLastHop) -1;
370     CkpvAccess(traceLastHop) = CkpvAccess(currentlyExecutingPath).hops;
371     newmsg->saveAsProjectionsUserEvents = saveToProjectionsTraces;
372     newmsg->table_idx = CkpvAccess(currentlyExecutingPath).sender_history_table_idx;
373     int pe = CkpvAccess(currentlyExecutingPath).sender_pe;
374     //CkPrintf("Starting tracing of critical path  current PE : %d, current entry %d to pe=%d table_idx=%d\n", CkMyPe(), pe,  CkpvAccess(currentlyExecutingPath).local_ep, CkpvAccess(currentlyExecutingPath).sender_history_table_idx);
375     CkAssert(pe < CkNumPes() && pe >= 0);
376     pathHistoryManagerProxy[pe].traceCriticalPathBackStepByStep(newmsg);
377 }
378
379 /// A debugging routine that prints the number of EPs for the program, and the size of the envelope's path fields
380 void  printEPInfo(){
381   CkPrintf("printEPInfo():\n");
382   CkPrintf("There are %d EPs\n", (int)_entryTable.size());
383   for (int epIdx=0;epIdx<_entryTable.size();epIdx++)
384     CkPrintf("EP %d is %s\n", epIdx, _entryTable[epIdx]->name);
385 }
386
387 #if USE_CRITICAL_PATH_HEADER_ARRAY
388
389 void criticalPath_setep(int epIdx){ 
390     CkpvAccess(currentlyExecutingPath).local_ep  = epIdx;
391 #if DEBUG
392     if(epIdx >= 0)
393         CkPrintf(" setting current method name %d %s",epIdx,  _entryTable[epIdx]->name);  
394     CkPrintf("\n");
395 #endif
396 }
397
398 /// Save information about the critical path contained in the message that is about to execute.
399 void criticalPath_start(envelope * env){ 
400   CkpvAccess(currentlyExecutingPath).sender_pe = env->getSrcPe();
401   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = env->pathHistory.get_sender_history_table_idx();
402   CkpvAccess(currentlyExecutingPath).preceding_path_time = env->pathHistory.getTotalTime();
403   CkpvAccess(currentlyExecutingPath).hops =  env->pathHistory.getHops() + 1;
404
405   CkpvAccess(currentlyExecutingPath).sanity_check();
406   
407   CkpvAccess(currentlyExecutingPath).local_ep  = env->getEpIdx();
408
409   double now = CmiWallTimer();
410   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
411   CkpvAccess(timeEntryMethodStarted) = now;
412
413   switch(env->getMsgtype()) {
414   case ForArrayEltMsg:
415   case ArrayEltInitMsg:
416     CkpvAccess(currentlyExecutingPath).local_ep = env->getsetArrayEp();
417     break;
418
419   case ForNodeBocMsg:
420     break;
421
422   case ForBocMsg:
423     //CkPrintf("Critical Path Detection handling a ForBocMsg\n");    
424     break;
425
426   case ForChareMsg:
427     //CkPrintf("Critical Path Detection handling a ForChareMsg\n");        
428     break;
429
430   default:
431     break;
432     //CkPrintf("Critical Path Detection can't yet handle message type %d\n", (int)env->getMsgtype());
433   }
434
435   if(CkpvAccess(currentlyExecutingPath).local_ep == CkIndex_CkArray::recvBroadcast(0))
436       CkpvAccess(currentlyExecutingPath).local_ep = env->getsetArrayBcastEp();
437
438 #if DEBUG
439   CkPrintf("criticalPath_start(envelope * env) srcpe=%d sender table idx=%d  time=%lf current ep=%d ", env->getSrcPe(),  env->pathHistory.get_sender_history_table_idx(), env->pathHistory.getTotalTime(), env->getEpIdx() );
440   if(env->getEpIdx() >= 0)
441       CkPrintf(" current method name %d  %s", CkpvAccess(currentlyExecutingPath).local_ep, _entryTable[CkpvAccess(currentlyExecutingPath).local_ep]->name);  
442   CkPrintf("\n");
443 #endif
444
445   saveCurrentPathAsUserEvent("criticalPath_start()<br> ");
446 }
447
448
449 /// Modify the envelope of a message that is being sent for critical path detection and store an entry in a table on this PE.
450 void criticalPath_send(envelope * sendingEnv){
451 #if DEBUG
452   CkPrintf("criticalPath_send(envelope * sendingEnv)\n");
453 #endif
454   double now = CmiWallTimer();
455   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), CkpvAccess(timeEntryMethodStarted), now);
456   entry.addToTableAndEnvelope(sendingEnv);
457   sendingEnv->pathHistory.setHops(CkpvAccess(currentlyExecutingPath).hops);
458   automaticallySetMessagePriority(sendingEnv);
459 }
460
461 /// Handle the end of the entry method in the critical path detection processes. This should create a forward dependency for the object.
462 void criticalPath_end(){
463   saveCurrentPathAsUserEvent("criticalPath_end()<br> ");
464
465   CkpvAccess(currentlyExecutingPath).reset();
466
467 #if DEBUG
468   CkPrintf("criticalPath_end()\n");
469 #endif
470 }
471
472 #endif
473
474 /// Split an entry method invocation into multiple logical tasks for the critical path analysis.
475 /// SDAG doen's break up the code in useful ways, so I'll make it add calls to this in the generated code.
476 void criticalPath_split(){
477   saveCurrentPathAsUserEvent("criticalPath_split()<br> ");
478
479   // save an entry in the table
480   double now = CmiWallTimer();
481   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), now-CkpvAccess(timeEntryMethodStarted) );
482   int tableidx = entry.addToTable();
483   // end the old task
484   // start the new task
485   CkpvAccess(currentlyExecutingPath).sender_pe = CkMyPe();
486   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = tableidx;
487   CkpvAccess(currentlyExecutingPath).preceding_path_time = entry.getTotalTime();
488   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
489   CkpvAccess(timeEntryMethodStarted) = now;
490 }
491
492 MergeablePathHistory* saveCurrentPath()
493 {
494     MergeablePathHistory *savedPath = new MergeablePathHistory(CkpvAccess(currentlyExecutingPath));
495     return savedPath;
496 }
497
498
499 void mergePathHistory(MergeablePathHistory* tmp)
500 {
501     CkpvAccess(currentlyExecutingPath).updateMax(*tmp);
502 }
503
504 #include "PathHistory.def.h"
505
506 /*! @} */