Changes to maintain the older scheme for isomalloc
[charm.git] / src / libs / ck-libs / armci / armci_vp.C
1 #include <vector>
2 #include "armci_impl.h"
3
4 using namespace std;
5
6 int **_armciRednLookupTable;
7
8 // FIXME: might be memory leakage in put
9 // This is the way to adapt a library's preferred start interface with the
10 // one provided by TCharm (eg. argc,argv vs void).
11 extern "C" void armciLibStart(void) {
12   int argc=CkGetArgc();
13   char **argv=CkGetArgv();
14   ARMCI_Main_cpp(argc, argv);
15 }
16
17 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(sum,ret[i]+=value[i];)
18 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(product,ret[i]*=value[i];)
19 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
20 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
21 _ARMCI_GENERATE_ABS_REDUCTION()
22
23 static int armciLibStart_idx = -1;
24
25 void armciNodeInit(void) {
26   CmiAssert(armciLibStart_idx == -1);
27   armciLibStart_idx = TCHARM_Register_thread_function((TCHARM_Thread_data_start_fn)armciLibStart);
28
29   // initialize the reduction table
30   _armciRednLookupTable = new int*[_ARMCI_NUM_REDN_OPS];
31   for (int ops=0; ops<_ARMCI_NUM_REDN_OPS; ops++) {
32     _armciRednLookupTable[ops] = new int[ARMCI_NUM_DATATYPES];
33   }
34
35   // Add the new reducers for ARMCI.
36   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(sum,_ARMCI_REDN_OP_SUM);
37   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(product,_ARMCI_REDN_OP_SUM);
38   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(max,_ARMCI_REDN_OP_MAX);
39   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(min,_ARMCI_REDN_OP_MIN);
40   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(absmax,_ARMCI_REDN_OP_ABSMAX);
41   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(absmin,_ARMCI_REDN_OP_ABSMIN);
42 }
43
44 // Default startup routine (can be overridden by user's own)
45 // This will be registered with TCharm's startup routine
46 // in the Node initialization function.
47 static void ArmciDefaultSetup(void) {
48   // Create the base threads on TCharm using user-defined start routine.
49   TCHARM_Create(TCHARM_Get_num_chunks(), armciLibStart_idx);
50 }
51
52 CtvDeclare(ArmciVirtualProcessor *, _armci_ptr);
53
54 // Node initialization (made by initcall of the module armci)
55 void armciProcInit(void) {
56   CtvInitialize(ArmciVirtualProcessor, _armci_ptr);
57   CtvAccess(_armci_ptr) = NULL;
58
59   // Register the library's default startup routine to TCharm
60   TCHARM_Set_fallback_setup(ArmciDefaultSetup);
61 }
62
63 ArmciVirtualProcessor::ArmciVirtualProcessor(const CProxy_TCharm &_thr_proxy)
64   : TCharmClient1D(_thr_proxy) {
65   thisProxy = this;
66   tcharmClientInit();
67   thread->semaPut(ARMCI_TCHARM_SEMAID,this);
68   memBlock = CmiIsomallocBlockListNew(thread->getThread());
69   thisProxy = CProxy_ArmciVirtualProcessor(thisArrayID);
70   addressReply = NULL;
71   // Save ourselves for the waiting ARMCI_Init
72 }
73
74 ArmciVirtualProcessor::ArmciVirtualProcessor(CkMigrateMessage *m) 
75   : TCharmClient1D(m) 
76 {
77 //  memBlock = NULL; //Paranoia-- makes sure we initialize this in pup
78   thread = NULL;
79   addressReply = NULL;
80 }
81
82 ArmciVirtualProcessor::~ArmciVirtualProcessor()
83 {
84 #if !CMK_USE_MEMPOOL_ISOMALLOC
85   CmiIsomallocBlockListDelete(memBlock);
86 #endif
87   if (addressReply) {delete addressReply;}
88 }
89
90 void ArmciVirtualProcessor::setupThreadPrivate(CthThread forThread) {
91   CtvAccessOther(forThread, _armci_ptr) = this;
92   armci_nproc = thread->getNumElements();
93 }
94
95 void ArmciVirtualProcessor::getAddresses(AddressMsg *msg) {
96   addressReply = msg;
97   thread->resume();
98 }
99
100 // implemented as a blocking put for now
101 void ArmciVirtualProcessor::put(pointer src, pointer dst,
102                                int nbytes, int dst_proc) {
103 /*  if(dst_proc == thisIndex){
104     memcpy(dst,src,nbytes);
105     return;
106   }*/
107   int hdl = hdlList.size();
108   Armci_Hdl* entry = new Armci_Hdl(ARMCI_BPUT, dst_proc, nbytes, src, dst);
109   hdlList.push_back(entry);
110
111   ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
112   memcpy(msg->data, src, nbytes);
113   thisProxy[dst_proc].putData(msg);
114 //  thisProxy[dst_proc].putData(dst,nbytes,msg->data,thisIndex,hdl);
115 }
116
117 int ArmciVirtualProcessor::nbput(pointer src, pointer dst,
118                                int nbytes, int dst_proc) {
119 /*  if(dst_proc == thisIndex){
120     memcpy(dst,src,nbytes);
121     return -1;
122   }*/
123   int hdl = hdlList.size();
124   Armci_Hdl* entry = new Armci_Hdl(ARMCI_PUT, dst_proc, nbytes, src, dst);
125   hdlList.push_back(entry);
126
127   ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
128   memcpy(msg->data, src, nbytes);
129   thisProxy[dst_proc].putData(msg);
130   
131   return hdl;
132 }
133
134 void ArmciVirtualProcessor::nbput_implicit(pointer src, pointer dst,
135                                           int nbytes, int dst_proc) {
136   int hdl = hdlList.size();
137   Armci_Hdl* entry = new Armci_Hdl(ARMCI_IPUT, dst_proc, nbytes, src, dst);
138   hdlList.push_back(entry);
139
140   ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
141   memcpy(msg->data, src, nbytes);
142   thisProxy[dst_proc].putData(msg);
143 }
144
145 void ArmciVirtualProcessor::putData(pointer dst, int nbytes, char *data,
146                                     int src_proc, int hdl) {
147   memcpy(dst, data, nbytes);
148   thisProxy[src_proc].putAck(hdl);
149 }
150
151 void ArmciVirtualProcessor::putData(ArmciMsg *m) {
152   memcpy(m->dst, m->data, m->nbytes);
153   thisProxy[m->src_proc].putAck(m->hdl);
154   delete m;
155 }
156
157 void ArmciVirtualProcessor::putAck(int hdl) {
158   if(hdl != -1) { // non-blocking 
159     hdlList[hdl]->acked = 1;  
160     if (hdlList[hdl]->wait == 1) {
161       hdlList[hdl]->wait = 0;
162       thread->resume();
163     }
164   }
165   thread->resume();
166 }
167
168 void ArmciVirtualProcessor::get(pointer src, pointer dst,
169                                int nbytes, int src_proc) {
170 /*  if(src_proc == thisIndex){
171     memcpy(dst,src,nbytes);
172     return;
173   }*/
174   thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, -1);
175   // wait for reply
176   thread->suspend();
177 }
178
179 int ArmciVirtualProcessor::nbget(pointer src, pointer dst,
180                                int nbytes, int src_proc) {
181 /*  if(src_proc == thisIndex){
182     memcpy(dst,src,nbytes);
183     return -1;
184   }*/
185
186   int hdl = hdlList.size();
187   Armci_Hdl* entry = new Armci_Hdl(ARMCI_GET, src_proc, nbytes, src, dst);
188   hdlList.push_back(entry);
189   
190   thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, hdl);
191
192   return hdl;
193 }
194
195 void ArmciVirtualProcessor::nbget_implicit(pointer src, pointer dst,
196                                            int nbytes, int src_proc) {
197   int hdl = hdlList.size();
198   Armci_Hdl* entry = new Armci_Hdl(ARMCI_IGET, src_proc, nbytes, src, dst);
199   hdlList.push_back(entry);
200   
201   thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, hdl);
202 }
203
204 void ArmciVirtualProcessor::wait(int hdl){
205   if(hdl == -1) return;
206   while (1) {
207     if(hdlList[hdl]->acked != 0)
208       break;
209     else
210       thread->suspend();
211   }
212 }
213
214 // CWL NOTE: This works only because in wait(), the while (1) loop
215 //   insists on matching the first unackowledged non-blocking call 
216 //   waitmulti is
217 //   waiting on. Out-of-order acknowledgements will wake the thread
218 //   but cause it to suspend itself again until the call is acknowledged.
219 //   Subsequent calls to wait from waitmulti will then succeed because
220 //   out-of-order acks would have set the acked flag.
221 void ArmciVirtualProcessor::waitmulti(vector<int> procs){
222   for(int i=0;i<procs.size();i++){
223     wait(procs[i]);
224   }
225 }
226
227 void ArmciVirtualProcessor::waitproc(int proc){
228   vector<int> procs;
229   for(int i=0;i<hdlList.size();i++){
230     if((hdlList[i]->acked == 0) && 
231        (hdlList[i]->proc == proc) && 
232        ((hdlList[i]->op & IMPLICIT_MASK) != 0)) {
233       hdlList[i]->wait = 1;
234       procs.push_back(i);
235     }
236   }
237   waitmulti(procs);
238 }
239
240 void ArmciVirtualProcessor::waitall(){
241   vector<int> procs;
242   for(int i=0;i<hdlList.size();i++){
243     if((hdlList[i]->acked == 0) && 
244        ((hdlList[i]->op & IMPLICIT_MASK) != 0)) {
245       hdlList[i]->wait = 1;
246       procs.push_back(i);
247     }
248   }
249   waitmulti(procs);
250 }
251
252 void ArmciVirtualProcessor::fence(int proc){
253   vector<int> procs;
254   for(int i=0;i<hdlList.size();i++){
255     if((hdlList[i]->acked == 0) && 
256        ((hdlList[i]->op & BLOCKING_MASK) != 0) && 
257        (hdlList[i]->proc == proc))
258       procs.push_back(i);
259   }
260   waitmulti(procs);
261 }
262 void ArmciVirtualProcessor::allfence(){
263   vector<int> procs;
264   for(int i=0;i<hdlList.size();i++){
265     if((hdlList[i]->acked == 0) && 
266        ((hdlList[i]->op & BLOCKING_MASK) != 0))
267       procs.push_back(i);
268   }
269   waitmulti(procs);
270 }
271 void ArmciVirtualProcessor::barrier(){
272   allfence();
273   CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
274   contribute(0,NULL,CkReduction::sum_int,cb);
275   thread->suspend();
276 }
277
278 void ArmciVirtualProcessor::resumeThread(void){
279   thread->resume();
280 }
281
282 int ArmciVirtualProcessor::test(int hdl){
283   if(hdl == -1) return 1;
284   return hdlList[hdl]->acked;
285 }
286
287 void ArmciVirtualProcessor::requestFromGet(pointer src, pointer dst, int nbytes,
288                                        int dst_proc, int hdl) {
289   ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,-1,hdl);
290   memcpy(msg->data, src, nbytes);
291   thisProxy[dst_proc].putDataFromGet(msg);
292 }
293
294 // this is essentially the same as putData except that no acknowledgement
295 // is required and the thread suspended while waiting for the data is
296 // awoken.
297 void ArmciVirtualProcessor::putDataFromGet(pointer dst, int nbytes, char *data, int hdl) {
298   memcpy(dst, data, nbytes);
299   if(hdl != -1) { // non-blocking 
300     hdlList[hdl]->acked = 1;  
301     if (hdlList[hdl]->wait == 1) {
302       hdlList[hdl]->wait = 0;
303       thread->resume();
304     }
305   }
306   thread->resume();
307 }
308
309 void ArmciVirtualProcessor::putDataFromGet(ArmciMsg *m) {
310   memcpy(m->dst, m->data, m->nbytes);
311   if(m->hdl != -1) { // non-blocking 
312     hdlList[m->hdl]->acked = 1;  
313     if (hdlList[m->hdl]->wait == 1) {
314       hdlList[m->hdl]->wait = 0;
315       thread->resume();
316     }
317   }
318   delete m;
319   thread->resume();
320 }
321
322 void ArmciVirtualProcessor::puts(pointer src_ptr, int src_stride_ar[], 
323            pointer dst_ptr, int dst_stride_ar[],
324            int count[], int stride_levels, int dst_proc){
325   int nbytes = 1;
326   for(int i=0;i<stride_levels+1;i++) 
327     nbytes *= count[i];
328   
329 /*  if(dst_proc == thisIndex){
330     buffer = new char[nbytes];
331     stridedCopy(src_ptr, buffer, src_stride_ar, count, stride_levels, 1);
332     stridedCopy(dst_ptr, buffer, dst_stride_ar, count, stride_levels, 0);
333     return;
334   }
335 */
336   int hdl = hdlList.size();
337   Armci_Hdl* entry = new Armci_Hdl(ARMCI_BPUT, dst_proc, nbytes, src_ptr, dst_ptr);
338   hdlList.push_back(entry);
339   
340   ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
341
342   memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
343   memcpy(m->count,count,sizeof(int)*(stride_levels+1));
344   stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
345   thisProxy[dst_proc].putsData(m);
346 }
347
348 int ArmciVirtualProcessor::nbputs(pointer src_ptr, int src_stride_ar[], 
349            pointer dst_ptr, int dst_stride_ar[],
350            int count[], int stride_levels, int dst_proc){
351   int nbytes = 1;
352   for(int i=0;i<stride_levels+1;i++) 
353     nbytes *= count[i];
354   
355 /*  if(dst_proc == thisIndex){
356     buffer = new char[nbytes];
357     stridedCopy(src_ptr, buffer, src_stride_ar, count, stride_levels, 1);
358     stridedCopy(dst_ptr, buffer, dst_stride_ar, count, stride_levels, 0);
359     return -1;
360   }
361   */
362   int hdl = hdlList.size();
363   Armci_Hdl* entry = new Armci_Hdl(ARMCI_PUT, dst_proc, nbytes, src_ptr, dst_ptr);
364   hdlList.push_back(entry);
365  
366   ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
367
368   memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
369   memcpy(m->count,count,sizeof(int)*(stride_levels+1));
370   stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
371   thisProxy[dst_proc].putsData(m);
372   return hdl;
373 }
374
375 void ArmciVirtualProcessor::nbputs_implicit(pointer src_ptr, 
376                                             int src_stride_ar[], 
377                                             pointer dst_ptr, 
378                                             int dst_stride_ar[],
379                                             int count[], int stride_levels, 
380                                             int dst_proc){
381   int nbytes = 1;
382   for(int i=0;i<stride_levels+1;i++) 
383     nbytes *= count[i];
384   int hdl = hdlList.size();
385   Armci_Hdl* entry = new Armci_Hdl(ARMCI_IPUT, dst_proc, nbytes, 
386                                    src_ptr, dst_ptr);
387   hdlList.push_back(entry);
388  
389   ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
390
391   memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
392   memcpy(m->count,count,sizeof(int)*(stride_levels+1));
393   stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
394   thisProxy[dst_proc].putsData(m);
395 }
396
397 void ArmciVirtualProcessor::putsData(pointer dst_ptr, int dst_stride_ar[], 
398                 int count[], int stride_levels,
399                 int nbytes, char *data, int src_proc, int hdl){
400   stridedCopy(dst_ptr, data, dst_stride_ar, count, stride_levels, 0);
401   thisProxy[src_proc].putAck(hdl);
402 }
403
404 void ArmciVirtualProcessor::putsData(ArmciStridedMsg *m){
405   stridedCopy(m->dst, m->data, m->dst_stride_ar, m->count, m->stride_levels, 0);
406   thisProxy[m->src_proc].putAck(m->hdl);
407   delete m;
408 }
409
410 void ArmciVirtualProcessor::gets(pointer src_ptr, int src_stride_ar[], 
411            pointer dst_ptr, int dst_stride_ar[],
412            int count[], int stride_levels, int src_proc){
413 /*  if(src_proc == thisIndex){
414     char *buffer;
415     int nbytes = 1;
416     for(int i=0;i<stride_levels+1;i++) 
417       nbytes *= count[i];
418     buffer = new char[nbytes];
419     stridedCopy(src_ptr, buffer, src_stride_ar, count, stride_levels, 1);
420     stridedCopy(dst_ptr, buffer, dst_stride_ar, count, stride_levels, 0);
421     delete buffer;
422     return;
423   }*/
424   thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar, 
425                                         count, stride_levels, thisIndex, -1);
426   // wait for reply
427   thread->suspend();
428 }
429
430 int ArmciVirtualProcessor::nbgets(pointer src_ptr, int src_stride_ar[], 
431            pointer dst_ptr, int dst_stride_ar[],
432            int count[], int stride_levels, int src_proc){
433   int hdl = hdlList.size();
434   int nbytes = 1;
435   for(int i=0;i<stride_levels+1;i++) 
436     nbytes *= count[i];
437 /*  if(src_proc == thisIndex){
438     char *buffer;
439     buffer = new char[nbytes];
440     stridedCopy(src_ptr, buffer, src_stride_ar, count, stride_levels, 1);
441     stridedCopy(dst_ptr, buffer, dst_stride_ar, count, stride_levels, 0);
442     delete buffer;
443     return -1;
444   }*/
445   
446   Armci_Hdl* entry = new Armci_Hdl(ARMCI_GET, src_proc, nbytes, src_ptr, dst_ptr);
447   hdlList.push_back(entry);
448
449   thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar, 
450                                         count, stride_levels, thisIndex, hdl);
451
452   return hdl;
453 }
454
455 void ArmciVirtualProcessor::nbgets_implicit(pointer src_ptr, 
456                                             int src_stride_ar[], 
457                                             pointer dst_ptr, 
458                                             int dst_stride_ar[],
459                                             int count[], int stride_levels, 
460                                             int src_proc) {
461   int hdl = hdlList.size();
462   int nbytes = 1;
463   for(int i=0;i<stride_levels+1;i++) 
464     nbytes *= count[i];
465
466   Armci_Hdl* entry = new Armci_Hdl(ARMCI_IGET, src_proc, nbytes, src_ptr, dst_ptr);
467   hdlList.push_back(entry);
468
469   thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar, 
470                                         count, stride_levels, thisIndex, hdl);
471 }
472
473 void ArmciVirtualProcessor::requestFromGets(pointer src_ptr, int src_stride_ar[], 
474            pointer dst_ptr, int dst_stride_ar[], int count[], int stride_levels, int dst_proc, int hdl){
475   int nbytes = 1;
476   for(int i=0;i<stride_levels+1;i++) 
477     nbytes *= count[i];
478   
479   ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
480
481   memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
482   memcpy(m->count,count,sizeof(int)*(stride_levels+1));
483   stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
484   thisProxy[dst_proc].putDataFromGets(m);
485 }
486 void ArmciVirtualProcessor::putDataFromGets(pointer dst_ptr, int dst_stride_ar[], 
487                 int count[], int stride_levels, int nbytes, char *data, int hdl){
488   stridedCopy(dst_ptr, data, dst_stride_ar, count, stride_levels, 0);
489   if(hdl != -1) { // non-blocking 
490     hdlList[hdl]->acked = 1;  
491     if (hdlList[hdl]->wait == 1) {
492       hdlList[hdl]->wait = 0;
493       thread->resume();
494     }
495   }
496   thread->resume();
497 }
498
499 void ArmciVirtualProcessor::putDataFromGets(ArmciStridedMsg *m){
500   stridedCopy(m->dst, m->data, m->dst_stride_ar, m->count, m->stride_levels, 0);
501   if(m->hdl != -1) { // non-blocking 
502     hdlList[m->hdl]->acked = 1;  
503     if (hdlList[m->hdl]->wait == 1) {
504       hdlList[m->hdl]->wait = 0;
505       thread->resume();
506     }
507   }
508   delete m;
509   thread->resume();
510 }
511
512 void ArmciVirtualProcessor::notify(int proc){
513   thisProxy[proc].sendNote(thisIndex);
514 }
515 void ArmciVirtualProcessor::sendNote(int proc){
516   // check if note exists
517   // if so, decrement it and see if resume thread is appropriate
518   // if not, create a new note
519   int hasNote = -1;
520   for(int i=0;i<noteList.size();i++){
521     if(noteList[i]->proc == proc){
522       hasNote = i;
523       break;
524     }
525   }
526   if(hasNote!=-1){
527     (noteList[hasNote]->notified)++;
528   } else {
529     Armci_Note* newNote = new Armci_Note(proc, 0, 1);
530     noteList.push_back(newNote);
531     hasNote = noteList.size() - 1;
532   }
533   if(noteList[hasNote]->notified >= noteList[hasNote]->waited){
534 /*
535     noteList[hasNote]->notified -= noteList[hasNote]->waited;
536     noteList[hasNote]->waited = 0;
537 */
538     thread->resume();
539   }
540 }
541 void ArmciVirtualProcessor::notify_wait(int proc){
542   // check if note exists
543   // if so, check if suspend is necessary
544   // if not, create a waited note and suspend
545   int hasNote = -1;
546   for(int i=0;i<noteList.size();i++){
547     if(noteList[i]->proc == proc){
548       hasNote = i;
549       break;
550     }
551   }
552   if(hasNote!=-1){
553     (noteList[hasNote]->waited)++;
554   } else {
555     Armci_Note* newNote = new Armci_Note(proc, 1, 0);
556     noteList.push_back(newNote);
557     hasNote = noteList.size() - 1;
558   }
559   if(noteList[hasNote]->notified < noteList[hasNote]->waited){
560     thread->suspend();
561   }
562 }
563
564 void ArmciVirtualProcessor::pup(PUP::er &p) {
565   TCharmClient1D::pup(p);
566   //Copying only address, the mempool will be pupped as part of the thread
567 #if CMK_USE_MEMPOOL_ISOMALLOC
568   pup_bytes(&p, &memBlock, sizeof(CmiIsomallocBlockList*));
569 #else
570   CmiIsomallocBlockListPup(&p, &memBlock, NULL);
571 #endif
572   p|thisProxy;
573   p|hdlList;
574   p|noteList;
575   CkPupMessage(p, (void **)&addressReply, 1);
576 }
577
578 // NOT an entry method. This is an object-interface to the API interface.
579 void ArmciVirtualProcessor::requestAddresses(pointer ptr, pointer ptr_arr[], int bytes) {
580   int thisPE = armci_me;
581   int numPE = armci_nproc;
582   // reset the reply field
583   addressReply = NULL;
584   addressPair *pair = new addressPair;
585   pair->pe = thisPE;
586   pair->ptr = ptr;
587   // do a reduction to get everyone else's data.
588   CkCallback cb(CkIndex_ArmciVirtualProcessor::mallocClient(NULL),CkArrayIndex1D(0),thisProxy);
589   contribute(sizeof(addressPair), pair, CkReduction::concat, cb);
590   // wait for the reply to arrive.
591   while(addressReply==NULL) thread->suspend();
592
593   // copy the acquired data to the user-allocated array.
594   for (int i=0; i<numPE; i++) {
595     ptr_arr[i] = addressReply->addresses[i];
596   }
597   delete addressReply;
598   addressReply = NULL;
599 }
600
601 void ArmciVirtualProcessor::stridedCopy(void *base, void *buffer_ptr,
602                   int *stride, int *count, int stride_levels, bool flatten) {
603   if (stride_levels == 0) {
604     if (flatten) {
605       memcpy(buffer_ptr, base, count[stride_levels]);
606     } else {
607       memcpy(base, buffer_ptr, count[stride_levels]);
608     }
609   } else {
610     int mystride = 1;
611     for(int i=0;i<stride_levels;i++)
612       mystride *= count[i];
613     for (int i=0; i<count[stride_levels]; i++) {
614       stridedCopy((void *)((char *)base + stride[stride_levels-1]*i), 
615                 (void *)((char *)buffer_ptr + mystride*i), stride, count, stride_levels-1, flatten);
616     }
617   }
618 }
619
620 // malloc reduction client
621 void ArmciVirtualProcessor::mallocClient(CkReductionMsg *msg) {
622   int numBlocks = msg->getSize()/sizeof(addressPair);
623   addressPair *dataBlocks = (addressPair *)msg->getData();
624   AddressMsg *addrmsg = new(numBlocks, 0) AddressMsg;
625   // constructing the ordered set of shared pointers
626   for (int i=0; i<numBlocks; i++) {
627     addrmsg->addresses[dataBlocks[i].pe] = dataBlocks[i].ptr;
628   }
629   // broadcast the results to everyone.
630   thisProxy.getAddresses(addrmsg);
631   delete msg;
632 }
633
634 // **** CAF collective operations **** 
635
636 // **CWL**
637 //   Assumptions:
638 //   1. this operation blocks until data is ready.
639 //   2. buffer pointers can be different for each Virtual Processor.
640 //   3. len represents length of buffer in bytes.
641 void ArmciVirtualProcessor::msgBcast(void *buffer, int len, int root) {
642   int me;
643   ARMCI_Myid(&me);
644   if (me == root) {
645     thisProxy.recvMsgBcast(len, (char *)buffer, root);
646   } else {
647     // copy the buffer pointer to thread object
648     collectiveTmpBufferPtr = buffer;
649     thread->suspend();
650   }
651 }
652
653 // **CWL** For now, we have to live with a double-copy
654 void ArmciVirtualProcessor::recvMsgBcast(int len, char *buffer, int root) {
655   int me;
656   ARMCI_Myid(&me);
657   if (me != root) {
658     // Copy broadcast buffer into the area of memory pointed to by
659     //   buffer specified in the original broadcast collective and then
660     //   setting the temporary thread object pointer back to NULL.
661     collectiveTmpBufferPtr = memcpy(collectiveTmpBufferPtr, buffer, len);
662     collectiveTmpBufferPtr = NULL;
663     thread->resume();
664   }
665 }
666
667 // **CWL**
668 //   Assumptions (seems true from ARMCI 1.4 implementation):
669 //   1. the root is always 0.
670 void ArmciVirtualProcessor::msgGop(void *x, int n, char *op, int type) {
671   CkReduction::reducerType reducer;
672   if (strcmp(op,"+") == 0) {
673   } else if (strcmp(op,"*") == 0) {
674   } else if (strcmp(op,"min") == 0) {
675   } else if (strcmp(op,"max") == 0) {
676   } else if (strcmp(op,"absmin") == 0) {
677   } else if (strcmp(op,"absmax") == 0) {
678   } else {
679     CkPrintf("Operator %s not supported\n",op);
680     CmiAbort("ARMCI ERROR: msgGop - Unknown operator\n");
681   }
682   switch (type) {
683   case ARMCI_INT:
684     
685     break;
686   case ARMCI_LONG:
687     break;
688   case ARMCI_LONG_LONG:
689     break;
690   case ARMCI_FLOAT:
691     break;
692   case ARMCI_DOUBLE:
693     break;
694   default:
695     CkPrintf("ARMCI Type %d not supported\n", type);
696     CmiAbort("ARMCI ERROR: msgGop - Unknown type\n");
697   }
698 }
699
700 // reduction client data - preparation for checkpointing
701 class ckptClientStruct {
702 public:
703   const char *dname;
704   ArmciVirtualProcessor *vp;
705   ckptClientStruct(const char *s, ArmciVirtualProcessor *p): dname(s), vp(p) {}
706 };
707
708 static void checkpointClient(void *param,void *msg)
709 {       
710   ckptClientStruct *client = (ckptClientStruct*)param;
711   const char *dname = client->dname;
712   ArmciVirtualProcessor *vp = client->vp;
713   vp->checkpoint(strlen(dname), dname);
714   delete client;
715 }               
716                 
717 void ArmciVirtualProcessor::startCheckpoint(const char* dname){
718   if (thisIndex==0) {
719     ckptClientStruct *clientData = new ckptClientStruct(dname, this);
720     CkCallback cb(checkpointClient, clientData);
721     contribute(0, NULL, CkReduction::sum_int, cb);
722   } else {
723     contribute(0, NULL, CkReduction::sum_int);
724   }
725   thread->suspend();
726 }
727 void ArmciVirtualProcessor::checkpoint(int len, const char* dname){
728   if (len == 0) { // memory checkpoint
729     CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
730     CkStartMemCheckpoint(cb);
731   } else {
732     char dirname[256];
733     strncpy(dirname,dname,len);
734     dirname[len]='\0';
735     CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
736     CkStartCheckpoint(dirname,cb);
737   }
738 }
739
740 #include "armci.def.h"
741