Adding fortran interfaces to control point framework
[charm.git] / src / ck-cp / pathHistory.C
1 #include <charm++.h>
2 #include <iostream>
3 #include <fstream>
4 #include <string>
5 #include <map>
6 #include <set>
7 #include <vector>
8 #include <utility>
9
10 #include "PathHistory.decl.h"
11 #include "LBDatabase.h"
12 #include "pathHistory.h"
13 //#include "controlPoints.h"
14 //#include "arrayRedistributor.h"
15 #include <register.h> // for _entryTable
16
17 #include "trace-projections.h"
18
19
20 /**
21  *  \addtogroup CriticalPathFramework
22  *   @{
23  *
24  */
25
26
27
28 /*readonly*/ CProxy_pathHistoryManager pathHistoryManagerProxy;
29
30 CkpvDeclare(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
31 CkpvDeclare(double, timeEntryMethodStarted);
32
33 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
34 /** A table to store all the local nodes in the parallel dependency graph */
35 CkpvDeclare(PathHistoryTableType, pathHistoryTable);
36 /** A counter that defines the new keys for the entries in the pathHistoryTable */
37 CkpvDeclare(int, pathHistoryTableLastIdx);
38 #endif
39
40
41 /// A mainchare that is used just to create a group at startup
42 class pathHistoryMain : public CBase_pathHistoryMain {
43 public:
44   pathHistoryMain(CkArgMsg* args){
45 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
46     pathHistoryManagerProxy = CProxy_pathHistoryManager::ckNew();
47 #endif
48   }
49   ~pathHistoryMain(){}
50 };
51
52
53 pathHistoryManager::pathHistoryManager(){
54
55 }
56
57
58   /** Trace perform a traversal backwards over the critical path specified as a 
59       table index for the processor upon which this is called.
60       
61       The callback cb will be called with the resulting msg after the path has 
62       been traversed to its origin.  
63   */
64  void pathHistoryManager::traceCriticalPathBackStepByStep(pathInformationMsg *msg){
65 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
66    int count = CkpvAccess(pathHistoryTable).count(msg->table_idx);
67
68 #if DEBUGPRINT > 2
69    CkPrintf("Table entry %d on pe %d occurs %d times in table\n", msg->table_idx, CkMyPe(), count);
70 #endif
71    CkAssert(count==0 || count==1);
72
73     if(count > 0){ 
74       PathHistoryTableEntry & path = CkpvAccess(pathHistoryTable)[msg->table_idx];
75       int idx = path.sender_history_table_idx;
76       int pe = path.sender_pe;
77
78 #if DEBUGPRINT > 2
79       CkPrintf("Table entry %d on pe %d points to pe=%d idx=%d\n", msg->table_idx, CkMyPe(), pe, idx);
80 #endif
81
82       // Make a copy of the message as we forward it along
83       pathInformationMsg *newmsg = new(msg->historySize+1) pathInformationMsg;
84       for(int i=0;i<msg->historySize;i++){
85         newmsg->history[i] = msg->history[i];
86       }
87       newmsg->history[msg->historySize] = path;
88       newmsg->historySize = msg->historySize+1;
89       newmsg->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
90       newmsg->cb = msg->cb;
91       newmsg->table_idx = idx;
92         
93       if(idx > -1 && pe > -1){
94         // Not yet at origin, keep tracing the path back
95         CkAssert(pe < CkNumPes() && pe >= 0);
96         thisProxy[pe].traceCriticalPathBackStepByStep(newmsg);
97       } else {
98         CkPrintf("Traced critical path back to its origin.\n");
99         if(msg->saveAsProjectionsUserEvents){
100
101           // Keep a message for returning to the user's callback
102           pathForUser = new(msg->historySize+1) pathInformationMsg;
103           for(int i=0;i<msg->historySize;i++){
104             pathForUser->history[i] = msg->history[i];
105           }
106           pathForUser->history[msg->historySize] = path;
107           pathForUser->historySize = msg->historySize+1;
108           pathForUser->saveAsProjectionsUserEvents = msg->saveAsProjectionsUserEvents;
109           pathForUser->cb = msg->cb;
110           pathForUser->table_idx = idx;
111           
112           CkPrintf("Broadcasting it to all PE\n");
113           thisProxy.broadcastCriticalPathProjections(newmsg);
114         } else {
115           newmsg->cb.send(newmsg);
116         }
117
118       }
119     } else {
120       CkAbort("ERROR: Traced critical path back to a nonexistent table entry.\n");
121     }
122
123     delete msg;
124 #else
125     CkAbort("Shouldn't call pathHistoryManager::traceCriticalPathBack when critical path detection is not enabled");
126 #endif
127   }
128
129
130 void pathHistoryManager::broadcastCriticalPathProjections(pathInformationMsg *msg){
131
132 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
133   CkPrintf("[%d] Received broadcast of critical path\n", CkMyPe());
134   int me = CkMyPe();
135   int intersectsLocalPE = false;
136
137   // Create user events for critical path
138
139   for(int i=msg->historySize-1;i>=0;i--){
140     if(CkMyPe() == msg->history[i].local_pe){
141       // Part of critical path is local
142       // Create user event for it
143
144       //      CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, msg->history[i].get_local_path_time(), msg-> history[i].local_arr, msg->history[i].local_ep, msg->history[i].get_start_time(), msg->history[i].get_preceding_path_time(), msg->history[i].local_pe);
145       traceUserBracketEvent(32000, msg->history[i].get_start_time(), msg->history[i].get_start_time() + msg->history[i].get_local_path_time());
146
147       intersectsLocalPE = true;
148     }
149
150   }
151
152   traceRegisterUserEvent("Critical Path", 32000);
153   
154   
155 #define PRUNE_CRITICAL_PATH_LOGS 0
156
157 #if PRUNE_CRITICAL_PATH_LOGS
158   // Tell projections tracing to only output log entries if I contain part of the critical path
159
160   enableTraceLogOutput();
161   if(! intersectsLocalPE){
162     disableTraceLogOutput();
163     CkPrintf("PE %d doesn't intersect the critical path, so its log files won't be created\n", CkMyPe() );
164   }
165 #endif
166   
167 #if TRACE_ALL_PATH_TABLE_ENTRIES
168   // Create user events for all table entries
169   std::map< int, PathHistoryTableEntry >::iterator iter;
170   for(iter=pathHistoryTable.begin(); iter != pathHistoryTable.end(); iter++){
171     double startTime = iter->second.get_start_time();
172     double endTime = iter->second.get_start_time() + iter->second.get_local_path_time();
173     traceUserBracketEvent(32001, startTime, endTime);
174   }
175 #endif
176
177   int data=1;
178   CkCallback cb(CkIndex_pathHistoryManager::criticalPathProjectionsDone(NULL),thisProxy[0]); 
179   contribute(sizeof(int), &data, CkReduction::sum_int, cb);
180
181 #endif
182
183 }
184
185 void pathHistoryManager::criticalPathProjectionsDone(CkReductionMsg *msg){
186 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
187   CkPrintf("[%d] All PEs have received the critical path information. Sending critical path to user supplied callback.\n", CkMyPe());
188   pathForUser->cb.send(pathForUser);
189   pathForUser = NULL;
190 #endif
191 }
192
193
194
195
196 /// An interface callable by the application.
197 void useThisCriticalPathForPriorities(){
198 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
199   pathHistoryManagerProxy.ckLocalBranch()->useCriticalPathForPriories();
200 #endif
201 }
202
203
204 /// Callable from inside charm++ delivery mechanisms (after envelope contains epIdx):
205 void automaticallySetMessagePriority(envelope *env){
206   #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
207
208 #if DEBUG
209   if(env->getPriobits() == 8*sizeof(int)){
210     CkPrintf("[%d] priorities for env=%p are integers\n", CkMyPe(), env);
211   } else if(env->getPriobits() == 0) {
212     CkPrintf("[%d] priorities for env=%p are not allocated in message\n", CkMyPe(), env);
213   } else {
214     CkPrintf("[%d] priorities for env=%p are not integers: %d priobits\n", CkMyPe(), env, env->getPriobits());
215   }
216 #endif
217   
218   
219   const std::map< std::pair<int,int>, int> & criticalPathForPriorityCounts = pathHistoryManagerProxy.ckLocalBranch()->getCriticalPathForPriorityCounts();
220   
221   if(criticalPathForPriorityCounts.size() > 0 && env->getPriobits() == 8*sizeof(int)) {
222     
223     switch(env->getMsgtype()) {
224     case ForArrayEltMsg:
225     case ForChareMsg:
226       {        
227         const int ep = env->getsetArrayEp();
228         const int arr = env->getArrayMgrIdx();
229         
230         const std::pair<int,int> k = std::make_pair(arr, ep);
231         const int count = criticalPathForPriorityCounts.count(k);
232         
233 #if DEBUG
234         CkPrintf("[%d] destination array,ep occurs %d times along stored critical path\n", CkMyPe(), count);
235 #endif
236         
237         if(count > 0){
238           // Set the integer priority to high
239 #if DEBUG
240           CkPrintf("Prio auto high\n");
241 #endif
242           *(int*)(env->getPrioPtr()) = -5;
243         } else {
244           // Set the integer priority to low
245 #if DEBUG
246           CkPrintf("Prio auto low: %d,%d\n", arr, ep);
247 #endif
248           *(int*)(env->getPrioPtr()) = 5;
249         }
250         
251       }
252       break;
253       
254     case ForNodeBocMsg:
255       CkPrintf("Can't Critical Path Autoprioritize a ForNodeBocMsg\n");    
256       break;
257       
258     case ForBocMsg:
259       CkPrintf("Can't Critical Path Autoprioritize a ForBocMsg\n");
260       break;
261       
262     case ArrayEltInitMsg:
263       // Don't do anything special with these
264       CkPrintf("Can't Critical Path Autoprioritize a ArrayEltInitMsg\n");
265       break;
266       
267     default:
268       CkPrintf("Can't Critical Path Autoprioritize messages of [unknown type]\n");
269       break;
270     }
271       
272   }
273
274 #endif
275 }
276
277
278
279 void pathHistoryManager::useCriticalPathForPriories(){
280 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
281
282   // Request a critical path that will be stored everywhere for future use in autotuning message priorities
283   
284   // The resulting critical path should be broadcast to saveCriticalPathForPriorities() on all PEs
285   CkCallback cb(CkIndex_pathHistoryManager::saveCriticalPathForPriorities(NULL),thisProxy); 
286   traceCriticalPathBack(cb, false);
287 #endif  
288 }
289
290
291
292 void pathHistoryManager::saveCriticalPathForPriorities(pathInformationMsg *msg){
293 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
294
295   CkPrintf("[%d] saveCriticalPathForPriorities() Receiving critical paths\n", CkMyPe());
296   fflush(stdout);
297   
298   criticalPathForPriorityCounts.clear();
299   
300   // Save a list of which entries are along the critical path
301   for(int i=msg->historySize-1;i>=0;i--){
302     
303     PathHistoryTableEntry &e = msg->history[i];
304     
305 #if DEBUG
306     if(CkMyPe() == 0){
307       CkPrintf("\t[%d] Path Step %d: local_path_time=%lf arr=%d ep=%d starttime=%lf preceding path time=%lf pe=%d\n",CkMyPe(), i, e.get_local_path_time(), msg-> history[i].local_arr, e.local_ep, e.get_start_time(), e.get_preceding_path_time(), e.local_pe);
308     }
309 #endif
310     
311     const std::pair<int,int> k = std::make_pair(e.local_arr, e.local_ep);
312     if(criticalPathForPriorityCounts.count(k) == 1)
313       criticalPathForPriorityCounts[k]++;
314     else
315       criticalPathForPriorityCounts[k] = 1;  
316   }
317   
318
319
320
321   // print out the list just for debugging purposes
322   if(CkMyPe() == 0){
323     std::map< std::pair<int,int>, int>::iterator iter;
324     for(iter=criticalPathForPriorityCounts.begin();iter!=criticalPathForPriorityCounts.end();++iter){
325       const std::pair<int,int> k = iter->first;
326       const int c = iter->second;
327
328       CkPrintf("[%d] On critical path EP %d,%d occurs %d times\n", CkMyPe(), k.first, k.second, c);
329
330     }
331   }
332
333 #endif
334 }
335
336
337
338
339
340
341
342
343
344
345
346 void initializeCriticalPath(void){
347 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
348   CkpvInitialize(MergeablePathHistory, currentlyExecutingPath); // The maximal incoming path for the node
349   CkpvInitialize(double, timeEntryMethodStarted);
350   CkpvAccess(timeEntryMethodStarted) = 0.0;
351
352
353   CkpvInitialize(PathHistoryTableType, pathHistoryTable);
354   CkpvInitialize(int, pathHistoryTableLastIdx);
355   CkpvAccess(pathHistoryTableLastIdx) = 0;
356 #endif
357 }
358
359
360
361
362
363
364
365
366
367
368 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
369 // The PathHistoryEnvelope class is disabled if USE_CRITICAL_PATH_HEADER_ARRAY isn't defined
370 void PathHistoryEnvelope::reset(){
371   totalTime = 0.0;
372   sender_history_table_idx = -1;
373 }
374
375   
376 void PathHistoryEnvelope::print() const {
377   CkPrintf("print() is not implemented\n");
378 }
379
380 /// Write a description of the path into the beginning of the provided buffer. The buffer ought to be large enough.
381
382   
383 void PathHistoryEnvelope::incrementTotalTime(double time){
384   totalTime += time;
385 }
386
387
388
389 void PathHistoryEnvelope::setDebug100(){
390   totalTime = 100.0;   
391 }
392
393
394
395 /// Add an entry for this path history into the table, and write the corresponding information into the outgoing envelope
396 int PathHistoryTableEntry::addToTableAndEnvelope(envelope *env){
397   // Add to table
398   int new_idx = addToTable();
399
400   // Fill in outgoing envelope
401   CkAssert(env != NULL);
402   env->pathHistory.set_sender_history_table_idx(new_idx);
403   env->pathHistory.setTime(local_path_time + preceding_path_time);
404
405 #if 0
406   // Create a user event for projections
407   char *note = new char[4096];
408   sprintf(note, "addToTableAndEnvelope<br> ");
409   env->pathHistory.printHTMLToString(note+strlen(note));
410   traceUserSuppliedNote(note); // stores a copy of the string
411   delete[] note;
412 #endif  
413
414   return new_idx;
415 }
416   
417
418 /// Add an entry for this path history into the table. Returns the index in the table for it.
419 int PathHistoryTableEntry::addToTable(){
420   int new_idx = CkpvAccess(pathHistoryTableLastIdx) ++;
421   CkpvAccess(pathHistoryTable)[new_idx] = *this;
422   return new_idx;
423 }
424 #endif
425
426
427
428   
429 void resetThisEntryPath(void) {
430   CkpvAccess(currentlyExecutingPath).reset();
431 }
432
433
434 /// A debugging routine that outputs critical path info as user events.
435 void  saveCurrentPathAsUserEvent(char* prefix){
436   if(CkpvAccess(currentlyExecutingPath).getTotalTime() > 0.0){
437     //traceUserEvent(5020);
438
439 #if 0
440     char *note = new char[4096];
441     sprintf(note, "%s<br> saveCurrentPathAsUserEvent()<br> ", prefix);
442     CkpvAccess(currentlyExecutingPath).printHTMLToString(note+strlen(note));
443     traceUserSuppliedNote(note); // stores a copy of the string
444     delete[] note;
445 #endif
446
447   } else {
448 #if 0
449     traceUserEvent(5010);
450 #endif
451   }
452  
453 }
454
455
456 void setCurrentlyExecutingPathTo100(void){
457   CkpvAccess(currentlyExecutingPath).setDebug100();
458 }
459
460
461 /// Acquire the critical path and deliver it to the user supplied callback
462 void traceCriticalPathBack(CkCallback cb, bool saveToProjectionsTraces){
463 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
464   pathInformationMsg *newmsg = new(0) pathInformationMsg;
465   newmsg->historySize = 0;
466   newmsg->cb = cb;
467   newmsg->saveAsProjectionsUserEvents = saveToProjectionsTraces;
468   newmsg->table_idx = CkpvAccess(currentlyExecutingPath).sender_history_table_idx;
469   int pe = CkpvAccess(currentlyExecutingPath).sender_pe;
470   CkPrintf("Starting tracing of critical path from pe=%d table_idx=%d\n", pe,  CkpvAccess(currentlyExecutingPath).sender_history_table_idx);
471   CkAssert(pe < CkNumPes() && pe >= 0);
472   pathHistoryManagerProxy[pe].traceCriticalPathBackStepByStep(newmsg);
473 #else
474   pathInformationMsg * pathForUser = new(0) pathInformationMsg;  
475   pathForUser->historySize = 0;                                                                                        
476   pathForUser->cb = CkCallback();                                                                                                      
477   pathForUser->table_idx = -1;      
478   cb.send(pathForUser);  
479 #endif
480 }
481
482
483
484 // void  printPathInMsg(void* msg){
485 //   envelope *env = UsrToEnv(msg);
486 //   env->printPath();
487 // }
488
489
490
491 /// A debugging routine that prints the number of EPs for the program, and the size of the envelope's path fields
492 void  printEPInfo(){
493 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
494   CkPrintf("printEPInfo():\n");
495   CkPrintf("There are %d EPs\n", (int)_entryTable.size());
496   for (int epIdx=0;epIdx<_entryTable.size();epIdx++)
497     CkPrintf("EP %d is %s\n", epIdx, _entryTable[epIdx]->name);
498 #endif
499 }
500
501
502
503
504 /// Save information about the critical path contained in the message that is about to execute.
505 void criticalPath_start(envelope * env){ 
506 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
507 #if DEBUG
508   CkPrintf("criticalPath_start(envelope * env) srcpe=%d sender table idx=%d  time=%lf\n", env->getSrcPe(),  env->pathHistory.get_sender_history_table_idx(), env->pathHistory.getTotalTime() );
509 #endif
510
511   CkpvAccess(currentlyExecutingPath).sender_pe = env->getSrcPe();
512   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = env->pathHistory.get_sender_history_table_idx();
513   CkpvAccess(currentlyExecutingPath).preceding_path_time = env->pathHistory.getTotalTime();
514
515   CkpvAccess(currentlyExecutingPath).sanity_check();
516   
517   CkpvAccess(currentlyExecutingPath).local_ep  = -1;
518   CkpvAccess(currentlyExecutingPath).local_arr = -1;
519
520   double now = CmiWallTimer();
521   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
522   CkpvAccess(timeEntryMethodStarted) = now;
523
524   switch(env->getMsgtype()) {
525   case ForArrayEltMsg:
526     //    CkPrintf("Critical Path Detection handling a ForArrayEltMsg\n");    
527     CkpvAccess(currentlyExecutingPath).local_ep = env->getsetArrayEp();
528     CkpvAccess(currentlyExecutingPath).local_arr = env->getArrayMgrIdx();
529     
530     break;
531
532   case ForNodeBocMsg:
533     //CkPrintf("Critical Path Detection handling a ForNodeBocMsg\n");    
534     break;
535
536   case ForChareMsg:
537     //CkPrintf("Critical Path Detection handling a ForChareMsg\n");        
538     break;
539
540   case ForBocMsg:
541     //CkPrintf("Critical Path Detection handling a ForBocMsg\n");    
542     break;
543
544   case ArrayEltInitMsg:
545     // Don't do anything special with these
546     break;
547
548   default:
549     CkPrintf("Critical Path Detection can't yet handle message type %d\n", (int)env->getMsgtype());
550   }
551   
552   
553
554   saveCurrentPathAsUserEvent("criticalPath_start()<br> ");
555
556
557 #endif
558 }
559
560
561 /// Modify the envelope of a message that is being sent for critical path detection and store an entry in a table on this PE.
562 void criticalPath_send(envelope * sendingEnv){
563 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
564 #if DEBUG
565   CkPrintf("criticalPath_send(envelope * sendingEnv)\n");
566 #endif
567   double now = CmiWallTimer();
568   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), CkpvAccess(timeEntryMethodStarted), now);
569   entry.addToTableAndEnvelope(sendingEnv);
570   
571 #endif
572 }
573
574
575 /// Handle the end of the entry method in the critical path detection processes. This should create a forward dependency for the object.
576 void criticalPath_end(){
577 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
578   saveCurrentPathAsUserEvent("criticalPath_end()<br> ");
579
580   CkpvAccess(currentlyExecutingPath).reset();
581
582 #if DEBUG
583   CkPrintf("criticalPath_end()\n");
584 #endif
585
586 #endif
587 }
588
589
590
591 /// Split an entry method invocation into multiple logical tasks for the critical path analysis.
592 /// SDAG doen's break up the code in useful ways, so I'll make it add calls to this in the generated code.
593 void criticalPath_split(){
594 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
595   saveCurrentPathAsUserEvent("criticalPath_split()<br> ");
596
597   // save an entry in the table
598   double now = CmiWallTimer();
599   PathHistoryTableEntry entry(CkpvAccess(currentlyExecutingPath), now-CkpvAccess(timeEntryMethodStarted) );
600   int tableidx = entry.addToTable();
601
602   // end the old task
603
604   
605   // start the new task
606   CkpvAccess(currentlyExecutingPath).sender_pe = CkMyPe();
607   CkpvAccess(currentlyExecutingPath).sender_history_table_idx = tableidx;
608   CkpvAccess(currentlyExecutingPath).preceding_path_time = entry.getTotalTime();
609   CkpvAccess(currentlyExecutingPath).timeEntryMethodStarted = now;
610   CkpvAccess(timeEntryMethodStarted) = now;
611 #endif
612 }
613
614
615
616
617
618 #include "PathHistory.def.h"
619
620 /*! @} */
621