AMPI/CUDA: Synchronous and asynchronous invocation, some cleanup
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1 #define exit exit /*Supress definition of exit in ampi.h*/
2 #include "ampiimpl.h"
3 #include "tcharm.h"
4 #include "ampiEvents.h" /*** for trace generation for projector *****/
5 #include "ampiProjections.h"
6
7 #define CART_TOPOL 1
8 #define AMPI_PRINT_IDLE 0
9
10 /* change this define to "x" to trace all send/recv's */
11 #define MSG_ORDER_DEBUG(x)  //x /* empty */
12 /* change this define to "x" to trace user calls */
13 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
14 #define STARTUP_DEBUG(x)  //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
15 #define FUNCCALL_DEBUG(x) //x /* empty */
16
17 static CkDDT *getDDT(void) {
18   return getAmpiParent()->myDDT;
19 }
20
21 //------------- startup -------------
22 static mpi_comm_worlds mpi_worlds;
23
24 int _mpi_nworlds; /*Accessed by ampif*/
25 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
26
27 /* ampiReducer: AMPI's generic reducer type 
28    MPI_Op is function pointer to MPI_User_function
29    so that it can be packed into AmpiOpHeader, shipped 
30    with the reduction message, and then plugged into 
31    the ampiReducer. 
32    One little trick is the ampi::recv which receives
33    the final reduction message will see additional
34    sizeof(AmpiOpHeader) bytes in the buffer before
35    any user data.                             */
36 class AmpiComplex { 
37 public: 
38         double re, im; 
39         void operator+=(const AmpiComplex &a) {
40                 re+=a.re;
41                 im+=a.im;
42         }
43         void operator*=(const AmpiComplex &a) {
44                 double nu_re=re*a.re-im*a.im;
45                 im=re*a.im+im*a.re;
46                 re=nu_re;
47         }
48         int operator>(const AmpiComplex &a) {
49                 CkAbort("Cannot compare complex numbers with MPI_MAX");
50                 return 0;
51         }
52         int operator<(const AmpiComplex &a) {
53                 CkAbort("Cannot compare complex numbers with MPI_MIN");
54                 return 0;
55         }
56 };
57 typedef struct { float val; int idx; } FloatInt;
58 typedef struct { double val; int idx; } DoubleInt;
59 typedef struct { long val; int idx; } LongInt;
60 typedef struct { int val; int idx; } IntInt;
61 typedef struct { short val; int idx; } ShortInt;
62 typedef struct { long double val; int idx; } LongdoubleInt;
63 typedef struct { float val; float idx; } FloatFloat;
64 typedef struct { double val; double idx; } DoubleDouble;
65
66
67 #define MPI_OP_SWITCH(OPNAME) \
68   int i; \
69   switch (*datatype) { \
70   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
71   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
72   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
73   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
74   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
75   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
76   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
77   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
78   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
79   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
80   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
81   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
82   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
83   default: \
84     ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
85     CmiAbort("Unsupported MPI datatype for MPI Op"); \
86   };\
87
88 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
89 #define MPI_OP_IMPL(type) \
90         if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
91   MPI_OP_SWITCH(MPI_MAX)
92 #undef MPI_OP_IMPL
93 }
94
95 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
96 #define MPI_OP_IMPL(type) \
97         if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
98   MPI_OP_SWITCH(MPI_MIN)
99 #undef MPI_OP_IMPL
100 }
101
102 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
103 #define MPI_OP_IMPL(type) \
104         ((type *)inoutvec)[i] += ((type *)invec)[i];
105   MPI_OP_SWITCH(MPI_SUM)
106 #undef MPI_OP_IMPL
107 }
108
109 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
110 #define MPI_OP_IMPL(type) \
111         ((type *)inoutvec)[i] *= ((type *)invec)[i];
112   MPI_OP_SWITCH(MPI_PROD)
113 #undef MPI_OP_IMPL
114 }
115
116 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
117   int i;  
118   switch (*datatype) {
119   case MPI_INT:
120   case MPI_LOGICAL:
121     for(i=0;i<(*len);i++)
122       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
123     break;
124   default:
125     ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
126     CmiAbort("exiting");
127   }
128 }
129 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
130   int i; 
131   switch (*datatype) {
132   case MPI_INT:
133     for(i=0;i<(*len);i++)
134       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
135     break;
136   case MPI_BYTE:
137     for(i=0;i<(*len);i++)
138       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
139     break;
140   default:
141     ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
142     CmiAbort("exiting");
143   }
144 }
145 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
146   int i;  
147   switch (*datatype) {
148   case MPI_INT:
149   case MPI_LOGICAL:
150     for(i=0;i<(*len);i++)
151       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
152     break;
153   default:
154     ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
155     CmiAbort("exiting");
156   }
157 }
158 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
159   int i;  
160   switch (*datatype) {
161   case MPI_INT:
162     for(i=0;i<(*len);i++)
163       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
164     break;
165   case MPI_BYTE:
166     for(i=0;i<(*len);i++)
167       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
168     break;
169   default:
170     ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
171     CmiAbort("exiting");
172   }
173 }
174 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
175   int i;  
176   switch (*datatype) {
177   case MPI_INT:
178   case MPI_LOGICAL:
179     for(i=0;i<(*len);i++)
180       ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
181     break;
182   default:
183     ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
184     CmiAbort("exiting");
185   }
186 }
187 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
188   int i;  
189   switch (*datatype) {
190   case MPI_INT:
191     for(i=0;i<(*len);i++)
192       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
193     break;
194   case MPI_BYTE:
195     for(i=0;i<(*len);i++)
196       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
197     break;
198         case MPI_UNSIGNED:
199     for(i=0;i<(*len);i++)
200       ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
201     break;
202   default:
203     ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
204     CmiAbort("exiting");
205   }
206 }
207
208 #ifndef MIN
209 #define MIN(a,b) (a < b ? a : b)
210 #endif
211
212 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
213   int i;  
214
215   switch (*datatype) {
216   case MPI_FLOAT_INT:
217     for(i=0;i<(*len);i++)
218       if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
219         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
220       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
221         ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
222     break;
223   case MPI_DOUBLE_INT:
224     for(i=0;i<(*len);i++)
225       if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
226         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
227       else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
228         ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
229
230     break;
231   case MPI_LONG_INT:
232     for(i=0;i<(*len);i++)
233       if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
234         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
235       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
236         ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
237     break;
238   case MPI_2INT:
239     for(i=0;i<(*len);i++)
240       if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
241         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
242       else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
243         ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
244     break;
245   case MPI_SHORT_INT:
246     for(i=0;i<(*len);i++)
247       if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
248         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
249       else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
250         ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
251     break;
252   case MPI_LONG_DOUBLE_INT:
253     for(i=0;i<(*len);i++)
254       if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
255         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
256       else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
257         ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
258     break;
259   case MPI_2FLOAT:
260     for(i=0;i<(*len);i++)
261       if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
262         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
263       else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
264         ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
265     break;
266   case MPI_2DOUBLE:
267     for(i=0;i<(*len);i++)
268       if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
269         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
270       else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
271         ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
272     break;
273   default:
274     ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
275     CmiAbort("exiting");
276   }
277 }
278 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
279   int i;  
280   switch (*datatype) {
281   case MPI_FLOAT_INT:
282     for(i=0;i<(*len);i++)
283       if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
284         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
285       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
286         ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
287     break;
288   case MPI_DOUBLE_INT:
289     for(i=0;i<(*len);i++)
290       if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
291         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
292       else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
293         ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
294     break;
295   case MPI_LONG_INT:
296     for(i=0;i<(*len);i++)
297       if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
298         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
299       else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
300         ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
301     break;
302   case MPI_2INT:
303     for(i=0;i<(*len);i++)
304       if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
305         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
306       else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
307         ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
308     break;
309   case MPI_SHORT_INT:
310     for(i=0;i<(*len);i++)
311       if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
312         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
313       else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
314         ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
315     break;
316   case MPI_LONG_DOUBLE_INT:
317     for(i=0;i<(*len);i++)
318       if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
319         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
320       else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
321         ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
322     break;
323   case MPI_2FLOAT:
324     for(i=0;i<(*len);i++)
325       if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
326         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
327       else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
328         ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
329     break;
330   case MPI_2DOUBLE:
331     for(i=0;i<(*len);i++)
332       if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
333         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
334       else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
335         ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
336     break;
337   default:
338     ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
339     CmiAbort("exiting");
340   }
341 }
342
343 // every msg contains a AmpiOpHeader structure before user data
344 // FIXME: non-commutative operations require messages be ordered by rank
345 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
346   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
347   MPI_Datatype dtype;
348   int szhdr, szdata, len;
349   MPI_User_function* func;
350   func = hdr->func;
351   dtype = hdr->dtype;  
352   szdata = hdr->szdata;
353   len = hdr->len;  
354   szhdr = sizeof(AmpiOpHeader);
355
356   //Assuming extent == size
357   void *ret = malloc(szhdr+szdata);
358   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
359   for(int i=1;i<nMsg;i++){
360     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
361   }
362   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
363   free(ret);
364   return retmsg;
365 }
366
367 CkReduction::reducerType AmpiReducer;
368
369 class Builtin_kvs{
370  public:
371   int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
372   Builtin_kvs(){
373     tag_ub = MPI_TAG_UB_VALUE; 
374     host = MPI_PROC_NULL;
375     io = 0;
376     wtime_is_global = 0;
377     keyval_mype = CkMyPe();
378     keyval_numpes = CkNumPes();
379     keyval_mynode = CkMyNode();
380     keyval_numnodes = CkNumNodes();
381   }
382 };
383
384 // ------------ startup support -----------
385 int _ampi_fallback_setup_count;
386 CDECL void AMPI_Setup(void);
387 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
388
389 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
390
391 /*Main routine used when missing MPI_Setup routine*/
392 CDECL void AMPI_Fallback_Main(int argc,char **argv)
393 {
394   AMPI_Main_cpp(argc,argv);
395   AMPI_Main(argc,argv);
396   FTN_NAME(MPI_MAIN,mpi_main)();
397 }
398
399 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
400 /*Startup routine used if user *doesn't* write
401   a TCHARM_User_setup routine.
402  */
403 CDECL void AMPI_Setup_Switch(void) {
404   _ampi_fallback_setup_count=0;
405   FTN_NAME(AMPI_SETUP,ampi_setup)();
406   AMPI_Setup();
407   if (_ampi_fallback_setup_count==2)
408   { //Missing AMPI_Setup in both C and Fortran:
409     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
410   }
411 }
412
413 static int nodeinit_has_been_called=0;
414 CtvDeclare(ampiParent*, ampiPtr);
415 CtvDeclare(int, ampiInitDone);
416 CtvDeclare(void*,stackBottom);
417 CtvDeclare(int, ampiFinalized);
418 CkpvDeclare(Builtin_kvs, bikvs);
419 CkpvDeclare(int,argvExtracted);
420 static int enableStreaming = 0;
421
422 CDECL long ampiCurrentStackUsage(){
423   int localVariable;
424   
425   unsigned long p1 =  (unsigned long)((void*)&localVariable);
426   unsigned long p2 =  (unsigned long)(CtvAccess(stackBottom));
427
428
429   if(p1 > p2)
430     return p1 - p2;
431   else
432     return  p2 - p1;
433  
434 }
435
436 FDECL void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
437   long usage = ampiCurrentStackUsage();
438   CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
439 }
440
441
442 CDECL void AMPI_threadstart(void *data);
443 static int AMPI_threadstart_idx = -1;
444
445 static void ampiNodeInit(void)
446 {
447   _mpi_nworlds=0;
448   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
449   {
450     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
451   }
452   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
453
454   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
455   
456   CmiAssert(AMPI_threadstart_idx == -1);    // only initialize once
457   AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
458
459   nodeinit_has_been_called=1;
460 }
461
462 #if PRINT_IDLE
463 static double totalidle=0.0, startT=0.0;
464 static int beginHandle, endHandle;
465 static void BeginIdle(void *dummy,double curWallTime)
466 {
467   startT = curWallTime;
468 }
469 static void EndIdle(void *dummy,double curWallTime)
470 {
471   totalidle += curWallTime - startT;
472 }
473 #endif
474
475 /* for fortran reduction operation table to handle mapping */
476 typedef MPI_Op  MPI_Op_Array[128];
477 CtvDeclare(int, mpi_opc);
478 CtvDeclare(MPI_Op_Array, mpi_ops);
479
480 static void ampiProcInit(void){
481   CtvInitialize(ampiParent*, ampiPtr);
482   CtvInitialize(int,ampiInitDone);
483   CtvInitialize(int,ampiFinalized);
484   CtvInitialize(void*,stackBottom);
485
486
487   CtvInitialize(MPI_Op_Array, mpi_ops);
488   CtvInitialize(int, mpi_opc);
489
490   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
491   CkpvAccess(bikvs) = Builtin_kvs();
492
493   CkpvInitialize(int, argvExtracted);
494   CkpvAccess(argvExtracted) = 0;
495   REGISTER_AMPI
496   initAmpiProjections();
497   char **argv=CkGetArgv();
498 #if AMPI_COMLIB  
499   if(CkpvAccess(argvExtracted)==0){
500     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
501   }
502 #endif
503
504 #ifdef AMPIMSGLOG
505   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
506   msgLogRead = CmiGetArgFlag(argv, "+msgLogRead");
507   CmiGetArgInt(argv, "+msgLogRank", &msgLogRank);
508   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
509 #endif
510 }
511
512 void AMPI_Install_Idle_Timer(){
513 #if AMPI_PRINT_IDLE
514   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
515   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
516 #endif
517 }
518
519 void AMPI_Uninstall_Idle_Timer(){
520 #if AMPI_PRINT_IDLE
521   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
522   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
523 #endif
524 }
525
526 PUPfunctionpointer(MPI_MainFn)
527
528 class MPI_threadstart_t {
529 public:
530         MPI_MainFn fn;
531         MPI_threadstart_t() {}
532         MPI_threadstart_t(MPI_MainFn fn_)
533                 :fn(fn_) {}
534         void start(void) {
535                 char **argv=CmiCopyArgs(CkGetArgv());
536                 int argc=CkGetArgc();
537
538                 // Set a pointer to somewhere close to the bottom of the stack.
539                 // This is used for roughly estimating the stack usage later.
540                 CtvAccess(stackBottom) = &argv;
541
542 #if CMK_AMPI_FNPTR_HACK
543                 AMPI_Fallback_Main(argc,argv);
544 #else
545                 (fn)(argc,argv);
546 #endif
547         }
548         void pup(PUP::er &p) {
549                 p|fn;
550         }
551 };
552 PUPmarshall(MPI_threadstart_t)
553
554 CDECL void AMPI_threadstart(void *data)
555 {
556         STARTUP_DEBUG("MPI_threadstart")
557         MPI_threadstart_t t;
558         pupFromBuf(data,t);
559 #if CMK_TRACE_IN_CHARM
560         if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
561 #endif
562         t.start();
563 }
564
565 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
566 {
567         STARTUP_DEBUG("ampiCreateMain")
568         int _nchunks=TCHARM_Get_num_chunks();
569         //Make a new threads array:
570         MPI_threadstart_t s(mainFn);
571         memBuf b; pupIntoBuf(b,s);
572         TCHARM_Create_data( _nchunks,AMPI_threadstart_idx,
573                           b.getData(), b.getSize());
574 }
575
576 /* TCharm Semaphore ID's for AMPI startup */
577 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
578 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
579
580 static CProxy_ampiWorlds ampiWorldsGroup;
581
582 static void init_operations()
583 {
584   CtvInitialize(MPI_Op_Array, mpi_ops);
585   int i = 0;
586   MPI_Op *tab = CtvAccess(mpi_ops);
587   tab[i++] = MPI_MAX;
588   tab[i++] = MPI_MIN;
589   tab[i++] = MPI_SUM;
590   tab[i++] = MPI_PROD;
591   tab[i++] = MPI_LAND;
592   tab[i++] = MPI_BAND;
593   tab[i++] = MPI_LOR;
594   tab[i++] = MPI_BOR;
595   tab[i++] = MPI_LXOR;
596   tab[i++] = MPI_BXOR;
597   tab[i++] = MPI_MAXLOC;
598   tab[i++] = MPI_MINLOC;
599
600   CtvInitialize(int, mpi_opc);
601   CtvAccess(mpi_opc) = i;
602 }
603
604 /*
605 Called from MPI_Init, a collective initialization call:
606  creates a new AMPI array and attaches it to the current
607  set of TCHARM threads.
608 */
609 static ampi *ampiInit(char **argv)
610 {
611   FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CmiMyPe(), TCHARM_Element());)
612   if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
613   STARTUP_DEBUG("ampiInit> begin")
614   
615   MPI_Comm new_world;
616   int _nchunks;
617   CkArrayOptions opts;
618   CProxy_ampiParent parent;
619   if (TCHARM_Element()==0) //the rank of a tcharm object
620   { /* I'm responsible for building the arrays: */
621         STARTUP_DEBUG("ampiInit> creating arrays")
622
623 // FIXME: Need to serialize global communicator allocation in one place.
624         //Allocate the next communicator
625         if(_mpi_nworlds == MPI_MAX_COMM_WORLDS)
626         {
627                 CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
628         }
629         int new_idx=_mpi_nworlds;
630         new_world=MPI_COMM_WORLD+new_idx; // Isaac guessed there shouldn't be a +1 here
631
632         //Create and attach the ampiParent array
633         CkArrayID threads;
634         opts=TCHARM_Attach_start(&threads,&_nchunks);
635         parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
636         STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
637   }
638   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
639
640   FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
641
642   if (TCHARM_Element()==0)
643   {
644           //Make a new ampi array
645           CkArrayID empty;
646
647         ampiCommStruct worldComm(new_world,empty,_nchunks);
648         CProxy_ampi arr;
649
650
651 #if AMPI_COMLIB
652
653         ComlibInstanceHandle ciStreaming = 1;
654         ComlibInstanceHandle ciBcast = 2;
655         ComlibInstanceHandle ciAllgather = 3;
656         ComlibInstanceHandle ciAlltoall = 4;
657
658         arr=CProxy_ampi::ckNew(parent, worldComm, ciStreaming, ciBcast, ciAllgather, ciAlltoall, opts);
659         
660
661         CkPrintf("Using untested comlib code in ampi.C\n");
662
663         Strategy *sStreaming = new StreamingStrategy(1,10);
664         CkAssert(ciStreaming == ComlibRegister(sStreaming));
665         
666         Strategy *sBcast = new BroadcastStrategy(USE_HYPERCUBE);
667         CkAssert(ciBcast = ComlibRegister(sBcast));
668         
669         Strategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
670         CkAssert(ciAllgather = ComlibRegister(sAllgather));
671
672         Strategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
673         CkAssert(ciAlltoall = ComlibRegister(sAlltoall));
674         
675         CmiPrintf("Created AMPI comlib strategies in new manner\n");
676
677         // FIXME: Propogate the comlib table here
678         CkpvAccess(conv_com_object).doneCreating();
679 #else
680         arr=CProxy_ampi::ckNew(parent,worldComm,opts);
681
682 #endif
683
684         //Broadcast info. to the mpi_worlds array
685         // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
686         ampiCommStruct newComm(new_world,arr,_nchunks);
687         //CkPrintf("In ampiInit: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
688         if (ampiWorldsGroup.ckGetGroupID().isZero())
689                 ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
690         else
691                 ampiWorldsGroup.add(newComm);
692         STARTUP_DEBUG("ampiInit> arrays created")
693
694   }
695
696   // Find our ampi object:
697   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
698   CtvAccess(ampiInitDone)=1;
699   CtvAccess(ampiFinalized)=0;
700   STARTUP_DEBUG("ampiInit> complete")
701 #if CMK_BLUEGENE_CHARM
702 //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
703   TRACE_BG_ADD_TAG("AMPI_START");
704 #endif
705
706   init_operations();     // initialize fortran reduction operation table
707
708   getAmpiParent()->ampiInitCallDone = 0;
709
710   CProxy_ampi cbproxy = ptr->getProxy();
711   CkCallback cb(CkIndex_ampi::allInitDone(NULL), cbproxy[0]);
712   ptr->contribute(0, NULL, CkReduction::sum_int, cb);
713
714   ampiParent *thisParent = getAmpiParent(); 
715   while(thisParent->ampiInitCallDone!=1){
716     //CkPrintf("In checking ampiInitCallDone(%d) loop at parent %d!\n", thisParent->ampiInitCallDone, thisParent->thisIndex);
717     thisParent->getTCharmThread()->stop();
718     /* 
719      * thisParent needs to be updated in case of the parent is being pupped.
720      * In such case, thisParent got changed
721      */
722     thisParent = getAmpiParent();
723   }
724
725 #ifdef CMK_BLUEGENE_CHARM
726     BgSetStartOutOfCore();
727 #endif
728
729   return ptr;
730 }
731
732 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
733 class ampiWorlds : public CBase_ampiWorlds {
734 public:
735     ampiWorlds(const ampiCommStruct &nextWorld) {
736         ampiWorldsGroup=thisgroup;
737         //CkPrintf("In constructor: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
738         add(nextWorld);
739     }
740     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
741     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
742     void add(const ampiCommStruct &nextWorld) {
743       int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD); // Isaac guessed there shouldn't be a +1 after the MPI_COMM_WORLD
744         mpi_worlds[new_idx].comm=nextWorld;
745         if (_mpi_nworlds<=new_idx) _mpi_nworlds=new_idx+1;
746         STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
747     }
748 };
749
750 //-------------------- ampiParent -------------------------
751 ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
752                 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
753 {
754   int barrier = 0x1234;
755   STARTUP_DEBUG("ampiParent> starting up")
756   thread=NULL;
757   worldPtr=NULL;
758   myDDT=&myDDTsto;
759   prepareCtv();
760
761   init();
762
763   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
764         AsyncEvacuate(CmiFalse);
765 }
766
767 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
768   thread=NULL;
769   worldPtr=NULL;
770   myDDT=&myDDTsto;
771
772   init();
773
774   AsyncEvacuate(CmiFalse);
775 }
776
777 void ampiParent::pup(PUP::er &p) {
778   ArrayElement1D::pup(p);
779   p|threads;
780   p|worldNo;           // why it was missing from here before??
781   p|worldStruct;
782   myDDT->pup(p);
783   p|splitComm;
784   p|groupComm;
785   p|groups;
786
787 //BIGSIM_OOC DEBUGGING
788 //if(!p.isUnpacking()){
789 //    CmiPrintf("ampiParent[%d] packing ampiRequestList: \n", thisIndex);
790 //    ampiReqs.print();
791 //}
792
793   p|ampiReqs;
794
795 //BIGSIM_OOC DEBUGGING
796 //if(p.isUnpacking()){
797 //    CmiPrintf("ampiParent[%d] unpacking ampiRequestList: \n", thisIndex);
798 //    ampiReqs.print();
799 //}
800
801   p|RProxyCnt;
802   p|tmpRProxy;
803   p|winStructList;
804   p|infos;
805
806   p|ampiInitCallDone;
807 }
808 void ampiParent::prepareCtv(void) {
809   thread=threads[thisIndex].ckLocal();
810   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
811   CtvAccessOther(thread->getThread(),ampiPtr) = this;
812   STARTUP_DEBUG("ampiParent> found TCharm")
813 }
814
815 void ampiParent::init(){
816 #ifdef AMPIMSGLOG
817   if(msgLogWrite && msgLogRank == thisIndex){
818 #if CMK_PROJECTIONS_USE_ZLIB
819     fMsgLog = gzopen(msgLogFilename,"wb");
820     toPUPer = new PUP::tozDisk(fMsgLog);
821 #else
822     fMsgLog = fopen(msgLogFilename,"wb");
823     toPUPer = new PUP::toDisk(fMsgLog);
824 #endif
825   }else if(msgLogRead){
826 #if CMK_PROJECTIONS_USE_ZLIB
827     fMsgLog = gzopen(msgLogFilename,"rb");
828     fromPUPer = new PUP::fromzDisk(fMsgLog);
829 #else
830     fMsgLog = fopen(msgLogFilename,"rb");
831     fromPUPer = new PUP::fromDisk(fMsgLog);
832 #endif
833   }
834 #endif
835 }
836
837 void ampiParent::finalize(){
838 #ifdef AMPIMSGLOG
839   if(msgLogWrite && msgLogRank == thisIndex){
840     delete toPUPer;
841 #if CMK_PROJECTIONS_USE_ZLIB
842     gzclose(fMsgLog);
843 #else
844     fclose(fMsgLog);
845 #endif
846   }else if(msgLogRead){
847     delete fromPUPer;
848 #if CMK_PROJECTIONS_USE_ZLIB
849     gzclose(fMsgLog);
850 #else
851     fclose(fMsgLog);
852 #endif
853   }
854 #endif
855 }
856
857 void ampiParent::ckJustMigrated(void) {
858   ArrayElement1D::ckJustMigrated();
859   prepareCtv();
860 }
861
862 void ampiParent::ckJustRestored(void) {
863   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
864   ArrayElement1D::ckJustRestored();
865   prepareCtv();
866   
867   //BIGSIM_OOC DEBUGGING
868   //CkPrintf("In ampiParent[%d] with TCharm thread=%p:   ",thisIndex, thread);
869   //CthPrintThdMagic(thread->getTid()); 
870 }
871
872 ampiParent::~ampiParent() {
873   STARTUP_DEBUG("ampiParent> destructor called");
874   finalize();
875 }
876
877 //Children call this when they are first created or just migrated
878 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
879 {
880   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
881
882   if (s.getComm()>=MPI_COMM_WORLD)
883   { //We now have our COMM_WORLD-- register it
884     //Note that split communicators don't keep a raw pointer, so
885     //they don't need to re-register on migration.
886      if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
887      worldPtr=ptr;
888      worldStruct=s;
889
890     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
891      CkVec<int> _indices;
892      _indices.push_back(thisIndex);
893      selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
894   }
895   
896   if (!forMigration)
897   { //Register the new communicator:
898      MPI_Comm comm = s.getComm();
899      STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
900      if (comm>=MPI_COMM_WORLD) { 
901        // Pass the new ampi to the waiting ampiInit
902        thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
903      } else if (isSplit(comm)) {
904        splitChildRegister(s);
905      } else if (isGroup(comm)) {
906        groupChildRegister(s);
907      } else if (isCart(comm)) {
908        cartChildRegister(s);
909      } else if (isGraph(comm)) {
910        graphChildRegister(s);
911      } else if (isInter(comm)) {
912        interChildRegister(s);
913      } else if (isIntra(comm)) {
914        intraChildRegister(s);
915      }else
916        CkAbort("ampiParent recieved child with bad communicator");
917   }
918
919   return thread;
920 }
921
922 //BIGSIM_OOC DEBUGGING
923 //Move the comm2ampi from inline to normal function for the sake of debugging
924 /*ampi *ampiParent::comm2ampi(MPI_Comm comm){
925       //BIGSIM_OOC DEBUGGING
926       //CmiPrintf("%d, in ampiParent::comm2ampi, comm=%d\n", thisIndex, comm);
927       if (comm==MPI_COMM_WORLD) return worldPtr;
928       if (comm==MPI_COMM_SELF) return worldPtr;
929       if (comm==worldNo) return worldPtr;
930       if (isSplit(comm)) {
931          const ampiCommStruct &st=getSplit(comm);
932          return st.getProxy()[thisIndex].ckLocal();
933       }
934       if (isGroup(comm)) {
935          const ampiCommStruct &st=getGroup(comm);
936          return st.getProxy()[thisIndex].ckLocal();
937       }
938       if (isCart(comm)) {
939         const ampiCommStruct &st = getCart(comm);
940         return st.getProxy()[thisIndex].ckLocal();
941       }
942       if (isGraph(comm)) {
943         const ampiCommStruct &st = getGraph(comm);
944         return st.getProxy()[thisIndex].ckLocal();
945       }
946       if (isInter(comm)) {
947          const ampiCommStruct &st=getInter(comm);
948          return st.getProxy()[thisIndex].ckLocal();
949       }
950       if (isIntra(comm)) {
951          const ampiCommStruct &st=getIntra(comm);
952          return st.getProxy()[thisIndex].ckLocal();
953       }
954       if (comm>MPI_COMM_WORLD) return worldPtr; //Use MPI_WORLD ampi for cross-world messages:
955       CkAbort("Invalid communicator used!");
956       return NULL;
957 }*/
958
959 // reduction client data - preparation for checkpointing
960 class ckptClientStruct {
961 public:
962   const char *dname;
963   ampiParent *ampiPtr;
964   ckptClientStruct(const char *s, ampiParent *a): dname(s), ampiPtr(a) {}
965 };
966
967 static void checkpointClient(void *param,void *msg)
968 {
969   ckptClientStruct *client = (ckptClientStruct*)param;
970   const char *dname = client->dname;
971   ampiParent *ampiPtr = client->ampiPtr;
972   ampiPtr->Checkpoint(strlen(dname), dname);
973   delete client;
974 }
975
976 void ampiParent::startCheckpoint(const char* dname){
977   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
978   if (thisIndex==0) {
979         ckptClientStruct *clientData = new ckptClientStruct(dname, this);
980         CkCallback cb(checkpointClient, clientData);
981         contribute(0, NULL, CkReduction::sum_int, cb);
982   }
983   else
984         contribute(0, NULL, CkReduction::sum_int);
985
986 #if 0
987 #if CMK_BLUEGENE_CHARM
988   void *curLog;         // store current log in timeline
989   _TRACE_BG_TLINE_END(&curLog);
990   TRACE_BG_AMPI_SUSPEND();
991 #endif
992 #endif
993
994   thread->stop();
995
996 #if CMK_BLUEGENE_CHARM
997   // _TRACE_BG_BEGIN_EXECUTE_NOMSG("CHECKPOINT_RESUME", &curLog);
998   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
999 #endif
1000 }
1001
1002 void ampiParent::Checkpoint(int len, const char* dname){
1003   if (len == 0) {
1004     // memory checkpoint
1005     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1006     CkStartMemCheckpoint(cb);
1007   }
1008   else {
1009     char dirname[256];
1010     strncpy(dirname,dname,len);
1011     dirname[len]='\0';
1012     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1013     CkStartCheckpoint(dirname,cb);
1014   }
1015 }
1016 void ampiParent::ResumeThread(void){
1017   thread->resume();
1018 }
1019
1020 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
1021                              int *keyval, void* extra_state){
1022         KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
1023         int idx = kvlist.size();
1024         kvlist.resize(idx+1);
1025         kvlist[idx] = newnode;
1026         *keyval = idx;
1027         return 0;
1028 }
1029 int ampiParent::freeKeyval(int *keyval){
1030         if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
1031                 return -1;
1032         delete kvlist[*keyval];
1033         kvlist[*keyval] = NULL;
1034         *keyval = MPI_KEYVAL_INVALID;
1035         return 0;
1036 }
1037
1038 int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
1039         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1040                 return -1;
1041         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1042         // Enlarge the keyval list:
1043         while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
1044         cs.getKeyvals()[keyval]=attribute_val;
1045         return 0;
1046 }
1047
1048 int ampiParent::kv_is_builtin(int keyval) {
1049         switch(keyval) {
1050         case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
1051         case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
1052         case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
1053         case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
1054         case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
1055         case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
1056         case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
1057         case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
1058         default: return 0;
1059         };
1060 }
1061
1062 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
1063         *flag = false;
1064         if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
1065           *flag=true;
1066           *(int **)attribute_val = kv_builtin_storage;  // all default tags are ints
1067           return 0;
1068         }
1069         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1070                 return -1; /* invalid keyval */
1071         
1072         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1073         if (keyval>=cs.getKeyvals().size())  
1074                 return 0; /* we don't have a value yet */
1075         if (cs.getKeyvals()[keyval]==0)
1076                 return 0; /* we had a value, but now it's zero */
1077         /* Otherwise, we have a good value */
1078         *flag = true;
1079         *(void **)attribute_val = cs.getKeyvals()[keyval];
1080         return 0;
1081 }
1082 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
1083         /* no way to delete an attribute: just overwrite it with 0 */
1084         return putAttr(comm,keyval,0);
1085 }
1086
1087 //----------------------- ampi -------------------------
1088 void ampi::init(void) {
1089   parent=NULL;
1090   thread=NULL;
1091   msgs=NULL;
1092   posted_ireqs=NULL;
1093   resumeOnRecv=false;
1094   AsyncEvacuate(CmiFalse);
1095 }
1096
1097 ampi::ampi()
1098 {
1099   /* this constructor only exists so we can create an empty array during split */
1100   CkAbort("Default ampi constructor should never be called");
1101 }
1102
1103 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
1104    :parentProxy(parent_)
1105 {
1106   init();
1107
1108   myComm=s; myComm.setArrayID(thisArrayID);
1109   myRank=myComm.getRankForIndex(thisIndex);
1110
1111   findParent(false);
1112
1113   msgs = CmmNew();
1114   posted_ireqs = CmmNew();
1115   nbcasts = 0;
1116
1117   comlibProxy = thisProxy; // Will later be associated with comlib
1118   
1119   seqEntries=parent->ckGetArraySize();
1120   oorder.init (seqEntries);
1121 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1122     if(thisIndex == 0){
1123 /*      CkAssert(CkMyPe() == 0);
1124  *              CkGroupID _myManagerGID = thisProxy.ckGetArrayID();     
1125  *                      CkAssert(numElements);
1126  *                              printf("ampi::ampi setting numInitial to %d on manager at gid %d \n",numElements,_myManagerGID.idx);
1127  *                                      CkArray *_myManager = thisProxy.ckLocalBranch();
1128  *                                              _myManager->setNumInitial(numElements);*/
1129     }
1130 #endif
1131 }
1132
1133 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
1134                 ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
1135    :parentProxy(parent_),ciStreaming(ciStreaming_),ciBcast(ciBcast_),ciAllgather(ciAllgather_),ciAlltoall(ciAlltoall_)
1136 {
1137   init();
1138
1139   myComm=s; myComm.setArrayID(thisArrayID);
1140   myRank=myComm.getRankForIndex(thisIndex);
1141
1142   findParent(false);
1143
1144   msgs = CmmNew();
1145   posted_ireqs = CmmNew();
1146   nbcasts = 0;
1147
1148   comlibProxy = thisProxy;
1149   CmiPrintf("comlibProxy created as a copy of thisProxy, no associate call\n");
1150
1151 #if AMPI_COMLIB
1152   //  ComlibAssociateProxy(ciAlltoall, comlibProxy);
1153 #endif
1154
1155   seqEntries=parent->ckGetArraySize();
1156   oorder.init (seqEntries);
1157 }
1158
1159 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
1160 {
1161   init();
1162   
1163   seqEntries=-1;
1164 }
1165
1166 void ampi::ckJustMigrated(void)
1167 {
1168         findParent(true);
1169         ArrayElement1D::ckJustMigrated();
1170 }
1171
1172 void ampi::ckJustRestored(void)
1173 {
1174         FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
1175         findParent(true);
1176         ArrayElement1D::ckJustRestored();
1177         
1178         //BIGSIM_OOC DEBUGGING
1179         //CkPrintf("In ampi[%d] thread[%p]:   ", thisIndex, thread);
1180         //CthPrintThdMagic(thread->getTid()); 
1181 }
1182
1183 void ampi::findParent(bool forMigration) {
1184         STARTUP_DEBUG("ampi> finding my parent")
1185         parent=parentProxy[thisIndex].ckLocal();
1186         if (parent==NULL) CkAbort("AMPI can't find its parent!");
1187         thread=parent->registerAmpi(this,myComm,forMigration);
1188         if (thread==NULL) CkAbort("AMPI can't find its thread!");
1189 //      printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
1190 }
1191
1192 //The following method should be called on the first element of the
1193 //ampi array
1194 void ampi::allInitDone(CkReductionMsg *m){
1195     FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
1196     thisProxy.setInitDoneFlag();
1197     delete m;
1198 }
1199
1200 void ampi::setInitDoneFlag(){
1201     //CkPrintf("ampi[%d]::setInitDone called!\n", thisIndex);
1202     parent->ampiInitCallDone=1;
1203     parent->getTCharmThread()->start();
1204 }
1205
1206 static void cmm_pup_ampi_message(pup_er p,void **msg) {
1207         CkPupMessage(*(PUP::er *)p,msg,1);
1208         if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
1209 //      printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
1210 }
1211
1212 static void cmm_pup_posted_ireq(pup_er p,void **msg) {
1213
1214         pup_int(p, (int *)msg);
1215
1216 /*      if(pup_isUnpacking(p)){
1217             // *msg = new IReq;
1218             //when unpacking, nothing is needed to do since *msg is an index
1219             //(of type integer) to the ampiParent::ampiReqs (the AmpiRequestList)
1220         }
1221         if(!pup_isUnpacking(p)){
1222             AmpiRequestList *reqL = getReqs();
1223             int retIdx = reqL->findRequestIndex((IReq *)*msg);
1224             if(retIdx==-1){
1225                 CmiAbort("An AmpiRequest instance should be found for an instance in posted_ireq!\n");
1226             }
1227             pup_int(p, retIdx)
1228         }
1229 */
1230 //      ((IReq *)*msg)->pup(*(PUP::er *)p);
1231
1232 //      if (pup_isDeleting(p)) delete (IReq *)*msg;
1233 //      printf("[%d] pupping postd irequests %p \n",CkMyPe(),*msg);
1234 }
1235
1236 void ampi::pup(PUP::er &p)
1237 {
1238   if(!p.isUserlevel())
1239     ArrayElement1D::pup(p);//Pack superclass
1240   p|parentProxy;
1241   p|myComm;
1242   p|myRank;
1243   p|nbcasts;
1244   p|tmpVec;
1245   p|remoteProxy;
1246   p|resumeOnRecv;
1247   p|comlibProxy;
1248   p|ciStreaming;
1249   p|ciBcast;
1250   p|ciAllgather;
1251   p|ciAlltoall;
1252
1253 #if AMPI_COMLIB
1254   if(p.isUnpacking()){
1255 //    ciStreaming.setSourcePe();
1256 //    ciBcast.setSourcePe();
1257 //    ciAllgather.setSourcePe();
1258 //    ciAlltoall.setSourcePe();
1259   }
1260 #endif
1261
1262   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
1263
1264   //BIGSIM_OOC DEBUGGING
1265   //if(!p.isUnpacking()){
1266     //CkPrintf("ampi[%d]::packing: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1267   //}
1268
1269   posted_ireqs = CmmPup((pup_er)&p, posted_ireqs, cmm_pup_posted_ireq);
1270
1271   //if(p.isUnpacking()){
1272     //CkPrintf("ampi[%d]::unpacking: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1273   //}
1274
1275   p|seqEntries;
1276   p|oorder;
1277 }
1278
1279 ampi::~ampi()
1280 {
1281   if (CkInRestarting() || _BgOutOfCoreFlag==1) {
1282     // in restarting, we need to flush messages
1283     int tags[3], sts[3];
1284     tags[0] = tags[1] = tags[2] = CmmWildCard;
1285     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1286     while (msg) {
1287       delete msg;
1288       msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1289     }
1290   }
1291
1292   CmmFree(msgs);
1293   CmmFreeAll(posted_ireqs);
1294 }
1295
1296 //------------------------ Communicator Splitting ---------------------
1297 class ampiSplitKey {
1298 public:
1299         int nextSplitComm;
1300         int color; //New class of processes we'll belong to
1301         int key; //To determine rank in new ordering
1302         int rank; //Rank in old ordering
1303         ampiSplitKey() {}
1304         ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
1305                 :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
1306 };
1307
1308 /* "type" may indicate whether call is for a cartesian topology etc. */
1309
1310 void ampi::split(int color,int key,MPI_Comm *dest, int type)
1311 {
1312 #if CMK_BLUEGENE_CHARM
1313   void *curLog;         // store current log in timeline
1314   _TRACE_BG_TLINE_END(&curLog);
1315 //  TRACE_BG_AMPI_SUSPEND();
1316 #endif
1317   if (type == CART_TOPOL) {
1318         ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
1319         int rootIdx=myComm.getIndexForRank(0);
1320         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1321         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1322
1323         thread->suspend(); //Resumed by ampiParent::cartChildRegister
1324         MPI_Comm newComm=parent->getNextCart()-1;
1325         *dest=newComm;
1326   } else {
1327         ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
1328         int rootIdx=myComm.getIndexForRank(0);
1329         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1330         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1331
1332         thread->suspend(); //Resumed by ampiParent::splitChildRegister
1333         MPI_Comm newComm=parent->getNextSplit()-1;
1334         *dest=newComm;
1335   }
1336 #if CMK_BLUEGENE_CHARM
1337 //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "SPLIT_RESUME", curLog);
1338   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("SPLIT_RESUME", &curLog);
1339   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
1340 #endif
1341 }
1342
1343 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
1344         const ampiSplitKey *a=(const ampiSplitKey *)a_;
1345         const ampiSplitKey *b=(const ampiSplitKey *)b_;
1346         if (a->color!=b->color) return a->color-b->color;
1347         if (a->key!=b->key) return a->key-b->key;
1348         return a->rank-b->rank;
1349 }
1350
1351 void ampi::splitPhase1(CkReductionMsg *msg)
1352 {
1353         //Order the keys, which orders the ranks properly:
1354         int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1355         ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1356         if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1357         qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1358
1359         MPI_Comm newComm = -1;
1360         for(int i=0;i<nKeys;i++)
1361                 if(keys[i].nextSplitComm>newComm)
1362                         newComm = keys[i].nextSplitComm;
1363
1364         //Loop over the sorted keys, which gives us the new arrays:
1365         int lastColor=keys[0].color-1; //The color we're building an array for
1366         CProxy_ampi lastAmpi; //The array for lastColor
1367         int lastRoot=0; //C value for new rank 0 process for latest color
1368         ampiCommStruct lastComm; //Communicator info. for latest color
1369         for (int c=0;c<nKeys;c++) {
1370                 if (keys[c].color!=lastColor)
1371                 { //Hit a new color-- need to build a new communicator and array
1372                         lastColor=keys[c].color;
1373                         lastRoot=c;
1374                         CkArrayOptions opts;
1375                         opts.bindTo(parentProxy);
1376                         opts.setNumInitial(0);
1377                         CkArrayID unusedAID; ampiCommStruct unusedComm;
1378                         lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1379                         lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1380
1381                         CkVec<int> indices; //Maps rank to array indices for new arrau
1382                         for (int i=c;i<nKeys;i++) {
1383                                 if (keys[i].color!=lastColor) break; //Done with this color
1384                                 int idx=myComm.getIndexForRank(keys[i].rank);
1385                                 indices.push_back(idx);
1386                         }
1387
1388                         //FIXME: create a new communicator for each color, instead of
1389                         // (confusingly) re-using the same MPI_Comm number for each.
1390                         lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1391                 }
1392                 int newRank=c-lastRoot;
1393                 int newIdx=lastComm.getIndexForRank(newRank);
1394
1395                 //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1396                 lastAmpi[newIdx].insert(parentProxy,lastComm);
1397         }
1398
1399         delete msg;
1400 }
1401
1402 //...newly created array elements register with the parent, which calls:
1403 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1404         int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1405         if (splitComm.size()<=idx) splitComm.resize(idx+1);
1406         splitComm[idx]=new ampiCommStruct(s);
1407         thread->resume(); //Matches suspend at end of ampi::split
1408 }
1409
1410 //-----------------create communicator from group--------------
1411 // The procedure is like that of comm_split very much,
1412 // so the code is shamelessly copied from above
1413 //   1. reduction to make sure all members have called
1414 //   2. the root in the old communicator create the new array
1415 //   3. ampiParent::register is called to register new array as new comm
1416 class vecStruct {
1417 public:
1418   int nextgroup;
1419   groupStruct vec;
1420   vecStruct():nextgroup(-1){}
1421   vecStruct(int nextgroup_, groupStruct vec_)
1422     : nextgroup(nextgroup_), vec(vec_) { }
1423 };
1424
1425 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1426   int rootIdx=vec[0];
1427   tmpVec = vec;
1428   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1429   MPI_Comm nextgroup = parent->getNextGroup();
1430   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1431
1432   if(getPosOp(thisIndex,vec)>=0){
1433     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1434     MPI_Comm retcomm = parent->getNextGroup()-1;
1435     *newcomm = retcomm;
1436   }else{
1437     *newcomm = MPI_COMM_NULL;
1438   }
1439 }
1440
1441 void ampi::commCreatePhase1(CkReductionMsg *msg){
1442   MPI_Comm *nextGroupComm = (int *)msg->getData();
1443
1444   CkArrayOptions opts;
1445   opts.bindTo(parentProxy);
1446   opts.setNumInitial(0);
1447   CkArrayID unusedAID;
1448   ampiCommStruct unusedComm;
1449   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1450   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1451
1452   groupStruct indices = tmpVec;
1453   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1454   for(int i=0;i<indices.size();i++){
1455     int newIdx=indices[i];
1456     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1457   }
1458   delete msg;
1459 }
1460
1461 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1462   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1463   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1464   groupComm[idx]=new ampiCommStruct(s);
1465   thread->resume(); //Matches suspend at end of ampi::split
1466 }
1467
1468 /* Virtual topology communicator creation */
1469 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1470   int rootIdx=vec[0];
1471   tmpVec = vec;
1472   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1473
1474   MPI_Comm nextcart = parent->getNextCart();
1475   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1476
1477   if(getPosOp(thisIndex,vec)>=0){
1478     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1479      MPI_Comm retcomm = parent->getNextCart()-1;
1480      *newcomm = retcomm;
1481   }else
1482     *newcomm = MPI_COMM_NULL;
1483 }
1484
1485 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1486   MPI_Comm *nextCartComm = (int *)msg->getData();
1487
1488   CkArrayOptions opts;
1489   opts.bindTo(parentProxy);
1490   opts.setNumInitial(0);
1491   CkArrayID unusedAID;
1492   ampiCommStruct unusedComm;
1493   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1494   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1495
1496   groupStruct indices = tmpVec;
1497   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1498                                                 size(),indices);
1499   for(int i=0;i<indices.size();i++){
1500     int newIdx=indices[i];
1501     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1502   }
1503   delete msg;
1504 }
1505
1506 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1507   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1508   if (cartComm.size()<=idx) {
1509     cartComm.resize(idx+1);
1510     cartComm.length()=idx+1;
1511   }
1512   cartComm[idx]=new ampiCommStruct(s);
1513   thread->resume(); //Matches suspend at end of ampi::cartCreate
1514 }
1515
1516 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1517   int rootIdx=vec[0];
1518   tmpVec = vec;
1519   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1520                 myComm.getProxy());
1521   MPI_Comm nextgraph = parent->getNextGraph();
1522   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1523
1524   if(getPosOp(thisIndex,vec)>=0){
1525     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1526     MPI_Comm retcomm = parent->getNextGraph()-1;
1527     *newcomm = retcomm;
1528   }else
1529     *newcomm = MPI_COMM_NULL;
1530 }
1531
1532 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1533   MPI_Comm *nextGraphComm = (int *)msg->getData();
1534
1535   CkArrayOptions opts;
1536   opts.bindTo(parentProxy);
1537   opts.setNumInitial(0);
1538   CkArrayID unusedAID;
1539   ampiCommStruct unusedComm;
1540   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1541   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1542
1543   groupStruct indices = tmpVec;
1544   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1545                                                 .size(),indices);
1546   for(int i=0;i<indices.size();i++){
1547     int newIdx=indices[i];
1548     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1549   }
1550   delete msg;
1551 }
1552
1553 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1554   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1555   if (graphComm.size()<=idx) {
1556     graphComm.resize(idx+1);
1557     graphComm.length()=idx+1;
1558   }
1559   graphComm[idx]=new ampiCommStruct(s);
1560   thread->resume(); //Matches suspend at end of ampi::graphCreate
1561 }
1562
1563 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1564   if(thisIndex==root) { // not everybody gets the valid rvec
1565     tmpVec = rvec;
1566   }
1567   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1568   MPI_Comm nextinter = parent->getNextInter();
1569   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1570
1571   thread->suspend(); //Resumed by ampiParent::interChildRegister
1572   MPI_Comm newcomm=parent->getNextInter()-1;
1573   *ncomm=newcomm;
1574 }
1575
1576 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1577   MPI_Comm *nextInterComm = (int *)msg->getData();
1578
1579   groupStruct lgroup = myComm.getIndices();
1580   CkArrayOptions opts;
1581   opts.bindTo(parentProxy);
1582   opts.setNumInitial(0);
1583   CkArrayID unusedAID;
1584   ampiCommStruct unusedComm;
1585   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1586   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1587
1588   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1589   for(int i=0;i<lgroup.size();i++){
1590     int newIdx=lgroup[i];
1591     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1592   }
1593
1594   parentProxy[0].ExchangeProxy(newAmpi);
1595   delete msg;
1596 }
1597
1598 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1599   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1600   if (interComm.size()<=idx) interComm.resize(idx+1);
1601   interComm[idx]=new ampiCommStruct(s);
1602   //thread->resume(); // don't resume it yet, till parent set remote proxy
1603 }
1604
1605 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1606   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1607     groupStruct lvec = myComm.getIndices();
1608     groupStruct rvec = myComm.getRemoteIndices();
1609     int rsize = rvec.size();
1610     tmpVec = lvec;
1611     for(int i=0;i<rsize;i++)
1612       tmpVec.push_back(rvec[i]);
1613     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1614   }else{
1615     tmpVec.resize(0);
1616   }
1617
1618   int rootIdx=myComm.getIndexForRank(0);
1619   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1620   MPI_Comm nextintra = parent->getNextIntra();
1621   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1622
1623   thread->suspend(); //Resumed by ampiParent::interChildRegister
1624   MPI_Comm newcomm=parent->getNextIntra()-1;
1625   *ncomm=newcomm;
1626 }
1627
1628 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1629   if(tmpVec.size()==0) { delete msg; return; }
1630   MPI_Comm *nextIntraComm = (int *)msg->getData();
1631   CkArrayOptions opts;
1632   opts.bindTo(parentProxy);
1633   opts.setNumInitial(0);
1634   CkArrayID unusedAID;
1635   ampiCommStruct unusedComm;
1636   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1637   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1638
1639   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1640   for(int i=0;i<tmpVec.size();i++){
1641     int newIdx=tmpVec[i];
1642     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1643   }
1644   delete msg;
1645 }
1646
1647 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1648   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1649   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1650   intraComm[idx]=new ampiCommStruct(s);
1651   thread->resume(); //Matches suspend at end of ampi::split
1652 }
1653
1654 //------------------------ communication -----------------------
1655 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1656 {
1657   if (universeNo>MPI_COMM_WORLD) {
1658     int worldDex=universeNo-MPI_COMM_WORLD-1;
1659     if (worldDex>=_mpi_nworlds)
1660       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1661     return mpi_worlds[worldDex].comm;
1662   }
1663   CkAbort("Bad communicator passed to universeComm2CommStruct");
1664   return mpi_worlds[0].comm; // meaningless return
1665 }
1666
1667 void ampi::block(void){
1668         thread->suspend();
1669 }
1670
1671 void ampi::yield(void){
1672         thread->schedule();
1673 }
1674
1675 void ampi::unblock(void){
1676         thread->resume();
1677 }
1678
1679 void ampi::ssend_ack(int sreq_idx){
1680         if (sreq_idx == 1)
1681           thread->resume();           // MPI_Ssend
1682         else {
1683           sreq_idx -= 2;              // start from 2
1684           AmpiRequestList *reqs = &(parent->ampiReqs);
1685           SReq *sreq = (SReq *)(*reqs)[sreq_idx];
1686           sreq->statusIreq = true;
1687           if (resumeOnRecv) {
1688              thread->resume();
1689           }
1690         }
1691 }
1692
1693 void
1694 ampi::generic(AmpiMsg* msg)
1695 {
1696 MSG_ORDER_DEBUG(
1697         CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1698         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1699 )
1700 #if CMK_BLUEGENE_CHARM
1701   TRACE_BG_ADD_TAG("AMPI_generic");
1702   msg->event = NULL;
1703 #endif
1704
1705   int sync = UsrToEnv(msg)->getRef();
1706   int srcIdx;
1707   if (sync)  srcIdx = msg->srcIdx;
1708
1709 //      AmpiMsg *msgcopy = msg;
1710   if(msg->seq != -1) {
1711     int srcIdx=msg->srcIdx;
1712     int n=oorder.put(srcIdx,msg);
1713     if (n>0) { // This message was in-order
1714       inorder(msg);
1715       if (n>1) { // It enables other, previously out-of-order messages
1716         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1717           inorder(msg);
1718         }
1719       }
1720     }
1721   } else { //Cross-world or system messages are unordered
1722     inorder(msg);
1723   }
1724   
1725     // msg may be free'ed from calling inorder()
1726   if (sync>0) {         // send an ack to sender
1727     CProxy_ampi pa(thisArrayID);
1728     pa[srcIdx].ssend_ack(sync);
1729   }
1730
1731   if(resumeOnRecv){
1732     //CkPrintf("Calling TCharm::resume at ampi::generic!\n");
1733     thread->resume();
1734   }
1735 }
1736
1737 inline static AmpiRequestList *getReqs(void); 
1738
1739 void
1740 ampi::inorder(AmpiMsg* msg)
1741 {
1742 MSG_ORDER_DEBUG(
1743   CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1744         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1745 )
1746   // check posted recvs
1747   int tags[3], sts[3];
1748   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1749   IReq *ireq = NULL;
1750   if (CpvAccess(CmiPICMethod) != 2) {
1751 #if 0
1752     //IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1753     ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1754 #else
1755 #if CMK_BLUEGENE_CHARM
1756     _TRACE_BG_TLINE_END(&msg->event);    // store current log
1757     msg->eventPe = CmiMyPe();
1758 #endif
1759     //in case ampi has not initialized and posted_ireqs are only inserted 
1760     //at AMPI_Irecv (MPI_Irecv)
1761     AmpiRequestList *reqL = &(parent->ampiReqs);
1762     //When storing the req index, it's 1-based. The reason is stated in the comments
1763     //in AMPI_Irecv function.
1764     int ireqIdx = (int)((long)CmmGet(posted_ireqs, 3, tags, sts));
1765     if(reqL->size()>0 && ireqIdx>0)
1766         ireq = (IReq *)(*reqL)[ireqIdx-1];
1767     //CkPrintf("[%d] ampi::inorder, ireqIdx=%d\n", thisIndex, ireqIdx);
1768 #endif
1769     //CkPrintf("[%d] ampi::inorder, ireq=%p\n", thisIndex, ireq);
1770     if (ireq) { // receive posted
1771       ireq->receive(this, msg);
1772       // Isaac changed this so that the IReq stores the tag when receiving the message, 
1773       // instead of using this user supplied tag which could be MPI_ANY_TAG
1774       // Formerly the following line was not commented out:
1775       //ireq->tag = sts[0];         
1776       //ireq->src = sts[1];
1777       //ireq->comm = sts[2];
1778     } else {
1779       CmmPut(msgs, 3, tags, msg);
1780     }
1781   }
1782   else
1783       CmmPut(msgs, 3, tags, msg);
1784 }
1785
1786 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
1787 {
1788     int tags[3];
1789     tags[0] = t; tags[1] = s; tags[2] = comm;
1790     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1791     return msg;
1792 }
1793
1794 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1795         int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm, int sync)
1796 {
1797   CkDDT_DataType *ddt = getDDT()->getType(type);
1798   int len = ddt->getSize(count);
1799   int sIdx=thisIndex;
1800   int seq = -1;
1801   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_TAG_UB_VALUE) //Not cross-module: set seqno
1802   seq = oorder.nextOutgoing(destIdx);
1803   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1804   if (sync) UsrToEnv(msg)->setRef(sync);
1805   TCharm::activateVariable(buf);
1806   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1807   TCharm::deactivateVariable(buf);
1808   return msg;
1809 }
1810
1811 void
1812 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1813 {
1814   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1815 }
1816
1817 void
1818 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, int sync)
1819 {
1820 #if CMK_TRACE_IN_CHARM
1821    TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
1822 #endif
1823
1824 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1825   MPI_Comm disComm = myComm.getComm();
1826   ampi *dis = getAmpiInstance(disComm);
1827   CpvAccess(_currentObj) = dis;
1828 #endif
1829
1830   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1831   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),sync);
1832
1833 #if CMK_TRACE_IN_CHARM
1834    TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
1835 #endif
1836
1837   if (sync == 1) {
1838     // waiting for receiver side
1839     resumeOnRecv = false;            // so no one else awakes it
1840     block();
1841   }
1842 }
1843
1844 void
1845 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1846 {
1847   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
1848   memcpy(msg->data, buf, len);
1849   CProxy_ampi pa(aid);
1850   pa[idx].generic(msg);
1851 }
1852
1853 void
1854 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy, int sync)
1855 {
1856   if(rank==MPI_PROC_NULL) return;
1857   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1858   int destIdx = dest.getIndexForRank(rank);
1859   if(isInter()){
1860     sRank = parent->thisIndex;
1861     destcomm = MPI_COMM_FIRST_INTER;
1862     destIdx = dest.getIndexForRemoteRank(rank);
1863     arrproxy = remoteProxy;
1864   }
1865  MSG_ORDER_DEBUG(
1866   CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
1867  )
1868
1869   arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm,sync));
1870
1871 #if 0
1872 #if CMK_TRACE_ENABLED
1873   int size=0;
1874   MPI_Type_size(type,&size);
1875   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
1876 #endif
1877 #endif
1878 }
1879
1880 int
1881 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
1882 {
1883   CkDDT_DataType *ddt = getDDT()->getType(type);
1884   int len = ddt->getSize(count);
1885   
1886   if (msg->length > len && msg->length-len!=sizeof(AmpiOpHeader))
1887   { /* Received more data than we were expecting */
1888     char einfo[1024];
1889     sprintf(einfo, "FATAL ERROR in rank %d MPI_Recv (tag=%d, source=%d)\n"
1890         "  Expecting only %d bytes (%d items of type %d), \n"
1891         "  but received %d bytes from rank %d\nAMPI> MPI_Send was longer than matching MPI_Recv.",
1892             thisIndex,t,s,
1893             len, count, type,
1894             msg->length, msg->srcRank);
1895     CkAbort(einfo);
1896     return -1;
1897   }else if(msg->length < len){ // only at rare case shall we reset count by using divide
1898     count = msg->length/(ddt->getSize(1));
1899   }
1900   //
1901   TCharm::activateVariable(buf);
1902   if (msg->length-len==sizeof(AmpiOpHeader)) {  // reduction msg
1903     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
1904   } else {
1905     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
1906   }
1907   TCharm::deactivateVariable(buf);
1908   return 0;
1909 }
1910
1911 int
1912 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
1913 {
1914   MPI_Comm disComm = myComm.getComm();
1915   if(s==MPI_PROC_NULL) {
1916     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
1917     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
1918     ((MPI_Status *)sts)->MPI_LENGTH = 0;
1919     return 0;
1920   }
1921   _LOG_E_END_AMPI_PROCESSING(thisIndex)
1922 #if CMK_BLUEGENE_CHARM
1923   void *curLog;         // store current log in timeline
1924   _TRACE_BG_TLINE_END(&curLog);
1925 //  TRACE_BG_AMPI_SUSPEND();
1926 #if CMK_TRACE_IN_CHARM
1927   if(CpvAccess(traceOn)) traceSuspend();
1928 #endif
1929 #endif
1930
1931   if(isInter()){
1932     s = myComm.getIndexForRemoteRank(s);
1933     comm = MPI_COMM_FIRST_INTER;
1934   }
1935
1936   int tags[3];
1937   AmpiMsg *msg = 0;
1938   
1939  MSG_ORDER_DEBUG(
1940   CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
1941  )
1942
1943   resumeOnRecv=true;
1944   ampi *dis = getAmpiInstance(disComm);
1945 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1946 //  dis->yield();
1947 //  processRemoteMlogMessages();
1948 #endif
1949   int dosuspend = 0;
1950   while(1) {
1951       //This is done to take into account the case in which an ampi 
1952       // thread has migrated while waiting for a message
1953     tags[0] = t; tags[1] = s; tags[2] = comm;
1954     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
1955     if (msg) break;
1956     dis->thread->suspend();
1957     dosuspend = 1;
1958     dis = getAmpiInstance(disComm);
1959   }
1960
1961 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1962         CpvAccess(_currentObj) = dis;
1963         MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
1964 #endif
1965
1966   dis->resumeOnRecv=false;
1967
1968   if(sts)
1969     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1970   int status = dis->processMessage(msg, t, s, buf, count, type);
1971   if (status != 0) return status;
1972
1973   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
1974
1975 #if CMK_BLUEGENE_CHARM
1976 #if CMK_TRACE_IN_CHARM
1977   //if(CpvAccess(traceOn)) CthTraceResume(thread->getThread());
1978   //Due to the reason mentioned the in the while loop above, we need to 
1979   //use "dis" as "this" in the case of migration (or out-of-core execution in BigSim)
1980   if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
1981 #endif
1982   //TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", &curLog, 1);
1983   //TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0);
1984   //_TRACE_BG_SET_INFO((char *)msg, "RECV_RESUME",  &curLog, 1);
1985 #if 0
1986 #if 1
1987   if (!dosuspend) {
1988     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 1);
1989     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
1990   }
1991   else
1992 #endif
1993   TRACE_BG_ADD_TAG("RECV_RESUME_THREAD");
1994 #else
1995     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
1996     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
1997 #endif
1998 #endif
1999
2000   delete msg;
2001   return 0;
2002 }
2003
2004 void
2005 ampi::probe(int t, int s, int comm, int *sts)
2006 {
2007   int tags[3];
2008 #if CMK_BLUEGENE_CHARM
2009   void *curLog;         // store current log in timeline
2010   _TRACE_BG_TLINE_END(&curLog);
2011 //  TRACE_BG_AMPI_SUSPEND();
2012 #endif
2013
2014   AmpiMsg *msg = 0;
2015   resumeOnRecv=true;
2016   while(1) {
2017     tags[0] = t; tags[1] = s; tags[2] = comm;
2018     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2019     if (msg) break;
2020     thread->suspend();
2021   }
2022   resumeOnRecv=false;
2023   if(sts)
2024     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2025 #if CMK_BLUEGENE_CHARM
2026 //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
2027   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
2028 #endif
2029 }
2030
2031 int
2032 ampi::iprobe(int t, int s, int comm, int *sts)
2033 {
2034   int tags[3];
2035   AmpiMsg *msg = 0;
2036   tags[0] = t; tags[1] = s; tags[2] = comm;
2037   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2038   if (msg) {
2039     if(sts)
2040       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2041     return 1;
2042   }
2043 #if CMK_BLUEGENE_CHARM
2044   void *curLog;         // store current log in timeline
2045   _TRACE_BG_TLINE_END(&curLog);
2046 //  TRACE_BG_AMPI_SUSPEND();
2047 #endif
2048   thread->schedule();
2049 #if CMK_BLUEGENE_CHARM
2050   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("IPROBE_RESUME", &curLog);
2051   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
2052 #endif
2053   return 0;
2054 }
2055
2056
2057 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
2058 void
2059 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
2060 {
2061   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2062   int rootIdx=dest.getIndexForRank(root);
2063   if(rootIdx==thisIndex) {
2064 #if 0//AMPI_COMLIB
2065     ciBcast.beginIteration();
2066     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2067 #else
2068 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2069         CpvAccess(_currentObj) = this;
2070 #endif
2071     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2072 #endif
2073   }
2074   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
2075   nbcasts++;
2076 }
2077
2078 void
2079 ampi::bcastraw(void* buf, int len, CkArrayID aid)
2080 {
2081   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, 0);
2082   memcpy(msg->data, buf, len);
2083   CProxy_ampi pa(aid);
2084   pa.generic(msg);
2085 }
2086
2087
2088 AmpiMsg* 
2089 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
2090 {
2091   CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
2092   int unit;
2093   CkDDT_DataType *ddt = getDDT()->getType(type);
2094   unit = ddt->getSize(1);
2095   int totalsize = unit*cnt;
2096
2097   AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
2098   char* addr = (char*)Alltoallbuff+disp*unit;
2099   ddt->serialize((char*)msg->data, addr, cnt, (-1));
2100   return msg;
2101 }
2102
2103 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
2104                         void *attr_in, void *attr_out, int *flag){
2105   (*flag) = 0;
2106   return (MPI_SUCCESS);
2107 }
2108 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
2109                         void *attr_in, void *attr_out, int *flag){
2110   (*(void **)attr_out) = attr_in;
2111   (*flag) = 1;
2112   return (MPI_SUCCESS);
2113 }
2114 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
2115   return (MPI_SUCCESS);
2116 }
2117
2118
2119 void AmpiSeqQ::init(int numP) 
2120 {
2121   elements.init(numP);
2122 }
2123
2124 AmpiSeqQ::~AmpiSeqQ () {
2125 }
2126   
2127 void AmpiSeqQ::pup(PUP::er &p) {
2128   p|out;
2129   p|elements;
2130 }
2131
2132 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
2133 {
2134   AmpiOtherElement &el=elements[srcIdx];
2135 #ifndef CMK_OPTIMIZE
2136   if (msg->seq<el.seqIncoming)
2137     CkAbort("AMPI Logic error: received late out-of-order message!\n");
2138 #endif
2139   out.enq(msg);
2140   el.nOut++; // We have another message in the out-of-order queue
2141 }
2142
2143 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
2144 {
2145   AmpiOtherElement &el=elements[srcIdx];
2146   if (el.nOut==0) return 0; // No more out-of-order left.
2147   // Walk through our out-of-order queue, searching for our next message:
2148   for (int i=0;i<out.length();i++) {
2149     AmpiMsg *msg=out.deq();
2150     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
2151       el.seqIncoming++;
2152       el.nOut--; // We have one less message out-of-order
2153       return msg;
2154     }
2155     else
2156       out.enq(msg);
2157   }
2158   // We walked the whole queue-- ours is not there.
2159   return 0;
2160 }
2161
2162 //BIGSIM_OOC DEBUGGING: Output for AmpiRequest and its children classes
2163 void AmpiRequest::print(){
2164             CmiPrintf("In AmpiRequest: buf=%p, count=%d, type=%d, src=%d, tag=%d, comm=%d, isvalid=%d\n", buf, count, type, src, tag, comm, isvalid);
2165 }
2166
2167 void PersReq::print(){
2168     AmpiRequest::print();
2169     CmiPrintf("In PersReq: sndrcv=%d\n", sndrcv);
2170 }
2171
2172 void IReq::print(){
2173     AmpiRequest::print();
2174     CmiPrintf("In IReq: this=%p, status=%d, length=%d\n", this, statusIreq, length);
2175 }
2176
2177 void ATAReq::print(){ //not complete for myreqs
2178     AmpiRequest::print();
2179     CmiPrintf("In ATAReq: elmcount=%d, idx=%d\n", elmcount, idx);
2180
2181
2182 void SReq::print(){
2183     AmpiRequest::print();
2184     CmiPrintf("In SReq: this=%p, status=%d\n", this, statusIreq);
2185 }
2186
2187 void AmpiRequestList::pup(PUP::er &p) { 
2188         if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
2189                 return;
2190         }
2191
2192         p(blklen); //Allocated size of block
2193         p(len); //Number of used elements in block
2194         if(p.isUnpacking()){
2195                 makeBlock(blklen,len);
2196         }
2197         int count=0;
2198         for(int i=0;i<len;i++){
2199                 char nonnull;
2200                 if(!p.isUnpacking()){
2201                         if(block[i] == NULL){
2202                                 nonnull = 0;
2203                         }else{
2204                                 nonnull = block[i]->getType();
2205                         }
2206                 }       
2207                 p(nonnull);
2208                 if(nonnull != 0){
2209                         if(p.isUnpacking()){
2210                                 switch(nonnull){
2211                                         case 1:
2212                                                 block[i] = new PersReq;
2213                                                 break;
2214                                         case 2: 
2215                                                 block[i] = new IReq;
2216                                                 break;
2217                                         case 3: 
2218                                                 block[i] = new ATAReq;
2219                                                 break;
2220                                 }
2221                         }       
2222                         block[i]->pup(p);
2223                         count++;
2224                 }else{
2225                         block[i] = 0;
2226                 }
2227         }
2228         if(p.isDeleting()){
2229                 freeBlock();
2230         }
2231 }
2232
2233 //------------------ External Interface -----------------
2234 ampiParent *getAmpiParent(void) {
2235   ampiParent *p = CtvAccess(ampiPtr);
2236 #ifndef CMK_OPTIMIZE
2237   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
2238 #endif
2239   return p;
2240 }
2241
2242 ampi *getAmpiInstance(MPI_Comm comm) {
2243   ampi *ptr=getAmpiParent()->comm2ampi(comm);
2244 #ifndef CMK_OPTIMIZE
2245   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
2246 #endif
2247   return ptr;
2248 }
2249
2250 inline static AmpiRequestList *getReqs(void) {
2251   return &(getAmpiParent()->ampiReqs);
2252 }
2253
2254 inline void checkComm(MPI_Comm comm){
2255 #ifndef CMK_OPTIMIZE
2256   getAmpiParent()->checkComm(comm);
2257 #endif
2258 }
2259
2260 inline void checkRequest(MPI_Request req){
2261 #ifndef CMK_OPTIMIZE
2262   getReqs()->checkRequest(req);
2263 #endif
2264 }
2265
2266 inline void checkRequests(int n, MPI_Request* reqs){
2267 #ifndef CMK_OPTIMIZE
2268   AmpiRequestList* reqlist = getReqs();
2269   for(int i=0;i<n;i++)
2270     reqlist->checkRequest(reqs[i]);
2271 #endif
2272 }
2273
2274 CDECL void AMPI_Migrate(void)
2275 {
2276 //  AMPIAPI("AMPI_Migrate");
2277 #if 0
2278 #if CMK_BLUEGENE_CHARM
2279   TRACE_BG_AMPI_SUSPEND();
2280 #endif
2281 #endif
2282   TCHARM_Migrate();
2283
2284 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2285     ampi *currentAmpi = getAmpiInstance(MPI_COMM_WORLD);
2286     CpvAccess(_currentObj) = currentAmpi;
2287 #endif
2288
2289 #if CMK_BLUEGENE_CHARM
2290 //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2291   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
2292 #endif
2293 }
2294
2295
2296 CDECL void AMPI_Evacuate(void)
2297 {
2298   TCHARM_Evacuate();
2299 }
2300
2301
2302
2303 CDECL void AMPI_Migrateto(int destPE)
2304 {
2305   AMPIAPI("AMPI_MigrateTo");
2306 #if 0
2307 #if CMK_BLUEGENE_CHARM
2308   TRACE_BG_AMPI_SUSPEND();
2309 #endif
2310 #endif
2311   TCHARM_Migrate_to(destPE);
2312 #if CMK_BLUEGENE_CHARM
2313   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
2314   TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
2315 #endif
2316 }
2317
2318 CDECL void AMPI_MigrateTo(int destPE)
2319 {
2320         AMPI_Migrateto(destPE);
2321 }
2322
2323 CDECL void AMPI_Async_Migrate(void)
2324 {
2325   AMPIAPI("AMPI_Async_Migrate");
2326 #if 0
2327 #if CMK_BLUEGENE_CHARM
2328   TRACE_BG_AMPI_SUSPEND();
2329 #endif
2330 #endif
2331   TCHARM_Async_Migrate();
2332 #if CMK_BLUEGENE_CHARM
2333   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2334   TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
2335 #endif
2336 }
2337
2338 CDECL void AMPI_Allow_Migrate(void)
2339 {
2340   AMPIAPI("AMPI_Allow_Migrate");
2341 #if 0
2342 #if CMK_BLUEGENE_CHARM
2343   TRACE_BG_AMPI_SUSPEND();
2344 #endif
2345 #endif
2346   TCHARM_Allow_Migrate();
2347 #if CMK_BLUEGENE_CHARM
2348   TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
2349 #endif
2350 }
2351
2352 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
2353 #if CMK_LBDB_ON
2354   //AMPIAPI("AMPI_Setmigratable");
2355   ampi *ptr=getAmpiInstance(comm);
2356   ptr->setMigratable(mig);
2357 #else
2358   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
2359 #endif
2360 }
2361
2362 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
2363 {
2364     //AMPIAPI("AMPI_Init");
2365   if (nodeinit_has_been_called) {
2366     AMPIAPI("AMPI_Init");
2367     char **argv;
2368     if (p_argv) argv=*p_argv;
2369     else argv=CkGetArgv();
2370     ampiInit(argv);
2371     if (p_argc) *p_argc=CmiGetArgc(argv);
2372   }
2373   else
2374   { /* Charm hasn't been started yet! */
2375     CkAbort("AMPI_Init> Charm is not initialized!");
2376   }
2377
2378   return 0;
2379 }
2380
2381 CDECL int AMPI_Initialized(int *isInit)
2382 {
2383   if (nodeinit_has_been_called) {
2384         AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2385         *isInit=CtvAccess(ampiInitDone);
2386   }
2387   else /* !nodeinit_has_been_called */ {
2388         *isInit=nodeinit_has_been_called;
2389   }
2390   return 0;
2391 }
2392
2393 CDECL int AMPI_Finalized(int *isFinalized)
2394 {
2395     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2396     *isFinalized=CtvAccess(ampiFinalized);
2397     return 0;
2398 }
2399
2400 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
2401 {
2402   //AMPIAPI("AMPI_Comm_rank");
2403
2404 #ifdef AMPIMSGLOG
2405   ampiParent* pptr = getAmpiParent();
2406   if(msgLogRead){
2407     PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
2408     return 0;
2409   }
2410 #endif
2411
2412   *rank = getAmpiInstance(comm)->getRank(comm);
2413
2414 #ifdef AMPIMSGLOG
2415   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2416     PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
2417   }
2418 #endif
2419   return 0;
2420 }
2421
2422 CDECL
2423 int AMPI_Comm_size(MPI_Comm comm, int *size)
2424 {
2425   //AMPIAPI("AMPI_Comm_size");
2426 #ifdef AMPIMSGLOG
2427   ampiParent* pptr = getAmpiParent();
2428   if(msgLogRead){
2429     PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
2430     return 0;
2431   }
2432 #endif
2433
2434   *size = getAmpiInstance(comm)->getSize(comm);
2435
2436 #ifdef AMPIMSGLOG
2437   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2438     PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
2439   }
2440 #endif
2441
2442   return 0;
2443 }
2444
2445 CDECL
2446 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
2447 {
2448   AMPIAPI("AMPI_Comm_compare");
2449   if(comm1==comm2) *result=MPI_IDENT;
2450   else{
2451     int equal=1;
2452     CkVec<int> ind1, ind2;
2453     ind1 = getAmpiInstance(comm1)->getIndices();
2454     ind2 = getAmpiInstance(comm2)->getIndices();
2455     if(ind1.size()==ind2.size()){
2456       for(int i=0;i<ind1.size();i++)
2457         if(ind1[i] != ind2[i]) { equal=0; break; }
2458     }
2459     if(equal==1) *result=MPI_CONGRUENT;
2460     else *result=MPI_UNEQUAL;
2461   }
2462   return 0;
2463 }
2464
2465 CDECL void AMPI_Exit(int /*exitCode*/)
2466 {
2467   AMPIAPI("AMPI_Exit");
2468   TCHARM_Done();
2469 }
2470 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
2471 {
2472   AMPI_Exit(*exitCode);
2473 }
2474
2475 CDECL
2476 int AMPI_Finalize(void)
2477 {
2478   AMPIAPI("AMPI_Finalize");
2479 #if PRINT_IDLE
2480   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
2481 #endif
2482 #if AMPI_COUNTER
2483     getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
2484 #endif
2485   CtvAccess(ampiFinalized)=1;
2486
2487 #if CMK_BLUEGENE_CHARM
2488 #if 0
2489   TRACE_BG_AMPI_SUSPEND();
2490 #endif
2491 #if CMK_TRACE_IN_CHARM
2492   if(CpvAccess(traceOn)) traceSuspend();
2493 #endif
2494 #endif
2495
2496 //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
2497   AMPI_Exit(0);
2498   return 0;
2499 }
2500
2501 CDECL
2502 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
2503                         int tag, MPI_Comm comm)
2504 {
2505   AMPIAPI("AMPI_Send");
2506 #ifdef AMPIMSGLOG
2507   if(msgLogRead){
2508     return 0;
2509   }
2510 #endif
2511
2512   ampi *ptr = getAmpiInstance(comm);
2513 #if AMPI_COMLIB
2514   if(enableStreaming){  
2515 //    ptr->getStreaming().beginIteration();
2516     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2517   } else
2518 #endif
2519     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
2520 #if AMPI_COUNTER
2521   getAmpiParent()->counters.send++;
2522 #endif
2523 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2524 //  ptr->yield();
2525 //  //  processRemoteMlogMessages();
2526 #endif
2527   return 0;
2528 }
2529
2530 CDECL
2531 int AMPI_Ssend(void *msg, int count, MPI_Datatype type, int dest,
2532                         int tag, MPI_Comm comm)
2533 {
2534   AMPIAPI("AMPI_Ssend");
2535 #ifdef AMPIMSGLOG
2536   if(msgLogRead){
2537     return 0;
2538   }
2539 #endif
2540
2541   ampi *ptr = getAmpiInstance(comm);
2542 #if AMPI_COMLIB
2543   if(enableStreaming){
2544     ptr->getStreaming().beginIteration();
2545     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2546   } else
2547 #endif
2548     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm, 1);
2549 #if AMPI_COUNTER
2550   getAmpiParent()->counters.send++;
2551 #endif
2552
2553   return 0;
2554 }
2555
2556 CDECL
2557 int AMPI_Issend(void *buf, int count, MPI_Datatype type, int dest,
2558               int tag, MPI_Comm comm, MPI_Request *request)
2559 {
2560   AMPIAPI("AMPI_Issend");
2561
2562 #ifdef AMPIMSGLOG
2563   ampiParent* pptr = getAmpiParent();
2564   if(msgLogRead){
2565     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
2566     return 0;
2567   }
2568 #endif
2569
2570   USER_CALL_DEBUG("AMPI_Issend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
2571   ampi *ptr = getAmpiInstance(comm);
2572   AmpiRequestList* reqs = getReqs();
2573   SReq *newreq = new SReq(comm);
2574   *request = reqs->insert(newreq);
2575     // 1:  blocking now  - used by MPI_Ssend
2576     // >=2:  the index of the requests - used by MPI_Issend
2577   ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm, *request+2);
2578 #if AMPI_COUNTER
2579   getAmpiParent()->counters.isend++;
2580 #endif
2581
2582 #ifdef AMPIMSGLOG
2583   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2584     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
2585   }
2586 #endif
2587
2588   return 0;
2589 }
2590
2591 CDECL
2592 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
2593               MPI_Comm comm, MPI_Status *status)
2594 {
2595   AMPIAPI("AMPI_Recv");
2596
2597 #ifdef AMPIMSGLOG
2598   ampiParent* pptr = getAmpiParent();
2599   if(msgLogRead){
2600     (*(pptr->fromPUPer))|(pptr->pupBytes);
2601     PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
2602     PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
2603     return 0;
2604   }
2605 #endif
2606
2607   ampi *ptr = getAmpiInstance(comm);
2608   if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
2609   
2610 #if AMPI_COUNTER
2611   getAmpiParent()->counters.recv++;
2612 #endif
2613  
2614 #ifdef AMPIMSGLOG
2615   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2616     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2617     (*(pptr->toPUPer))|(pptr->pupBytes);
2618     PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
2619     PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
2620   }
2621 #endif
2622   
2623   return 0;
2624 }
2625
2626 CDECL
2627 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
2628 {
2629   AMPIAPI("AMPI_Probe");
2630   ampi *ptr = getAmpiInstance(comm);
2631   ptr->probe(tag,src, comm, (int*) status);
2632   return 0;
2633 }
2634
2635 CDECL
2636 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
2637 {
2638   AMPIAPI("AMPI_Iprobe");
2639   ampi *ptr = getAmpiInstance(comm);
2640   *flag = ptr->iprobe(tag,src,comm,(int*) status);
2641   return 0;
2642 }
2643
2644 CDECL
2645 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
2646                   int stag, void *rbuf, int rcount, int rtype,
2647                   int src, int rtag, MPI_Comm comm, MPI_Status *sts)
2648 {
2649   AMPIAPI("AMPI_Sendrecv");
2650   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
2651   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
2652   if (se) return se;
2653   else return re;
2654 }
2655
2656 CDECL
2657 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
2658                          int dest, int sendtag, int source, int recvtag,
2659                          MPI_Comm comm, MPI_Status *status)
2660 {
2661   AMPIAPI("AMPI_Sendrecv_replace");
2662   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
2663                       buf, count, datatype, source, recvtag, comm, status);
2664 }
2665
2666
2667 CDECL
2668 int AMPI_Barrier(MPI_Comm comm)
2669 {
2670   AMPIAPI("AMPI_Barrier");
2671
2672   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
2673
2674   TRACE_BG_AMPI_LOG(1, 0);
2675
2676   //HACK: Use collective operation as a barrier.
2677   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
2678
2679   //BIGSIM_OOC DEBUGGING
2680   //CkPrintf("%d: in AMPI_Barrier, after AMPI_Allreduce\n", getAmpiParent()->thisIndex);
2681 #if AMPI_COUNTER
2682   getAmpiParent()->counters.barrier++;
2683 #endif
2684   return 0;
2685 }
2686
2687 CDECL
2688 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
2689                          MPI_Comm comm)
2690 {
2691   AMPIAPI("AMPI_Bcast");
2692   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
2693   if(comm==MPI_COMM_SELF) return 0;
2694
2695 #ifdef AMPIMSGLOG
2696   ampiParent* pptr = getAmpiParent();
2697   if(msgLogRead){
2698     (*(pptr->fromPUPer))|(pptr->pupBytes);
2699     PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
2700     return 0;
2701   }
2702 #endif
2703
2704   ampi* ptr = getAmpiInstance(comm);
2705   ptr->bcast(root, buf, count, type,comm);
2706 #if AMPI_COUNTER
2707   getAmpiParent()->counters.bcast++;
2708 #endif
2709
2710 #ifdef AMPIMSGLOG
2711   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2712     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2713     (*(pptr->toPUPer))|(pptr->pupBytes);
2714     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
2715   }
2716 #endif
2717 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2718 //  ptr->yield();
2719 //  //  processRemoteMlogMessages();
2720 #endif
2721
2722   return 0;
2723 }
2724
2725 /// This routine is called with the results of a Reduce or AllReduce
2726 const int MPI_REDUCE_SOURCE=0;
2727 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2728 void ampi::reduceResult(CkReductionMsg *msg)
2729 {
2730         MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2731   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2732              thisArrayID,thisIndex);
2733   delete msg;
2734 }
2735
2736 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2737 {
2738   int szdata = ddt->getSize(count);
2739   int szhdr = sizeof(AmpiOpHeader);
2740   AmpiOpHeader newhdr(op,type,count,szdata); 
2741   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2742   memcpy(msg->getData(),&newhdr,szhdr);
2743   TCharm::activateVariable(inbuf);
2744   ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2745   TCharm::deactivateVariable(inbuf);
2746   return msg;
2747 }
2748
2749 // Copy the MPI datatype "type" from inbuf to outbuf
2750 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2751   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2752   //  deserializing into the output.
2753   ampi *ptr = getAmpiInstance(comm);
2754   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2755   int len=ddt->getSize(count);
2756   char *serialized=new char[len];
2757   TCharm::activateVariable(inbuf);
2758   TCharm::activateVariable(outbuf);
2759   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
2760   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
2761   TCharm::deactivateVariable(outbuf);
2762   TCharm::deactivateVariable(inbuf);
2763   delete [] serialized;         // < memory leak!  // gzheng 
2764   
2765   return MPI_SUCCESS;
2766 }
2767
2768 CDECL
2769 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
2770                int root, MPI_Comm comm)
2771 {
2772   AMPIAPI("AMPI_Reduce");
2773   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2774
2775 #ifdef AMPIMSGLOG
2776   ampiParent* pptr = getAmpiParent();
2777   if(msgLogRead){
2778     (*(pptr->fromPUPer))|(pptr->pupBytes);
2779     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
2780     return 0;
2781   }
2782 #endif
2783   
2784   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
2785   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
2786   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
2787
2788   ampi *ptr = getAmpiInstance(comm);
2789   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
2790   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
2791   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
2792
2793   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2794
2795   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
2796   msg->setCallback(reduceCB);
2797         MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
2798   ptr->contribute(msg);
2799   if (ptr->thisIndex == rootIdx){
2800     /*HACK: Use recv() to block until reduction data comes back*/
2801     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2802       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
2803   }
2804 #if AMPI_COUNTER
2805   getAmpiParent()->counters.reduce++;
2806 #endif
2807
2808 #ifdef AMPIMSGLOG
2809   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2810     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2811     (*(pptr->toPUPer))|(pptr->pupBytes);
2812     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
2813   }
2814 #endif
2815   
2816   return 0;
2817 }
2818
2819 CDECL
2820 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
2821                   MPI_Op op, MPI_Comm comm)
2822 {
2823   AMPIAPI("AMPI_Allreduce");
2824   ampi *ptr = getAmpiInstance(comm);
2825  
2826   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
2827   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
2828   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
2829   
2830   CkDDT_DataType *ddt_type = ptr->getDDT()->getType(type);
2831
2832   TRACE_BG_AMPI_LOG(2, count * ddt_type->getSize());
2833
2834   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2835
2836 #ifdef AMPIMSGLOG
2837   ampiParent* pptr = getAmpiParent();
2838   if(msgLogRead){
2839     (*(pptr->fromPUPer))|(pptr->pupBytes);
2840     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
2841     //    CkExit();
2842     return 0;
2843   }
2844 #endif
2845
2846   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
2847   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
2848
2849   CkReductionMsg *msg=makeRednMsg(ddt_type, inbuf, count, type, op);
2850   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2851   msg->setCallback(allreduceCB);
2852   ptr->contribute(msg);
2853
2854   /*HACK: Use recv() to block until the reduction data comes back*/
2855   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2856     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
2857 #if AMPI_COUNTER
2858   getAmpiParent()->counters.allreduce++;
2859 #endif
2860
2861 #ifdef AMPIMSGLOG
2862   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2863     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2864     (*(pptr->toPUPer))|(pptr->pupBytes);
2865     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
2866     //    CkExit();
2867   }
2868 #endif
2869
2870   return 0;
2871 }
2872
2873 CDECL
2874 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
2875                    MPI_Op op, MPI_Comm comm, MPI_Request* request)
2876 {
2877   AMPIAPI("AMPI_Iallreduce");
2878   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2879
2880   checkRequest(*request);
2881   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
2882   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
2883   ampi *ptr = getAmpiInstance(comm);
2884
2885   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2886   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2887   msg->setCallback(allreduceCB);
2888   ptr->contribute(msg);
2889
2890   // using irecv instead recv to non-block the call and get request pointer
2891   AmpiRequestList* reqs = getReqs();
2892   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
2893   *request = reqs->insert(newreq);
2894   return 0;
2895 }
2896
2897 CDECL
2898 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
2899                        MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
2900 {
2901   AMPIAPI("AMPI_Reduce_scatter");
2902   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
2903   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
2904   ampi *ptr = getAmpiInstance(comm);
2905   int size = ptr->getSize(comm);
2906   int count=0;
2907   int *displs = new int [size];
2908   int len;
2909   void *tmpbuf;
2910
2911   //under construction
2912   for(int i=0;i<size;i++){
2913     displs[i] = count;
2914     count+= recvcounts[i];
2915   }
2916   len = ptr->getDDT()->getType(datatype)->getSize(count);
2917   tmpbuf = malloc(len);
2918   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
2919   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
2920                recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
2921   free(tmpbuf);
2922   delete [] displs;     // < memory leak ! // gzheng
2923   return 0;
2924 }
2925
2926 /***** MPI_Scan algorithm (from MPICH) *******
2927    recvbuf = sendbuf;
2928    partial_scan = sendbuf;
2929    mask = 0x1;
2930    while (mask < size) {
2931       dst = rank^mask;
2932       if (dst < size) {
2933          send partial_scan to dst;
2934          recv from dst into tmp_buf;
2935          if (rank > dst) {
2936             partial_scan = tmp_buf + partial_scan;
2937             recvbuf = tmp_buf + recvbuf;
2938          }
2939          else {
2940             if (op is commutative)
2941                partial_scan = tmp_buf + partial_scan;
2942             else {
2943                tmp_buf = partial_scan + tmp_buf;
2944                partial_scan = tmp_buf;
2945             }
2946          }
2947       }
2948       mask <<= 1;
2949    }
2950  ***** MPI_Scan algorithm (from MPICH) *******/
2951
2952 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
2953   (op)(invec,inoutvec,&count,&datatype);
2954 }
2955 CDECL
2956 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
2957   AMPIAPI("AMPI_Scan");
2958   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
2959   MPI_Status sts;
2960   ampi *ptr = getAmpiInstance(comm);
2961   int size = ptr->getSize(comm);
2962   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
2963   int rank = ptr->getRank(comm);
2964   int mask = 0x1;
2965   int dst;
2966   void* tmp_buf = malloc(blklen);
2967   void* partial_scan = malloc(blklen);
2968   
2969   memcpy(recvbuf, sendbuf, blklen);
2970   memcpy(partial_scan, sendbuf, blklen);
2971   while(mask < size){
2972     dst = rank^mask;
2973     if(dst < size){
2974       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
2975                    tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
2976       if(rank > dst){
2977         (op)(tmp_buf,partial_scan,&count,&datatype);
2978         (op)(tmp_buf,recvbuf,&count,&datatype);
2979       }else {
2980         (op)(partial_scan,tmp_buf,&count,&datatype);
2981         memcpy(partial_scan,tmp_buf,blklen);
2982       }
2983     }
2984     mask <<= 1;
2985
2986   }
2987
2988   free(tmp_buf);
2989   free(partial_scan);
2990 #if AMPI_COUNTER
2991   getAmpiParent()->counters.scan++;
2992 #endif
2993   return 0;
2994 }
2995
2996 CDECL
2997 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
2998   //AMPIAPI("AMPI_Op_create");
2999   *op = function;
3000   return 0;
3001 }
3002
3003 CDECL
3004 int AMPI_Op_free(MPI_Op *op){
3005   //AMPIAPI("AMPI_Op_free");
3006   *op = MPI_OP_NULL;
3007   return 0;
3008 }
3009
3010
3011 CDECL
3012 double AMPI_Wtime(void)
3013 {
3014 //  AMPIAPI("AMPI_Wtime");
3015
3016 #ifdef AMPIMSGLOG
3017   double ret=TCHARM_Wall_timer();
3018   ampiParent* pptr = getAmpiParent();
3019   if(msgLogRead){
3020     (*(pptr->fromPUPer))|ret;
3021     return ret;
3022   }
3023
3024   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3025     (*(pptr->toPUPer))|ret;
3026   }
3027 #endif
3028
3029 #if CMK_BLUEGENE_CHARM
3030   return BgGetTime();
3031 #else
3032   return TCHARM_Wall_timer();
3033 #endif
3034 }
3035
3036 CDECL
3037 double AMPI_Wtick(void){
3038   //AMPIAPI("AMPI_Wtick");
3039   return 1e-6;
3040 }
3041
3042 int PersReq::start(){
3043   if(sndrcv == 1 || sndrcv == 3) { // send or ssend request
3044     ampi *ptr=getAmpiInstance(comm);
3045     ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm, sndrcv==3?1:0);
3046   }
3047   return 0;
3048 }
3049
3050 CDECL
3051 int AMPI_Start(MPI_Request *request)
3052 {
3053   AMPIAPI("AMPI_Start");
3054   checkRequest(*request);
3055   AmpiRequestList *reqs = getReqs();
3056   if(-1==(*reqs)[*request]->start()){
3057     CkAbort("MPI_Start could be used only on persistent communication requests!");
3058   }
3059   return 0;
3060 }
3061
3062 CDECL
3063 int AMPI_Startall(int count, MPI_Request *requests){
3064   AMPIAPI("AMPI_Startall");
3065   checkRequests(count,requests);
3066   AmpiRequestList *reqs = getReqs();
3067   for(int i=0;i<count;i++){
3068     if(-1==(*reqs)[requests[i]]->start())
3069       CkAbort("MPI_Start could be used only on persistent communication requests!");
3070   }
3071   return 0;
3072 }
3073
3074 /* organize the indices of requests into a vector of a vector: 
3075  * level 1 is different msg envelope matches
3076  * level 2 is (posting) ordered requests of with envelope
3077  * each time multiple completion call loop over first elem of level 1
3078  * and move the matched to the NULL request slot.   
3079  * warning: this does not work with I-Alltoall requests */
3080 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
3081   for(int i=0;i<count;i++){
3082     if(reqs[i]!=MPI_REQUEST_NULL)
3083       return 0;
3084   }
3085   return 1;
3086 }
3087 inline int matchReq(MPI_Request ia, MPI_Request ib){
3088   checkRequest(ia);  
3089   checkRequest(ib);
3090   AmpiRequestList* reqs = getReqs();
3091   AmpiRequest *a, *b;
3092   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
3093   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
3094   a=(*reqs)[ia];  b=(*reqs)[ib];
3095   if(a->tag != b->tag) return 0;
3096   if(a->src != b->src) return 0;
3097   if(a->comm != b->comm) return 0;
3098   return 1;
3099 }
3100 inline void swapInt(int& a,int& b){
3101   int tmp;
3102   tmp=a; a=b; b=tmp;
3103 }
3104 inline void sortedIndex(int n, int* arr, int* idx){
3105   int i,j;
3106   for(i=0;i<n;i++) 
3107     idx[i]=i;
3108   for (i=0; i<n-1; i++) 
3109     for (j=0; j<n-1-i; j++)
3110       if (arr[idx[j+1]] < arr[idx[j]]) 
3111         swapInt(idx[j+1],idx[j]);
3112 }
3113 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
3114   CkAssert(count!=0);
3115   int *newidx = new int [count];
3116   int flag;
3117   sortedIndex(count,arr,newidx);
3118   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
3119   CkVec<int> slot;
3120   vec->push_back(slot);
3121   (*vec)[0].push_back(newidx[0]);
3122   for(int i=1;i<count;i++){
3123     flag=0;
3124     for(int j=0;j<vec->size();j++){
3125       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
3126         ((*vec)[j]).push_back(newidx[i]);
3127         flag++;
3128       }
3129     }
3130     if(!flag){
3131       CkVec<int> newslot;
3132       newslot.push_back(newidx[i]);
3133       vec->push_back(newslot);
3134     }else{
3135       CkAssert(flag==1);
3136     }
3137   }
3138   delete [] newidx;
3139   return vec;
3140 }
3141 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
3142   printf("vec content: ");
3143   for(int i=0;i<vec.size();i++){
3144     printf("{");
3145     for(int j=0;j<(vec[i]).size();j++){
3146       printf(" %d ",arr[(vec[i])[j]]);
3147     }
3148     printf("} ");
3149   }
3150   printf("\n");
3151 }
3152
3153 int PersReq::wait(MPI_Status *sts){
3154         if(sndrcv == 2) {
3155                 if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3156                         CkAbort("AMPI> Error in persistent request wait");
3157 #if CMK_BLUEGENE_CHARM
3158                 _TRACE_BG_TLINE_END(&event);
3159 #endif
3160         }
3161         return 0;
3162 }
3163
3164 int IReq::wait(MPI_Status *sts){
3165     if(CpvAccess(CmiPICMethod) == 2) {
3166         AMPI_DEBUG("In weird clause of IReq::wait\n");
3167         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3168             CkAbort("AMPI> Error in non-blocking request wait");
3169
3170         return 0;
3171     }
3172
3173     //Copy "this" to a local variable in the case that "this" pointer
3174     //is updated during the out-of-core emulation.
3175
3176     // optimization for Irecv
3177     // generic() writes directly to the buffer, so the only thing we
3178     // do here is to wait
3179     ampi *ptr = getAmpiInstance(comm);
3180
3181     //BIGSIM_OOC DEBUGGING
3182     //int ooccnt=0;
3183     //int ampiIndex = ptr->thisIndex;
3184     //CmiPrintf("%d: IReq's status=%d\n", ampiIndex, statusIreq);
3185         
3186     while (statusIreq == false) {
3187         //BIGSIM_OOC DEBUGGING
3188         //CmiPrintf("Before blocking: %dth time: %d: in Ireq::wait\n", ++ooccnt, ptr->thisIndex);
3189         //print();
3190
3191         ptr->resumeOnRecv=true;
3192         ptr->block();
3193                         
3194         //BIGSIM_OOC DEBUGGING
3195         //CmiPrintf("[%d] After blocking: in Ireq::wait\n", ptr->thisIndex);
3196         //CmiPrintf("IReq's this pointer: %p\n", this);
3197         //print();
3198
3199 #if CMK_BLUEGENE_CHARM
3200         //Because of the out-of-core emulation, this pointer is changed after in-out
3201         //memory operation. So we need to return from this function and do the while loop
3202         //in the outer function call.   
3203         if(_BgInOutOfCoreMode)
3204             return -1;
3205 #endif  
3206     }   // end of while
3207     ptr->resumeOnRecv=false;
3208
3209     AMPI_DEBUG("IReq::wait has resumed\n");
3210
3211     if(sts) {
3212         AMPI_DEBUG("Setting sts->MPI_TAG to this->tag=%d in IReq::wait  this=%p\n", (int)this->tag, this);
3213         sts->MPI_TAG = tag;
3214         sts->MPI_SOURCE = src;
3215         sts->MPI_COMM = comm;
3216         sts->MPI_LENGTH = length;
3217     }
3218
3219     return 0;
3220 }
3221
3222 int ATAReq::wait(MPI_Status *sts){
3223         int i;
3224         for(i=0;i<count;i++){
3225                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3226                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
3227                         CkAbort("AMPI> Error in alltoall request wait");
3228 #if CMK_BLUEGENE_CHARM
3229                 _TRACE_BG_TLINE_END(&myreqs[i].event);
3230 #endif
3231         }
3232 #if CMK_BLUEGENE_CHARM
3233         //TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", NULL, 0);
3234         TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0, 1);
3235         for (i=0; i<count; i++)
3236           _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
3237         _TRACE_BG_TLINE_END(&event);
3238 #endif
3239         return 0;
3240 }
3241
3242 int SReq::wait(MPI_Status *sts){
3243         ampi *ptr = getAmpiInstance(comm);
3244         while (statusIreq == false) {
3245           ptr->resumeOnRecv = true;
3246           ptr->block();
3247           ptr = getAmpiInstance(comm);
3248           ptr->resumeOnRecv = false;
3249         }
3250         return 0;
3251 }
3252
3253 CDECL
3254 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
3255 {
3256
3257
3258   AMPIAPI("AMPI_Wait");
3259   if(*request == MPI_REQUEST_NULL){
3260     stsempty(*sts);
3261     return 0;
3262   }
3263   checkRequest(*request);
3264   AmpiRequestList* reqs = getReqs();
3265
3266 #ifdef AMPIMSGLOG
3267   ampiParent* pptr = getAmpiParent();
3268   if(msgLogRead){
3269     (*(pptr->fromPUPer))|(pptr->pupBytes);
3270     PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3271     PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
3272     return 0;
3273   }
3274 #endif
3275
3276   AMPI_DEBUG("AMPI_Wait request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3277   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3278   //(*reqs)[*request]->wait(sts);
3279   int waitResult = -1;
3280   do{
3281     AmpiRequest *waitReq = (*reqs)[*request];
3282     waitResult = waitReq->wait(sts);
3283     if(_BgInOutOfCoreMode){
3284         reqs = getReqs();
3285     }
3286   }while(waitResult==-1);
3287
3288
3289   AMPI_DEBUG("AMPI_Wait after calling wait, request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3290
3291
3292 #ifdef AMPIMSGLOG
3293   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3294     (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
3295     (*(pptr->toPUPer))|(pptr->pupBytes);
3296     PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3297     PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
3298   }
3299 #endif
3300   
3301   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3302     reqs->free(*request);
3303     *request = MPI_REQUEST_NULL;
3304   }
3305
3306     AMPI_DEBUG("End of AMPI_Wait\n");
3307
3308   return 0;
3309 }
3310
3311 CDECL
3312 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
3313 {
3314   AMPIAPI("AMPI_Waitall");
3315   if(count==0) return MPI_SUCCESS;
3316   checkRequests(count,request);
3317   int i,j,oldPe;
3318   AmpiRequestList* reqs = getReqs();
3319   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3320
3321 #ifdef AMPIMSGLOG
3322   ampiParent* pptr = getAmpiParent();
3323   if(msgLogRead){
3324     for(i=0;i<reqvec->size();i++){
3325       for(j=0;j<((*reqvec)[i]).size();j++){
3326         if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3327           stsempty(sts[((*reqvec)[i])[j]]);
3328           continue;
3329         }
3330         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3331         (*(pptr->fromPUPer))|(pptr->pupBytes);
3332         PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3333         PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3334       }
3335     }
3336     return 0;
3337   }
3338 #endif
3339
3340 #if CMK_BLUEGENE_CHARM
3341   void *curLog;         // store current log in timeline
3342   _TRACE_BG_TLINE_END(&curLog);
3343 #if 0
3344   TRACE_BG_AMPI_SUSPEND();
3345 #endif
3346 #endif
3347   for(i=0;i<reqvec->size();i++){
3348     for(j=0;j<((*reqvec)[i]).size();j++){
3349       //CkPrintf("[%d] in loop [%d, %d]\n", pptr->thisIndex,i, j);
3350       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3351         stsempty(sts[((*reqvec)[i])[j]]);
3352         continue;
3353       }
3354       oldPe = CkMyPe();
3355
3356       int waitResult = -1;
3357       do{       
3358         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3359         waitResult = waitReq->wait(&sts[((*reqvec)[i])[j]]);
3360         if(_BgInOutOfCoreMode){
3361             reqs = getReqs();
3362             reqvec = vecIndex(count, request);
3363         }
3364       }while(waitResult==-1);
3365
3366 #ifdef AMPIMSGLOG
3367       if(msgLogWrite && pptr->thisIndex == msgLogRank){
3368         (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
3369         (*(pptr->toPUPer))|(pptr->pupBytes);
3370         PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3371         PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3372       }
3373 #endif
3374     
3375 #if 1
3376 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3377             //for fault evacuation
3378       if(oldPe != CkMyPe()){
3379 #endif
3380                         reqs = getReqs();
3381                         reqvec  = vecIndex(count,request);
3382 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3383             }
3384 #endif
3385 #endif
3386     }
3387   }
3388 #if CMK_BLUEGENE_CHARM
3389   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
3390 #endif
3391   // free memory of requests
3392   for(i=0;i<count;i++){ 
3393     if(request[i] == MPI_REQUEST_NULL)
3394       continue;
3395     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
3396       reqs->free(request[i]);
3397       request[i] = MPI_REQUEST_NULL;
3398     }
3399   }
3400   delete reqvec;
3401   return 0;
3402 }
3403
3404 CDECL
3405 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
3406 {
3407   AMPIAPI("AMPI_Waitany");
3408   
3409   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
3410   checkRequests(count,request);
3411   if(areInactiveReqs(count,request)){
3412     *idx=MPI_UNDEFINED;
3413     stsempty(*sts);
3414     return MPI_SUCCESS;
3415   }
3416   int flag=0;
3417   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3418   while(count>0){ /* keep looping until some request finishes: */
3419     for(int i=0;i<reqvec->size();i++){
3420       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
3421       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
3422         *idx = ((*reqvec)[i])[0];
3423         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
3424         return 0;
3425       }
3426     }
3427     /* no requests have finished yet-- schedule and try again */
3428     AMPI_Yield(MPI_COMM_WORLD);
3429   }
3430   *idx = MPI_UNDEFINED;
3431   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
3432         delete reqvec;
3433   return 0;
3434 }
3435
3436 CDECL
3437 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
3438                  int *array_of_indices, MPI_Status *array_of_statuses)
3439 {
3440   AMPIAPI("AMPI_Waitsome");
3441   checkRequests(incount,array_of_requests);
3442   if(areInactiveReqs(incount,array_of_requests)){
3443     *outcount=MPI_UNDEFINED;
3444     return MPI_SUCCESS;
3445   }
3446   MPI_Status sts;
3447   int i;
3448   int flag=0, realflag=0;
3449   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3450   *outcount = 0;
3451   while(1){
3452     for(i=0;i<reqvec->size();i++){
3453       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3454       if(flag == 1){ 
3455         array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3456         array_of_statuses[(*outcount)++]=sts;
3457         if(sts.MPI_COMM != 0)
3458           realflag=1; // there is real(non null) request
3459       }
3460     }
3461     if(realflag && outcount>0) break;
3462   }
3463         delete reqvec;
3464   return 0;
3465 }
3466
3467 CmiBool PersReq::test(MPI_Status *sts){
3468         if(sndrcv == 2)         // recv request
3469                 return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3470         else                    // send request
3471                 return 1;
3472
3473 }
3474 void PersReq::complete(MPI_Status *sts){
3475         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3476                 CkAbort("AMPI> Error in persistent request complete");
3477 }
3478
3479 CmiBool IReq::test(MPI_Status *sts){
3480         if (statusIreq == true) {           
3481           if(sts)
3482             sts->MPI_LENGTH = length;           
3483           return true;
3484         }
3485         else {
3486           getAmpiInstance(comm)->yield();
3487           return false;
3488         }
3489 /*
3490         return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3491 */
3492 }
3493
3494 CmiBool SReq::test(MPI_Status *sts){
3495         if (statusIreq == true) {
3496           return true;
3497         }
3498         else {
3499           getAmpiInstance(comm)->yield();
3500           return false;
3501         }
3502 }
3503
3504 void IReq::complete(MPI_Status *sts){
3505         wait(sts);
3506 /*
3507         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3508                 CkAbort("AMPI> Error in non-blocking request complete");
3509 */
3510 }
3511
3512 void SReq::complete(MPI_Status *sts){
3513         wait(sts);
3514 }
3515
3516 void IReq::receive(ampi *ptr, AmpiMsg *msg)
3517 {
3518     int sts = ptr->processMessage(msg, tag, src, buf, count, type);
3519     statusIreq = (sts == 0);
3520     length = msg->length;
3521     this->tag = msg->tag; // Although not required, we also extract tag from msg
3522     src = msg->srcRank; // Although not required, we also extract src from msg
3523     comm = msg->comm;
3524     AMPI_DEBUG("Setting this->tag to %d in IReq::receive this=%p\n", (int)this->tag, this);
3525 #if CMK_BLUEGENE_CHARM
3526     event = msg->event; 
3527 #endif
3528     delete msg;
3529     
3530     //BIGSIM_OOC DEBUGGING
3531     //CmiPrintf("In IReq::receive, this=%p ", this);
3532     //print();
3533 }
3534
3535 CmiBool ATAReq::test(MPI_Status *sts){
3536         int i, flag=1;
3537         for(i=0;i<count;i++){
3538                 flag *= getAmpiInstance(myreqs[i].comm)->iprobe(myreqs[i].tag, myreqs[i].src,
3539                                         myreqs[i].comm, (int*) sts);
3540         }
3541         return flag;
3542 }
3543 void ATAReq::complete(MPI_Status *sts){
3544         int i;
3545         for(i=0;i<count;i++){
3546                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3547                                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int*)sts))
3548                         CkAbort("AMPI> Error in alltoall request complete");
3549         }
3550 }
3551
3552 CDECL
3553 int AMPI_Test(MPI_Request *request, int *flag, MPI_Status *sts)
3554 {
3555   AMPIAPI("AMPI_Test");
3556   checkRequest(*request);
3557   if(*request==MPI_REQUEST_NULL) {
3558     *flag = 1;
3559     stsempty(*sts);
3560     return 0;
3561   }
3562   AmpiRequestList* reqs = getReqs();
3563   if(1 == (*flag = (*reqs)[*request]->test(sts))){
3564     if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3565       (*reqs)[*request]->complete(sts);
3566       reqs->free(*request);
3567       *request = MPI_REQUEST_NULL;
3568     }
3569   }
3570   return 0;
3571 }
3572
3573 CDECL
3574 int AMPI_Testany(int count, MPI_Request *request, int *index, int *flag, MPI_Status *sts){
3575   AMPIAPI("AMPI_Testany");
3576   checkRequests(count,request);
3577   if(areInactiveReqs(count,request)){
3578     *flag=1;
3579     *index=MPI_UNDEFINED;
3580     stsempty(*sts);
3581     return MPI_SUCCESS;
3582   }
3583   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3584   *flag=0;
3585   for(int i=0;i<reqvec->size();i++){
3586     AMPI_Test(&request[((*reqvec)[i])[0]], flag, sts);
3587     if(*flag==1 && sts->MPI_COMM!=0){ // skip MPI_REQUEST_NULL
3588       *index = ((*reqvec)[i])[0];
3589       return 0;
3590     }
3591   }
3592   *index = MPI_UNDEFINED;
3593         delete reqvec;
3594   return 0;
3595 }
3596
3597 CDECL
3598 int AMPI_Testall(int count, MPI_Request *request, int *flag, MPI_Status *sts)
3599 {
3600   AMPIAPI("AMPI_Testall");
3601   if(count==0) return MPI_SUCCESS;
3602   checkRequests(count,request);
3603   int tmpflag;
3604   int i,j;
3605   AmpiRequestList* reqs = getReqs();
3606   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3607   *flag = 1;  
3608   for(i=0;i<reqvec->size();i++){
3609     for(j=0;j<((*reqvec)[i]).size();j++){
3610       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL)
3611         continue;
3612       tmpflag = (*reqs)[request[((*reqvec)[i])[j]]]->test(&sts[((*reqvec)[i])[j]]);
3613       *flag *= tmpflag;
3614     }
3615   }
3616   if(flag) 
3617     MPI_Waitall(count,request,sts);
3618         delete reqvec;  
3619   return 0;
3620 }
3621
3622 CDECL
3623 int AMPI_Testsome(int incount, MPI_Request *array_of_requests, int *outcount,
3624                  int *array_of_indices, MPI_Status *array_of_statuses)
3625 {
3626   AMPIAPI("AMPI_Testsome");
3627   checkRequests(incount,array_of_requests);
3628   if(areInactiveReqs(incount,array_of_requests)){
3629     *outcount=MPI_UNDEFINED;
3630     return MPI_SUCCESS;
3631   }
3632   MPI_Status sts;
3633   int flag;
3634   int i;
3635   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3636   *outcount = 0;
3637   for(i=0;i<reqvec->size();i++){
3638     AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3639     if(flag == 1){
3640       array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3641       array_of_statuses[(*outcount)++]=sts;
3642     }
3643   }
3644         delete reqvec;
3645   return 0;
3646 }
3647
3648 CDECL
3649 int AMPI_Request_free(MPI_Request *request){
3650   AMPIAPI("AMPI_Request_free");
3651   checkRequest(*request);
3652   if(*request==MPI_REQUEST_NULL) return 0;
3653   AmpiRequestList* reqs = getReqs();
3654   reqs->free(*request);
3655   return 0;
3656 }
3657
3658 CDECL
3659 int AMPI_Cancel(MPI_Request *request){
3660   AMPIAPI("AMPI_Cancel");
3661   return AMPI_Request_free(request);
3662 }
3663
3664 CDECL
3665 int AMPI_Test_cancelled(MPI_Status* status, int* flag) {
3666     /* FIXME: always returns success */
3667     *flag = 1;
3668     return 0;
3669 }
3670
3671 CDECL
3672 int AMPI_Recv_init(void *buf, int count, int type, int src, int tag,
3673                    MPI_Comm comm, MPI_Request *req)
3674 {
3675   AMPIAPI("AMPI_Recv_init");
3676   AmpiRequestList* reqs = getReqs();
3677   PersReq *newreq = new PersReq(buf,count,type,src,tag,comm,2);
3678   *req = reqs->insert(newreq);