work around for buggy new xlc compiler on copper ( with aix53)
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #define exit exit /*Supress definition of exit in ampi.h*/
9 #include "ampiimpl.h"
10 #include "tcharm.h"
11 #include "ampiEvents.h" /*** for trace generation for projector *****/
12 #include "ampiProjections.h"
13
14 #define CART_TOPOL 1
15 #define AMPI_PRINT_IDLE 0
16
17 /* change this define to "x" to trace all send/recv's */
18 #define MSG_ORDER_DEBUG(x) // x /* empty */
19 /* change this define to "x" to trace user calls */
20 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
21 #define STARTUP_DEBUG(x)  // ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
22
23
24
25 //------------- startup -------------
26 static mpi_comm_worlds mpi_worlds;
27
28 int mpi_nworlds; /*Accessed by ampif*/
29 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
30
31 /* ampiReducer: AMPI's generic reducer type 
32    MPI_Op is function pointer to MPI_User_function
33    so that it can be packed into AmpiOpHeader, shipped 
34    with the reduction message, and then plugged into 
35    the ampiReducer. 
36    One little trick is the ampi::recv which receives
37    the final reduction message will see additional
38    sizeof(AmpiOpHeader) bytes in the buffer before
39    any user data.                             */
40 class AmpiComplex { 
41 public: 
42         double re, im; 
43         void operator+=(const AmpiComplex &a) {
44                 re+=a.re;
45                 im+=a.im;
46         }
47         void operator*=(const AmpiComplex &a) {
48                 double nu_re=re*a.re-im*a.im;
49                 im=re*a.im+im*a.re;
50                 re=nu_re;
51         }
52         int operator>(const AmpiComplex &a) {
53                 CkAbort("Cannot compare complex numbers with MPI_MAX");
54                 return 0;
55         }
56         int operator<(const AmpiComplex &a) {
57                 CkAbort("Cannot compare complex numbers with MPI_MIN");
58                 return 0;
59         }
60 };
61 typedef struct { float val; int idx; } FloatInt;
62 typedef struct { double val; int idx; } DoubleInt;
63 typedef struct { long val; int idx; } LongInt;
64 typedef struct { int val; int idx; } IntInt;
65 typedef struct { short val; int idx; } ShortInt;
66 typedef struct { long double val; int idx; } LongdoubleInt;
67 typedef struct { float val; float idx; } FloatFloat;
68 typedef struct { double val; double idx; } DoubleDouble;
69
70
71 #define MPI_OP_SWITCH(OPNAME) \
72   int i; \
73   switch (*datatype) { \
74   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
75   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
76   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
77   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
78   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
79   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
80   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
81   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long); } break; \
82   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
83   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
84   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
85   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
86   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CMK_TYPEDEF_INT8); } break; \
87   default: \
88     ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
89     CmiAbort("Unsupported MPI datatype for MPI Op"); \
90   };\
91
92 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
93 #define MPI_OP_IMPL(type) \
94         if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
95   MPI_OP_SWITCH(MPI_MAX)
96 #undef MPI_OP_IMPL
97 }
98
99 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
100 #define MPI_OP_IMPL(type) \
101         if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
102   MPI_OP_SWITCH(MPI_MIN)
103 #undef MPI_OP_IMPL
104 }
105
106 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
107 #define MPI_OP_IMPL(type) \
108         ((type *)inoutvec)[i] += ((type *)invec)[i];
109   MPI_OP_SWITCH(MPI_SUM)
110 #undef MPI_OP_IMPL
111 }
112
113 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
114 #define MPI_OP_IMPL(type) \
115         ((type *)inoutvec)[i] *= ((type *)invec)[i];
116   MPI_OP_SWITCH(MPI_PROD)
117 #undef MPI_OP_IMPL
118 }
119
120 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
121   int i;  
122   switch (*datatype) {
123   case MPI_INT:
124   case MPI_LOGICAL:
125     for(i=0;i<(*len);i++)
126       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
127     break;
128   default:
129     ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
130     CmiAbort("exiting");
131   }
132 }
133 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
134   int i; 
135   switch (*datatype) {
136   case MPI_INT:
137     for(i=0;i<(*len);i++)
138       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
139     break;
140   case MPI_BYTE:
141     for(i=0;i<(*len);i++)
142       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
143     break;
144   default:
145     ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
146     CmiAbort("exiting");
147   }
148 }
149 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
150   int i;  
151   switch (*datatype) {
152   case MPI_INT:
153   case MPI_LOGICAL:
154     for(i=0;i<(*len);i++)
155       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
156     break;
157   default:
158     ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
159     CmiAbort("exiting");
160   }
161 }
162 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
163   int i;  
164   switch (*datatype) {
165   case MPI_INT:
166     for(i=0;i<(*len);i++)
167       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
168     break;
169   case MPI_BYTE:
170     for(i=0;i<(*len);i++)
171       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
172     break;
173   default:
174     ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
175     CmiAbort("exiting");
176   }
177 }
178 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
179   int i;  
180   switch (*datatype) {
181   case MPI_INT:
182   case MPI_LOGICAL:
183     for(i=0;i<(*len);i++)
184       ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
185     break;
186   default:
187     ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
188     CmiAbort("exiting");
189   }
190 }
191 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
192   int i;  
193   switch (*datatype) {
194   case MPI_INT:
195     for(i=0;i<(*len);i++)
196       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
197     break;
198   case MPI_BYTE:
199     for(i=0;i<(*len);i++)
200       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
201     break;
202   default:
203     ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
204     CmiAbort("exiting");
205   }
206 }
207 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
208   int i;  
209   switch (*datatype) {
210   case MPI_FLOAT_INT:
211     for(i=0;i<(*len);i++)
212       if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].idx)
213         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
214     break;
215   case MPI_DOUBLE_INT:
216     for(i=0;i<(*len);i++)
217       if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].idx)
218         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
219     break;
220   case MPI_LONG_INT:
221     for(i=0;i<(*len);i++)
222       if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].idx)
223         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
224     break;
225   case MPI_2INT:
226     for(i=0;i<(*len);i++)
227       if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].idx)
228         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
229     break;
230   case MPI_SHORT_INT:
231     for(i=0;i<(*len);i++)
232       if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].idx)
233         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
234     break;
235   case MPI_LONG_DOUBLE_INT:
236     for(i=0;i<(*len);i++)
237       if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].idx)
238         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
239     break;
240   case MPI_2FLOAT:
241     for(i=0;i<(*len);i++)
242       if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].idx)
243         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
244     break;
245   case MPI_2DOUBLE:
246     for(i=0;i<(*len);i++)
247       if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].idx)
248         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
249     break;
250   default:
251     ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
252     CmiAbort("exiting");
253   }
254 }
255 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
256   int i;  
257   switch (*datatype) {
258   case MPI_FLOAT_INT:
259     for(i=0;i<(*len);i++)
260       if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].idx)
261         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
262     break;
263   case MPI_DOUBLE_INT:
264     for(i=0;i<(*len);i++)
265       if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].idx)
266         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
267     break;
268   case MPI_LONG_INT:
269     for(i=0;i<(*len);i++)
270       if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].idx)
271         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
272     break;
273   case MPI_2INT:
274     for(i=0;i<(*len);i++)
275       if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].idx)
276         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
277     break;
278   case MPI_SHORT_INT:
279     for(i=0;i<(*len);i++)
280       if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].idx)
281         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
282     break;
283   case MPI_LONG_DOUBLE_INT:
284     for(i=0;i<(*len);i++)
285       if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].idx)
286         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
287     break;
288   case MPI_2FLOAT:
289     for(i=0;i<(*len);i++)
290       if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].idx)
291         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
292     break;
293   case MPI_2DOUBLE:
294     for(i=0;i<(*len);i++)
295       if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].idx)
296         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
297     break;
298   default:
299     ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
300     CmiAbort("exiting");
301   }
302 }
303
304 // every msg contains a AmpiOpHeader structure before user data
305 // FIXME: non-commutative operations require messages be ordered by rank
306 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
307   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
308   MPI_Datatype dtype;
309   int szhdr, szdata, len;
310   MPI_User_function* func;
311   func = hdr->func;
312   dtype = hdr->dtype;  
313   szdata = hdr->szdata;
314   len = hdr->len;  
315   szhdr = sizeof(AmpiOpHeader);
316
317   //Assuming extent == size
318   void *ret = malloc(szhdr+szdata);
319   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
320   for(int i=1;i<nMsg;i++){
321     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
322   }
323   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
324   free(ret);
325   return retmsg;
326 }
327
328 CkReduction::reducerType AmpiReducer;
329
330 // ------------ startup support -----------
331 int _ampi_fallback_setup_count;
332 CDECL void AMPI_Setup(void);
333 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
334
335 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
336
337 /*Main routine used when missing MPI_Setup routine*/
338 CDECL void AMPI_Fallback_Main(int argc,char **argv)
339 {
340   AMPI_Main_cpp(argc,argv);
341   AMPI_Main(argc,argv);
342   FTN_NAME(MPI_MAIN,mpi_main)();
343 }
344
345 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
346 /*Startup routine used if user *doesn't* write
347   a TCHARM_User_setup routine.
348  */
349 CDECL void AMPI_Setup_Switch(void) {
350   _ampi_fallback_setup_count=0;
351   FTN_NAME(AMPI_SETUP,ampi_setup)();
352   AMPI_Setup();
353   if (_ampi_fallback_setup_count==2)
354   { //Missing AMPI_Setup in both C and Fortran:
355     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
356   }
357 }
358
359 static int nodeinit_has_been_called=0;
360 CtvDeclare(ampiParent*, ampiPtr);
361 CtvDeclare(int, ampiInitDone);
362 CtvDeclare(int, ampiFinalized);
363 CkpvDeclare(int,argvExtracted);
364 static int enableStreaming = 0;
365
366 static void ampiNodeInit(void)
367 {
368   mpi_nworlds=0;
369   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
370   {
371     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
372   }
373   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
374
375   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
376   
377   nodeinit_has_been_called=1;
378 }
379
380 #if PRINT_IDLE
381 static double totalidle=0.0, startT=0.0;
382 static int beginHandle, endHandle;
383 static void BeginIdle(void *dummy,double curWallTime)
384 {
385   startT = curWallTime;
386 }
387 static void EndIdle(void *dummy,double curWallTime)
388 {
389   totalidle += curWallTime - startT;
390 }
391 #endif
392
393 /* for fortran reduction operation table to handle mapping */
394 typedef MPI_Op  MPI_Op_Array[128];
395 CtvDeclare(int, mpi_opc);
396 CtvDeclare(MPI_Op_Array, mpi_ops);
397
398 static void ampiProcInit(void){
399   CtvInitialize(ampiParent*, ampiPtr);
400   CtvInitialize(int,ampiInitDone);
401   CtvInitialize(int,ampiFinalized);
402
403   CtvInitialize(MPI_Op_Array, mpi_ops);
404   CtvInitialize(int, mpi_opc);
405
406   CkpvInitialize(int, argvExtracted);
407   CkpvAccess(argvExtracted) = 0;
408   REGISTER_AMPI
409   initAmpiProjections();
410 #if AMPI_COMLIB  
411   char **argv=CkGetArgv();
412   if(CkpvAccess(argvExtracted)==0){
413     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
414   }
415 #endif
416 }
417
418 void AMPI_Install_Idle_Timer(){
419 #if AMPI_PRINT_IDLE
420   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
421   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
422 #endif
423 }
424
425 void AMPI_Uninstall_Idle_Timer(){
426 #if AMPI_PRINT_IDLE
427   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
428   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
429 #endif
430 }
431
432 PUPfunctionpointer(MPI_MainFn);
433
434 class MPI_threadstart_t {
435 public:
436         MPI_MainFn fn;
437         MPI_threadstart_t() {}
438         MPI_threadstart_t(MPI_MainFn fn_)
439                 :fn(fn_) {}
440         void start(void) {
441                 char **argv=CmiCopyArgs(CkGetArgv());
442                 int argc=CkGetArgc();
443                 (fn)(argc,argv);
444         }
445         void pup(PUP::er &p) {
446                 p|fn;
447         }
448 };
449 PUPmarshall(MPI_threadstart_t);
450
451 CDECL void AMPI_threadstart(void *data)
452 {
453         STARTUP_DEBUG("MPI_threadstart")
454         MPI_threadstart_t t;
455         pupFromBuf(data,t);
456         t.start();
457 }
458
459 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
460 {
461         STARTUP_DEBUG("ampiCreateMain")
462         int _nchunks=TCHARM_Get_num_chunks();
463         //Make a new threads array:
464         MPI_threadstart_t s(mainFn);
465         memBuf b; pupIntoBuf(b,s);
466         TCHARM_Create_data( _nchunks,AMPI_threadstart,
467                           b.getData(), b.getSize());
468 }
469
470 /* TCharm Semaphore ID's for AMPI startup */
471 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
472 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
473
474 static CProxy_ampiWorlds ampiWorldsGroup;
475
476 static void init_operations()
477 {
478   CtvInitialize(MPI_Op_Array, mpi_ops);
479   int i = 0;
480   MPI_Op *tab = CtvAccess(mpi_ops);
481   tab[i++] = MPI_MAX;
482   tab[i++] = MPI_MIN;
483   tab[i++] = MPI_SUM;
484   tab[i++] = MPI_PROD;
485   tab[i++] = MPI_LAND;
486   tab[i++] = MPI_BAND;
487   tab[i++] = MPI_LOR;
488   tab[i++] = MPI_BOR;
489   tab[i++] = MPI_LXOR;
490   tab[i++] = MPI_BXOR;
491   tab[i++] = MPI_MAXLOC;
492   tab[i++] = MPI_MINLOC;
493
494   CtvInitialize(int, mpi_opc);
495   CtvAccess(mpi_opc) = i;
496 }
497
498 /*
499 Called from MPI_Init, a collective initialization call:
500  creates a new AMPI array and attaches it to the current
501  set of TCHARM threads.
502 */
503 static ampi *ampiInit(char **argv)
504 {
505   if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
506   STARTUP_DEBUG("ampiInit> begin")
507
508   MPI_Comm new_world;
509   int _nchunks;
510   CkArrayOptions opts;
511   CProxy_ampiParent parent;
512   if (TCHARM_Element()==0)
513   { /* I'm responsible for building the arrays: */
514         STARTUP_DEBUG("ampiInit> creating arrays")
515
516 // FIXME: Need to serialize global communicator allocation in one place.
517         //Allocate the next communicator
518         if(mpi_nworlds == MPI_MAX_COMM_WORLDS)
519         {
520                 CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
521         }
522         int new_idx=mpi_nworlds;
523         new_world=MPI_COMM_WORLD+1+new_idx;
524
525         //Create and attach the ampiParent array
526         CkArrayID threads;
527         opts=TCHARM_Attach_start(&threads,&_nchunks);
528         parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
529         STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
530   }
531   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
532   if (TCHARM_Element()==0)
533   {
534         //Make a new ampi array
535         CkArrayID empty;
536         
537 #if AMPI_COMLIB
538         ComlibInstanceHandle ciStreaming;
539         ComlibInstanceHandle ciBcast;
540         ComlibInstanceHandle ciAllgather;
541         ComlibInstanceHandle ciAlltoall;
542         ciStreaming=CkGetComlibInstance();
543         ciBcast=CkGetComlibInstance();
544         ciAllgather=CkGetComlibInstance();
545         ciAlltoall=CkGetComlibInstance();
546 #endif
547
548         CkVec<int> _indices;
549         for(int i=0;i<_nchunks;i++) _indices.push_back(i);
550         ampiCommStruct worldComm(new_world,empty,_nchunks,_indices);
551         CProxy_ampi arr;
552 #if AMPI_COMLIB
553         arr=CProxy_ampi::ckNew(parent,worldComm,ciStreaming,ciBcast,ciAllgather,ciAlltoall,opts);
554 #else
555         //opts.setAnytimeMigration(0);
556         arr=CProxy_ampi::ckNew(parent,worldComm,opts);
557 #endif
558
559         //Broadcast info. to the mpi_worlds array
560         // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
561         ampiCommStruct newComm(new_world,arr,_nchunks);
562         if (ampiWorldsGroup.ckGetGroupID().isZero())
563                 ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
564         else
565                 ampiWorldsGroup.add(newComm);
566         STARTUP_DEBUG("ampiInit> arrays created")
567
568 #if AMPI_COMLIB
569         StreamingStrategy *sStreaming = new StreamingStrategy(1,10);
570         ciStreaming.setStrategy(sStreaming);
571         
572         BroadcastStrategy *sBcast = new BroadcastStrategy(arr.ckGetArrayID(),USE_HYPERCUBE);
573         ciBcast.setStrategy(sBcast);
574         
575         EachToManyMulticastStrategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
576         ciAllgather.setStrategy(sAllgather);
577         
578         EachToManyMulticastStrategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
579         ciAlltoall.setStrategy(sAlltoall);
580 #endif        
581   }
582
583   // Find our ampi object:
584   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
585   CtvAccess(ampiInitDone)=1;
586   CtvAccess(ampiFinalized)=0;
587   STARTUP_DEBUG("ampiInit> complete")
588 #if CMK_BLUEGENE_CHARM
589   TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
590 #endif
591
592   init_operations();     // initialize fortran reduction operation table
593
594   return ptr;
595 }
596
597 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
598 class ampiWorlds : public CBase_ampiWorlds {
599 public:
600     ampiWorlds(const ampiCommStruct &nextWorld) {
601         ampiWorldsGroup=thisgroup;
602         add(nextWorld);
603     }
604     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
605     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
606     void add(const ampiCommStruct &nextWorld) {
607         int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD+1);
608         mpi_worlds[new_idx].comm=nextWorld;
609         if (mpi_nworlds<=new_idx) mpi_nworlds=new_idx+1;
610         STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
611     }
612 };
613
614 //-------------------- ampiParent -------------------------
615 ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
616                 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
617 {
618   int barrier = 0x1234;
619   STARTUP_DEBUG("ampiParent> starting up")
620   thread=NULL;
621   worldPtr=NULL;
622   myDDT=&myDDTsto;
623   prepareCtv();
624
625   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
626         AsyncEvacuate(CmiFalse);
627 }
628
629 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
630   thread=NULL;
631   worldPtr=NULL;
632   myDDT=&myDDTsto;
633         AsyncEvacuate(CmiFalse);
634 }
635
636 void ampiParent::pup(PUP::er &p) {
637
638   ArrayElement1D::pup(p);
639 //      printf("[%d] Ampiparent being pupped \n",thisIndex);
640   p|threads;
641
642   p|worldStruct;
643   myDDT->pup(p);
644   p|splitComm;
645   p|groupComm;
646   p|groups;
647   p|ampiReqs;
648   p|RProxyCnt;
649   p|tmpRProxy;
650   p|winStructList;
651   p|infos;
652 }
653 void ampiParent::prepareCtv(void) {
654   thread=threads[thisIndex].ckLocal();
655   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
656   CtvAccessOther(thread->getThread(),ampiPtr) = this;
657   STARTUP_DEBUG("ampiParent> found TCharm")
658 }
659
660 void ampiParent::ckJustMigrated(void) {
661   ArrayElement1D::ckJustMigrated();
662   prepareCtv();
663 }
664
665 ampiParent::~ampiParent() {
666         STARTUP_DEBUG("ampiParent> destructor called");
667 }
668
669 //Children call this when they are first created or just migrated
670 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
671 {
672   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
673
674   if (s.getComm()>=MPI_COMM_WORLD)
675   { //We now have our COMM_WORLD-- register it
676     //Note that split communicators don't keep a raw pointer, so
677     //they don't need to re-register on migration.
678      if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
679      worldPtr=ptr;
680      worldStruct=s;
681
682     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
683      CkVec<int> _indices;
684      _indices.push_back(thisIndex);
685      selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
686   }
687   
688   if (!forMigration)
689   { //Register the new communicator:
690      MPI_Comm comm = s.getComm();
691      STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
692      if (comm>=MPI_COMM_WORLD) { 
693        // Pass the new ampi to the waiting ampiInit
694        thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
695      } else if (isSplit(comm)) {
696        splitChildRegister(s);
697      } else if (isGroup(comm)) {
698        groupChildRegister(s);
699      } else if (isCart(comm)) {
700        cartChildRegister(s);
701      } else if (isGraph(comm)) {
702        graphChildRegister(s);
703      } else if (isInter(comm)) {
704        interChildRegister(s);
705      } else if (isIntra(comm)) {
706        intraChildRegister(s);
707      }else
708        CkAbort("ampiParent recieved child with bad communicator");
709   }
710
711   return thread;
712 }
713
714 // reduction client data - preparation for checkpointing
715 class ckptClientStruct {
716 public:
717   char *dname;
718   ampiParent *ampiPtr;
719   ckptClientStruct(char *s, ampiParent *a): dname(s), ampiPtr(a) {}
720 };
721
722 static void checkpointClient(void *param,void *msg)
723 {
724   ckptClientStruct *client = (ckptClientStruct*)param;
725   char *dname = client->dname;
726   ampiParent *ampiPtr = client->ampiPtr;
727   ampiPtr->Checkpoint(strlen(dname), dname);
728   delete client;
729 }
730
731 void ampiParent::startCheckpoint(char* dname){
732   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
733   if (thisIndex==0) {
734         ckptClientStruct *clientData = new ckptClientStruct(dname, this);
735         CkCallback cb(checkpointClient, clientData);
736         contribute(0, NULL, CkReduction::sum_int, cb);
737   }
738   else
739         contribute(0, NULL, CkReduction::sum_int);
740   thread->stop();
741 }
742 void ampiParent::Checkpoint(int len, char* dname){
743   if (len == 0) {
744     // memory checkpoint
745     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
746     CkStartMemCheckpoint(cb);
747   }
748   else {
749     char dirname[256];
750     strncpy(dirname,dname,len);
751     dirname[len]='\0';
752     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
753     CkStartCheckpoint(dirname,cb);
754   }
755 }
756 void ampiParent::ResumeThread(void){
757   thread->resume();
758 }
759
760 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
761                              int *keyval, void* extra_state){
762         KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
763         int idx = kvlist.size();
764         kvlist.resize(idx+1);
765         kvlist[idx] = newnode;
766         *keyval = idx;
767         return 0;
768 }
769 int ampiParent::freeKeyval(int *keyval){
770         if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
771                 return -1;
772         delete kvlist[*keyval];
773         kvlist[*keyval] = NULL;
774         *keyval = MPI_KEYVAL_INVALID;
775         return 0;
776 }
777
778 int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
779         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
780                 return -1;
781         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
782         // Enlarge the keyval list:
783         while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
784         cs.getKeyvals()[keyval]=attribute_val;
785         return 0;
786 }
787 int ampiParent::kv_is_builtin(int keyval) {
788         switch(keyval) {
789         case MPI_TAG_UB: kv_builtin_storage=MPI_TAG_UB_VALUE; return 1;
790         case MPI_HOST: kv_builtin_storage=MPI_PROC_NULL; return 1;
791         case MPI_IO: kv_builtin_storage=0; return 1;
792         case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=0; return 1;
793         case AMPI_KEYVAL_MYPE: kv_builtin_storage=CkMyPe(); return 1;
794         case AMPI_KEYVAL_NUMPES: kv_builtin_storage=CkNumPes(); return 1;
795         case AMPI_KEYVAL_MYNODE: kv_builtin_storage=CkMyNode(); return 1;
796         case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=CkNumNodes(); return 1;
797         default: return 0;
798         };
799 }
800 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
801         *flag = false;
802         if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
803           *flag=true;
804           *(int *)attribute_val = kv_builtin_storage;
805           return 0;
806         }
807         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
808                 return -1; /* invalid keyval */
809         
810         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
811         if (keyval>=cs.getKeyvals().size())  
812                 return 0; /* we don't have a value yet */
813         if (cs.getKeyvals()[keyval]==0)
814                 return 0; /* we had a value, but now it's zero */
815         /* Otherwise, we have a good value */
816         *flag = true;
817         *(void **)attribute_val = cs.getKeyvals()[keyval];
818         return 0;
819 }
820 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
821         /* no way to delete an attribute: just overwrite it with 0 */
822         return putAttr(comm,keyval,0);
823 }
824
825 //----------------------- ampi -------------------------
826 void ampi::init(void) {
827   parent=NULL;
828   thread=NULL;
829   msgs=NULL;
830   resumeOnRecv=false;
831         AsyncEvacuate(CmiFalse);
832 }
833
834 ampi::ampi()
835 {
836   /* this constructor only exists so we can create an empty array during split */
837   CkAbort("Default ampi constructor should never be called");
838 }
839
840 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
841    :parentProxy(parent_)
842 {
843   init();
844
845   myComm=s; myComm.setArrayID(thisArrayID);
846   myRank=myComm.getRankForIndex(thisIndex);
847
848   findParent(false);
849
850   msgs = CmmNew();
851   nbcasts = 0;
852
853   comlibProxy = thisProxy;
854   ComlibDelegateProxy(&comlibProxy);
855   
856   seqEntries=parent->numElements;
857   oorder.init (seqEntries);
858 }
859
860 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
861                 ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
862    :parentProxy(parent_),ciStreaming(ciStreaming_),ciBcast(ciBcast_),ciAllgather(ciAllgather_),ciAlltoall(ciAlltoall_)
863 {
864   init();
865
866   myComm=s; myComm.setArrayID(thisArrayID);
867   myRank=myComm.getRankForIndex(thisIndex);
868
869   findParent(false);
870
871   msgs = CmmNew();
872   nbcasts = 0;
873
874   comlibProxy = thisProxy;
875   ComlibDelegateProxy(&comlibProxy);
876   ciStreaming.setSourcePe();
877   ciBcast.setSourcePe();
878   ciAllgather.setSourcePe();
879   ciAlltoall.setSourcePe();
880
881   seqEntries=parent->numElements;
882   oorder.init (seqEntries);
883 }
884
885 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
886 {
887   init();
888   
889   seqEntries=-1;
890 }
891
892 void ampi::ckJustMigrated(void)
893 {
894         findParent(true);
895         ArrayElement1D::ckJustMigrated();
896 }
897
898 void ampi::findParent(bool forMigration) {
899         STARTUP_DEBUG("ampi> finding my parent")
900         parent=parentProxy[thisIndex].ckLocal();
901         if (parent==NULL) CkAbort("AMPI can't find its parent!");
902         thread=parent->registerAmpi(this,myComm,forMigration);
903         if (thread==NULL) CkAbort("AMPI can't find its thread!");
904 //      printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
905 }
906
907 static void cmm_pup_ampi_message(pup_er p,void **msg) {
908         CkPupMessage(*(PUP::er *)p,msg,1);
909         if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
910 //      printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
911 }
912
913 void ampi::pup(PUP::er &p)
914 {
915   if(!p.isUserlevel())
916     ArrayElement1D::pup(p);//Pack superclass
917   p|parentProxy;
918   p|myComm;
919   p|myRank;
920   p|nbcasts;
921   p|tmpVec;
922   p|remoteProxy;
923   p|resumeOnRecv;
924   p|comlibProxy;
925   p|ciStreaming;
926   p|ciBcast;
927   p|ciAllgather;
928   p|ciAlltoall;
929
930   if(p.isUnpacking()){
931     ciStreaming.setSourcePe();
932     ciBcast.setSourcePe();
933     ciAllgather.setSourcePe();
934     ciAlltoall.setSourcePe();
935   }
936
937   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
938 //      printf("[%d] ampi index %d msgs table pointer %p\n",CkMyPe(),thisIndex,msgs);
939
940   p|seqEntries;
941   p|oorder;
942 }
943
944 ampi::~ampi()
945 {
946 /*
947   int tags[3], sts[3];
948   tags[0] = tags[1] = tags[2] = CmmWildCard;
949   AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
950   while (msg) {
951     delete msg;
952     msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
953   }
954 */
955   CmmFree(msgs);
956 }
957
958 //------------------------ Communicator Splitting ---------------------
959 class ampiSplitKey {
960 public:
961         int nextSplitComm;
962         int color; //New class of processes we'll belong to
963         int key; //To determine rank in new ordering
964         int rank; //Rank in old ordering
965         ampiSplitKey() {}
966         ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
967                 :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
968 };
969
970 /* "type" may indicate whether call is for a cartesian topology etc. */
971
972 void ampi::split(int color,int key,MPI_Comm *dest, int type)
973 {
974   if (type == CART_TOPOL) {
975         ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
976         int rootIdx=myComm.getIndexForRank(0);
977         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
978         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
979
980         thread->suspend(); //Resumed by ampiParent::cartChildRegister
981         MPI_Comm newComm=parent->getNextCart()-1;
982         *dest=newComm;
983   } else {
984         ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
985         int rootIdx=myComm.getIndexForRank(0);
986         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
987         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
988
989         thread->suspend(); //Resumed by ampiParent::splitChildRegister
990         MPI_Comm newComm=parent->getNextSplit()-1;
991         *dest=newComm;
992   }
993 }
994
995 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
996         const ampiSplitKey *a=(const ampiSplitKey *)a_;
997         const ampiSplitKey *b=(const ampiSplitKey *)b_;
998         if (a->color!=b->color) return a->color-b->color;
999         if (a->key!=b->key) return a->key-b->key;
1000         return a->rank-b->rank;
1001 }
1002
1003 void ampi::splitPhase1(CkReductionMsg *msg)
1004 {
1005         //Order the keys, which orders the ranks properly:
1006         int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1007         ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1008         if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1009         qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1010
1011         MPI_Comm newComm = -1;
1012         for(int i=0;i<nKeys;i++)
1013                 if(keys[i].nextSplitComm>newComm)
1014                         newComm = keys[i].nextSplitComm;
1015
1016         //Loop over the sorted keys, which gives us the new arrays:
1017         int lastColor=keys[0].color-1; //The color we're building an array for
1018         CProxy_ampi lastAmpi; //The array for lastColor
1019         int lastRoot=0; //C value for new rank 0 process for latest color
1020         ampiCommStruct lastComm; //Communicator info. for latest color
1021         for (int c=0;c<nKeys;c++) {
1022                 if (keys[c].color!=lastColor)
1023                 { //Hit a new color-- need to build a new communicator and array
1024                         lastColor=keys[c].color;
1025                         lastRoot=c;
1026                         CkArrayOptions opts;
1027                         opts.bindTo(parentProxy);
1028                         opts.setNumInitial(0);
1029                         CkArrayID unusedAID; ampiCommStruct unusedComm;
1030                         lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1031                         lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1032
1033                         CkVec<int> indices; //Maps rank to array indices for new arrau
1034                         for (int i=c;i<nKeys;i++) {
1035                                 if (keys[i].color!=lastColor) break; //Done with this color
1036                                 int idx=myComm.getIndexForRank(keys[i].rank);
1037                                 indices.push_back(idx);
1038                         }
1039
1040                         //FIXME: create a new communicator for each color, instead of
1041                         // (confusingly) re-using the same MPI_Comm number for each.
1042                         lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1043                 }
1044                 int newRank=c-lastRoot;
1045                 int newIdx=lastComm.getIndexForRank(newRank);
1046
1047                 //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1048                 lastAmpi[newIdx].insert(parentProxy,lastComm);
1049         }
1050
1051         delete msg;
1052 }
1053
1054 //...newly created array elements register with the parent, which calls:
1055 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1056         int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1057         if (splitComm.size()<=idx) splitComm.resize(idx+1);
1058         splitComm[idx]=new ampiCommStruct(s);
1059         thread->resume(); //Matches suspend at end of ampi::split
1060 }
1061
1062 //-----------------create communicator from group--------------
1063 // The procedure is like that of comm_split very much,
1064 // so the code is shamelessly copied from above
1065 //   1. reduction to make sure all members have called
1066 //   2. the root in the old communicator create the new array
1067 //   3. ampiParent::register is called to register new array as new comm
1068 class vecStruct {
1069 public:
1070   int nextgroup;
1071   groupStruct vec;
1072   vecStruct():nextgroup(-1){}
1073   vecStruct(int nextgroup_, groupStruct vec_)
1074     : nextgroup(nextgroup_), vec(vec_) { }
1075 };
1076
1077 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1078   int rootIdx=vec[0];
1079   tmpVec = vec;
1080   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1081   MPI_Comm nextgroup = parent->getNextGroup();
1082   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1083
1084   if(getPosOp(thisIndex,vec)>=0){
1085     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1086     MPI_Comm retcomm = parent->getNextGroup()-1;
1087     *newcomm = retcomm;
1088   }else{
1089     *newcomm = MPI_COMM_NULL;
1090   }
1091 }
1092
1093 void ampi::commCreatePhase1(CkReductionMsg *msg){
1094   MPI_Comm *nextGroupComm = (int *)msg->getData();
1095
1096   CkArrayOptions opts;
1097   opts.bindTo(parentProxy);
1098   opts.setNumInitial(0);
1099   CkArrayID unusedAID;
1100   ampiCommStruct unusedComm;
1101   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1102   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1103
1104   groupStruct indices = tmpVec;
1105   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1106   for(int i=0;i<indices.size();i++){
1107     int newIdx=indices[i];
1108     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1109   }
1110   delete msg;
1111 }
1112
1113 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1114   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1115   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1116   groupComm[idx]=new ampiCommStruct(s);
1117   thread->resume(); //Matches suspend at end of ampi::split
1118 }
1119
1120 /* Virtual topology communicator creation */
1121 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1122   int rootIdx=vec[0];
1123   tmpVec = vec;
1124   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1125
1126   MPI_Comm nextcart = parent->getNextCart();
1127   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1128
1129   if(getPosOp(thisIndex,vec)>=0){
1130     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1131      MPI_Comm retcomm = parent->getNextCart()-1;
1132      *newcomm = retcomm;
1133   }else
1134     *newcomm = MPI_COMM_NULL;
1135 }
1136
1137 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1138   MPI_Comm *nextCartComm = (int *)msg->getData();
1139
1140   CkArrayOptions opts;
1141   opts.bindTo(parentProxy);
1142   opts.setNumInitial(0);
1143   CkArrayID unusedAID;
1144   ampiCommStruct unusedComm;
1145   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1146   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1147
1148   groupStruct indices = tmpVec;
1149   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1150                                                 size(),indices);
1151   for(int i=0;i<indices.size();i++){
1152     int newIdx=indices[i];
1153     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1154   }
1155   delete msg;
1156 }
1157
1158 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1159   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1160   if (cartComm.size()<=idx) {
1161     cartComm.resize(idx+1);
1162     cartComm.length()=idx+1;
1163   }
1164   cartComm[idx]=new ampiCommStruct(s);
1165   thread->resume(); //Matches suspend at end of ampi::cartCreate
1166 }
1167
1168 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1169   int rootIdx=vec[0];
1170   tmpVec = vec;
1171   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1172                 myComm.getProxy());
1173   MPI_Comm nextgraph = parent->getNextGraph();
1174   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1175
1176   if(getPosOp(thisIndex,vec)>=0){
1177     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1178     MPI_Comm retcomm = parent->getNextGraph()-1;
1179     *newcomm = retcomm;
1180   }else
1181     *newcomm = MPI_COMM_NULL;
1182 }
1183
1184 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1185   MPI_Comm *nextGraphComm = (int *)msg->getData();
1186
1187   CkArrayOptions opts;
1188   opts.bindTo(parentProxy);
1189   opts.setNumInitial(0);
1190   CkArrayID unusedAID;
1191   ampiCommStruct unusedComm;
1192   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1193   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1194
1195   groupStruct indices = tmpVec;
1196   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1197                                                 .size(),indices);
1198   for(int i=0;i<indices.size();i++){
1199     int newIdx=indices[i];
1200     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1201   }
1202   delete msg;
1203 }
1204
1205 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1206   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1207   if (graphComm.size()<=idx) {
1208     graphComm.resize(idx+1);
1209     graphComm.length()=idx+1;
1210   }
1211   graphComm[idx]=new ampiCommStruct(s);
1212   thread->resume(); //Matches suspend at end of ampi::graphCreate
1213 }
1214
1215 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1216   if(thisIndex==root) { // not everybody gets the valid rvec
1217     tmpVec = rvec;
1218   }
1219   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1220   MPI_Comm nextinter = parent->getNextInter();
1221   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1222
1223   thread->suspend(); //Resumed by ampiParent::interChildRegister
1224   MPI_Comm newcomm=parent->getNextInter()-1;
1225   *ncomm=newcomm;
1226 }
1227
1228 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1229   MPI_Comm *nextInterComm = (int *)msg->getData();
1230
1231   groupStruct lgroup = myComm.getIndices();
1232   CkArrayOptions opts;
1233   opts.bindTo(parentProxy);
1234   opts.setNumInitial(0);
1235   CkArrayID unusedAID;
1236   ampiCommStruct unusedComm;
1237   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1238   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1239
1240   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1241   for(int i=0;i<lgroup.size();i++){
1242     int newIdx=lgroup[i];
1243     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1244   }
1245
1246   parentProxy[0].ExchangeProxy(newAmpi);
1247   delete msg;
1248 }
1249
1250 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1251   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1252   if (interComm.size()<=idx) interComm.resize(idx+1);
1253   interComm[idx]=new ampiCommStruct(s);
1254   //thread->resume(); // don't resume it yet, till parent set remote proxy
1255 }
1256
1257 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1258   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1259     groupStruct lvec = myComm.getIndices();
1260     groupStruct rvec = myComm.getRemoteIndices();
1261     int rsize = rvec.size();
1262     tmpVec = lvec;
1263     for(int i=0;i<rsize;i++)
1264       tmpVec.push_back(rvec[i]);
1265     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1266   }else{
1267     tmpVec.resize(0);
1268   }
1269
1270   int rootIdx=myComm.getIndexForRank(0);
1271   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1272   MPI_Comm nextintra = parent->getNextIntra();
1273   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1274
1275   thread->suspend(); //Resumed by ampiParent::interChildRegister
1276   MPI_Comm newcomm=parent->getNextIntra()-1;
1277   *ncomm=newcomm;
1278 }
1279
1280 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1281   if(tmpVec.size()==0) { delete msg; return; }
1282   MPI_Comm *nextIntraComm = (int *)msg->getData();
1283   CkArrayOptions opts;
1284   opts.bindTo(parentProxy);
1285   opts.setNumInitial(0);
1286   CkArrayID unusedAID;
1287   ampiCommStruct unusedComm;
1288   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1289   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1290
1291   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1292   for(int i=0;i<tmpVec.size();i++){
1293     int newIdx=tmpVec[i];
1294     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1295   }
1296   delete msg;
1297 }
1298
1299 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1300   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1301   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1302   intraComm[idx]=new ampiCommStruct(s);
1303   thread->resume(); //Matches suspend at end of ampi::split
1304 }
1305
1306 //------------------------ communication -----------------------
1307 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1308 {
1309   if (universeNo>MPI_COMM_WORLD) {
1310     int worldDex=universeNo-MPI_COMM_WORLD-1;
1311     if (worldDex>=mpi_nworlds)
1312       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1313     return mpi_worlds[worldDex].comm;
1314   }
1315   CkAbort("Bad communicator passed to universeComm2CommStruct");
1316   return mpi_worlds[0].comm; // meaningless return
1317 }
1318
1319 void ampi::block(void){
1320         thread->suspend();
1321 }
1322
1323 void ampi::yield(void){
1324         thread->schedule();
1325 }
1326
1327 void ampi::unblock(void){
1328         thread->resume();
1329 }
1330
1331 void
1332 ampi::generic(AmpiMsg* msg)
1333 {
1334 MSG_ORDER_DEBUG(
1335   CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1336         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1337 )
1338
1339 //      AmpiMsg *msgcopy = msg;
1340   if(msg->seq != -1) {
1341     int srcIdx=msg->srcIdx;
1342     int n=oorder.put(srcIdx,msg);
1343     if (n>0) { // This message was in-order
1344       inorder(msg);
1345       if (n>1) { // It enables other, previously out-of-order messages
1346         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1347           inorder(msg);
1348         }
1349       }
1350     }
1351   } else { //Cross-world or system messages are unordered
1352     inorder(msg);
1353   }
1354   
1355   if(resumeOnRecv){
1356     thread->resume();
1357   }
1358 }
1359
1360 void
1361 ampi::inorder(AmpiMsg* msg)
1362 {
1363 MSG_ORDER_DEBUG(
1364   CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1365         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1366 )
1367   int tags[3];
1368   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1369   CmmPut(msgs, 3, tags, msg);
1370 }
1371
1372 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1373         int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm)
1374 {
1375   CkDDT_DataType *ddt = getDDT()->getType(type);
1376   int len = ddt->getSize(count);
1377   int sIdx=thisIndex;
1378   int seq = -1;
1379   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_TAG_UB_VALUE) //Not cross-module: set seqno
1380      seq = oorder.nextOutgoing(destIdx);
1381   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1382   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1383   return msg;
1384 }
1385
1386 void
1387 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1388 {
1389   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1390 }
1391
1392 void
1393 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1394 {
1395   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1396   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy());
1397 }
1398
1399 void
1400 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1401 {
1402   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
1403   memcpy(msg->data, buf, len);
1404   CProxy_ampi pa(aid);
1405   pa[idx].generic(msg);
1406 }
1407
1408 void
1409 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy)
1410 {
1411   if(rank==MPI_PROC_NULL) return;
1412   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1413   int destIdx = dest.getIndexForRank(rank);
1414   if(isInter()){
1415     sRank = parent->thisIndex;
1416     destcomm = MPI_COMM_FIRST_INTER;
1417     destIdx = dest.getIndexForRemoteRank(rank);
1418     arrproxy = remoteProxy;
1419   }
1420  MSG_ORDER_DEBUG(
1421   CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
1422  )
1423
1424   arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm));
1425
1426 #ifndef CMK_OPTIMIZE
1427   int size=0;
1428   MPI_Type_size(type,&size);
1429   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
1430 #endif
1431 }
1432
1433 int
1434 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
1435 {
1436   MPI_Comm disComm = myComm.getComm();
1437   if(s==MPI_PROC_NULL) {
1438     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
1439     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
1440     ((MPI_Status *)sts)->MPI_LENGTH = 0;
1441     return 0;
1442   }
1443   _LOG_E_END_AMPI_PROCESSING(thisIndex)
1444 #if CMK_BLUEGENE_CHARM
1445   void *curLog;         // store current log in timeline
1446   _TRACE_BG_TLINE_END(&curLog);
1447   TRACE_BG_AMPI_SUSPEND();
1448 #endif
1449
1450   if(isInter()){
1451     s = myComm.getIndexForRemoteRank(s);
1452     comm = MPI_COMM_FIRST_INTER;
1453   }
1454
1455   int tags[3];
1456   AmpiMsg *msg = 0;
1457   
1458  MSG_ORDER_DEBUG(
1459   CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
1460  )
1461
1462   resumeOnRecv=true;
1463   ampi *dis = getAmpiInstance(disComm);
1464   while(1) {
1465       //This is done to take into account the case in which an ampi 
1466       // thread has migrated while waiting for a message
1467     tags[0] = t; tags[1] = s; tags[2] = comm;
1468     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
1469     if (msg) break;
1470     dis->thread->suspend();
1471     dis = getAmpiInstance(disComm);
1472   }
1473   dis->resumeOnRecv=false;
1474   CkDDT_DataType *ddt = dis->getDDT()->getType(type);
1475   int len = ddt->getSize(count);
1476   
1477   if(sts)
1478     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1479   if (msg->length > len && msg->length-len!=sizeof(AmpiOpHeader))
1480   { /* Received more data than we were expecting */
1481     char einfo[1024];
1482     sprintf(einfo, "FATAL ERROR in rank %d MPI_Recv (tag=%d, source=%d)\n"
1483         "  Expecting only %d bytes (%d items of type %d), \n"
1484         "  but received %d bytes from rank %d\nAMPI> MPI_Send was longer than matching MPI_Recv.",
1485             thisIndex,t,s,
1486             len, count, type,
1487             msg->length, msg->srcRank);
1488     CkError(einfo);
1489     return -1;
1490   }else if(msg->length < len){ // only at rare case shall we reset count by using divide
1491     count = msg->length/(ddt->getSize(1));
1492   }
1493   if (msg->length-len==sizeof(AmpiOpHeader)) {  // reduction msg
1494     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
1495   } else {
1496     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
1497   }
1498   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
1499 #if CMK_BLUEGENE_CHARM
1500   TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", curLog);
1501 #endif
1502
1503   delete msg;
1504   return 0;
1505 }
1506
1507 void
1508 ampi::probe(int t, int s, int comm, int *sts)
1509 {
1510   int tags[3];
1511 #if CMK_BLUEGENE_CHARM
1512   void *curLog;         // store current log in timeline
1513   _TRACE_BG_TLINE_END(&curLog);
1514   TRACE_BG_AMPI_SUSPEND();
1515 #endif
1516
1517   AmpiMsg *msg = 0;
1518   resumeOnRecv=true;
1519   while(1) {
1520     tags[0] = t; tags[1] = s; tags[2] = comm;
1521     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
1522     if (msg) break;
1523     thread->suspend();
1524   }
1525   resumeOnRecv=false;
1526   if(sts)
1527     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1528 #if CMK_BLUEGENE_CHARM
1529   TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
1530 #endif
1531 }
1532
1533 int
1534 ampi::iprobe(int t, int s, int comm, int *sts)
1535 {
1536   int tags[3];
1537   AmpiMsg *msg = 0;
1538   tags[0] = t; tags[1] = s; tags[2] = comm;
1539   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
1540   if (msg) {
1541     if(sts)
1542       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1543     return 1;
1544   }
1545   thread->schedule();
1546   return 0;
1547 }
1548
1549
1550 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
1551 void
1552 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
1553 {
1554   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1555   int rootIdx=dest.getIndexForRank(root);
1556   if(rootIdx==thisIndex) {
1557 #if 0//AMPI_COMLIB
1558     ciBcast.beginIteration();
1559     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
1560 #else
1561     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
1562 #endif
1563   }
1564   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
1565   nbcasts++;
1566 }
1567
1568 void
1569 ampi::bcastraw(void* buf, int len, CkArrayID aid)
1570 {
1571   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, 0);
1572   memcpy(msg->data, buf, len);
1573   CProxy_ampi pa(aid);
1574   pa.generic(msg);
1575 }
1576
1577 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
1578                         void *attr_in, void *attr_out, int *flag){
1579   (*flag) = 0;
1580   return (MPI_SUCCESS);
1581 }
1582 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
1583                         void *attr_in, void *attr_out, int *flag){
1584   (*(void **)attr_out) = attr_in;
1585   (*flag) = 1;
1586   return (MPI_SUCCESS);
1587 }
1588 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
1589   return (MPI_SUCCESS);
1590 }
1591
1592
1593 void AmpiSeqQ::init(int numP) 
1594 {
1595   elements.init(numP);
1596 }
1597
1598 AmpiSeqQ::~AmpiSeqQ () {
1599 }
1600   
1601 void AmpiSeqQ::pup(PUP::er &p) {
1602   p|out;
1603   p|elements;
1604 }
1605
1606 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
1607 {
1608   AmpiOtherElement &el=elements[srcIdx];
1609 #ifndef CMK_OPTIMIZE
1610   if (msg->seq<el.seqIncoming)
1611     CkAbort("AMPI Logic error: received late out-of-order message!\n");
1612 #endif
1613   out.enq(msg);
1614   el.nOut++; // We have another message in the out-of-order queue
1615 }
1616
1617 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
1618 {
1619   AmpiOtherElement &el=elements[srcIdx];
1620   if (el.nOut==0) return 0; // No more out-of-order left.
1621   // Walk through our out-of-order queue, searching for our next message:
1622   for (int i=0;i<out.length();i++) {
1623     AmpiMsg *msg=out.deq();
1624     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
1625       el.seqIncoming++;
1626       el.nOut--; // We have one less message out-of-order
1627       return msg;
1628     }
1629     else
1630       out.enq(msg);
1631   }
1632   // We walked the whole queue-- ours is not there.
1633   return 0;
1634 }
1635
1636
1637 void AmpiRequestList::pup(PUP::er &p) { 
1638         CmiMemoryCheck();
1639         if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
1640                 return;
1641         }
1642
1643         p(blklen); //Allocated size of block
1644         p(len); //Number of used elements in block
1645         if(p.isUnpacking()){
1646                 makeBlock(blklen,len);
1647         }
1648         int count=0;
1649         for(int i=0;i<len;i++){
1650                 char nonnull;
1651                 if(!p.isUnpacking()){
1652                         if(block[i] == NULL){
1653                                 nonnull = 0;
1654                         }else{
1655                                 nonnull = block[i]->getType();
1656                         }
1657                 }       
1658                 p(nonnull);
1659                 if(nonnull != 0){
1660                         if(p.isUnpacking()){
1661                                 switch(nonnull){
1662                                         case 1:
1663                                                 block[i] = new PersReq;
1664                                                 break;
1665                                         case 2: 
1666                                                 block[i] = new IReq;
1667                                                 break;
1668                                         case 3: 
1669                                                 block[i] = new ATAReq;
1670                                                 break;
1671                                 }
1672                         }       
1673                         block[i]->pup(p);
1674                         count++;
1675                 }else{
1676                         block[i] = 0;
1677                 }
1678         }
1679         if(p.isDeleting()){
1680                 freeBlock();
1681         }
1682         CmiMemoryCheck();
1683 }
1684
1685 //------------------ External Interface -----------------
1686 ampiParent *getAmpiParent(void) {
1687   ampiParent *p = CtvAccess(ampiPtr);
1688 #ifndef CMK_OPTIMIZE
1689   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
1690 #endif
1691   return p;
1692 }
1693
1694 ampi *getAmpiInstance(MPI_Comm comm) {
1695   ampi *ptr=getAmpiParent()->comm2ampi(comm);
1696 #ifndef CMK_OPTIMIZE
1697   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
1698 #endif
1699   return ptr;
1700 }
1701
1702 inline static AmpiRequestList *getReqs(void) {
1703   return &(getAmpiParent()->ampiReqs);
1704 }
1705
1706 inline void checkComm(MPI_Comm comm){
1707 #ifndef CMK_OPTIMIZE
1708   getAmpiParent()->checkComm(comm);
1709 #endif
1710 }
1711
1712 inline void checkRequest(MPI_Request req){
1713 #ifndef CMK_OPTIMIZE
1714   getReqs()->checkRequest(req);
1715 #endif
1716 }
1717
1718 inline void checkRequests(int n, MPI_Request* reqs){
1719 #ifndef CMK_OPTIMIZE
1720   AmpiRequestList* reqlist = getReqs();
1721   for(int i=0;i<n;i++)
1722     reqlist->checkRequest(reqs[i]);
1723 #endif
1724 }
1725
1726 CDECL void AMPI_Migrate(void)
1727 {
1728   AMPIAPI("AMPI_Migrate");
1729 #if CMK_BLUEGENE_CHARM
1730   TRACE_BG_AMPI_SUSPEND();
1731 #endif
1732   TCHARM_Migrate();
1733 #if CMK_BLUEGENE_CHARM
1734   TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
1735 #endif
1736 }
1737
1738
1739 CDECL void AMPI_Evacuate(void)
1740 {
1741   TCHARM_Evacuate();
1742 }
1743
1744
1745
1746 CDECL void AMPI_Migrateto(int destPE)
1747 {
1748   AMPIAPI("AMPI_MigrateTo");
1749 #if CMK_BLUEGENE_CHARM
1750   TRACE_BG_AMPI_SUSPEND();
1751 #endif
1752   TCHARM_Migrate_to(destPE);
1753 #if CMK_BLUEGENE_CHARM
1754   TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
1755 #endif
1756 }
1757
1758 CDECL void AMPI_MigrateTo(int destPE)
1759 {
1760         AMPI_Migrateto(destPE);
1761 }
1762
1763 CDECL void AMPI_Async_Migrate(void)
1764 {
1765   AMPIAPI("AMPI_Async_Migrate");
1766 #if CMK_BLUEGENE_CHARM
1767   TRACE_BG_AMPI_SUSPEND();
1768 #endif
1769   TCHARM_Async_Migrate();
1770 #if CMK_BLUEGENE_CHARM
1771   TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
1772 #endif
1773 }
1774
1775 CDECL void AMPI_Allow_Migrate(void)
1776 {
1777   AMPIAPI("AMPI_Allow_Migrate");
1778 #if CMK_BLUEGENE_CHARM
1779   TRACE_BG_AMPI_SUSPEND();
1780 #endif
1781   TCHARM_Allow_Migrate();
1782 #if CMK_BLUEGENE_CHARM
1783   TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_ALLOW_MIGRATE")
1784 #endif
1785 }
1786
1787 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
1788 #if CMK_LBDB_ON
1789   AMPIAPI("AMPI_Setmigratable");
1790   ampi *ptr=getAmpiInstance(comm);
1791   ptr->setMigratable(mig);
1792 #else
1793   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
1794 #endif
1795 }
1796
1797 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
1798 {
1799   if (nodeinit_has_been_called) {
1800     AMPIAPI("AMPI_Init");
1801     char **argv;
1802     if (p_argv) argv=*p_argv;
1803     else argv=CkGetArgv();
1804     ampiInit(argv);
1805     if (p_argc) *p_argc=CmiGetArgc(argv);
1806   }
1807   else
1808   { /* Charm hasn't been started yet! */
1809     CkAbort("AMPI_Init> Charm is not initialized!");
1810   }
1811   return 0;
1812 }
1813
1814 CDECL int AMPI_Initialized(int *isInit)
1815 {
1816   if (nodeinit_has_been_called) {
1817         AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
1818         *isInit=CtvAccess(ampiInitDone);
1819   }
1820   else /* !nodeinit_has_been_called */ {
1821         *isInit=nodeinit_has_been_called;
1822   }
1823   return 0;
1824 }
1825
1826 CDECL int AMPI_Finalized(int *isFinalized)
1827 {
1828     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
1829     *isFinalized=CtvAccess(ampiFinalized);
1830     return 0;
1831 }
1832
1833 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
1834 {
1835   AMPIAPI("AMPI_Comm_rank");
1836   *rank = getAmpiInstance(comm)->getRank(comm);
1837   return 0;
1838 }
1839
1840 CDECL
1841 int AMPI_Comm_size(MPI_Comm comm, int *size)
1842 {
1843   AMPIAPI("AMPI_Comm_size");
1844   *size = getAmpiInstance(comm)->getSize(comm);
1845   return 0;
1846 }
1847
1848 CDECL
1849 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
1850 {
1851   AMPIAPI("AMPI_Comm_compare");
1852   if(comm1==comm2) *result=MPI_IDENT;
1853   else{
1854     int equal=1;
1855     CkVec<int> ind1, ind2;
1856     ind1 = getAmpiInstance(comm1)->getIndices();
1857     ind2 = getAmpiInstance(comm2)->getIndices();
1858     if(ind1.size()==ind2.size()){
1859       for(int i=0;i<ind1.size();i++)
1860         if(ind1[i] != ind2[i]) { equal=0; break; }
1861     }
1862     if(equal==1) *result=MPI_CONGRUENT;
1863     else *result=MPI_UNEQUAL;
1864   }
1865   return 0;
1866 }
1867
1868 CDECL void AMPI_Exit(int /*exitCode*/)
1869 {
1870   AMPIAPI("AMPI_Exit");
1871   TCHARM_Done();
1872 }
1873 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
1874 {
1875   AMPI_Exit(*exitCode);
1876 }
1877
1878 CDECL
1879 int AMPI_Finalize(void)
1880 {
1881   AMPIAPI("AMPI_Finalize");
1882 #if PRINT_IDLE
1883   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
1884 #endif
1885 #if AMPI_COUNTER
1886     getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
1887 #endif
1888   CtvAccess(ampiFinalized)=1;
1889 #if CMK_BLUEGENE_CHARM
1890   TRACE_BG_AMPI_SUSPEND();
1891 #endif
1892 //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
1893   AMPI_Exit(0);
1894   return 0;
1895 }
1896
1897 CDECL
1898 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
1899                         int tag, MPI_Comm comm)
1900 {
1901   AMPIAPI("AMPI_Send");
1902   ampi *ptr = getAmpiInstance(comm);
1903 #if AMPI_COMLIB
1904   if(enableStreaming){  
1905     ptr->getStreaming().beginIteration();
1906     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
1907   } else
1908 #endif
1909     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
1910 #if AMPI_COUNTER
1911   getAmpiParent()->counters.send++;
1912 #endif
1913   return 0;
1914 }
1915
1916 CDECL
1917 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
1918               MPI_Comm comm, MPI_Status *status)
1919 {
1920         AMPIAPI("AMPI_Recv");
1921         ampi *ptr = getAmpiInstance(comm);
1922
1923         if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
1924
1925 #if AMPI_COUNTER
1926   getAmpiParent()->counters.recv++;
1927 #endif
1928         return 0;
1929 }
1930
1931 CDECL
1932 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
1933 {
1934   AMPIAPI("AMPI_Probe");
1935   ampi *ptr = getAmpiInstance(comm);
1936   ptr->probe(tag,src, comm, (int*) status);
1937   return 0;
1938 }
1939
1940 CDECL
1941 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
1942 {
1943   AMPIAPI("AMPI_Iprobe");
1944   ampi *ptr = getAmpiInstance(comm);
1945   *flag = ptr->iprobe(tag,src,comm,(int*) status);
1946   return 0;
1947 }
1948
1949 CDECL
1950 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
1951                   int stag, void *rbuf, int rcount, int rtype,
1952                   int src, int rtag, MPI_Comm comm, MPI_Status *sts)
1953 {
1954   AMPIAPI("AMPI_Sendrecv");
1955   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
1956   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
1957   if (se) return se;
1958   else return re;
1959 }
1960
1961 CDECL
1962 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
1963                          int dest, int sendtag, int source, int recvtag,
1964                          MPI_Comm comm, MPI_Status *status)
1965 {
1966   AMPIAPI("AMPI_Sendrecv_replace");
1967   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
1968                       buf, count, datatype, source, recvtag, comm, status);
1969 }
1970
1971
1972 CDECL
1973 int AMPI_Barrier(MPI_Comm comm)
1974 {
1975   AMPIAPI("AMPI_Barrier");
1976   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
1977 #if CMK_BLUEGENE_CHARM
1978   void *barrierLog;             // store current log in timeline
1979   TRACE_BG_AMPI_BARRIER_START(barrierLog);
1980 #endif
1981   //HACK: Use collective operation as a barrier.
1982   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
1983 #if CMK_BLUEGENE_CHARM
1984   TRACE_BG_AMPI_BARRIER_END(barrierLog);
1985 #endif
1986 #if AMPI_COUNTER
1987   getAmpiParent()->counters.barrier++;
1988 #endif
1989   return 0;
1990 }
1991
1992 CDECL
1993 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
1994                          MPI_Comm comm)
1995 {
1996   AMPIAPI("AMPI_Bcast");
1997   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
1998   if(comm==MPI_COMM_SELF) return 0;
1999   ampi *ptr = getAmpiInstance(comm);
2000   ptr->bcast(root, buf, count, type,comm);
2001 #if AMPI_COUNTER
2002   getAmpiParent()->counters.bcast++;
2003 #endif
2004   return 0;
2005 }
2006
2007 /// This routine is called with the results of a Reduce or AllReduce
2008 const int MPI_REDUCE_SOURCE=0;
2009 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2010 void ampi::reduceResult(CkReductionMsg *msg)
2011 {
2012         MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2013   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2014              thisArrayID,thisIndex);
2015   delete msg;
2016 }
2017
2018 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2019 {
2020   int szdata = ddt->getSize(count);
2021   int szhdr = sizeof(AmpiOpHeader);
2022   AmpiOpHeader newhdr(op,type,count,szdata); 
2023   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2024   memcpy(msg->getData(),&newhdr,szhdr);
2025   ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2026   return msg;
2027 }
2028
2029 // Copy the MPI datatype "type" from inbuf to outbuf
2030 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2031   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2032   //  deserializing into the output.
2033   ampi *ptr = getAmpiInstance(comm);
2034   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2035   int len=ddt->getSize(count);
2036   char *serialized=new char[len];
2037   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
2038   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
2039   delete [] serialized;         // < memory leak!  // gzheng 
2040   
2041   return MPI_SUCCESS;
2042 }
2043
2044 CDECL
2045 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
2046                int root, MPI_Comm comm)
2047 {
2048   AMPIAPI("AMPI_Reduce");
2049   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
2050   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
2051   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2052   ampi *ptr = getAmpiInstance(comm);
2053   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2054
2055   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
2056   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
2057   msg->setCallback(reduceCB);
2058         MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
2059   ptr->contribute(msg);
2060   if (ptr->thisIndex == rootIdx){
2061     /*HACK: Use recv() to block until reduction data comes back*/
2062     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2063       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
2064   }
2065 #if AMPI_COUNTER
2066   getAmpiParent()->counters.reduce++;
2067 #endif
2068   return 0;
2069 }
2070
2071 CDECL
2072 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
2073                   MPI_Op op, MPI_Comm comm)
2074 {
2075   AMPIAPI("AMPI_Allreduce");
2076   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
2077   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
2078   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2079   ampi *ptr = getAmpiInstance(comm);
2080   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2081   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2082   msg->setCallback(allreduceCB);
2083   ptr->contribute(msg);
2084
2085   /*HACK: Use recv() to block until the reduction data comes back*/
2086   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2087     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
2088 #if AMPI_COUNTER
2089   getAmpiParent()->counters.allreduce++;
2090 #endif
2091   return 0;
2092 }
2093
2094 CDECL
2095 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
2096                    MPI_Op op, MPI_Comm comm, MPI_Request* request)
2097 {
2098   AMPIAPI("AMPI_Allreduce");
2099   checkRequest(*request);
2100   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
2101   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
2102   ampi *ptr = getAmpiInstance(comm);
2103   if(comm==MPI_COMM_SELF) {
2104     // Just do a local copy: no need to do a reduction:
2105     return copyDatatype(comm,type,count,inbuf,outbuf);
2106   }
2107   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2108   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2109   msg->setCallback(allreduceCB);
2110   ptr->contribute(msg);
2111
2112   // using irecv instead recv to non-block the call and get request pointer
2113   AmpiRequestList* reqs = getReqs();
2114   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
2115   *request = reqs->insert(newreq);
2116   return 0;
2117 }
2118
2119 CDECL
2120 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
2121                        MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
2122 {
2123   AMPIAPI("AMPI_Reduce_scatter");
2124   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
2125   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
2126   ampi *ptr = getAmpiInstance(comm);
2127   int size = ptr->getSize(comm);
2128   int count=0;
2129   int *displs = new int [size];
2130   int len;
2131   void *tmpbuf;
2132
2133   //under construction
2134   for(int i=0;i<size;i++){
2135     displs[i] = count;
2136     count+= recvcounts[i];
2137   }
2138   len = ptr->getDDT()->getType(datatype)->getSize(count);
2139   tmpbuf = malloc(len);
2140   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
2141   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
2142                recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
2143   free(tmpbuf);
2144   delete [] displs;     // < memory leak ! // gzheng
2145   return 0;
2146 }
2147
2148 /***** MPI_Scan algorithm (from MPICH) *******
2149    recvbuf = sendbuf;
2150    partial_scan = sendbuf;
2151    mask = 0x1;
2152    while (mask < size) {
2153       dst = rank^mask;
2154       if (dst < size) {
2155          send partial_scan to dst;
2156          recv from dst into tmp_buf;
2157          if (rank > dst) {
2158             partial_scan = tmp_buf + partial_scan;
2159             recvbuf = tmp_buf + recvbuf;
2160          }
2161          else {
2162             if (op is commutative)
2163                partial_scan = tmp_buf + partial_scan;
2164             else {
2165                tmp_buf = partial_scan + tmp_buf;
2166                partial_scan = tmp_buf;
2167             }
2168          }
2169       }
2170       mask <<= 1;
2171    }
2172  ***** MPI_Scan algorithm (from MPICH) *******/
2173
2174 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
2175   (op)(invec,inoutvec,&count,&datatype);
2176 }
2177 CDECL
2178 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
2179   AMPIAPI("AMPI_Scan");
2180   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
2181   MPI_Status sts;
2182   ampi *ptr = getAmpiInstance(comm);
2183   int size = ptr->getSize(comm);
2184   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
2185   int rank = ptr->getRank(comm);
2186   int mask = 0x1;
2187   int dst;
2188   void* tmp_buf = malloc(blklen);
2189   void* partial_scan = malloc(blklen);
2190   
2191   memcpy(recvbuf, sendbuf, blklen);
2192   memcpy(partial_scan, sendbuf, blklen);
2193   while(mask < size){
2194     dst = rank^mask;
2195     if(dst < size){
2196       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
2197                    tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
2198       if(rank > dst){
2199         (op)(tmp_buf,partial_scan,&count,&datatype);
2200         (op)(tmp_buf,recvbuf,&count,&datatype);
2201       }else {
2202         (op)(partial_scan,tmp_buf,&count,&datatype);
2203         memcpy(partial_scan,tmp_buf,blklen);
2204       }
2205       mask <<= 1;
2206     }
2207   }
2208
2209   free(tmp_buf);
2210   free(partial_scan);
2211 #if AMPI_COUNTER
2212   getAmpiParent()->counters.scan++;
2213 #endif
2214   return 0;
2215 }
2216
2217 CDECL
2218 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
2219   AMPIAPI("AMPI_Op_create");
2220   *op = function;
2221   return 0;
2222 }
2223
2224 CDECL
2225 int AMPI_Op_free(MPI_Op *op){
2226   AMPIAPI("AMPI_Op_free");
2227   *op = MPI_OP_NULL;
2228   return 0;
2229 }
2230
2231
2232 CDECL
2233 double AMPI_Wtime(void)
2234 {
2235   AMPIAPI("AMPI_Wtime");
2236 #if CMK_BLUEGENE_CHARM
2237   return BgGetTime();
2238 #else
2239   return TCHARM_Wall_timer();
2240 #endif
2241 }
2242
2243 CDECL
2244 double AMPI_Wtick(void){
2245   AMPIAPI("AMPI_Wtick");
2246   return 1e-6;
2247 }
2248
2249 int PersReq::start(){
2250   if(sndrcv == 1) { // send request
2251     ampi *aptr=getAmpiInstance(comm);
2252     aptr->send(tag, aptr->getRank(comm), buf, count, type, src, comm);
2253   }
2254   return 0;
2255 }
2256
2257 CDECL
2258 int AMPI_Start(MPI_Request *request)
2259 {
2260   AMPIAPI("AMPI_Start");
2261   checkRequest(*request);
2262   AmpiRequestList *reqs = getReqs();
2263   if(-1==(*reqs)[*request]->start()){
2264     CkAbort("MPI_Start could be used only on persistent communication requests!");
2265   }
2266   return 0;
2267 }
2268
2269 CDECL
2270 int AMPI_Startall(int count, MPI_Request *requests){
2271   AMPIAPI("AMPI_Startall");
2272   checkRequests(count,requests);
2273   AmpiRequestList *reqs = getReqs();
2274   for(int i=0;i<count;i++){
2275     if(-1==(*reqs)[requests[i]]->start())
2276       CkAbort("MPI_Start could be used only on persistent communication requests!");
2277   }
2278   return 0;
2279 }
2280
2281 /* organize the indices of requests into a vector of a vector: 
2282  * level 1 is different msg envelope matches
2283  * level 2 is (posting) ordered requests of with envelope
2284  * each time multiple completion call loop over first elem of level 1
2285  * and move the matched to the NULL request slot.   
2286  * warning: this does not work with I-Alltoall requests */
2287 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
2288   for(int i=0;i<count;i++){
2289     if(reqs[i]!=MPI_REQUEST_NULL)
2290       return 0;
2291   }
2292   return 1;
2293 }
2294 inline int matchReq(MPI_Request ia, MPI_Request ib){
2295   checkRequest(ia);  
2296   checkRequest(ib);
2297   AmpiRequestList* reqs = getReqs();
2298   AmpiRequest *a, *b;
2299   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
2300   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
2301   a=(*reqs)[ia];  b=(*reqs)[ib];
2302   if(a->tag != b->tag) return 0;
2303   if(a->src != b->src) return 0;
2304   if(a->comm != b->comm) return 0;
2305   return 1;
2306 }
2307 inline void swapInt(int& a,int& b){
2308   int tmp;
2309   tmp=a; a=b; b=tmp;
2310 }
2311 inline void sortedIndex(int n, int* arr, int* idx){
2312   int i,j;
2313   for(i=0;i<n;i++) 
2314     idx[i]=i;
2315   for (i=0; i<n-1; i++) 
2316     for (j=0; j<n-1-i; j++)
2317       if (arr[idx[j+1]] < arr[idx[j]]) 
2318         swapInt(idx[j+1],idx[j]);
2319 }
2320 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
2321   CkAssert(count!=0);
2322   int *newidx = new int [count];
2323   int flag;
2324   sortedIndex(count,arr,newidx);
2325   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
2326   CkVec<int> slot;
2327   vec->push_back(slot);
2328   (*vec)[0].push_back(newidx[0]);
2329   for(int i=1;i<count;i++){
2330     flag=0;
2331     for(int j=0;j<vec->size();j++){
2332       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
2333         ((*vec)[j]).push_back(newidx[i]);
2334         flag++;
2335       }
2336     }
2337     if(!flag){
2338       CkVec<int> newslot;
2339       newslot.push_back(newidx[i]);
2340       vec->push_back(newslot);
2341     }else{
2342       CkAssert(flag==1);
2343     }
2344   }
2345   delete [] newidx;
2346   return vec;
2347 }
2348 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
2349   printf("vec content: ");
2350   for(int i=0;i<vec.size();i++){
2351     printf("{");
2352     for(int j=0;j<(vec[i]).size();j++){
2353       printf(" %d ",arr[(vec[i])[j]]);
2354     }
2355     printf("} ");
2356   }
2357   printf("\n");
2358 }
2359
2360 int PersReq::wait(MPI_Status *sts){
2361         if(sndrcv == 2) {
2362                 if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
2363                         CkAbort("AMPI> Error in persistent request wait");
2364 #if CMK_BLUEGENE_CHARM
2365                 _TRACE_BG_TLINE_END(&event);
2366 #endif
2367         }
2368         return 0;
2369 }
2370 int IReq::wait(MPI_Status *sts){
2371         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
2372                 CkAbort("AMPI> Error in non-blocking request wait");
2373 #if CMK_BLUEGENE_CHARM
2374         _TRACE_BG_TLINE_END(&event);
2375 #endif
2376         return 0;
2377 }
2378 int ATAReq::wait(MPI_Status *sts){
2379         int i;
2380         for(i=0;i<count;i++){
2381                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
2382                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
2383                         CkAbort("AMPI> Error in alltoall request wait");
2384 #if CMK_BLUEGENE_CHARM
2385                 _TRACE_BG_TLINE_END(&myreqs[i].event);
2386 #endif
2387         }
2388 #if CMK_BLUEGENE_CHARM
2389         TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", myreqs[i].event, count);
2390         _TRACE_BG_TLINE_END(&event);
2391 #endif
2392         return 0;
2393 }
2394
2395 CDECL
2396 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
2397 {
2398   AMPIAPI("AMPI_Wait");
2399   checkRequest(*request);
2400   if(*request == MPI_REQUEST_NULL){
2401     stsempty(*sts);
2402     return 0;
2403   }
2404   AmpiRequestList* reqs = getReqs();
2405   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
2406   (*reqs)[*request]->wait(sts);
2407   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
2408     reqs->free(*request);
2409     *request = MPI_REQUEST_NULL;
2410   }
2411   return 0;
2412 }
2413
2414 CDECL
2415 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
2416 {
2417   AMPIAPI("AMPI_Waitall");
2418   if(count==0) return MPI_SUCCESS;
2419   checkRequests(count,request);
2420   int i,j,oldPe;
2421   AmpiRequestList* reqs = getReqs();
2422   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
2423   for(i=0;i<reqvec->size();i++){
2424     for(j=0;j<((*reqvec)[i]).size();j++){
2425       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
2426         stsempty(sts[((*reqvec)[i])[j]]);
2427         continue;
2428       }
2429                         oldPe = CkMyPe();
2430                         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
2431       waitReq->wait(&sts[((*reqvec)[i])[j]]);
2432                         if(oldPe != CkMyPe()){
2433 #if 1
2434                         reqs = getReqs();
2435                         reqvec  = vecIndex(count,request);
2436 #endif
2437                         }
2438     }
2439   }
2440 #if CMK_BLUEGENE_CHARM
2441   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
2442 #endif
2443   // free memory of requests
2444   for(i=0;i<count;i++){ 
2445     if(request[i] == MPI_REQUEST_NULL)
2446       continue;
2447     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
2448       reqs->free(request[i]);
2449       request[i] = MPI_REQUEST_NULL;
2450     }
2451   }
2452   delete reqvec;
2453   return 0;
2454 }
2455
2456 CDECL
2457 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
2458 {
2459   AMPIAPI("AMPI_Waitany");
2460   
2461   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
2462   checkRequests(count,request);
2463   if(areInactiveReqs(count,request)){
2464     *idx=MPI_UNDEFINED;
2465     stsempty(*sts);
2466     return MPI_SUCCESS;
2467   }
2468   int flag=0;
2469   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
2470   while(count>0){ /* keep looping until some request finishes: */
2471     for(int i=0;i<reqvec->size();i++){
2472       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
2473       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
2474         *idx = ((*reqvec)[i])[0];
2475         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
2476         return 0;
2477       }
2478     }
2479     /* no requests have finished yet-- schedule and try again */
2480     AMPI_Yield(MPI_COMM_WORLD);
2481   }
2482   *idx = MPI_UNDEFINED;
2483   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
2484         delete reqvec;
2485   return 0;
2486 }
2487
2488 CDECL
2489 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
2490                  int *array_of_indices, MPI_Status *array_of_statuses)
2491 {
2492   AMPIAPI("AMPI_Waitsome");
2493   checkRequests(incount,array_of_requests);
2494   if(areInactiveReqs(incount,array_of_requests)){
2495     *outcount=MPI_UNDEFINED;
2496     return MPI_SUCCESS;
2497   }
2498   MPI_Status sts;
2499   int i;
2500   int flag=0, realflag=0;
2501   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
2502   *outcount = 0;
2503   while(1){
2504     for(i=0;i<reqvec->size();i++){
2505       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
2506       if(flag == 1){ 
2507         array_of_indices[(*outcount)]=((*reqvec)[i])[0];
2508         array_of_statuses[(*outcount)++]=sts;
2509         if(sts.MPI_COMM != 0)
2510           realflag=1; // there is real(non null) request
2511       }
2512     }
2513     if(realflag && outcount>0) break;
2514   }
2515         delete reqvec;
2516   return 0;
2517 }
2518
2519 CmiBool PersReq::test(MPI_Status *sts){
2520         if(sndrcv == 2)         // recv request
2521                 return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
2522         else                    // send request
2523                 return 1;
2524
2525 }
2526 void PersReq::complete(MPI_Status *sts){
2527         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
2528                 CkAbort("AMPI> Error in persistent request complete");
2529 }
2530
2531 CmiBool IReq::test(MPI_Status *sts){
2532         return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
2533 }
2534 void IReq::complete(MPI_Status *sts){
2535         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
2536                 CkAbort("AMPI> Error in non-blocking request complete");
2537 }
2538
2539 CmiBool ATAReq::test(MPI_Status *sts){
2540         int i, flag=1;
2541         for(i=0;i<count;i++){
2542                 flag *= getAmpiInstance(myreqs[i].comm)->iprobe(myreqs[i].tag, myreqs[i].src,
2543                                         myreqs[i].comm, (int*) sts);
2544         }
2545         return flag;
2546 }
2547 void ATAReq::complete(MPI_Status *sts){
2548         int i;
2549         for(i=0;i<count;i++){
2550                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
2551                                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int*)sts))
2552                         CkAbort("AMPI> Error in alltoall request complete");
2553         }
2554 }
2555
2556 CDECL
2557 int AMPI_Test(MPI_Request *request, int *flag, MPI_Status *sts)
2558 {
2559   AMPIAPI("AMPI_Test");
2560   checkRequest(*request);
2561   if(*request==MPI_REQUEST_NULL) {
2562     *flag = 1;
2563     stsempty(*sts);
2564     return 0;
2565   }
2566   AmpiRequestList* reqs = getReqs();
2567   if(1 == (*flag = (*reqs)[*request]->test(sts))){
2568     if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
2569       (*reqs)[*request]->complete(sts);
2570       reqs->free(*request);
2571       *request = MPI_REQUEST_NULL;
2572     }
2573   }
2574   return 0;
2575 }
2576
2577 CDECL
2578 int AMPI_Testany(int count, MPI_Request *request, int *index, int *flag, MPI_Status *sts){
2579   AMPIAPI("AMPI_Testany");
2580   checkRequests(count,request);
2581   if(areInactiveReqs(count,request)){
2582     *flag=1;
2583     *index=MPI_UNDEFINED;
2584     stsempty(*sts);
2585     return MPI_SUCCESS;
2586   }
2587   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
2588   *flag=0;
2589   for(int i=0;i<reqvec->size();i++){
2590     AMPI_Test(&request[((*reqvec)[i])[0]], flag, sts);
2591     if(*flag==1 && sts->MPI_COMM!=0){ // skip MPI_REQUEST_NULL
2592       *index = ((*reqvec)[i])[0];
2593       return 0;
2594     }
2595   }
2596   *index = MPI_UNDEFINED;
2597         delete reqvec;
2598   return 0;
2599 }
2600
2601 CDECL
2602 int AMPI_Testall(int count, MPI_Request *request, int *flag, MPI_Status *sts)
2603 {
2604   AMPIAPI("AMPI_Testall");
2605   if(count==0) return MPI_SUCCESS;
2606   checkRequests(count,request);
2607   int tmpflag;
2608   int i,j;
2609   AmpiRequestList* reqs = getReqs();
2610   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
2611   *flag = 1;  
2612   for(i=0;i<reqvec->size();i++){
2613     for(j=0;j<((*reqvec)[i]).size();j++){
2614       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL)
2615         continue;
2616       tmpflag = (*reqs)[request[((*reqvec)[i])[j]]]->test(&sts[((*reqvec)[i])[j]]);
2617       *flag *= tmpflag;
2618     }
2619   }
2620   if(flag) 
2621     MPI_Waitall(count,request,sts);
2622         delete reqvec;  
2623   return 0;
2624 }
2625
2626 CDECL
2627 int AMPI_Testsome(int incount, MPI_Request *array_of_requests, int *outcount,
2628                  int *array_of_indices, MPI_Status *array_of_statuses)
2629 {
2630   AMPIAPI("AMPI_Testsome");
2631   checkRequests(incount,array_of_requests);
2632   if(areInactiveReqs(incount,array_of_requests)){
2633     *outcount=MPI_UNDEFINED;
2634     return MPI_SUCCESS;
2635   }
2636   MPI_Status sts;
2637   int flag;
2638   int i;
2639   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
2640   *outcount = 0;
2641   for(i=0;i<reqvec->size();i++){
2642     AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
2643     if(flag == 1){
2644       array_of_indices[(*outcount)]=((*reqvec)[i])[0];
2645       array_of_statuses[(*outcount)++]=sts;
2646     }
2647   }
2648         delete reqvec;
2649   return 0;
2650 }
2651
2652 CDECL
2653 int AMPI_Request_free(MPI_Request *request){
2654   AMPIAPI("AMPI_Request_free");
2655   checkRequest(*request);
2656   if(*request==MPI_REQUEST_NULL) return 0;
2657   AmpiRequestList* reqs = getReqs();
2658   reqs->free(*request);
2659   return 0;
2660 }
2661
2662 CDECL
2663 int AMPI_Cancel(MPI_Request *request){
2664   AMPIAPI("AMPI_Request_free");
2665   return AMPI_Request_free(request);
2666 }
2667
2668 CDECL
2669 int AMPI_Recv_init(void *buf, int count, int type, int src, int tag,
2670                    MPI_Comm comm, MPI_Request *req)
2671 {
2672   AMPIAPI("AMPI_Recv_init");
2673   AmpiRequestList* reqs = getReqs();
2674   PersReq *newreq = new PersReq(buf,count,type,src,tag,comm,2);
2675   *req = reqs->insert(newreq);
2676   return 0;
2677 }
2678
2679 CDECL
2680 int AMPI_Send_init(void *buf, int count, int type, int dest, int tag,
2681                    MPI_Comm comm, MPI_Request *req)
2682 {
2683   AMPIAPI("AMPI_Send_init");
2684   AmpiRequestList* reqs = getReqs();
2685   PersReq *newreq = new PersReq(buf,count,type,dest,tag,comm,1);
2686   *req = reqs->insert(newreq);
2687   return 0;
2688 }
2689
2690 static CkDDT *getDDT(void) {
2691   return getAmpiParent()->myDDT;
2692 }
2693
2694 CDECL
2695 int AMPI_Type_contiguous(int count, MPI_Datatype oldtype,
2696                          MPI_Datatype *newtype)
2697 {
2698   AMPIAPI("AMPI_Type_contiguous");
2699   getDDT()->newContiguous(count, oldtype, newtype);
2700   return 0;
2701 }
2702
2703 CDECL
2704 int AMPI_Type_vector(int count, int blocklength, int stride,
2705                      MPI_Datatype oldtype, MPI_Datatype*  newtype)
2706 {
2707   AMPIAPI("AMPI_Type_vector");
2708   getDDT()->newVector(count, blocklength, stride, oldtype, newtype);
2709   return 0 ;
2710 }
2711
2712 CDECL
2713 int AMPI_Type_hvector(int count, int blocklength, MPI_Aint stride, 
2714                       MPI_Datatype oldtype, MPI_Datatype*  newtype)
2715 {
2716   AMPIAPI("AMPI_Type_hvector");
2717   getDDT()->newHVector(count, blocklength, stride, oldtype, newtype);
2718   return 0 ;
2719 }
2720
2721 CDECL
2722 int AMPI_Type_indexed(int count, int* arrBlength, int* arrDisp, 
2723                       MPI_Datatype oldtype, MPI_Datatype*  newtype)
2724 {
2725   AMPIAPI("AMPI_Type_indexed");
2726   getDDT()->newIndexed(count, arrBlength, arrDisp, oldtype, newtype);
2727   return 0 ;
2728 }
2729
2730 CDECL
2731 int AMPI_Type_hindexed(int count, int* arrBlength, MPI_Aint* arrDisp,
2732                        MPI_Datatype oldtype, MPI_Datatype*  newtype)
2733 {
2734   AMPIAPI("AMPI_Type_hindexed");
2735   getDDT()->newHIndexed(count, arrBlength, arrDisp, oldtype, newtype);
2736   return 0 ;
2737 }
2738
2739 CDECL
2740 int AMPI_Type_struct(int count, int* arrBlength, int* arrDisp, 
2741                      MPI_Datatype* oldtype, MPI_Datatype*  newtype)
2742 {
2743   AMPIAPI("AMPI_Type_struct");
2744   getDDT()->newStruct(count, arrBlength, arrDisp, oldtype, newtype);
2745   return 0 ;
2746 }
2747
2748 CDECL
2749 int AMPI_Type_commit(MPI_Datatype *datatype)
2750 {
2751   AMPIAPI("AMPI_Type_commit");
2752   return 0;
2753 }
2754
2755 CDECL
2756 int AMPI_Type_free(MPI_Datatype *datatype)
2757 {
2758   AMPIAPI("AMPI_Type_free");
2759   getDDT()->freeType(datatype);
2760   return 0;
2761 }
2762
2763
2764 CDECL
2765 int AMPI_Type_extent(MPI_Datatype datatype, MPI_Aint *extent)
2766 {
2767   AMPIAPI("AMPI_Type_extent");
2768   *extent = getDDT()->getExtent(datatype);
2769   return 0;
2770 }
2771
2772 CDECL
2773 int AMPI_Type_size(MPI_Datatype datatype, int *size)
2774 {
2775   AMPIAPI("AMPI_Type_size");
2776   *size=getDDT()->getSize(datatype);
2777   return 0;
2778 }
2779
2780 CDECL
2781 int AMPI_Isend(void *buf, int count, MPI_Datatype type, int dest,
2782               int tag, MPI_Comm comm, MPI_Request *request)
2783 {
2784   AMPIAPI("AMPI_Isend");
2785   USER_CALL_DEBUG("AMPI_Isend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
2786   ampi *ptr = getAmpiInstance(comm);
2787 #if AMPI_COMLIB
2788   if(enableStreaming){
2789     ptr->getStreaming().beginIteration();
2790     ptr->comlibsend(tag,ptr->getRank(comm),buf,count,type,dest,comm);
2791   } else
2792 #endif
2793   ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm);
2794   *request = MPI_REQUEST_NULL;
2795 #if AMPI_COUNTER
2796   getAmpiParent()->counters.isend++;
2797 #endif
2798   return 0;
2799 }
2800
2801 CDECL
2802 int AMPI_Irecv(void *buf, int count, MPI_Datatype type, int src,
2803               int tag, MPI_Comm comm, MPI_Request *request)
2804 {
2805   if(src==MPI_PROC_NULL) { *request = MPI_REQUEST_NULL; return 0; }
2806   AMPIAPI("AMPI_Irecv");
2807   USER_CALL_DEBUG("AMPI_Irecv("<<type<<","<<src<<","<<tag<<","<<comm<<")");
2808   AmpiRequestList* reqs = getReqs();
2809   IReq *newreq = new IReq(buf,count,type,src,tag,comm);
2810   *request = reqs->insert(newreq);
2811 #if AMPI_COUNTER
2812   getAmpiParent()->counters.irecv++;
2813 #endif
2814   return 0;
2815 }
2816
2817 CDECL
2818 int AMPI_Ireduce(void *sendbuf, void *recvbuf, int count, int type, MPI_Op op,
2819                  int root, MPI_Comm comm, MPI_Request *request)
2820 {
2821   AMPIAPI("AMPI_Ireduce");
2822   if(op == MPI_OP_NULL) CkAbort("MPI_Ireduce called with MPI_OP_NULL!!!");
2823   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,sendbuf,recvbuf);
2824   ampi *ptr = getAmpiInstance(comm);
2825   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),sendbuf,count,type,op);
2826   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
2827   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
2828   msg->setCallback(reduceCB);
2829   ptr->contribute(msg);
2830
2831   if (ptr->thisIndex == rootIdx){
2832     // using irecv instead recv to non-block the call and get request pointer
2833     AmpiRequestList* reqs = getReqs();
2834     IReq *newreq = new IReq(recvbuf,count,type,0,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
2835     *request = reqs->insert(newreq);
2836   }
2837   return 0;
2838 }
2839
2840 CDECL
2841 int AMPI_Allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
2842                   void *recvbuf, int recvcount, MPI_Datatype recvtype,
2843                   MPI_Comm comm)
2844 {
2845   AMPIAPI("AMPI_Allgather");
2846   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgather not allowed for Inter-communicator!");
2847   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
2848   ampi *ptr = getAmpiInstance(comm);
2849   int size = ptr->getSize(comm);
2850   int i;
2851 #if AMPI_COMLIB
2852   if(comm == MPI_COMM_WORLD) {
2853       // commlib support
2854       ptr->getAllgather().beginIteration();
2855       for(i=0;i<size;i++) {
2856           ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
2857                         sendtype, i, comm, ptr->getComlibProxy());
2858       }
2859       ptr->getAllgather().endIteration();
2860   } else 
2861 #endif
2862       for(i=0;i<size;i++) {
2863           ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
2864                     sendtype, i, comm);
2865       }
2866
2867   MPI_Status status;
2868   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
2869   int itemsize = dttype->getSize(recvcount) ;
2870
2871   for(i=0;i<size;i++) {
2872     AMPI_Recv(((char*)recvbuf)+(itemsize*i), recvcount, recvtype,
2873              i, MPI_GATHER_TAG, comm, &status);
2874   }
2875 #if AMPI_COUNTER
2876   getAmpiParent()->counters.allgather++;
2877 #endif
2878   return 0;
2879 }
2880
2881 CDECL
2882 int AMPI_Iallgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
2883                     void *recvbuf, int recvcount, MPI_Datatype recvtype,
2884                     MPI_Comm comm, MPI_Request* request)
2885 {
2886   AMPIAPI("AMPI_Iallgather");
2887   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallgather not allowed for Inter-communicator!");
2888   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
2889   ampi *ptr = getAmpiInstance(comm);
2890   int size = ptr->getSize(comm);
2891   int i;
2892 #if AMPI_COMLIB
2893   if(comm == MPI_COMM_WORLD) {
2894       // commlib support
2895       ptr->getAllgather().beginIteration();
2896       for(i=0;i<size;i++) {
2897           ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
2898                         sendtype, i, comm, ptr->getComlibProxy());
2899       }
2900       ptr->getAllgather().endIteration();
2901   } else 
2902 #endif
2903       for(i=0;i<size;i++) {
2904           ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
2905                     sendtype, i, comm);
2906       }
2907
2908   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
2909   int itemsize = dttype->getSize(recvcount) ;
2910
2911   // copy+paste from MPI_Irecv
2912   AmpiRequestList* reqs = getReqs();
2913   ATAReq *newreq = new ATAReq(size);
2914   for(i=0;i<size;i++){
2915     if(newreq->addReq(((char*)recvbuf)+(itemsize*i),recvcount,recvtype,i,MPI_GATHER_TAG,comm)!=(i+1))
2916       CkAbort("MPI_Iallgather: Error adding requests into ATAReq!");
2917   }
2918   *request = reqs->insert(newreq);
2919   AMPI_DEBUG("MPI_Iallgather: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
2920
2921   return 0;
2922 }
2923
2924 CDECL
2925 int AMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
2926                    void *recvbuf, int *recvcounts, int *displs,
2927                    MPI_Datatype recvtype, MPI_Comm comm)
2928 {
2929   AMPIAPI("AMPI_Allgatherv");
2930   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgatherv not allowed for Inter-communicator!");
2931   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
2932   ampi *ptr = getAmpiInstance(comm);
2933   int size = ptr->getSize(comm);
2934   int i;
2935 #if AMPI_COMLIB
2936   if(comm == MPI_COMM_WORLD) {
2937       // commlib support
2938       ptr->getAllgather().beginIteration();
2939       for(i=0;i<size;i++) {
2940           ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
2941                         sendtype, i, comm, ptr->getComlibProxy());
2942       }
2943       ptr->getAllgather().endIteration();
2944   } else
2945 #endif 
2946       for(i=0;i<size;i++) {
2947           ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
2948                     sendtype, i, comm);
2949       }
2950
2951   MPI_Status status;
2952   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
2953   int itemsize = dttype->getSize() ;
2954
2955   for(i=0;i<size;i++) {
2956     AMPI_Recv(((char*)recvbuf)+(itemsize*displs[i]), recvcounts[i], recvtype,
2957              i, MPI_GATHER_TAG, comm, &status);
2958   }
2959 #if AMPI_COUNTER
2960   getAmpiParent()->counters.allgather++;
2961 #endif
2962   return 0;
2963 }
2964
2965 CDECL
2966 int AMPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
2967                void *recvbuf, int recvcount, MPI_Datatype recvtype,
2968                int root, MPI_Comm comm)
2969 {
2970   AMPIAPI("AMPI_Gather");
2971   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Gather not allowed for Inter-communicator!");
2972   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
2973   ampi *ptr = getAmpiInstance(comm);
2974   int size = ptr->getSize(comm);
2975   int i;
2976   AMPI_Send(sendbuf, sendcount, sendtype, root, MPI_GATHER_TAG, comm);
2977
2978   if(ptr->getRank(comm)==root) {
2979     MPI_Status status;
2980     CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
2981     int itemsize = dttype->getSize(recvcount) ;
2982
2983     for(i=0;i<size;i++) {
2984       AMPI_Recv(((char*)recvbuf)+(itemsize*i), recvcount, recvtype,
2985                i, MPI_GATHER_TAG, comm, &status);
2986     }
2987   }
2988 #if AMPI_COUNTER
2989   getAmpiParent()->counters.gather++;
2990 #endif
2991   return 0;
2992 }
2993
2994 CDECL
2995 int AMPI_Gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
2996                 void *recvbuf, int *recvcounts, int *displs,
2997                 MPI_Datatype recvtype, int root, MPI_Comm comm)
2998 {
2999   AMPIAPI("AMPI_Gatherv");
3000   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Gatherv not allowed for Inter-communicator!");
3001   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3002   ampi *ptr = getAmpiInstance(comm);
3003   int size = ptr->getSize(comm);
3004   int i;
3005
3006   AMPI_Send(sendbuf, sendcount, sendtype, root, MPI_GATHER_TAG, comm);
3007
3008   if(ptr->getRank(comm) == root) {
3009     MPI_Status status;
3010     CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
3011     int itemsize = dttype->getSize();
3012
3013     for(i=0;i<size;i++) {
3014       AMPI_Recv(((char*)recvbuf)+(itemsize*displs[i]), recvcounts[i], recvtype,
3015                i, MPI_GATHER_TAG, comm, &status);
3016     }
3017   }
3018 #if AMPI_COUNTER
3019   getAmpiParent()->counters.gather++;
3020 #endif
3021   return 0;
3022 }
3023
3024 CDECL
3025 int AMPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3026                 void *recvbuf, int recvcount, MPI_Datatype recvtype,
3027                 int root, MPI_Comm comm)
3028 {
3029   AMPIAPI("AMPI_Scatter");
3030   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scatter not allowed for Inter-communicator!");
3031   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3032   ampi *ptr = getAmpiInstance(comm);
3033   int size = ptr->getSize(comm);
3034   int i;
3035
3036   if(ptr->getRank(comm)==root) {
3037       CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
3038       int itemsize = dttype->getSize(sendcount) ;
3039       for(i=0;i<size;i++) {
3040           ptr->send(MPI_SCATTER_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i),
3041                     sendcount, sendtype, i, comm);
3042       }
3043       //ptr->mcomlibEnd();
3044   }
3045   
3046   MPI_Status status;
3047   AMPI_Recv(recvbuf, recvcount, recvtype, root, MPI_SCATTER_TAG, comm, &status);
3048
3049 #if AMPI_COUNTER
3050   getAmpiParent()->counters.scatter++;
3051 #endif
3052   return 0;
3053 }
3054
3055 CDECL
3056 int AMPI_Scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype,
3057                  void *recvbuf, int recvcount, MPI_Datatype recvtype,
3058                  int root, MPI_Comm comm)
3059 {
3060   AMPIAPI("AMPI_Scatterv");
3061   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scatterv not allowed for Inter-communicator!");
3062   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcounts[0],sendbuf,recvbuf);
3063   ampi *ptr = getAmpiInstance(comm);
3064   int size = ptr->getSize(comm);
3065   int i;
3066
3067   if(ptr->getRank(comm) == root) {
3068       CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
3069       int itemsize = dttype->getSize() ;
3070       for(i=0;i<size;i++) {
3071           ptr->send(MPI_SCATTER_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*displs[i]),
3072                     sendcounts[i], sendtype, i, comm);
3073       }
3074       //ptr->mcomlibEnd();
3075   }
3076   
3077   MPI_Status status;
3078   AMPI_Recv(recvbuf, recvcount, recvtype, root, MPI_SCATTER_TAG, comm, &status);
3079   
3080 #if AMPI_COUNTER
3081   getAmpiParent()->counters.scatter++;
3082 #endif
3083   return 0;
3084 }
3085
3086 CDECL
3087 int AMPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3088                  void *recvbuf, int recvcount, MPI_Datatype recvtype,
3089                  MPI_Comm comm)
3090 {
3091   AMPIAPI("AMPI_Alltoall");
3092   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Alltoall not allowed for Inter-communicator!");
3093   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3094   ampi *ptr = getAmpiInstance(comm);
3095   int size = ptr->getSize(comm);
3096   CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
3097   int itemsize = dttype->getSize(sendcount) ;
3098   int i;
3099
3100 #if AMPI_COMLIB
3101   if(comm == MPI_COMM_WORLD) {
3102       // commlib support
3103       ptr->getAlltoall().beginIteration();
3104       for(i=0;i<size;i++) {
3105           ptr->delesend(MPI_ATA_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
3106                         sendtype, i, comm, ptr->getComlibProxy());
3107       }
3108       ptr->getAlltoall().endIteration();
3109   } else
3110 #endif 
3111       for(i=0;i<size;i++) {
3112           ptr->send(MPI_ATA_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
3113                     sendtype, i, comm);
3114       }
3115   
3116   MPI_Status status;
3117   dttype = ptr->getDDT()->getType(recvtype) ;
3118   itemsize = dttype->getSize(recvcount) ;
3119
3120   for(i=0;i<size;i++) {
3121     AMPI_Recv(((char*)recvbuf)+(itemsize*i), recvcount, recvtype,
3122               i, MPI_ATA_TAG, comm, &status);
3123   }
3124 #if AMPI_COUNTER
3125   getAmpiParent()->counters.alltoall++;
3126 #endif
3127   return 0;
3128 }
3129
3130 CDECL
3131 int AMPI_Ialltoall(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3132                  void *recvbuf, int recvcount, MPI_Datatype recvtype,
3133                  MPI_Comm comm, MPI_Request *request)
3134 {
3135   AMPIAPI("AMPI_Ialltoall");
3136   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Ialltoall not allowed for Inter-communicator!");
3137   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3138   ampi *ptr = getAmpiInstance(comm);
3139   AmpiRequestList* reqs = getReqs();
3140   int size = ptr->getSize(comm);
3141   int reqsSize = reqs->size();
3142   CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
3143   int itemsize = dttype->getSize(sendcount) ;
3144   int i;
3145
3146 #if AMPI_COMLIB
3147   if(comm == MPI_COMM_WORLD) {
3148       // commlib support
3149       ptr->getAlltoall().beginIteration();
3150       for(i=0;i<size;i++) {
3151           ptr->delesend(MPI_ATA_TAG+reqsSize, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
3152                         sendtype, i, comm, ptr->getComlibProxy());
3153       }
3154       ptr->getAlltoall().endIteration();
3155   } else
3156 #endif
3157     for(i=0;i<size;i++) {
3158       ptr->send(MPI_ATA_TAG+reqsSize, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
3159                 sendtype, i, comm);
3160     }
3161   
3162   // copy+paste from MPI_Irecv
3163   ATAReq *newreq = new ATAReq(size);
3164   for(i=0;i<size;i++){
3165     if(newreq->addReq(((char*)recvbuf)+(itemsize*i),recvcount,recvtype,i,MPI_ATA_TAG+reqsSize,comm)!=(i+1))
3166       CkAbort("MPI_Ialltoall: Error adding requests into ATAReq!");
3167   }
3168   *request = reqs->insert(newreq);
3169   AMPI_DEBUG("MPI_Ialltoall: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3170   return 0;
3171 }
3172
3173 CDECL
3174 int AMPI_Alltoallv(void *sendbuf, int *sendcounts, int *sdispls,
3175                   MPI_Datatype sendtype, void *recvbuf, int *recvcounts,
3176                   int *rdispls, MPI_Datatype recvtype, MPI_Comm comm)
3177 {
3178   AMPIAPI("AMPI_Alltoallv");
3179   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Alltoallv not allowed for Inter-communicator!");
3180   if(comm==MPI_COMM_SELF) return 0;
3181   ampi *ptr = getAmpiInstance(comm);
3182   int size = ptr->getSize(comm);
3183   CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
3184   int itemsize = dttype->getSize() ;
3185   int i;
3186 #if AMPI_COMLIB
3187   if(comm == MPI_COMM_WORLD) {
3188       // commlib support
3189       ptr->getAlltoall().beginIteration();
3190       for(i=0;i<size;i++) {
3191           ptr->delesend(MPI_GATHER_TAG,ptr->getRank(comm),((char*)sendbuf)+(itemsize*sdispls[i]),sendcounts[i],
3192                         sendtype, i, comm, ptr->getComlibProxy());
3193       }
3194       ptr->getAlltoall().endIteration();
3195   } else
3196 #endif
3197       for(i=0;i<size;i++) 
3198           ptr->send(MPI_GATHER_TAG,ptr->getRank(comm),((char*)sendbuf)+(itemsize*sdispls[i]),sendcounts[i],
3199                     sendtype, i, comm);
3200   
3201   MPI_Status status;
3202   dttype = ptr->getDDT()->getType(recvtype) ;
3203   itemsize = dttype->getSize() ;
3204
3205   for(i=0;i<size;i++) {
3206     AMPI_Recv(((char*)recvbuf)+(itemsize*rdispls[i]), recvcounts[i], recvtype,
3207              i, MPI_GATHER_TAG, comm, &status);
3208   }
3209 #if AMPI_COUNTER
3210   getAmpiParent()->counters.alltoall++;
3211 #endif
3212   return 0;
3213 }
3214
3215 CDECL
3216 int AMPI_Comm_dup(int comm, int *newcomm)
3217 {
3218   AMPIAPI("AMPI_Comm_dup");
3219   *newcomm = comm;
3220   return 0;
3221 }
3222
3223 CDECL
3224 int AMPI_Comm_split(int src,int color,int key,int *dest)
3225 {
3226   AMPIAPI("AMPI_Comm_split");
3227   getAmpiInstance(src)->split(color,key,dest, 0);
3228   return 0;
3229 }
3230
3231 CDECL
3232 int AMPI_Comm_free(int *comm)
3233 {
3234   AMPIAPI("AMPI_Comm_free");
3235   return 0;
3236 }
3237
3238 CDECL
3239 int AMPI_Comm_test_inter(MPI_Comm comm, int *flag){
3240   AMPIAPI("AMPI_Comm_test_inter");
3241   *flag = getAmpiParent()->isInter(comm);
3242   return 0;
3243 }
3244
3245 CDECL
3246 int AMPI_Comm_remote_size(MPI_Comm comm, int *size){
3247   AMPIAPI("AMPI_Comm_remote_size");
3248   *size = getAmpiParent()->getRemoteSize(comm);
3249   return 0;
3250 }
3251
3252 CDECL
3253 int AMPI_Comm_remote_group(MPI_Comm comm, MPI_Group *group){
3254   AMPIAPI("AMPI_Comm_remote_group");
3255   *group = getAmpiParent()->getRemoteGroup(comm);
3256   return 0;
3257 }
3258
3259 CDECL
3260 int AMPI_Intercomm_create(MPI_Comm lcomm, int lleader, MPI_Comm rcomm, int rleader, int tag, MPI_Comm *newintercomm){
3261   AMPIAPI("AMPI_Intercomm_create");
3262   ampi *ptr = getAmpiInstance(lcomm);
3263   int root = ptr->getIndexForRank(lleader);
3264   CkVec<int> rvec;
3265   int lrank;
3266   AMPI_Comm_rank(lcomm,&lrank);
3267
3268   if(lrank==lleader){
3269     int lsize, rsize;
3270     lsize = ptr->getSize(lcomm);
3271     int *larr = new int [lsize];
3272     int *rarr;
3273     CkVec<int> lvec = ptr->getIndices();
3274     MPI_Status sts;
3275
3276     // local leader exchanges groupStruct with remote leader
3277     int i;
3278     for(i=0;i<lsize;i++)
3279       larr[i] = lvec[i];
3280     AMPI_Send(&lsize,1,MPI_INT,rleader,tag,rcomm);
3281     AMPI_Recv(&rsize,1,MPI_INT,rleader,tag,rcomm,&sts);
3282
3283     rarr = new int [rsize];
3284     AMPI_Send(larr,lsize,MPI_INT,rleader,tag+1,rcomm);
3285     AMPI_Recv(rarr,rsize,MPI_INT,rleader,tag+1,rcomm,&sts);
3286     for(i=0;i<rsize;i++)
3287       rvec.push_back(rarr[i]);
3288
3289     delete [] larr;
3290     delete [] rarr;
3291
3292     if(rsize==0) CkAbort("MPI_Intercomm_create: remote size = 0! Does it really make sense to create an empty communicator?\n");
3293   }
3294   
3295   ptr->intercommCreate(rvec,root,newintercomm);
3296   return 0;
3297 }
3298
3299 CDECL
3300 int AMPI_Intercomm_merge(MPI_Comm intercomm, int high, MPI_Comm *newintracomm){
3301   AMPIAPI("AMPI_Intercomm_merge");
3302   ampi *ptr = getAmpiInstance(intercomm);
3303   int lroot, rroot, lrank, lhigh, rhigh, first;
3304   lroot = ptr->getIndexForRank(0);
3305   rroot = ptr->getIndexForRemoteRank(0);
3306   lhigh = high;
3307   lrank = ptr->getRank(intercomm);
3308
3309   if(lrank==0){
3310     MPI_Status sts;
3311     AMPI_Send(&lhigh,1,MPI_INT,0,10010,intercomm);
3312     AMPI_Recv(&rhigh,1,MPI_INT,0,10010,intercomm,&sts);
3313
3314     if((lhigh && rhigh) || (!lhigh && !rhigh)){ // same value: smaller root goes first (first=1 if local goes first)
3315       first = (lroot < rroot);
3316     }else{ // different values, then high=false goes first
3317       first = (lhigh == false);
3318     }
3319   }
3320
3321   ptr->intercommMerge(first, newintracomm);
3322   return 0;
3323 }
3324
3325 CDECL
3326 int AMPI_Abort(int comm, int errorcode)
3327 {
3328   AMPIAPI("AMPI_Abort");
3329   CkAbort("AMPI: User called MPI_Abort!\n");
3330   return errorcode;
3331 }
3332
3333 CDECL
3334 int AMPI_Get_count(MPI_Status *sts, MPI_Datatype dtype, int *count){
3335   AMPIAPI("AMPI_Get_count");
3336   CkDDT_DataType* dttype = getDDT()->getType(dtype);
3337   int itemsize = dttype->getSize() ;
3338   *count = sts->MPI_LENGTH/itemsize;
3339   return 0;
3340 }
3341
3342 CDECL
3343 int AMPI_Type_lb(MPI_Datatype dtype, MPI_Aint* displacement){
3344   AMPIAPI("AMPI_Type_lb");
3345   *displacement = getDDT()->getLB(dtype);
3346   return 0;
3347 }
3348
3349 CDECL
3350 int AMPI_Type_ub(MPI_Datatype dtype, MPI_Aint* displacement){
3351   AMPIAPI("AMPI_Type_ub");
3352   *displacement = getDDT()->getUB(dtype);
3353   return 0;
3354 }
3355
3356 CDECL
3357 int AMPI_Address(void* location, MPI_Aint *address){
3358   AMPIAPI("AMPI_Address");
3359   *address = (MPI_Aint)(unsigned long)(char *)location;
3360   return 0;
3361 }
3362
3363 CDECL
3364 int AMPI_Get_elements(MPI_Status *sts, MPI_Datatype dtype, int *count){
3365   AMPIAPI("AMPI_Get_elements");
3366   CkDDT_DataType* dttype = getDDT()->getType(dtype) ;
3367   int basesize = dttype->getBaseSize() ;
3368   if(basesize==0) basesize=dttype->getSize();
3369   *count = sts->MPI_LENGTH/basesize;
3370   return 0;
3371 }
3372
3373 CDECL
3374 int AMPI_Pack(void *inbuf, int incount, MPI_Datatype dtype, void *outbuf,
3375               int outsize, int *position, MPI_Comm comm)
3376 {
3377   AMPIAPI("AMPI_Pack");
3378   CkDDT_DataType* dttype = getDDT()->getType(dtype) ;
3379   int itemsize = dttype->getSize();
3380   dttype->serialize((char*)inbuf, ((char*)outbuf)+(*position), incount, 1);
3381   *position += (itemsize*incount);
3382   return 0;
3383 }
3384
3385 CDECL
3386 int AMPI_Unpack(void *inbuf, int insize, int *position, void *outbuf,
3387               int outcount, MPI_Datatype dtype, MPI_Comm comm)
3388 {
3389   AMPIAPI("AMPI_Unpack");
3390   CkDDT_DataType* dttype = getDDT()->getType(dtype) ;
3391   int itemsize = dttype->getSize();
3392   dttype->serialize(((char*)inbuf+(*position)), (char*)outbuf, outcount, 1);
3393   *position += (itemsize*outcount);
3394   return 0;
3395 }
3396
3397 CDECL
3398 int AMPI_Pack_size(int incount,MPI_Datatype datatype,MPI_Comm comm,int *sz)
3399 {
3400   AMPIAPI("AMPI_Pack_size");
3401   CkDDT_DataType* dttype = getDDT()->getType(datatype) ;
3402   *sz = incount*dttype->getSize() ;
3403   return 0;
3404 }
3405
3406 CDECL
3407 int AMPI_Get_processor_name(char *name, int *resultlen){
3408   AMPIAPI("AMPI_Get_processor_name");
3409   ampiParent *ptr = getAmpiParent();
3410   sprintf(name,"AMPI_VP[%d]_PE[%d]",ptr->thisIndex,ptr->getMyPe());
3411   *resultlen = strlen(name);
3412   return 0;
3413 }
3414
3415 /* Error handling */
3416 #if defined(USE_STDARG)
3417 void error_handler(MPI_Comm *, int *, ...);
3418 #else
3419 void error_handler ( MPI_Comm *, int * );
3420 #endif
3421
3422 CDECL
3423 int AMPI_Errhandler_create(MPI_Handler_function *function, MPI_Errhandler *errhandler){
3424         AMPIAPI("AMPI_Errhandler_create");
3425         return MPI_SUCCESS;
3426 }
3427
3428 CDECL
3429 int AMPI_Errhandler_set(MPI_Comm comm, MPI_Errhandler errhandler){
3430         AMPIAPI("AMPI_Errhandler_set");
3431         return MPI_SUCCESS;
3432 }
3433
3434 CDECL
3435 int AMPI_Errhandler_get(MPI_Comm comm, MPI_Errhandler *errhandler){
3436         AMPIAPI("AMPI_Errhandler_get");
3437         return MPI_SUCCESS;
3438 }
3439
3440 CDECL
3441 int AMPI_Errhandler_free(MPI_Errhandler *errhandler){
3442         AMPIAPI("AMPI_Errhandler_free");
3443         return MPI_SUCCESS;
3444 }
3445
3446 CDECL
3447 int AMPI_Error_class(int errorcode, int *errorclass){
3448         AMPIAPI("AMPI_Error_class");
3449         *errorclass = errorcode;
3450         return MPI_SUCCESS;
3451 }
3452
3453 CDECL
3454 int AMPI_Error_string(int errorcode, char *string, int *resultlen)
3455 {
3456   AMPIAPI("AMPI_Error_string");
3457   const char *ret="";
3458   switch(errorcode) {
3459   case MPI_SUCCESS:
3460      ret="Success";
3461      break;
3462   default:
3463      return 1;/*LIE: should be MPI_ERR_something */
3464   };
3465   *resultlen=strlen(ret);
3466   strcpy(string,ret);
3467   return MPI_SUCCESS;
3468 }
3469
3470
3471 /* Group operations */
3472 CDECL
3473 int AMPI_Comm_group(MPI_Comm comm, MPI_Group *group)
3474 {
3475   AMPIAPI("AMPI_Comm_Group");
3476   *group = getAmpiParent()->comm2group(comm);
3477   return 0;
3478 }
3479
3480 CDECL
3481 int AMPI_Group_union(MPI_Group group1, MPI_Group group2, MPI_Group *newgroup)
3482 {
3483   AMPIAPI("AMPI_Group_union");
3484   groupStruct vec1, vec2, newvec;
3485   ampiParent *ptr = getAmpiParent();
3486   vec1 = ptr->group2vec(group1);
3487   vec2 = ptr->group2vec(group2);
3488   newvec = unionOp(vec1,vec2);
3489   *newgroup = ptr->saveGroupStruct(newvec);
3490   return 0;
3491 }
3492
3493 CDECL
3494 int AMPI_Group_intersection(MPI_Group group1, MPI_Group group2, MPI_Group *newgroup)
3495 {
3496   AMPIAPI("AMPI_Group_intersection");
3497   groupStruct vec1, vec2, newvec;
3498   ampiParent *ptr = getAmpiParent();
3499   vec1 = ptr->group2vec(group1);
3500   vec2 = ptr->group2vec(group2);
3501   newvec = intersectOp(vec1,vec2);
3502   *newgroup = ptr->saveGroupStruct(newvec);
3503   return 0;
3504 }
3505
3506 CDECL
3507 int AMPI_Group_difference(MPI_Group group1, MPI_Group group2, MPI_Group *newgroup)
3508 {
3509   AMPIAPI("AMPI_Group_difference");
3510   groupStruct vec1, vec2, newvec;
3511   ampiParent *ptr = getAmpiParent();
3512   vec1 = ptr->group2vec(group1);
3513   vec2 = ptr->group2vec(group2);
3514   newvec = diffOp(vec1,vec2);
3515   *newgroup = ptr->saveGroupStruct(newvec);
3516   return 0;
3517 }
3518
3519 CDECL
3520 int AMPI_Group_size(MPI_Group group, int *size)
3521 {
3522   AMPIAPI("AMPI_Group_size");
3523   *size = (getAmpiParent()->group2vec(group)).size();
3524   return 0;
3525 }
3526
3527 CDECL
3528 int AMPI_Group_rank(MPI_Group group, int *rank)
3529 {
3530   AMPIAPI("AMPI_Group_rank");
3531   *rank = getAmpiParent()->getRank(group);
3532   return 0;
3533 }
3534
3535 CDECL
3536 int AMPI_Group_translate_ranks (MPI_Group group1, int n, int *ranks1, MPI_Group group2, int *ranks2)
3537 {
3538   AMPIAPI("AMPI_Group_translate_ranks");
3539   ampiParent *ptr = getAmpiParent();
3540   groupStruct vec1, vec2;
3541   vec1 = ptr->group2vec(group1);
3542   vec2 = ptr->group2vec(group2);
3543   ranks2 = translateRanksOp(n, vec1, ranks1, vec2);
3544   return 0;
3545 }
3546
3547 CDECL
3548 int AMPI_Group_compare(MPI_Group group1,MPI_Group group2, int *result)
3549 {
3550   AMPIAPI("AMPI_Group_compare");
3551   ampiParent *ptr = getAmpiParent();
3552   groupStruct vec1, vec2;
3553   vec1 = ptr->group2vec(group1);
3554   vec2 = ptr->group2vec(group2);
3555   *result = compareVecOp(vec1, vec2);
3556   return 0;
3557 }
3558
3559 CDECL
3560 int AMPI_Group_incl(MPI_Group group, int n, int *ranks, MPI_Group *newgroup)
3561 {
3562   AMPIAPI("AMPI_Group_incl");
3563   groupStruct vec, newvec;
3564   ampiParent *ptr = getAmpiParent();
3565   vec = ptr->group2vec(group);
3566   newvec = inclOp(n,ranks,vec);
3567   *newgroup = ptr->saveGroupStruct(newvec);
3568   return 0;
3569 }
3570 CDECL
3571 int AMPI_Group_excl(MPI_Group group, int n, int *ranks, MPI_Group *newgroup)
3572 {
3573   AMPIAPI("AMPI_Group_excl");
3574   groupStruct vec, newvec;
3575   ampiParent *ptr = getAmpiParent();
3576   vec = ptr->group2vec(group);
3577   newvec = exclOp(n,ranks,vec);
3578   *newgroup = ptr->saveGroupStruct(newvec);
3579 //outputOp(vec); outputOp(newvec);
3580   return 0;
3581 }
3582 CDECL
3583 int AMPI_Group_range_incl(MPI_Group group, int n, int ranges[][3], MPI_Group *newgroup)
3584 {
3585   AMPIAPI("AMPI_Group_range_incl");
3586   groupStruct vec, newvec;
3587   ampiParent *ptr = getAmpiParent();
3588   vec = ptr->group2vec(group);
3589   newvec = rangeInclOp(n,ranges,vec);
3590   *newgroup = ptr->saveGroupStruct(newvec);
3591   return 0;
3592 }
3593 CDECL
3594 int AMPI_Group_range_excl(MPI_Group group, int n, int ranges[][3], MPI_Group *newgroup)
3595 {
3596   AMPIAPI("AMPI_Group_range_excl");
3597   groupStruct vec, newvec;
3598   ampiParent *ptr = getAmpiParent();
3599   vec = ptr->group2vec(group);
3600   newvec = rangeExclOp(n,ranges,vec);
3601   *newgroup = ptr->saveGroupStruct(newvec);
3602   return 0;
3603 }
3604 CDECL
3605 int AMPI_Group_free(MPI_Group *group)
3606 {
3607   AMPIAPI("AMPI_Group_free");
3608   return 0;
3609 }
3610 CDECL
3611 int AMPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm* newcomm)
3612 {
3613   AMPIAPI("AMPI_Comm_create");
3614   groupStruct vec = getAmpiParent()->group2vec(group);
3615   if(vec.size()==0) CkAbort("AMPI> Abort: Does it really make sense to create an empty communicator?");
3616   getAmpiInstance(comm)->commCreate(vec, newcomm);
3617   return 0;
3618 }
3619
3620 /* Charm++ Extentions to MPI standard: */
3621 CDECL
3622 void AMPI_Checkpoint(char *dname)
3623 {
3624   AMPIAPI("AMPI_Checkpoint");
3625   AMPI_Barrier(MPI_COMM_WORLD);
3626   getAmpiParent()->startCheckpoint(dname);
3627 }
3628
3629 CDECL
3630 void AMPI_MemCheckpoint()
3631 {
3632 #if CMK_MEM_CHECKPOINT
3633   AMPIAPI("AMPI_Checkpoint");
3634   AMPI_Barrier(MPI_COMM_WORLD);
3635   getAmpiParent()->startCheckpoint("");
3636 #else
3637   CmiPrintf("Error: In memory checkpoint/restart is not on! \n");
3638   CmiAbort("Error: recompile Charm++ with CMK_MEM_CHECKPOINT. \n");
3639 #endif
3640 }
3641
3642 CDECL
3643 void AMPI_Print(char *str)
3644 {
3645   AMPIAPI("AMPI_Print");
3646   ampiParent *ptr = getAmpiParent();
3647   CkPrintf("[%d] %s\n", ptr->thisIndex, str);
3648 }
3649
3650 CDECL
3651 int AMPI_Register(void *d, MPI_PupFn f)
3652 {
3653   AMPIAPI("AMPI_Register");
3654   return TCHARM_Register(d,f);
3655 }
3656
3657 CDECL
3658 void *MPI_Get_userdata(int idx)
3659 {
3660   AMPIAPI("AMPI_Get_userdata");
3661   return TCHARM_Get_userdata(idx);
3662 }
3663
3664 CDECL
3665 void AMPI_Register_main(MPI_MainFn mainFn,const char *name)
3666 {
3667   AMPIAPI("AMPI_Register_main");
3668   if (TCHARM_Element()==0)
3669   { // I'm responsible for building the TCHARM threads:
3670     ampiCreateMain(mainFn,name,strlen(name));
3671   }
3672 }
3673 FDECL
3674 void FTN_NAME(MPI_REGISTER_MAIN,mpi_register_main)
3675   (MPI_MainFn mainFn,const char *name,int nameLen)
3676 {
3677   AMPIAPI("AMPI_register_main");
3678   if (TCHARM_Element()==0)
3679   { // I'm responsible for building the TCHARM threads:
3680     ampiCreateMain(mainFn,name,nameLen);
3681   }
3682 }
3683
3684 CDECL
3685 int AMPI_Keyval_create(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn, int *keyval, void* extra_state){
3686   AMPIAPI("AMPI_Keyval_create");
3687   return getAmpiParent()->createKeyval(copy_fn,delete_fn,keyval,extra_state);
3688 }
3689
3690 CDECL
3691 int AMPI_Keyval_free(int *keyval){
3692   AMPIAPI("AMPI_Keyval_free");
3693   return getAmpiParent()->freeKeyval(keyval);
3694 }
3695
3696 CDECL
3697 int AMPI_Attr_put(MPI_Comm comm, int keyval, void* attribute_val){
3698   AMPIAPI("AMPI_Attr_put");
3699   return getAmpiParent()->putAttr(comm,keyval,attribute_val);
3700 }
3701