Merge branch 'charm' of charmgit:charm into harshitha/adaptive_lb
[charm.git] / src / libs / ck-libs / cache / CkCache.h
1 #ifndef __CACHEMANAGER_H__
2 #define __CACHEMANAGER_H__
3
4 #include <sys/types.h>
5 #include <vector>
6 #include <map>
7 #include <set>
8 #include "charm++.h"
9 #include "envelope.h"
10
11 #if COSMO_STATS > 0
12 #include <fstream>
13 #endif
14
15 /** NodeCacheEntry represents the entry for a remote 
16 node that is requested by the chares 
17 on a processor.
18 It stores the index of the remote chare from 
19 which node is to be requested and the local
20 chares that request it.***/
21
22 // template now
23 //typedef CmiUInt8 CkCacheKey;
24
25 typedef struct _CkCacheUserData {
26   CmiUInt8 d0;
27   CmiUInt8 d1;
28 } CkCacheUserData;
29
30
31 template<class CkCacheKey> class CkCacheEntryType;
32 template<class CkCacheKey> class CkCacheRequestorData;
33 template<class CkCacheKey> class CkCacheEntry;
34
35 #include "CkCache.decl.h"
36
37 class CkCacheStatistics {
38   CmiUInt8 dataArrived;
39   CmiUInt8 dataTotalArrived;
40   CmiUInt8 dataMisses;
41   CmiUInt8 dataLocal;
42   CmiUInt8 dataError;
43   CmiUInt8 totalDataRequested;
44   CmiUInt8 maxData;
45   int index;
46
47   CkCacheStatistics() : dataArrived(0), dataTotalArrived(0),
48     dataMisses(0), dataLocal(0), dataError(0),
49     totalDataRequested(0), maxData(0), index(-1) { }
50
51  public:
52   CkCacheStatistics(CmiUInt8 pa, CmiUInt8 pta, CmiUInt8 pm,
53           CmiUInt8 pl, CmiUInt8 pe, CmiUInt8 tpr,
54           CmiUInt8 mp, int i) :
55     dataArrived(pa), dataTotalArrived(pta), dataMisses(pm),
56     dataLocal(pl), dataError(pe), totalDataRequested(tpr),
57     maxData(mp), index(i) { }
58
59   void printTo(CkOStream &os) {
60     os << "  Cache: " << dataTotalArrived << " data arrived (corresponding to ";
61     os << dataArrived << " messages), " << dataLocal << " from local Chares" << endl;
62     if (dataError > 0) {
63       os << "Cache: ======>>>> ERROR: " << dataError << " data messages arrived without being requested!! <<<<======" << endl;
64     }
65     os << "  Cache: " << dataMisses << " misses during computation" << endl;
66     os << "  Cache: Maximum of " << maxData << " data stored at a time in processor " << index << endl;
67     os << "  Cache: local Chares made " << totalDataRequested << " requests" << endl;
68   }
69
70   static CkReduction::reducerType sum;
71
72   static CkReductionMsg *sumFn(int nMsg, CkReductionMsg **msgs) {
73     CkCacheStatistics ret;
74     ret.maxData = 0;
75     for (int i=0; i<nMsg; ++i) {
76       CkAssert(msgs[i]->getSize() == sizeof(CkCacheStatistics));
77       CkCacheStatistics *data = (CkCacheStatistics *)msgs[i]->getData();
78       ret.dataArrived += data->dataArrived;
79       ret.dataTotalArrived += data->dataTotalArrived;
80       ret.dataMisses += data->dataMisses;
81       ret.dataLocal += data->dataLocal;
82       ret.totalDataRequested += data->totalDataRequested;
83       if (data->maxData > ret.maxData) {
84         ret.maxData = data->maxData;
85         ret.index = data->index;
86       }
87     }
88     return CkReductionMsg::buildNew(sizeof(CkCacheStatistics), &ret);
89   }
90 };
91
92 template<class CkCacheKey> 
93 class CkCacheRequestMsg : public CMessage_CkCacheRequestMsg<CkCacheKey> {
94  public:
95   CkCacheKey key;
96   int replyTo;
97   CkCacheRequestMsg(CkCacheKey k, int reply) : key(k), replyTo(reply) { }
98 };
99
100 template<class CkCacheKey>
101 class CkCacheFillMsg : public CMessage_CkCacheFillMsg<CkCacheKey> {
102 public:
103   CkCacheKey key;
104   char *data;
105   CkCacheFillMsg (CkCacheKey k) : key(k) {}
106 };
107
108
109 template<class CkCacheKey>
110 class CkCacheRequestorData {
111 public:
112   CkCacheUserData userData;
113   typedef void (*CkCacheCallback)(CkArrayID, CkArrayIndex&, CkCacheKey, CkCacheUserData &, void*, int);
114   CkCacheCallback fn;
115   CkArrayID requestorID;
116   CkArrayIndex requestorIdx;
117
118   CkCacheRequestorData(CProxyElement_ArrayElement &el, CkCacheCallback f, CkCacheUserData &data) {
119     userData = data;
120     requestorID = el.ckGetArrayID();
121     requestorIdx = el.ckGetIndex();
122     fn = f;
123   }
124   
125   void deliver(CkCacheKey key, void *data, int chunk) {
126     fn(requestorID, requestorIdx, key, userData, data, chunk);
127   }
128 };
129
130 template<class CkCacheKey>
131 class CkCacheEntryType {
132 public:
133   virtual void * request(CkArrayIndex&, CkCacheKey) = 0;
134   virtual void * unpack(CkCacheFillMsg<CkCacheKey> *, int, CkArrayIndex &) = 0;
135   virtual void writeback(CkArrayIndex&, CkCacheKey, void *) = 0;
136   virtual void free(void *) = 0;
137   virtual int size(void *) = 0;
138 };
139
140 template<class CkCacheKey>
141 class CkCacheEntry {
142 public:
143   CkCacheKey key;
144   CkArrayIndex home;
145   CkCacheEntryType<CkCacheKey> *type;
146   std::vector< CkCacheRequestorData<CkCacheKey> > requestorVec;
147
148   void *data;
149   
150   bool requestSent;
151   bool replyRecvd;
152   bool writtenBack;
153 #if COSMO_STATS > 1
154   /// total number of requests to this cache entry
155   int totalRequests;
156   /// total number of requests that missed this entry, if the request is
157   /// to another TreePiece in the local processor we never miss
158   int misses;
159 #endif
160   CkCacheEntry(CkCacheKey key, CkArrayIndex &home, CkCacheEntryType<CkCacheKey> *type) {
161     replyRecvd = false;
162     requestSent = false;
163     writtenBack = false;
164     data = NULL;
165     this->key = key;
166     this->home = home;
167     this->type = type;
168     #if COSMO_STATS > 1
169     totalRequests=0;
170     misses=0;
171 #endif
172   }
173
174   ~CkCacheEntry() {
175     CkAssert(requestorVec.empty());
176     if (!writtenBack) writeback();
177     type->free(data);
178   }
179
180   inline void writeback() {
181     type->writeback(home, key, data);
182     writtenBack = true;
183   }
184 };
185
186 class CkCacheArrayCounter : public CkLocIterator {
187 public:
188   int count;
189   CkHashtableT<CkArrayIndex, int> registered;
190   CkCacheArrayCounter() : count(0) { }
191   void addLocation(CkLocation &loc) {
192     registered.put(loc.getIndex()) = ++count;
193   }
194   void reset() {
195     count = 0;
196     registered.empty();
197   }
198 };
199
200 template<class CkCacheKey>
201 class CkCacheManager : public CBase_CkCacheManager<CkCacheKey> {
202
203   /***********************************************************************
204    * Variables definitions
205    ***********************************************************************/
206   
207   /// Number of chunks in which the cache is splitted
208   int numChunks;
209   /// Number of chunks that have already been completely acknowledged
210   int finishedChunks;
211   
212   /// A list of all the elements that are present in the local processor
213   /// for the current iteration
214   CkCacheArrayCounter localChares;
215   /// A list of all the elements that are present in the local processor
216   /// for the current iteration with respect to writeback
217   CkCacheArrayCounter localCharesWB;
218   /// number of chares that have checked in for the next iteration
219   int syncdChares;
220
221   /// The number of arrays this Manager serves without support for writeback
222   int numLocMgr;
223   /// The group ids of the location managers of the arrays this Manager serves
224   /// with support for writeback
225   CkGroupID *locMgr;
226   /// The number of arrays this Manager serves with support for writeback
227   int numLocMgrWB;
228   /// The group ids of the location managers of the arrays this Manager serves
229   /// with support for writeback
230   CkGroupID *locMgrWB;
231
232 #if COSMO_STATS > 0
233   /// particles arrived from remote processors, this counts only the entries in the cache
234   CmiUInt8 dataArrived;
235   /// particles arrived from remote processors, this counts the real
236   /// number of particles arrived
237   CmiUInt8 dataTotalArrived;
238   /// particles missed while walking the tree for computation
239   CmiUInt8 dataMisses;
240   /// particles that have been imported from local TreePieces
241   CmiUInt8 dataLocal;
242   /// particles arrived which were never requested, basically errors
243   CmiUInt8 dataError;
244   /** counts the total number of particles requested by all
245     the chares on the processor***/
246   CmiUInt8 totalDataRequested;
247   /// maximum number of nodes stored at some point in the cache
248   CmiUInt8 maxData;
249 #endif
250
251   /// weights of the chunks in which the tree is divided, the cache will
252   /// update the chunk division based on these values
253   CmiUInt8 *chunkWeight;
254
255   /// Maximum number of allowed data stored
256   CmiUInt8 maxSize;
257   
258   /// number of acknowledgements awaited before deleting the chunk
259   int *chunkAck;
260   /// number of acknowledgements awaited before writing back the chunk
261   int *chunkAckWB;
262
263   /// hash table containing all the entries currently in the cache
264   std::map<CkCacheKey,CkCacheEntry<CkCacheKey>*> *cacheTable;
265   int storedData;
266
267   /// list of all the outstanding requests. The second field is the chunk for
268   /// which this request is outstanding
269   std::map<CkCacheKey,int> outStandingRequests;
270     
271   /***********************************************************************
272    * Methods definitions
273    ***********************************************************************/
274
275  public:
276   
277   CkCacheManager(int size, CkGroupID gid);
278   CkCacheManager(int size, int n, CkGroupID *gid);
279   CkCacheManager(int size, int n, CkGroupID *gid, int nWB, CkGroupID *gidWB);
280   CkCacheManager(CkMigrateMessage *m): CBase_CkCacheManager<CkCacheKey>(m) { init(); }
281   ~CkCacheManager() {}
282   void pup(PUP::er &p);
283  private:
284   void init();
285  public:
286
287   void * requestData(CkCacheKey what, CkArrayIndex &toWhom, int chunk, CkCacheEntryType<CkCacheKey> *type, CkCacheRequestorData<CkCacheKey> &req);
288   void * requestDataNoFetch(CkCacheKey key, int chunk);
289   CkCacheEntry<CkCacheKey> * requestCacheEntryNoFetch(CkCacheKey key, int chunk);
290   void recvData(CkCacheFillMsg<CkCacheKey> *msg);
291   void recvData(CkCacheKey key, CkArrayIndex &from, CkCacheEntryType<CkCacheKey> *type, int chunk, void *data);
292
293   void cacheSync(int &numChunks, CkArrayIndex &chareIdx, int &localIdx);
294
295   /** Called from the TreePieces to acknowledge that a particular chunk
296       can be written back to the original senders */
297   void writebackChunk(int num);
298   /** Called from the TreePieces to acknowledge that a particular chunk
299       has been completely used, and can be deleted */
300   void finishedChunk(int num, CmiUInt8 weight);
301   /** Called from the TreePieces to acknowledge that they have completely
302       finished their computation */
303
304   /** Collect the statistics for the latest iteration */
305   void collectStatistics(CkCallback& cb);
306   std::map<CkCacheKey,CkCacheEntry<CkCacheKey>*> *getCache();
307
308 };
309
310   // from CkCache.C
311
312   template<class CkCacheKey>
313   CkCacheManager<CkCacheKey>::CkCacheManager(int size, CkGroupID gid) {
314     init();
315     numLocMgr = 1;
316     numLocMgrWB = 0;
317     locMgr = new CkGroupID[1];
318     locMgr[0] = gid;
319     maxSize = (CmiUInt8)size * 1024 * 1024;
320   }
321
322   template<class CkCacheKey>
323   CkCacheManager<CkCacheKey>::CkCacheManager(int size, int n, CkGroupID *gid) {
324     init();
325     numLocMgr = n;
326     numLocMgrWB = 0;
327     locMgr = new CkGroupID[n];
328     for (int i=0; i<n; ++i) locMgr[i] = gid[i];
329     maxSize = (CmiUInt8)size * 1024 * 1024;
330   }
331
332   template<class CkCacheKey>
333   CkCacheManager<CkCacheKey>::CkCacheManager(int size, int n, CkGroupID *gid, int nWB, CkGroupID *gidWB) {
334     init();
335     numLocMgr = n;
336     locMgr = new CkGroupID[n];
337     for (int i=0; i<n; ++i) locMgr[i] = gid[i];
338     numLocMgrWB = nWB;
339     locMgrWB = new CkGroupID[nWB];
340     for (int i=0; i<n; ++i) locMgrWB[i] = gidWB[i];
341     maxSize = (CmiUInt8)size * 1024 * 1024;
342   }
343
344   template<class CkCacheKey>
345   void CkCacheManager<CkCacheKey>::init() {
346     numChunks = 0;
347     numLocMgr = 0;
348     locMgr = NULL;
349     maxSize = 0;
350     syncdChares = 0;
351     cacheTable = NULL;
352     chunkAck = NULL;
353     chunkWeight = NULL;
354     storedData = 0;
355 #if COSMO_STATS > 0
356     dataArrived = 0;
357     dataTotalArrived = 0;
358     dataMisses = 0;
359     dataLocal = 0;
360     totalDataRequested = 0;
361 #endif
362   }
363
364   template<class CkCacheKey>
365   void CkCacheManager<CkCacheKey>::pup(PUP::er &p) {
366     CBase_CkCacheManager<CkCacheKey>::pup(p);
367     p | numLocMgr;
368     if (p.isUnpacking()) locMgr = new CkGroupID[numLocMgr];
369     PUParray(p,locMgr,numLocMgr);
370     p | numLocMgrWB;
371     if (p.isUnpacking()) locMgrWB = new CkGroupID[numLocMgrWB];
372     PUParray(p,locMgrWB,numLocMgrWB);
373     p | maxSize;
374   }
375
376   template<class CkCacheKey>
377   void * CkCacheManager<CkCacheKey>::requestData(CkCacheKey what, CkArrayIndex &_toWhom, int chunk, CkCacheEntryType<CkCacheKey> *type, CkCacheRequestorData<CkCacheKey> &req)
378   {
379     typename std::map<CkCacheKey, CkCacheEntry<CkCacheKey>* >::iterator  p;
380     CkArrayIndex toWhom(_toWhom);
381     CkAssert(chunkAck[chunk] > 0);
382     p = cacheTable[chunk].find(what);
383     CkCacheEntry<CkCacheKey> *e;
384 #if COSMO_STATS > 0
385     totalDataRequested++;
386 #endif
387     if (p != cacheTable[chunk].end()) {
388       e = p->second;
389       CkAssert(e->home == toWhom);
390       //CkAssert(e->begin == begin);
391       //CkAssert(e->end == end);
392 #if COSMO_STATS > 1
393       e->totalRequests++;
394 #endif
395       if (e->data != NULL) {
396         return e->data;
397       }
398       if (!e->requestSent) {// || _nocache) {
399         e->requestSent = true;
400         if ((e->data = type->request(toWhom, what)) != NULL) {
401           e->replyRecvd = true;
402           return e->data;
403         }
404       }
405     } else {
406       e = new CkCacheEntry<CkCacheKey>(what, toWhom, type);
407 #if COSMO_STATS > 1
408       e->totalRequests++;
409 #endif
410       cacheTable[chunk][what] = e;
411       e->requestSent = true;
412       if ((e->data = type->request(toWhom, what)) != NULL) {
413         e->replyRecvd = true;
414         return e->data;
415       }
416     }
417
418     e->requestorVec.push_back(req);
419     outStandingRequests[what] = chunk;
420 #if COSMO_STATS > 1
421     e->misses++;
422 #endif
423     return NULL;
424   }
425
426   template<class CkCacheKey>
427   void * CkCacheManager<CkCacheKey>::requestDataNoFetch(CkCacheKey key, int chunk) {
428     typename std::map<CkCacheKey,CkCacheEntry<CkCacheKey> *>::iterator p = cacheTable[chunk].find(key);
429     if (p != cacheTable[chunk].end()) {
430       return p->second->data;
431     }
432     return NULL;
433   }
434   
435   template<class CkCacheKey>
436   CkCacheEntry<CkCacheKey> * CkCacheManager<CkCacheKey>::requestCacheEntryNoFetch(CkCacheKey key, int chunk) {
437     typename std::map<CkCacheKey,CkCacheEntry<CkCacheKey> *>::iterator p = cacheTable[chunk].find(key);
438     if (p != cacheTable[chunk].end()) {
439       return p->second;
440     }
441     return NULL;
442   }
443   
444   template<class CkCacheKey>
445   std::map<CkCacheKey,CkCacheEntry<CkCacheKey>*> *CkCacheManager<CkCacheKey>::getCache(){
446     return cacheTable;
447   }
448
449   template<class CkCacheKey>
450   void CkCacheManager<CkCacheKey>::recvData(CkCacheFillMsg<CkCacheKey> *msg) {
451     CkCacheKey key = msg->key;
452     typename std::map<CkCacheKey,int>::iterator pchunk = outStandingRequests.find(key);
453     CkAssert(pchunk != outStandingRequests.end());
454     int chunk = pchunk->second;
455     CkAssert(chunk >= 0 && chunk < numChunks);
456     CkAssert(chunkAck[chunk] > 0);
457     outStandingRequests.erase(pchunk);
458     
459     typename std::map<CkCacheKey,CkCacheEntry<CkCacheKey>*>::iterator p;
460     p = cacheTable[chunk].find(key);
461     CkAssert(p != cacheTable[chunk].end());
462     CkCacheEntry<CkCacheKey> *e = p->second;
463     e->data = e->type->unpack(msg, chunk, e->home);
464     storedData += e->type->size(e->data);
465     
466     typename std::vector<CkCacheRequestorData<CkCacheKey> >::iterator caller;
467     for (caller = e->requestorVec.begin(); caller != e->requestorVec.end(); caller++) {
468       caller->deliver(key, e->data, chunk);
469     }
470     e->requestorVec.clear();
471   }
472   
473   template<class CkCacheKey>
474   void CkCacheManager<CkCacheKey>::recvData(CkCacheKey key, CkArrayIndex &from, CkCacheEntryType<CkCacheKey> *type, int chunk, void *data) {
475     typename std::map<CkCacheKey,CkCacheEntry<CkCacheKey>*>::iterator p = cacheTable[chunk].find(key);
476     CkCacheEntry<CkCacheKey> *e;
477     if (p == cacheTable[chunk].end()) {
478       e = new CkCacheEntry<CkCacheKey>(key, from, type);
479       cacheTable[chunk][key] = e;
480     } else {
481       e = p->second;
482       storedData -= e->type->size(e->data);
483       e->type->writeback(e->home, e->key, e->data);
484     }
485     e->replyRecvd = true;
486     e->data = data;
487     storedData += e->type->size(data);
488     
489     typename std::vector<CkCacheRequestorData<CkCacheKey> >::iterator caller;
490     for (caller = e->requestorVec.begin(); caller != e->requestorVec.end(); caller++) {
491       caller->deliver(key, e->data, chunk);
492     }
493     e->requestorVec.clear();
494   }
495
496   template<class CkCacheKey>
497   void CkCacheManager<CkCacheKey>::cacheSync(int &_numChunks, CkArrayIndex &chareIdx, int &localIdx) {
498     finishedChunks = 0;
499     if (syncdChares > 0) {
500       _numChunks = numChunks;
501       //CkPrintf("Cache %d: sync following\n",thisgroup.idx);
502     } else {
503       syncdChares = 1;
504       //CkPrintf("Cache %d: sync\n",thisgroup.idx);
505
506       localChares.reset();
507       localCharesWB.reset();
508       for (int i=0; i<numLocMgr; ++i) {
509         CkLocMgr *mgr = (CkLocMgr *)CkLocalBranch(locMgr[i]);
510         mgr->iterate(localChares);
511       }
512       for (int i=0; i<numLocMgrWB; ++i) {
513         CkLocMgr *mgr = (CkLocMgr *)CkLocalBranch(locMgrWB[i]);
514         mgr->iterate(localChares);
515         mgr->iterate(localCharesWB);
516       }
517
518 #if COSMO_STATS > 0
519       dataArrived = 0;
520       dataTotalArrived = 0;
521       dataMisses = 0;
522       dataLocal = 0;
523       totalDataRequested = 0;
524       maxData = 0;
525 #endif
526
527       for (int chunk=0; chunk<numChunks; ++chunk) {
528         CkAssert(cacheTable[chunk].empty());
529         CkAssert(chunkAck[chunk]==0);
530         CkAssert(chunkAckWB[chunk]==0);
531       }
532       CkAssert(outStandingRequests.empty());
533       storedData = 0;
534
535       if (numChunks != _numChunks) {
536         if(numChunks != 0) {
537           delete []cacheTable;
538           delete []chunkAck;
539           delete []chunkAckWB;
540           delete []chunkWeight;
541         }
542           
543         numChunks = _numChunks;
544         cacheTable = new std::map<CkCacheKey,CkCacheEntry<CkCacheKey>*>[numChunks];
545         chunkAck = new int[numChunks];
546         chunkAckWB = new int[numChunks];
547         chunkWeight = new CmiUInt8[numChunks];
548       }
549       for (int i=0; i<numChunks; ++i) {
550         chunkAck[i] = localChares.count;
551         chunkAckWB[i] = localCharesWB.count;
552         chunkWeight[i] = 0;
553       }
554       
555 #if COSMO_STATS > 0
556       CmiResetMaxMemory();
557 #endif
558     }
559
560     localIdx = localChares.registered.get(chareIdx);
561     CkAssert(localIdx != 0);
562   }
563
564   template<class CkCacheKey>
565   void CkCacheManager<CkCacheKey>::writebackChunk(int chunk) {
566     CkAssert(chunkAckWB[chunk] > 0);
567     if (--chunkAckWB[chunk] == 0) {
568       // we can safely write back the chunk to the senders
569       // at this point no more changes to the data can be made until next fetch
570
571       typename std::map<CkCacheKey,CkCacheEntry<CkCacheKey>*>::iterator iter;
572       for (iter = cacheTable[chunk].begin(); iter != cacheTable[chunk].end(); iter++) {
573         CkCacheEntry<CkCacheKey> *e = iter->second;
574         e->writeback();
575       }
576
577     }
578   }
579
580   template<class CkCacheKey>
581   void CkCacheManager<CkCacheKey>::finishedChunk(int chunk, CmiUInt8 weight) {
582     CkAssert(chunkAck[chunk] > 0);
583     chunkWeight[chunk] += weight;
584     //CkPrintf("Cache %d: finishedChunk %d\n",thisgroup.idx,chunkAck[chunk]);
585     if (--chunkAck[chunk] == 0) {
586       // we can safely delete the chunk from the cache
587       
588       // TODO: if chunks are held back due to restrictions, here is a
589       // good position to release them
590
591 #if COSMO_STATS > 0
592       if (maxData < storedData) maxData = storedData;
593 #endif
594
595       typename std::map<CkCacheKey,CkCacheEntry<CkCacheKey>*>::iterator iter;
596       for (iter = cacheTable[chunk].begin(); iter != cacheTable[chunk].end(); iter++) {
597         CkCacheEntry<CkCacheKey> *e = iter->second;
598         storedData -= e->type->size(e->data);
599         
600         // TODO: Store communication pattern here
601
602         delete e;
603       }
604       cacheTable[chunk].clear();
605       if (++finishedChunks == numChunks) {
606         finishedChunks = 0;
607         syncdChares = 0;
608       }
609     }
610   }
611   
612   template<class CkCacheKey>
613   void CkCacheManager<CkCacheKey>::collectStatistics(CkCallback &cb) {
614 #if COSMO_STATS > 0
615     CkCacheStatistics cs(dataArrived, dataTotalArrived,
616         dataMisses, dataLocal, dataError, totalDataRequested,
617         maxData, CkMyPe());
618     contribute(sizeof(CkCacheStatistics), &cs, CkCacheStatistics::sum, cb);
619 #else
620     CkAbort("Invalid call, only valid if COSMO_STATS is defined");
621 #endif
622   }
623
624 #define CK_TEMPLATES_ONLY
625 #include "CkCache.def.h"
626 #undef CK_TEMPLATES_ONLY
627
628 #endif