Moving the "critical path" history code into its own module which is included in...
[charm.git] / src / ck-cp / pathHistory.C
1 #include <charm++.h>
2 #include <iostream>
3 #include <fstream>
4 #include <string>
5 #include <map>
6 #include <set>
7 #include <vector>
8 #include <utility>
9
10 #include "PathHistory.decl.h"
11 #include "LBDatabase.h"
12 //#include "controlPoints.h"
13 #include "pathHistory.h"
14 #include "arrayRedistributor.h"
15 #include <register.h> // for _entryTable
16
17
18 /**
19  *  \addtogroup CriticalPathFramework
20  *   @{
21  *
22  */
23
24
25
26 /*readonly*/ CProxy_pathHistoryManager pathHistoryManagerProxy;
27
28 CkpvDeclare(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
29 CkpvDeclare(double, timeEntryMethodStarted);
30
31 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
32 /** A table to store all the local nodes in the parallel dependency graph */
33 CkpvDeclare(PathHistoryTableType, pathHistoryTable);
34 /** A counter that defines the new keys for the entries in the pathHistoryTable */
35 CkpvDeclare(int, pathHistoryTableLastIdx);
36 #endif
37
38
39 /// A mainchare that is used just to create a group at startup
40 class pathHistoryMain : public CBase_pathHistoryMain {
41 public:
42   pathHistoryMain(CkArgMsg* args){
43 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
44     pathHistoryManagerProxy = CProxy_pathHistoryManager::ckNew();
45 #endif
46   }
47   ~pathHistoryMain(){}
48 };
49
50
51 pathHistoryManager::pathHistoryManager(){
52
53 }
54
55
56   /** Trace perform a traversal backwards over the critical path specified as a 
57       table index for the processor upon which this is called.
58       
59       The callback cb will be called with the resulting msg after the path has 
60       been traversed to its origin.  
61   */
62  void pathHistoryManager::traceCriticalPathBack(pathInformationMsg *msg){
63 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
64    int count = CkpvAccess(pathHistoryTable).count(msg->table_idx);
65
66 #if DEBUGPRINT > 2
67    CkPrintf("Table entry %d on pe %d occurs %d times in table\n", msg->table_idx, CkMyPe(), count);
68 #endif
69    CkAssert(count==0 || count==1);
70
71     if(count > 0){ 
72       PathHistoryTableEntry & path = CkpvAccess(pathHistoryTable)[msg->table_idx];
73       int idx = path.sender_history_table_idx;
74       int pe = path.sender_pe;
75
76 #if DEBUGPRINT > 2
77       CkPrintf("Table entry %d on pe %d points to pe=%d idx=%d\n", msg->table_idx, CkMyPe(), pe, idx);
78 #endif
79
80       // Make a copy of the message for broadcasting to all PEs
81       pathInformationMsg *newmsg = new(msg->historySize+1) pathInformationMsg;
82       for(int i=0;i<msg->historySize;i++){
83         newmsg->history[i] = msg->history[i];
84       }
85       newmsg->history[msg->historySize] = path;
86       newmsg->historySize = msg->historySize+1;
87       newmsg->cb = msg->cb;
88       newmsg->table_idx = idx;
89       
90       // Keep a message for returning to the user's callback
91       pathForUser = new(msg->historySize+1) pathInformationMsg;
92       for(int i=0;i<msg->historySize;i++){
93         pathForUser->history[i] = msg->history[i];
94       }
95       pathForUser->history[msg->historySize] = path;
96       pathForUser->historySize = msg->historySize+1;
97       pathForUser->cb = msg->cb;
98       pathForUser->table_idx = idx;
99       
100       
101       if(idx > -1 && pe > -1){
102         CkAssert(pe < CkNumPes() && pe >= 0);
103         thisProxy[pe].traceCriticalPathBack(newmsg);
104       } else {
105         CkPrintf("Traced critical path back to its origin.\n");
106         CkPrintf("Broadcasting it to all PE\n");
107         thisProxy.broadcastCriticalPathResult(newmsg);
108       }
109     } else {
110       CkAbort("ERROR: Traced critical path back to a nonexistent table entry.\n");
111     }
112
113     delete msg;
114 #else
115     CkAbort("Shouldn't call pathHistoryManager::traceCriticalPathBack when critical path detection is not enabled");
116 #endif
117   }
118
119
120 void pathHistoryManager::broadcastCriticalPathResult(pathInformationMsg *msg){
121
122 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
123   CkPrintf("[%d] Received broadcast of critical path\n", CkMyPe());
124   int me = CkMyPe();
125   int intersectsLocalPE = false;
126
127   // Create user events for critical path
128
129   for(int i=msg->historySize-1;i>=0;i--){
130     if(CkMyPe() == msg->history[i].local_pe){
131       // Part of critical path is local
132       // Create user event for it
133
134       //      CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, msg->history[i].get_local_path_time(), msg-> history[i].local_arr, msg->history[i].local_ep, msg->history[i].get_start_time(), msg->history[i].get_preceding_path_time(), msg->history[i].local_pe);
135       traceUserBracketEvent(5900, msg->history[i].get_start_time(), msg->history[i].get_start_time() + msg->history[i].get_local_path_time());
136       intersectsLocalPE = true;
137     }
138
139   }
140   
141
142 #if PRUNE_CRITICAL_PATH_LOGS
143   // Tell projections tracing to only output log entries if I contain part of the critical path
144   if(! intersectsLocalPE){
145     disableTraceLogOutput();
146     CkPrintf("PE %d doesn't intersect the critical path, so its log files won't be created\n", CkMyPe() );
147   }
148 #endif  
149   
150 #if TRACE_ALL_PATH_TABLE_ENTRIES
151   // Create user events for all table entries
152   std::map< int, PathHistoryTableEntry >::iterator iter;
153   for(iter=pathHistoryTable.begin(); iter != pathHistoryTable.end(); iter++){
154     double startTime = iter->second.get_start_time();
155     double endTime = iter->second.get_start_time() + iter->second.get_local_path_time();
156     traceUserBracketEvent(5901, startTime, endTime);
157   }
158 #endif
159
160   int data=1;
161   CkCallback cb(CkIndex_pathHistoryManager::criticalPathDone(NULL),thisProxy[0]); 
162   contribute(sizeof(int), &data, CkReduction::sum_int, cb);
163
164 #endif
165
166 }
167
168 void pathHistoryManager::criticalPathDone(CkReductionMsg *msg){
169 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
170   CkPrintf("[%d] All PEs have received the critical path information. Sending critical path to user supplied callback.\n", CkMyPe());
171   pathForUser->cb.send(pathForUser);
172   pathForUser = NULL;
173 #endif
174 }
175
176
177
178
179 void initializeCriticalPath(void){
180 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
181   CkpvInitialize(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
182   CkpvInitialize(double, timeEntryMethodStarted);
183   CkpvAccess(timeEntryMethodStarted) = 0.0;
184
185
186   CkpvInitialize(PathHistoryTableType, pathHistoryTable);
187   CkpvInitialize(int, pathHistoryTableLastIdx);
188   CkpvAccess(pathHistoryTableLastIdx) = 0;
189 #endif
190 }
191
192
193
194
195
196
197
198
199
200
201 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
202 // The PathHistoryEnvelope class is disabled if USE_CRITICAL_PATH_HEADER_ARRAY isn't defined
203 void PathHistoryEnvelope::reset(){
204   totalTime = 0.0;
205   sender_history_table_idx = -1;
206 }
207
208   
209 void PathHistoryEnvelope::print() const {
210   CkPrintf("print() is not implemented\n");
211 }
212
213 /// Write a description of the path into the beginning of the provided buffer. The buffer ought to be large enough.
214
215   
216 void PathHistoryEnvelope::incrementTotalTime(double time){
217   totalTime += time;
218 }
219
220
221
222 void PathHistoryEnvelope::setDebug100(){
223   totalTime = 100.0;   
224 }
225
226
227 /// Add an entry for this path history into the table, and write the corresponding information into the outgoing envelope
228 int PathHistoryTableEntry::addToTableAndEnvelope(envelope *env){
229   // Add to table
230   int new_idx = addToTable();
231
232   // Fill in outgoing envelope
233   CkAssert(env != NULL);
234   env->pathHistory.set_sender_history_table_idx(new_idx);
235   env->pathHistory.setTime(local_path_time + preceding_path_time);
236
237   // Create a user event for projections
238   char *note = new char[4096];
239   sprintf(note, "addToTableAndEnvelope<br> ");
240   env->pathHistory.printHTMLToString(note+strlen(note));
241   traceUserSuppliedNote(note); // stores a copy of the string
242   delete[] note;
243   
244   return new_idx;
245 }
246   
247
248 /// Add an entry for this path history into the table. Returns the index in the table for it.
249 int PathHistoryTableEntry::addToTable(){
250   int new_idx = CkpvAccess(pathHistoryTableLastIdx) ++;
251   CkpvAccess(pathHistoryTable)[new_idx] = *this;
252   return new_idx;
253 }
254 #endif
255
256
257   
258 void resetThisEntryPath(void) {
259   CkpvAccess(currentlyExecutingPath).reset();
260 }
261
262
263 /// A debugging routine that outputs critical path info as user events.
264 void  saveCurrentPathAsUserEvent(char* prefix){
265   if(CkpvAccess(currentlyExecutingPath).getTotalTime() > 0.0){
266     //traceUserEvent(5020);
267
268     char *note = new char[4096];
269     sprintf(note, "%s<br> saveCurrentPathAsUserEvent()<br> ", prefix);
270     CkpvAccess(currentlyExecutingPath).printHTMLToString(note+strlen(note));
271     traceUserSuppliedNote(note); // stores a copy of the string
272     delete[] note;
273
274   } else {
275     traceUserEvent(5010);
276   }
277  
278 }
279
280
281 void setCurrentlyExecutingPathTo100(void){
282   CkpvAccess(currentlyExecutingPath).setDebug100();
283 }
284
285
286
287 /// A routine for printing out information along the critical path.
288 void traceCriticalPathBack(CkCallback cb){
289 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
290
291   pathInformationMsg *newmsg = new(0) pathInformationMsg;
292   newmsg->historySize = 0;
293   newmsg->cb = cb;
294   newmsg->table_idx = CkpvAccess(currentlyExecutingPath).sender_history_table_idx;
295   int pe = CkpvAccess(currentlyExecutingPath).sender_pe;
296   CkPrintf("Starting tracing of critical path from pe=%d table_idx=%d\n", pe,  CkpvAccess(currentlyExecutingPath).sender_history_table_idx);
297   CkAssert(pe < CkNumPes() && pe >= 0);
298   pathHistoryManagerProxy[pe].traceCriticalPathBack(newmsg);
299 #else
300
301   pathInformationMsg * pathForUser = new(0) pathInformationMsg;  
302   pathForUser->historySize = 0;                                                                                        
303   pathForUser->cb = CkCallback();                                                                                                      
304   pathForUser->table_idx = -1;      
305   cb.send(pathForUser);    
306
307 #endif
308 }
309
310
311
312 // void  printPathInMsg(void* msg){
313 //   envelope *env = UsrToEnv(msg);
314 //   env->printPath();
315 // }
316
317
318
319 /// A debugging routine that prints the number of EPs for the program, and the size of the envelope's path fields
320 void  printEPInfo(){
321 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
322   CkPrintf("printEPInfo():\n");
323   CkPrintf("There are %d EPs\n", (int)_entryTable.size());
324   for (int epIdx=0;epIdx<_entryTable.size();epIdx++)
325     CkPrintf("EP %d is %s\n", epIdx, _entryTable[epIdx]->name);
326 #endif
327 }
328
329
330
331
332 /// Save information about the critical path contained in the message that is about to execute.
333 void criticalPath_start(envelope * env){ 
334 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
335 #if DEBUG
336   CkPrintf("criticalPath_start(envelope * env) srcpe=%d sender table idx=%d  time=%lf\n", env->getSrcPe(),  env->pathHistory.get_sender_history_table_idx(), env->pathHistory.getTotalTime() );
337 #endif
338
339   CkpvAccess(currentlyExecutingPath).sender_pe = env->getSrcPe();
340   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = env->pathHistory.get_sender_history_table_idx();
341   CkpvAccess(currentlyExecutingPath).preceding_path_time = env->pathHistory.getTotalTime();
342
343   CkpvAccess(currentlyExecutingPath).sanity_check();
344   
345   CkpvAccess(currentlyExecutingPath).local_ep  = -1;
346   CkpvAccess(currentlyExecutingPath).local_arr = -1;
347
348   double now = CmiWallTimer();
349   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
350   CkpvAccess(timeEntryMethodStarted) = now;
351
352   switch(env->getMsgtype()) {
353   case ForArrayEltMsg:
354     //    CkPrintf("Critical Path Detection handling a ForArrayEltMsg\n");    
355     CkpvAccess(currentlyExecutingPath).local_ep = env->getsetArrayEp();
356     CkpvAccess(currentlyExecutingPath).local_arr = env->getArrayMgrIdx();
357     
358     break;
359
360   case ForNodeBocMsg:
361     //CkPrintf("Critical Path Detection handling a ForNodeBocMsg\n");    
362     break;
363
364   case ForChareMsg:
365     //CkPrintf("Critical Path Detection handling a ForChareMsg\n");        
366     break;
367
368   case ForBocMsg:
369     //CkPrintf("Critical Path Detection handling a ForBocMsg\n");    
370     break;
371
372   case ArrayEltInitMsg:
373     // Don't do anything special with these
374     break;
375
376   default:
377     CkPrintf("Critical Path Detection can't yet handle message type %d\n", (int)env->getMsgtype());
378   }
379   
380   
381
382   saveCurrentPathAsUserEvent("criticalPath_start()<br> ");
383
384
385 #endif
386 }
387
388
389 /// Modify the envelope of a message that is being sent for critical path detection and store an entry in a table on this PE.
390 void criticalPath_send(envelope * sendingEnv){
391 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
392 #if DEBUG
393   CkPrintf("criticalPath_send(envelope * sendingEnv)\n");
394 #endif
395   double now = CmiWallTimer();
396   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), CkpvAccess(timeEntryMethodStarted), now);
397   entry.addToTableAndEnvelope(sendingEnv);
398   
399 #endif
400 }
401
402
403 /// Handle the end of the entry method in the critical path detection processes. This should create a forward dependency for the object.
404 void criticalPath_end(){
405 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
406   saveCurrentPathAsUserEvent("criticalPath_end()<br> ");
407
408   CkpvAccess(currentlyExecutingPath).reset();
409
410 #if DEBUG
411   CkPrintf("criticalPath_end()\n");
412 #endif
413
414 #endif
415 }
416
417
418
419 /// Split an entry method invocation into multiple logical tasks for the critical path analysis.
420 /// SDAG doen's break up the code in useful ways, so I'll make it add calls to this in the generated code.
421 void criticalPath_split(){
422 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
423   saveCurrentPathAsUserEvent("criticalPath_split()<br> ");
424
425   // save an entry in the table
426   double now = CmiWallTimer();
427   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), now-CkpvAccess(timeEntryMethodStarted) );
428   int tableidx = entry.addToTable();
429
430   // end the old task
431
432   
433   // start the new task
434   CkpvAccess(currentlyExecutingPath).sender_pe = CkMyPe();
435   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = tableidx;
436   CkpvAccess(currentlyExecutingPath).preceding_path_time = entry.getTotalTime();
437   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
438   CkpvAccess(timeEntryMethodStarted) = now;
439 #endif
440 }
441
442
443 #include "PathHistory.def.h"
444
445 /*! @} */
446