Fixed to compile and run on origin2000 and origin-pthreads.
[charm.git] / src / ck-core / ckarray.C
1 #include "charm++.h"
2 #include "register.h"
3 #include "ckarray.h"
4 #include "ck.h"
5 #include "CkArray.def.h"
6 #include "init.h"
7
8 #if CMK_LBDB_ON
9 #include "LBDatabase.h"
10 #endif // CMK_LBDB_ON
11
12 CkGroupID _RRMapID;
13
14 void *
15 ArrayMigrateMessage::alloc(int msgnum,int size,int *array,int priobits)
16 {
17   int totalsize;
18   totalsize = size + array[0]*sizeof(char) + 8;
19   // CkPrintf("Allocating %d %d %d\n",msgnum,totalsize,priobits);
20   ArrayMigrateMessage *newMsg = (ArrayMigrateMessage *)
21     CkAllocMsg(msgnum,totalsize,priobits);
22   // CkPrintf("Allocated %d\n",newMsg);
23   newMsg->elementData = (char *)newMsg + ALIGN8(size);
24   return (void *) newMsg;
25 }
26   
27 void *
28 ArrayMigrateMessage::pack(ArrayMigrateMessage* in)
29 {
30   /*
31   CkPrintf("%d:Packing %d %d %d\n",CkMyPe(),in->from,in->index,in->elementSize);
32   */
33   in->elementData = (void*)((char*)in->elementData-(char *)&(in->elementData));
34   return (void*) in;
35 }
36
37 ArrayMigrateMessage* 
38 ArrayMigrateMessage::unpack(void *in)
39 {
40   ArrayMigrateMessage *me = new (in) ArrayMigrateMessage;
41   /*
42   CkPrintf("PE %d Unpacking me=%d from=%d index=%d elementSize=%d\n",
43     CkMyPe(),me,me->from,me->index,me->elementSize);
44   */
45   me->elementData = (char *)&(me->elementData) + (size_t)me->elementData;
46   return me;
47 }
48
49 CkGroupID Array1D::CreateArray(int numElements,
50                                CkGroupID mapID,
51                                ChareIndexType elementChare,
52                                EntryIndexType elementConstructor,
53                                EntryIndexType elementMigrator)
54 {
55   CkGroupID group;
56
57   ArrayCreateMessage *msg = new ArrayCreateMessage;
58
59   msg->numElements = numElements;
60   msg->mapID = mapID;
61   msg->elementChareType = elementChare;
62   msg->elementConstType = elementConstructor;
63   msg->elementMigrateType = elementMigrator;
64 #if CMK_LBDB_ON
65   msg->loadbalancer = lbdb;
66 #endif
67   group = CProxy_Array1D::ckNew(msg);
68
69   return group;
70 }
71
72 Array1D::Array1D(ArrayCreateMessage *msg)
73 {
74   numElements = msg->numElements;
75   elementChareType = msg->elementChareType;
76   elementConstType = msg->elementConstType;
77   elementMigrateType = msg->elementMigrateType;
78
79 #if CMK_LBDB_ON
80   the_lbdb = CProxy_LBDatabase(msg->loadbalancer).ckLocalBranch();
81   if (the_lbdb == 0)
82     CkPrintf("[%d] LBDatabase not created?\n",CkMyPe());
83
84   //  int iii=0;
85   //  CkPrintf("%d Hi from Array1D[%d]\n",iii++,CkMyPe());
86
87   // Register myself as an array manager
88   LDOMid myId;
89   myId.id = (int)thisgroup;
90
91   LDCallbacks myCallbacks;
92   myCallbacks.migrate = staticMigrate;
93   myCallbacks.setStats = staticSetStats;
94   myCallbacks.queryEstLoad = staticQueryLoad;
95   
96   myHandle = the_lbdb->RegisterOM(myId,this,myCallbacks);
97 #endif
98
99   ArrayMapRegisterMessage *mapMsg = new ArrayMapRegisterMessage;
100   mapMsg->numElements = numElements;
101   mapMsg->arrayID = thishandle;
102   mapMsg->groupID = thisgroup;
103
104   bufferedForElement = new PtrQ();
105   bufferedMigrated = new PtrQ();
106   map = 0;
107
108   ArrayMap *mapPtr = (ArrayMap *)CkLocalBranch(msg->mapID);
109
110   if(mapPtr==0) {
111     CProxy_ArrayMap pmap(msg->mapID);
112     pmap.registerArray(mapMsg, CkMyPe());
113   } else {
114     mapPtr->registerArray(mapMsg);
115   }
116
117   delete msg;
118 }
119
120 void Array1D::RecvMapID(ArrayMap *mPtr, int mHandle)
121 {
122   map = mPtr;
123   mapHandle = mHandle;
124
125   elementIDs = new ElementIDs[numElements];
126   _MEMCHECK(elementIDs);
127   elementIDsReported = 0;
128   numLocalElements=0;
129   int i;
130
131 #if CMK_LBDB_ON
132   // Tell the lbdb that I'm registering objects, until I'm done
133   // registering them.
134   the_lbdb->RegisteringObjects(myHandle);
135 #endif
136
137   for(i=0; i < numElements; i++)
138   {
139     elementIDs[i].originalPE = elementIDs[i].pe = map->procNum(mapHandle, i);
140     elementIDs[i].curHop = 0;
141     if (elementIDs[i].pe != CkMyPe())
142     {
143       elementIDs[i].state = at;
144       elementIDs[i].element = NULL;
145     }
146     else
147     {
148       elementIDs[i].state = creating;
149       numLocalElements++;
150
151       ArrayElementCreateMessage *msg = new ArrayElementCreateMessage;
152       
153       msg->numElements = numElements;
154       msg->arrayID = thishandle;
155       msg->groupID = thisgroup;
156       msg->arrayPtr = this;
157       msg->index = i;
158       CkCreateChare(elementChareType, elementConstType, msg, 0, CkMyPe());
159     }
160   }
161 #if CMK_LBDB_ON
162   if (numLocalElements==0)
163     the_lbdb->DoneRegisteringObjects(myHandle);
164 #endif
165   CProxy_Array1D arr(thisgroup);
166   ArrayMessage *amsg;
167   while((amsg = (ArrayMessage *) bufferedForElement->deq())) {
168     arr.RecvForElement(amsg, CkMyPe());
169   }
170   delete bufferedForElement;
171   ArrayMigrateMessage *mmsg;
172   while((mmsg = (ArrayMigrateMessage *) bufferedMigrated->deq())) {
173     arr.RecvMigratedElement(mmsg, CkMyPe());
174   }
175   delete bufferedMigrated;
176 }
177
178 void Array1D::RecvElementID(int index, ArrayElement *elem,
179                             CkChareID handle, Bool use_local_barrier)
180 {
181   elementIDs[index].state = here;
182   elementIDs[index].element = elem;
183   elementIDs[index].elementHandle = handle;
184   elementIDsReported++;
185
186   //  if (elementIDsReported == numLocalElements)
187   //    CkPrintf("PE %d all elements reported in\n",CkMyPe());
188
189 #if CMK_LBDB_ON
190   // Register the object with the load balancer
191   LDObjid elemID;
192   elemID.id[0] = index;
193   elemID.id[1] = elemID.id[2] = elemID.id[3] = 0;
194
195   elementIDs[index].ldHandle = the_lbdb->RegisterObj(myHandle,elemID,0,1);
196
197   if (use_local_barrier)
198     RegisterElementForSync(index);
199   else
200     elementIDs[index].uses_barrier = False;
201
202   if (elementIDsReported == numLocalElements)
203     the_lbdb->DoneRegisteringObjects(myHandle);
204     
205 #endif
206
207 }
208
209 static int serial_num = 0;
210
211 void Array1D::send(ArrayMessage *msg, int index, EntryIndexType ei)
212 {
213   msg->destIndex = index;
214   msg->entryIndex = ei;
215   msg->hopCount = 0;
216   msg->serial_num = 1000*serial_num+CkMyPe();
217   serial_num++;
218
219   if (elementIDs[index].state == here) {
220 #if 0
221     CPrintf("PE %d sending local message to index %d\n",CMyPe(),index);
222 #endif
223     CProxy_Array1D arr(thisgroup);
224     arr.RecvForElement(msg, CkMyPe());
225   } else if (elementIDs[index].state == moving_to) {
226     // CkPrintf("PE %d sending message to migrating index %d on PE %d\n",
227       // CkMyPe(),index,elementIDs[index].pe);
228     CProxy_Array1D arr(thisgroup);
229     arr.RecvForElement(msg, elementIDs[index].pe);
230   } else if (elementIDs[index].state == arriving) {
231     // CkPrintf("PE %d sending message for index %d to myself\n",
232       // CkMyPe(),index);
233     CProxy_Array1D arr(thisgroup);
234     arr.RecvForElement(msg, CkMyPe());
235  } else if (elementIDs[index].state == at) {
236 #if 0
237     CPrintf("PE %d AT message to index %d on original PE %d\n",
238             CMyPe(),elementIDs[index].state,index,
239             elementIDs[index].pe);
240 #endif
241     CProxy_Array1D arr(thisgroup);
242     arr.RecvForElement(msg, elementIDs[index].pe);
243  } else {
244     // CkPrintf("PE %d sending message to index %d on original PE %d\n",
245       // CkMyPe(),index,elementIDs[index].originalPE);
246     CProxy_Array1D arr(thisgroup);
247     arr.RecvForElement(msg, elementIDs[index].originalPE);
248   }
249 }
250
251 void Array1D::broadcast(ArrayMessage *msg, EntryIndexType ei)
252 {
253   CkPrintf("Broadcast not implemented\n");
254   delete msg;
255 }
256
257 void Array1D::RecvForElement(ArrayMessage *msg)
258 {
259   /*
260   CkPrintf("PE %d RecvForElement sending to index %d\n",CkMyPe(),msg->destIndex);
261   */
262   if(!map) {
263     bufferedForElement->enq((void *)msg);
264     return;
265   }
266   msg->hopCount++;
267   if (elementIDs[msg->destIndex].state == here) {
268     // CkPrintf("PE %d DELIVERING index %d RecvForElement state %d\n",
269     // CkMyPe(),msg->destIndex,elementIDs[msg->destIndex].state);
270
271     register int epIdx = msg->entryIndex;
272     CkChareID handle = elementIDs[msg->destIndex].elementHandle;
273     register void *obj = handle.objPtr;
274
275 #if CMK_LBDB_ON
276     const int index = msg->destIndex;
277     the_lbdb->ObjectStart(elementIDs[index].ldHandle);
278     // Can't use msg after call(): The user may delete it!
279     _entryTable[epIdx]->call(msg, obj);
280     the_lbdb->ObjectStop(elementIDs[index].ldHandle);
281 #else
282     _entryTable[epIdx]->call(msg, obj);
283 #endif
284
285  } else if (elementIDs[msg->destIndex].state == at) {
286     // CkPrintf("PE %d Sending to SELF index %d RecvForElement state %d\n",
287       // CkMyPe(),msg->destIndex,elementIDs[msg->destIndex].state);
288     CProxy_Array1D arr(thisgroup);
289     arr.RecvForElement(msg, elementIDs[msg->destIndex].pe);
290   } else {
291     // CkPrintf("PE %d Sending to SELF index %d RecvForElement state %d\n",
292       // CkMyPe(),msg->destIndex,elementIDs[msg->destIndex].state);
293     CProxy_Array1D arr(thisgroup);
294     arr.RecvForElement(msg, elementIDs[msg->destIndex].originalPE);
295   }
296 }
297
298 void Array1D::migrateMe(int index, int where)
299 {
300   int bufSize = elementIDs[index].element->packsize();
301
302   ArrayMigrateMessage *msg = new (&bufSize, 0) ArrayMigrateMessage;
303
304   msg->index = index;
305   msg->from = CkMyPe();
306   msg->elementSize = bufSize;
307   msg->hopCount = elementIDs[index].curHop + 1;
308 #if CMK_LBDB_ON
309   msg->uses_barrier = elementIDs[index].uses_barrier;
310 #endif
311
312   elementIDs[index].element->pack(msg->elementData);
313   elementIDs[index].state = moving_to;
314   elementIDs[index].pe = where;
315
316 #if CMK_LBDB_ON
317   the_lbdb->UnregisterObj(elementIDs[index].ldHandle);
318   if (elementIDs[index].uses_barrier)
319     the_lbdb->RemoveLocalBarrierClient(elementIDs[index].barrierHandle);
320 #endif
321   numLocalElements--;
322   elementIDsReported--;
323
324   CProxy_Array1D arr(thisgroup);
325   arr.RecvMigratedElement(msg, where);
326 }
327
328 void Array1D::RecvMigratedElement(ArrayMigrateMessage *msg)
329 {
330   if(!map) {
331     bufferedMigrated->enq(msg);
332     return;
333   }
334   int index =msg->index;
335
336   elementIDs[index].state = arriving;
337   elementIDs[index].pe = CkMyPe();
338   elementIDs[index].curHop = msg->hopCount;
339   elementIDs[index].cameFrom = msg->from;
340   elementIDs[index].migrateMsg = msg;
341
342 #if CMK_LBDB_ON
343   elementIDs[index].uses_barrier = msg->uses_barrier;
344 #endif
345    
346   ArrayElementMigrateMessage *new_msg = new ArrayElementMigrateMessage;
347
348   new_msg->index = index;
349   new_msg->numElements = numElements;
350   new_msg->arrayID = thishandle;
351   new_msg->groupID = thisgroup;
352   new_msg->arrayPtr = this;
353   new_msg->packData = msg->elementData;
354   
355   CkCreateChare(elementChareType, elementMigrateType, new_msg, 0, CkMyPe());
356
357
358 }
359
360 void Array1D::RecvMigratedElementID(int index, ArrayElement *elem,
361                                     CkChareID handle)
362 {
363   // CkPrintf("PE %d index %d receiving migrated element handle %d\n",
364     // CkMyPe(),index,handle);
365   elementIDs[index].state = here;
366   elementIDs[index].element = elem;
367   elementIDs[index].elementHandle = handle;
368   delete elementIDs[index].migrateMsg;
369   elementIDs[index].migrateMsg = NULL;
370
371   ArrayElementAckMessage *ack_msg = new ArrayElementAckMessage;
372
373   ack_msg->hopCount = elementIDs[index].curHop;
374   ack_msg->index = index;
375   ack_msg->arrivedAt = elementIDs[index].pe;
376   ack_msg->handle = elementIDs[index].elementHandle;
377   ack_msg->deleteElement = 1;
378
379   CProxy_Array1D arr(thisgroup);
380   arr.AckMigratedElement(ack_msg, elementIDs[index].cameFrom);
381   
382   if (elementIDs[index].cameFrom != elementIDs[index].originalPE) {
383     ack_msg = new ArrayElementAckMessage;
384
385     ack_msg->hopCount = elementIDs[index].curHop;
386     ack_msg->index = index;
387     ack_msg->arrivedAt = elementIDs[index].pe;
388     ack_msg->handle = elementIDs[index].elementHandle;
389     ack_msg->deleteElement = 0;
390
391     arr.AckMigratedElement(ack_msg, elementIDs[index].originalPE);
392   }
393   numLocalElements++;
394   elementIDsReported++;
395
396 #if CMK_LBDB_ON
397   // Register the object with the load balancer
398   LDObjid elemID;
399   elemID.id[0] = index;
400   elemID.id[1] = elemID.id[2] = elemID.id[3] = 0;
401
402   elementIDs[index].ldHandle = the_lbdb->RegisterObj(myHandle,elemID,0,1);
403
404   if (elementIDs[index].uses_barrier)
405     RegisterElementForSync(index);
406
407   the_lbdb->Migrated(elementIDs[index].ldHandle);
408 #endif
409
410 }
411
412 void Array1D::AckMigratedElement(ArrayElementAckMessage *msg)
413 {
414   int index = msg->index;
415
416   // CkPrintf("PE %d Message acknowledged hop=%d curHop=%d\n",
417     // CkMyPe(),msg->hopCount,elementIDs[index].curHop);
418
419   if (msg->hopCount > elementIDs[index].curHop) {
420     if (msg->deleteElement) {
421       ArrayElementExitMessage *exitmsg = new ArrayElementExitMessage;
422       // CkPrintf("I want to delete the element %d\n",index);
423       CProxy_ArrayElement elem(elementIDs[index].elementHandle);
424       elem.exit(exitmsg);
425     }
426     elementIDs[index].pe = msg->arrivedAt;
427     elementIDs[index].state = at;
428     elementIDs[index].elementHandle = msg->handle;
429   } else if (msg->hopCount <= elementIDs[index].curHop) {
430     // CkPrintf("PE %d STALE Message acknowledged hop=%d curHop=%d\n",
431       // CkMyPe(),msg->hopCount,elementIDs[index].curHop);
432     
433   }
434   delete msg;
435 }
436
437 #if CMK_LBDB_ON
438
439 void Array1D::staticMigrate(LDObjHandle _h, int _dest)
440 {
441   (static_cast<Array1D*>(_h.omhandle.user_ptr))->Migrate(_h,_dest);
442 }
443
444 void Array1D::staticSetStats(LDOMHandle _h, int _state)
445 {
446   (static_cast<Array1D*>(_h.user_ptr))->SetStats(_h,_state);   
447 }
448
449 void Array1D::staticQueryLoad(LDOMHandle _h)
450 {
451   (static_cast<Array1D*>(_h.user_ptr))->QueryLoad(_h);
452 }
453
454 void Array1D::Migrate(LDObjHandle _h, int _dest)
455 {
456   int id = _h.id.id[0];
457   if (elementIDs[id].state != here)
458     CkPrintf("%s(%d)[%d]: Migrate error, element not present\n",
459              __FILE__,__LINE__,CkMyPe());
460   else
461     elementIDs[id].element->migrate(_dest);
462   
463 }
464
465 void Array1D::SetStats(LDOMHandle _h, int _state)
466 {
467   CkPrintf("%s(%d)[%d]: SetStats request received\n",
468            __FILE__,__LINE__,CkMyPe());
469 }
470
471 void Array1D::QueryLoad(LDOMHandle _h)
472 {
473   CkPrintf("%s(%d)[%d]: QueryLoad request received\n",
474            __FILE__,__LINE__,CkMyPe());
475 }
476
477 void Array1D::RegisterElementForSync(int index)
478 {
479   CkPrintf("[%d] Registering element %d for barrier\n",CkMyPe(),index);
480   if (elementIDsReported == 1) { // This is the first element reported
481     // If this is a sync array, register a sync callback so I can
482     // inform the db when I start registering objects 
483     the_lbdb->AddLocalBarrierReceiver(staticRecvAtSync,
484                                       static_cast<void*>(this));
485   }
486     
487   elementIDs[index].uses_barrier = True;  
488   elementIDs[index].barrierData.me = this;
489   elementIDs[index].barrierData.index = index;
490
491   elementIDs[index].barrierHandle = the_lbdb->
492     AddLocalBarrierClient(staticResumeFromSync,
493                           static_cast<void*>(&elementIDs[index].barrierData));
494
495 }
496
497 void Array1D::staticRecvAtSync(void* data)
498 {
499   static_cast<Array1D*>(data)->RecvAtSync();
500 }
501
502 void Array1D::RecvAtSync()
503 {
504   // If all of our elements leave, there won't be anything to
505   // call DoneRegisteringObjects();
506   the_lbdb->RegisteringObjects(myHandle);
507 }
508
509 void Array1D::staticResumeFromSync(void* data)
510 {
511   ElementIDs::BarrierClientData* barrierData = 
512     static_cast<ElementIDs::BarrierClientData*>(data);
513   (barrierData->me)->ResumeFromSync(barrierData->index);
514 }
515
516 void Array1D::ResumeFromSync(int index)
517 {
518   the_lbdb->DoneRegisteringObjects(myHandle);
519
520   if (elementIDs[index].state == here)
521     elementIDs[index].element->ResumeFromSync();
522   else {
523     CkPrintf("!!! I'm supposed to resume an element, but it has left !!!\n");
524   }
525 }
526
527 void Array1D::AtSync(int index)
528 {
529   the_lbdb->AtLocalBarrier(elementIDs[index].barrierHandle);
530 }
531 #endif // CMK_LBDB_ON
532
533 ArrayElement::ArrayElement(ArrayElementCreateMessage *msg)
534 {
535   numElements = msg->numElements;
536   arrayChareID = msg->arrayID;
537   arrayGroupID = msg->groupID;
538   thisArray = msg->arrayPtr;
539   thisAID._setAid(thisArray->ckGetGroupId());
540   thisAID._elem = (-1);
541   thisAID._setChare(0);
542   thisIndex = msg->index;
543 }
544
545 ArrayElement::ArrayElement(ArrayElementMigrateMessage *msg)
546 {
547   numElements = msg->numElements;
548   arrayChareID = msg->arrayID;
549   arrayGroupID = msg->groupID;
550   thisArray = msg->arrayPtr;
551   thisAID._setAid(thisArray->ckGetGroupId());
552   thisAID._elem = (-1);
553   thisAID._setChare(0);
554   thisIndex = msg->index;
555 }
556
557 void ArrayElement::finishConstruction(Bool use_local_barrier)
558 {
559   thisArray->RecvElementID(thisIndex, this, thishandle, use_local_barrier);
560 }
561
562 void ArrayElement::finishMigration(void)
563 {
564   // CkPrintf("Finish Migration registering %d,%d\n",thisIndex,thishandle);
565   thisArray->RecvMigratedElementID(thisIndex, this, thishandle);
566 }
567
568 void ArrayElement::migrate(int where)
569 {
570   // CkPrintf("Migrating element %d to %d\n",thisIndex,where);
571   if (where != CkMyPe())
572     thisArray->migrateMe(thisIndex,where);
573 /*
574   else 
575     CkPrintf("PE %d I won't migrating element %d to myself\n", where,thisIndex);
576 */
577 }
578
579 void ArrayElement::AtSync(void)
580 {
581   CkPrintf("Element %d at sync\n",thisIndex);
582 #if CMK_LBDB_ON
583   thisArray->AtSync(thisIndex);
584 #endif
585 }
586
587 void ArrayElement::exit(ArrayElementExitMessage *msg)
588 {
589   delete msg;
590   // CkPrintf("ArrayElement::exit exiting %d\n",thisIndex);
591   delete this;
592 }
593
594 #if 0
595 ArrayMap::ArrayMap(ArrayMapCreateMessage *msg)
596 {
597   // CkPrintf("PE %d creating ArrayMap\n",CkMyPe());
598   arrayChareID = msg->arrayID;
599   arrayGroupID = msg->groupID;
600   array = CProxy_Array1D::ckLocalBranch(arrayGroupID);
601   numElements = msg->numElements;
602
603   delete msg;
604 }
605
606 void ArrayMap::finishConstruction(void)
607 {
608   array->RecvMapID(this, thishandle, thisgroup);
609 }
610 #endif
611
612 RRMap::RRMap(void)
613 {
614   // CkPrintf("PE %d creating RRMap for %d elements\n",CkMyPe(),numElements);
615   arrayVec = new PtrVec();
616 }
617
618 int RRMap::procNum(int /*arrayHdl*/, int element)
619 {
620   return ((element+1) % CkNumPes());
621 }
622
623 void RRMap::registerArray(ArrayMapRegisterMessage *msg)
624 {
625   int hdl = arrayVec->length();
626   arrayVec->insert(hdl, (void *)(msg->numElements));
627   Array1D* array = (Array1D *) CkLocalBranch(msg->groupID);
628   delete msg;
629   array->RecvMapID(this, hdl);
630 }
631