a little utility to record start/end events.
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1 #define exit exit /*Supress definition of exit in ampi.h*/
2 #include "ampiimpl.h"
3 #include "tcharm.h"
4 #include "ampiEvents.h" /*** for trace generation for projector *****/
5 #include "ampiProjections.h"
6 #include "cktiming.h"
7
8 #define CART_TOPOL 1
9 #define AMPI_PRINT_IDLE 0
10
11 /* change this define to "x" to trace all send/recv's */
12 #define MSG_ORDER_DEBUG(x)  //x /* empty */
13 /* change this define to "x" to trace user calls */
14 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
15 #define STARTUP_DEBUG(x)  //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
16 #define FUNCCALL_DEBUG(x) //x /* empty */
17
18 static CkDDT *getDDT(void) {
19   return getAmpiParent()->myDDT;
20 }
21
22 //------------- startup -------------
23 static mpi_comm_worlds mpi_worlds;
24
25 int _mpi_nworlds; /*Accessed by ampif*/
26 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
27
28 /* ampiReducer: AMPI's generic reducer type 
29    MPI_Op is function pointer to MPI_User_function
30    so that it can be packed into AmpiOpHeader, shipped 
31    with the reduction message, and then plugged into 
32    the ampiReducer. 
33    One little trick is the ampi::recv which receives
34    the final reduction message will see additional
35    sizeof(AmpiOpHeader) bytes in the buffer before
36    any user data.                             */
37 class AmpiComplex { 
38 public: 
39         double re, im; 
40         void operator+=(const AmpiComplex &a) {
41                 re+=a.re;
42                 im+=a.im;
43         }
44         void operator*=(const AmpiComplex &a) {
45                 double nu_re=re*a.re-im*a.im;
46                 im=re*a.im+im*a.re;
47                 re=nu_re;
48         }
49         int operator>(const AmpiComplex &a) {
50                 CkAbort("Cannot compare complex numbers with MPI_MAX");
51                 return 0;
52         }
53         int operator<(const AmpiComplex &a) {
54                 CkAbort("Cannot compare complex numbers with MPI_MIN");
55                 return 0;
56         }
57 };
58 typedef struct { float val; int idx; } FloatInt;
59 typedef struct { double val; int idx; } DoubleInt;
60 typedef struct { long val; int idx; } LongInt;
61 typedef struct { int val; int idx; } IntInt;
62 typedef struct { short val; int idx; } ShortInt;
63 typedef struct { long double val; int idx; } LongdoubleInt;
64 typedef struct { float val; float idx; } FloatFloat;
65 typedef struct { double val; double idx; } DoubleDouble;
66
67
68 #define MPI_OP_SWITCH(OPNAME) \
69   int i; \
70   switch (*datatype) { \
71   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
72   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
73   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
74   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
75   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
76   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
77   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
78   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
79   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
80   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
81   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
82   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
83   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
84   default: \
85     ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
86     CmiAbort("Unsupported MPI datatype for MPI Op"); \
87   };\
88
89 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
90 #define MPI_OP_IMPL(type) \
91         if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
92   MPI_OP_SWITCH(MPI_MAX)
93 #undef MPI_OP_IMPL
94 }
95
96 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
97 #define MPI_OP_IMPL(type) \
98         if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
99   MPI_OP_SWITCH(MPI_MIN)
100 #undef MPI_OP_IMPL
101 }
102
103 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
104 #define MPI_OP_IMPL(type) \
105         ((type *)inoutvec)[i] += ((type *)invec)[i];
106   MPI_OP_SWITCH(MPI_SUM)
107 #undef MPI_OP_IMPL
108 }
109
110 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
111 #define MPI_OP_IMPL(type) \
112         ((type *)inoutvec)[i] *= ((type *)invec)[i];
113   MPI_OP_SWITCH(MPI_PROD)
114 #undef MPI_OP_IMPL
115 }
116
117 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
118   int i;  
119   switch (*datatype) {
120   case MPI_INT:
121   case MPI_LOGICAL:
122     for(i=0;i<(*len);i++)
123       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
124     break;
125   default:
126     ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
127     CmiAbort("exiting");
128   }
129 }
130 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
131   int i; 
132   switch (*datatype) {
133   case MPI_INT:
134     for(i=0;i<(*len);i++)
135       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
136     break;
137   case MPI_BYTE:
138     for(i=0;i<(*len);i++)
139       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
140     break;
141   default:
142     ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
143     CmiAbort("exiting");
144   }
145 }
146 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
147   int i;  
148   switch (*datatype) {
149   case MPI_INT:
150   case MPI_LOGICAL:
151     for(i=0;i<(*len);i++)
152       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
153     break;
154   default:
155     ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
156     CmiAbort("exiting");
157   }
158 }
159 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
160   int i;  
161   switch (*datatype) {
162   case MPI_INT:
163     for(i=0;i<(*len);i++)
164       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
165     break;
166   case MPI_BYTE:
167     for(i=0;i<(*len);i++)
168       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
169     break;
170   default:
171     ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
172     CmiAbort("exiting");
173   }
174 }
175 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
176   int i;  
177   switch (*datatype) {
178   case MPI_INT:
179   case MPI_LOGICAL:
180     for(i=0;i<(*len);i++)
181       ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
182     break;
183   default:
184     ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
185     CmiAbort("exiting");
186   }
187 }
188 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
189   int i;  
190   switch (*datatype) {
191   case MPI_INT:
192     for(i=0;i<(*len);i++)
193       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
194     break;
195   case MPI_BYTE:
196     for(i=0;i<(*len);i++)
197       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
198     break;
199         case MPI_UNSIGNED:
200     for(i=0;i<(*len);i++)
201       ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
202     break;
203   default:
204     ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
205     CmiAbort("exiting");
206   }
207 }
208
209 #ifndef MIN
210 #define MIN(a,b) (a < b ? a : b)
211 #endif
212
213 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
214   int i;  
215
216   switch (*datatype) {
217   case MPI_FLOAT_INT:
218     for(i=0;i<(*len);i++)
219       if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
220         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
221       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
222         ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
223     break;
224   case MPI_DOUBLE_INT:
225     for(i=0;i<(*len);i++)
226       if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
227         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
228       else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
229         ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
230
231     break;
232   case MPI_LONG_INT:
233     for(i=0;i<(*len);i++)
234       if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
235         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
236       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
237         ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
238     break;
239   case MPI_2INT:
240     for(i=0;i<(*len);i++)
241       if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
242         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
243       else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
244         ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
245     break;
246   case MPI_SHORT_INT:
247     for(i=0;i<(*len);i++)
248       if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
249         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
250       else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
251         ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
252     break;
253   case MPI_LONG_DOUBLE_INT:
254     for(i=0;i<(*len);i++)
255       if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
256         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
257       else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
258         ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
259     break;
260   case MPI_2FLOAT:
261     for(i=0;i<(*len);i++)
262       if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
263         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
264       else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
265         ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
266     break;
267   case MPI_2DOUBLE:
268     for(i=0;i<(*len);i++)
269       if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
270         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
271       else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
272         ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
273     break;
274   default:
275     ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
276     CmiAbort("exiting");
277   }
278 }
279 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
280   int i;  
281   switch (*datatype) {
282   case MPI_FLOAT_INT:
283     for(i=0;i<(*len);i++)
284       if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
285         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
286       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
287         ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
288     break;
289   case MPI_DOUBLE_INT:
290     for(i=0;i<(*len);i++)
291       if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
292         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
293       else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
294         ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
295     break;
296   case MPI_LONG_INT:
297     for(i=0;i<(*len);i++)
298       if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
299         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
300       else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
301         ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
302     break;
303   case MPI_2INT:
304     for(i=0;i<(*len);i++)
305       if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
306         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
307       else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
308         ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
309     break;
310   case MPI_SHORT_INT:
311     for(i=0;i<(*len);i++)
312       if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
313         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
314       else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
315         ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
316     break;
317   case MPI_LONG_DOUBLE_INT:
318     for(i=0;i<(*len);i++)
319       if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
320         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
321       else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
322         ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
323     break;
324   case MPI_2FLOAT:
325     for(i=0;i<(*len);i++)
326       if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
327         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
328       else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
329         ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
330     break;
331   case MPI_2DOUBLE:
332     for(i=0;i<(*len);i++)
333       if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
334         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
335       else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
336         ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
337     break;
338   default:
339     ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
340     CmiAbort("exiting");
341   }
342 }
343
344 // every msg contains a AmpiOpHeader structure before user data
345 // FIXME: non-commutative operations require messages be ordered by rank
346 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
347   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
348   MPI_Datatype dtype;
349   int szhdr, szdata, len;
350   MPI_User_function* func;
351   func = hdr->func;
352   dtype = hdr->dtype;  
353   szdata = hdr->szdata;
354   len = hdr->len;  
355   szhdr = sizeof(AmpiOpHeader);
356
357   //Assuming extent == size
358   void *ret = malloc(szhdr+szdata);
359   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
360   for(int i=1;i<nMsg;i++){
361     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
362   }
363   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
364   free(ret);
365   return retmsg;
366 }
367
368 CkReduction::reducerType AmpiReducer;
369
370 class Builtin_kvs{
371  public:
372   int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
373   Builtin_kvs(){
374     tag_ub = MPI_TAG_UB_VALUE; 
375     host = MPI_PROC_NULL;
376     io = 0;
377     wtime_is_global = 0;
378     keyval_mype = CkMyPe();
379     keyval_numpes = CkNumPes();
380     keyval_mynode = CkMyNode();
381     keyval_numnodes = CkNumNodes();
382   }
383 };
384
385 // ------------ startup support -----------
386 int _ampi_fallback_setup_count;
387 CDECL void AMPI_Setup(void);
388 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
389
390 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
391
392 /*Main routine used when missing MPI_Setup routine*/
393 CDECL void AMPI_Fallback_Main(int argc,char **argv)
394 {
395   AMPI_Main_cpp(argc,argv);
396   AMPI_Main(argc,argv);
397   FTN_NAME(MPI_MAIN,mpi_main)();
398 }
399
400 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
401 /*Startup routine used if user *doesn't* write
402   a TCHARM_User_setup routine.
403  */
404 CDECL void AMPI_Setup_Switch(void) {
405   _ampi_fallback_setup_count=0;
406   FTN_NAME(AMPI_SETUP,ampi_setup)();
407   AMPI_Setup();
408   if (_ampi_fallback_setup_count==2)
409   { //Missing AMPI_Setup in both C and Fortran:
410     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
411   }
412 }
413
414 static int nodeinit_has_been_called=0;
415 CtvDeclare(ampiParent*, ampiPtr);
416 CtvDeclare(int, ampiInitDone);
417 CtvDeclare(void*,stackBottom);
418 CtvDeclare(int, ampiFinalized);
419 CkpvDeclare(Builtin_kvs, bikvs);
420 CkpvDeclare(int,argvExtracted);
421 static int enableStreaming = 0;
422
423 CDECL long ampiCurrentStackUsage(){
424   int localVariable;
425   
426   unsigned long p1 =  (unsigned long)((void*)&localVariable);
427   unsigned long p2 =  (unsigned long)(CtvAccess(stackBottom));
428
429
430   if(p1 > p2)
431     return p1 - p2;
432   else
433     return  p2 - p1;
434  
435 }
436
437 FDECL void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
438   long usage = ampiCurrentStackUsage();
439   CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
440 }
441
442
443 CDECL void AMPI_threadstart(void *data);
444 static int AMPI_threadstart_idx = -1;
445
446 static void ampiNodeInit(void)
447 {
448   _mpi_nworlds=0;
449   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
450   {
451     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
452   }
453   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
454
455   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
456   
457   CmiAssert(AMPI_threadstart_idx == -1);    // only initialize once
458   AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
459
460   nodeinit_has_been_called=1;
461 }
462
463 #if PRINT_IDLE
464 static double totalidle=0.0, startT=0.0;
465 static int beginHandle, endHandle;
466 static void BeginIdle(void *dummy,double curWallTime)
467 {
468   startT = curWallTime;
469 }
470 static void EndIdle(void *dummy,double curWallTime)
471 {
472   totalidle += curWallTime - startT;
473 }
474 #endif
475
476 /* for fortran reduction operation table to handle mapping */
477 typedef MPI_Op  MPI_Op_Array[128];
478 CtvDeclare(int, mpi_opc);
479 CtvDeclare(MPI_Op_Array, mpi_ops);
480
481 static void ampiProcInit(void){
482   CtvInitialize(ampiParent*, ampiPtr);
483   CtvInitialize(int,ampiInitDone);
484   CtvInitialize(int,ampiFinalized);
485   CtvInitialize(void*,stackBottom);
486
487
488   CtvInitialize(MPI_Op_Array, mpi_ops);
489   CtvInitialize(int, mpi_opc);
490
491   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
492   CkpvAccess(bikvs) = Builtin_kvs();
493
494   CkpvInitialize(int, argvExtracted);
495   CkpvAccess(argvExtracted) = 0;
496
497   initBigSimTrace(1);
498
499   REGISTER_AMPI
500   initAmpiProjections();
501   char **argv=CkGetArgv();
502 #if AMPI_COMLIB  
503   if(CkpvAccess(argvExtracted)==0){
504     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
505   }
506 #endif
507
508 #ifdef AMPIMSGLOG
509   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
510   msgLogRead = CmiGetArgFlag(argv, "+msgLogRead");
511   CmiGetArgInt(argv, "+msgLogRank", &msgLogRank);
512   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
513 #endif
514 }
515
516 void AMPI_Install_Idle_Timer(){
517 #if AMPI_PRINT_IDLE
518   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
519   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
520 #endif
521 }
522
523 void AMPI_Uninstall_Idle_Timer(){
524 #if AMPI_PRINT_IDLE
525   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
526   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
527 #endif
528 }
529
530 PUPfunctionpointer(MPI_MainFn)
531
532 class MPI_threadstart_t {
533 public:
534         MPI_MainFn fn;
535         MPI_threadstart_t() {}
536         MPI_threadstart_t(MPI_MainFn fn_)
537                 :fn(fn_) {}
538         void start(void) {
539                 char **argv=CmiCopyArgs(CkGetArgv());
540                 int argc=CkGetArgc();
541
542                 // Set a pointer to somewhere close to the bottom of the stack.
543                 // This is used for roughly estimating the stack usage later.
544                 CtvAccess(stackBottom) = &argv;
545
546 #if CMK_AMPI_FNPTR_HACK
547                 AMPI_Fallback_Main(argc,argv);
548 #else
549                 (fn)(argc,argv);
550 #endif
551         }
552         void pup(PUP::er &p) {
553                 p|fn;
554         }
555 };
556 PUPmarshall(MPI_threadstart_t)
557
558 CDECL void AMPI_threadstart(void *data)
559 {
560         STARTUP_DEBUG("MPI_threadstart")
561         MPI_threadstart_t t;
562         pupFromBuf(data,t);
563 #if CMK_TRACE_IN_CHARM
564         if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
565 #endif
566         t.start();
567 }
568
569 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
570 {
571         STARTUP_DEBUG("ampiCreateMain")
572         int _nchunks=TCHARM_Get_num_chunks();
573         //Make a new threads array:
574         MPI_threadstart_t s(mainFn);
575         memBuf b; pupIntoBuf(b,s);
576         TCHARM_Create_data( _nchunks,AMPI_threadstart_idx,
577                           b.getData(), b.getSize());
578 }
579
580 /* TCharm Semaphore ID's for AMPI startup */
581 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
582 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
583
584 static CProxy_ampiWorlds ampiWorldsGroup;
585
586 static void init_operations()
587 {
588   CtvInitialize(MPI_Op_Array, mpi_ops);
589   int i = 0;
590   MPI_Op *tab = CtvAccess(mpi_ops);
591   tab[i++] = MPI_MAX;
592   tab[i++] = MPI_MIN;
593   tab[i++] = MPI_SUM;
594   tab[i++] = MPI_PROD;
595   tab[i++] = MPI_LAND;
596   tab[i++] = MPI_BAND;
597   tab[i++] = MPI_LOR;
598   tab[i++] = MPI_BOR;
599   tab[i++] = MPI_LXOR;
600   tab[i++] = MPI_BXOR;
601   tab[i++] = MPI_MAXLOC;
602   tab[i++] = MPI_MINLOC;
603
604   CtvInitialize(int, mpi_opc);
605   CtvAccess(mpi_opc) = i;
606 }
607
608 /*
609 Called from MPI_Init, a collective initialization call:
610  creates a new AMPI array and attaches it to the current
611  set of TCHARM threads.
612 */
613 static ampi *ampiInit(char **argv)
614 {
615   FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CmiMyPe(), TCHARM_Element());)
616   if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
617   STARTUP_DEBUG("ampiInit> begin")
618   
619   MPI_Comm new_world;
620   int _nchunks;
621   CkArrayOptions opts;
622   CProxy_ampiParent parent;
623   if (TCHARM_Element()==0) //the rank of a tcharm object
624   { /* I'm responsible for building the arrays: */
625         STARTUP_DEBUG("ampiInit> creating arrays")
626
627 // FIXME: Need to serialize global communicator allocation in one place.
628         //Allocate the next communicator
629         if(_mpi_nworlds == MPI_MAX_COMM_WORLDS)
630         {
631                 CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
632         }
633         int new_idx=_mpi_nworlds;
634         new_world=MPI_COMM_WORLD+new_idx; // Isaac guessed there shouldn't be a +1 here
635
636         //Create and attach the ampiParent array
637         CkArrayID threads;
638         opts=TCHARM_Attach_start(&threads,&_nchunks);
639         parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
640         STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
641   }
642   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
643
644   FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
645
646   if (TCHARM_Element()==0)
647   {
648           //Make a new ampi array
649           CkArrayID empty;
650
651         ampiCommStruct worldComm(new_world,empty,_nchunks);
652         CProxy_ampi arr;
653
654
655 #if AMPI_COMLIB
656
657         ComlibInstanceHandle ciStreaming = 1;
658         ComlibInstanceHandle ciBcast = 2;
659         ComlibInstanceHandle ciAllgather = 3;
660         ComlibInstanceHandle ciAlltoall = 4;
661
662         arr=CProxy_ampi::ckNew(parent, worldComm, ciStreaming, ciBcast, ciAllgather, ciAlltoall, opts);
663         
664
665         CkPrintf("Using untested comlib code in ampi.C\n");
666
667         Strategy *sStreaming = new StreamingStrategy(1,10);
668         CkAssert(ciStreaming == ComlibRegister(sStreaming));
669         
670         Strategy *sBcast = new BroadcastStrategy(USE_HYPERCUBE);
671         CkAssert(ciBcast = ComlibRegister(sBcast));
672         
673         Strategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
674         CkAssert(ciAllgather = ComlibRegister(sAllgather));
675
676         Strategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
677         CkAssert(ciAlltoall = ComlibRegister(sAlltoall));
678         
679         CmiPrintf("Created AMPI comlib strategies in new manner\n");
680
681         // FIXME: Propogate the comlib table here
682         CkpvAccess(conv_com_object).doneCreating();
683 #else
684         arr=CProxy_ampi::ckNew(parent,worldComm,opts);
685
686 #endif
687
688         //Broadcast info. to the mpi_worlds array
689         // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
690         ampiCommStruct newComm(new_world,arr,_nchunks);
691         //CkPrintf("In ampiInit: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
692         if (ampiWorldsGroup.ckGetGroupID().isZero())
693                 ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
694         else
695                 ampiWorldsGroup.add(newComm);
696         STARTUP_DEBUG("ampiInit> arrays created")
697
698   }
699
700   // Find our ampi object:
701   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
702   CtvAccess(ampiInitDone)=1;
703   CtvAccess(ampiFinalized)=0;
704   STARTUP_DEBUG("ampiInit> complete")
705 #if CMK_BLUEGENE_CHARM
706 //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
707   TRACE_BG_ADD_TAG("AMPI_START");
708 #endif
709
710   init_operations();     // initialize fortran reduction operation table
711
712   getAmpiParent()->ampiInitCallDone = 0;
713
714   CProxy_ampi cbproxy = ptr->getProxy();
715   CkCallback cb(CkIndex_ampi::allInitDone(NULL), cbproxy[0]);
716   ptr->contribute(0, NULL, CkReduction::sum_int, cb);
717
718   ampiParent *thisParent = getAmpiParent(); 
719   while(thisParent->ampiInitCallDone!=1){
720     //CkPrintf("In checking ampiInitCallDone(%d) loop at parent %d!\n", thisParent->ampiInitCallDone, thisParent->thisIndex);
721     thisParent->getTCharmThread()->stop();
722     /* 
723      * thisParent needs to be updated in case of the parent is being pupped.
724      * In such case, thisParent got changed
725      */
726     thisParent = getAmpiParent();
727   }
728
729 #ifdef CMK_BLUEGENE_CHARM
730     BgSetStartOutOfCore();
731 #endif
732
733   return ptr;
734 }
735
736 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
737 class ampiWorlds : public CBase_ampiWorlds {
738 public:
739     ampiWorlds(const ampiCommStruct &nextWorld) {
740         ampiWorldsGroup=thisgroup;
741         //CkPrintf("In constructor: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
742         add(nextWorld);
743     }
744     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
745     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
746     void add(const ampiCommStruct &nextWorld) {
747       int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD); // Isaac guessed there shouldn't be a +1 after the MPI_COMM_WORLD
748         mpi_worlds[new_idx].comm=nextWorld;
749         if (_mpi_nworlds<=new_idx) _mpi_nworlds=new_idx+1;
750         STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
751     }
752 };
753
754 //-------------------- ampiParent -------------------------
755 ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
756                 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
757 {
758   int barrier = 0x1234;
759   STARTUP_DEBUG("ampiParent> starting up")
760   thread=NULL;
761   worldPtr=NULL;
762   myDDT=&myDDTsto;
763   prepareCtv();
764
765   init();
766
767   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
768         AsyncEvacuate(CmiFalse);
769 }
770
771 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
772   thread=NULL;
773   worldPtr=NULL;
774   myDDT=&myDDTsto;
775
776   init();
777
778   AsyncEvacuate(CmiFalse);
779 }
780
781 void ampiParent::pup(PUP::er &p) {
782   ArrayElement1D::pup(p);
783   p|threads;
784   p|worldNo;           // why it was missing from here before??
785   p|worldStruct;
786   myDDT->pup(p);
787   p|splitComm;
788   p|groupComm;
789   p|groups;
790
791 //BIGSIM_OOC DEBUGGING
792 //if(!p.isUnpacking()){
793 //    CmiPrintf("ampiParent[%d] packing ampiRequestList: \n", thisIndex);
794 //    ampiReqs.print();
795 //}
796
797   p|ampiReqs;
798
799 //BIGSIM_OOC DEBUGGING
800 //if(p.isUnpacking()){
801 //    CmiPrintf("ampiParent[%d] unpacking ampiRequestList: \n", thisIndex);
802 //    ampiReqs.print();
803 //}
804
805   p|RProxyCnt;
806   p|tmpRProxy;
807   p|winStructList;
808   p|infos;
809
810   p|ampiInitCallDone;
811 }
812 void ampiParent::prepareCtv(void) {
813   thread=threads[thisIndex].ckLocal();
814   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
815   CtvAccessOther(thread->getThread(),ampiPtr) = this;
816   STARTUP_DEBUG("ampiParent> found TCharm")
817 }
818
819 void ampiParent::init(){
820 #ifdef AMPIMSGLOG
821   if(msgLogWrite && msgLogRank == thisIndex){
822 #if CMK_PROJECTIONS_USE_ZLIB
823     fMsgLog = gzopen(msgLogFilename,"wb");
824     toPUPer = new PUP::tozDisk(fMsgLog);
825 #else
826     fMsgLog = fopen(msgLogFilename,"wb");
827     toPUPer = new PUP::toDisk(fMsgLog);
828 #endif
829   }else if(msgLogRead){
830 #if CMK_PROJECTIONS_USE_ZLIB
831     fMsgLog = gzopen(msgLogFilename,"rb");
832     fromPUPer = new PUP::fromzDisk(fMsgLog);
833 #else
834     fMsgLog = fopen(msgLogFilename,"rb");
835     fromPUPer = new PUP::fromDisk(fMsgLog);
836 #endif
837   }
838 #endif
839 }
840
841 void ampiParent::finalize(){
842 #ifdef AMPIMSGLOG
843   if(msgLogWrite && msgLogRank == thisIndex){
844     delete toPUPer;
845 #if CMK_PROJECTIONS_USE_ZLIB
846     gzclose(fMsgLog);
847 #else
848     fclose(fMsgLog);
849 #endif
850   }else if(msgLogRead){
851     delete fromPUPer;
852 #if CMK_PROJECTIONS_USE_ZLIB
853     gzclose(fMsgLog);
854 #else
855     fclose(fMsgLog);
856 #endif
857   }
858 #endif
859 }
860
861 void ampiParent::ckJustMigrated(void) {
862   ArrayElement1D::ckJustMigrated();
863   prepareCtv();
864 }
865
866 void ampiParent::ckJustRestored(void) {
867   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
868   ArrayElement1D::ckJustRestored();
869   prepareCtv();
870   
871   //BIGSIM_OOC DEBUGGING
872   //CkPrintf("In ampiParent[%d] with TCharm thread=%p:   ",thisIndex, thread);
873   //CthPrintThdMagic(thread->getTid()); 
874 }
875
876 ampiParent::~ampiParent() {
877   STARTUP_DEBUG("ampiParent> destructor called");
878   finalize();
879 }
880
881 //Children call this when they are first created or just migrated
882 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
883 {
884   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
885
886   if (s.getComm()>=MPI_COMM_WORLD)
887   { //We now have our COMM_WORLD-- register it
888     //Note that split communicators don't keep a raw pointer, so
889     //they don't need to re-register on migration.
890      if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
891      worldPtr=ptr;
892      worldStruct=s;
893
894     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
895      CkVec<int> _indices;
896      _indices.push_back(thisIndex);
897      selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
898   }
899   
900   if (!forMigration)
901   { //Register the new communicator:
902      MPI_Comm comm = s.getComm();
903      STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
904      if (comm>=MPI_COMM_WORLD) { 
905        // Pass the new ampi to the waiting ampiInit
906        thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
907      } else if (isSplit(comm)) {
908        splitChildRegister(s);
909      } else if (isGroup(comm)) {
910        groupChildRegister(s);
911      } else if (isCart(comm)) {
912        cartChildRegister(s);
913      } else if (isGraph(comm)) {
914        graphChildRegister(s);
915      } else if (isInter(comm)) {
916        interChildRegister(s);
917      } else if (isIntra(comm)) {
918        intraChildRegister(s);
919      }else
920        CkAbort("ampiParent recieved child with bad communicator");
921   }
922
923   return thread;
924 }
925
926 //BIGSIM_OOC DEBUGGING
927 //Move the comm2ampi from inline to normal function for the sake of debugging
928 /*ampi *ampiParent::comm2ampi(MPI_Comm comm){
929       //BIGSIM_OOC DEBUGGING
930       //CmiPrintf("%d, in ampiParent::comm2ampi, comm=%d\n", thisIndex, comm);
931       if (comm==MPI_COMM_WORLD) return worldPtr;
932       if (comm==MPI_COMM_SELF) return worldPtr;
933       if (comm==worldNo) return worldPtr;
934       if (isSplit(comm)) {
935          const ampiCommStruct &st=getSplit(comm);
936          return st.getProxy()[thisIndex].ckLocal();
937       }
938       if (isGroup(comm)) {
939          const ampiCommStruct &st=getGroup(comm);
940          return st.getProxy()[thisIndex].ckLocal();
941       }
942       if (isCart(comm)) {
943         const ampiCommStruct &st = getCart(comm);
944         return st.getProxy()[thisIndex].ckLocal();
945       }
946       if (isGraph(comm)) {
947         const ampiCommStruct &st = getGraph(comm);
948         return st.getProxy()[thisIndex].ckLocal();
949       }
950       if (isInter(comm)) {
951          const ampiCommStruct &st=getInter(comm);
952          return st.getProxy()[thisIndex].ckLocal();
953       }
954       if (isIntra(comm)) {
955          const ampiCommStruct &st=getIntra(comm);
956          return st.getProxy()[thisIndex].ckLocal();
957       }
958       if (comm>MPI_COMM_WORLD) return worldPtr; //Use MPI_WORLD ampi for cross-world messages:
959       CkAbort("Invalid communicator used!");
960       return NULL;
961 }*/
962
963 // reduction client data - preparation for checkpointing
964 class ckptClientStruct {
965 public:
966   const char *dname;
967   ampiParent *ampiPtr;
968   ckptClientStruct(const char *s, ampiParent *a): dname(s), ampiPtr(a) {}
969 };
970
971 static void checkpointClient(void *param,void *msg)
972 {
973   ckptClientStruct *client = (ckptClientStruct*)param;
974   const char *dname = client->dname;
975   ampiParent *ampiPtr = client->ampiPtr;
976   ampiPtr->Checkpoint(strlen(dname), dname);
977   delete client;
978 }
979
980 void ampiParent::startCheckpoint(const char* dname){
981   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
982   if (thisIndex==0) {
983         ckptClientStruct *clientData = new ckptClientStruct(dname, this);
984         CkCallback cb(checkpointClient, clientData);
985         contribute(0, NULL, CkReduction::sum_int, cb);
986   }
987   else
988         contribute(0, NULL, CkReduction::sum_int);
989
990 #if 0
991 #if CMK_BLUEGENE_CHARM
992   void *curLog;         // store current log in timeline
993   _TRACE_BG_TLINE_END(&curLog);
994   TRACE_BG_AMPI_SUSPEND();
995 #endif
996 #endif
997
998   thread->stop();
999
1000 #if CMK_BLUEGENE_CHARM
1001   // _TRACE_BG_BEGIN_EXECUTE_NOMSG("CHECKPOINT_RESUME", &curLog);
1002   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
1003 #endif
1004 }
1005
1006 void ampiParent::Checkpoint(int len, const char* dname){
1007   if (len == 0) {
1008     // memory checkpoint
1009     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1010     CkStartMemCheckpoint(cb);
1011   }
1012   else {
1013     char dirname[256];
1014     strncpy(dirname,dname,len);
1015     dirname[len]='\0';
1016     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1017     CkStartCheckpoint(dirname,cb);
1018   }
1019 }
1020 void ampiParent::ResumeThread(void){
1021   thread->resume();
1022 }
1023
1024 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
1025                              int *keyval, void* extra_state){
1026         KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
1027         int idx = kvlist.size();
1028         kvlist.resize(idx+1);
1029         kvlist[idx] = newnode;
1030         *keyval = idx;
1031         return 0;
1032 }
1033 int ampiParent::freeKeyval(int *keyval){
1034         if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
1035                 return -1;
1036         delete kvlist[*keyval];
1037         kvlist[*keyval] = NULL;
1038         *keyval = MPI_KEYVAL_INVALID;
1039         return 0;
1040 }
1041
1042 int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
1043         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1044                 return -1;
1045         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1046         // Enlarge the keyval list:
1047         while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
1048         cs.getKeyvals()[keyval]=attribute_val;
1049         return 0;
1050 }
1051
1052 int ampiParent::kv_is_builtin(int keyval) {
1053         switch(keyval) {
1054         case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
1055         case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
1056         case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
1057         case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
1058         case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
1059         case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
1060         case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
1061         case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
1062         default: return 0;
1063         };
1064 }
1065
1066 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
1067         *flag = false;
1068         if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
1069           *flag=true;
1070           *(int **)attribute_val = kv_builtin_storage;  // all default tags are ints
1071           return 0;
1072         }
1073         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1074                 return -1; /* invalid keyval */
1075         
1076         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1077         if (keyval>=cs.getKeyvals().size())  
1078                 return 0; /* we don't have a value yet */
1079         if (cs.getKeyvals()[keyval]==0)
1080                 return 0; /* we had a value, but now it's zero */
1081         /* Otherwise, we have a good value */
1082         *flag = true;
1083         *(void **)attribute_val = cs.getKeyvals()[keyval];
1084         return 0;
1085 }
1086 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
1087         /* no way to delete an attribute: just overwrite it with 0 */
1088         return putAttr(comm,keyval,0);
1089 }
1090
1091 //----------------------- ampi -------------------------
1092 void ampi::init(void) {
1093   parent=NULL;
1094   thread=NULL;
1095   msgs=NULL;
1096   posted_ireqs=NULL;
1097   resumeOnRecv=false;
1098   AsyncEvacuate(CmiFalse);
1099 }
1100
1101 ampi::ampi()
1102 {
1103   /* this constructor only exists so we can create an empty array during split */
1104   CkAbort("Default ampi constructor should never be called");
1105 }
1106
1107 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
1108    :parentProxy(parent_)
1109 {
1110   init();
1111
1112   myComm=s; myComm.setArrayID(thisArrayID);
1113   myRank=myComm.getRankForIndex(thisIndex);
1114
1115   findParent(false);
1116
1117   msgs = CmmNew();
1118   posted_ireqs = CmmNew();
1119   nbcasts = 0;
1120
1121   comlibProxy = thisProxy; // Will later be associated with comlib
1122   
1123   seqEntries=parent->ckGetArraySize();
1124   oorder.init (seqEntries);
1125 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1126     if(thisIndex == 0){
1127 /*      CkAssert(CkMyPe() == 0);
1128  *              CkGroupID _myManagerGID = thisProxy.ckGetArrayID();     
1129  *                      CkAssert(numElements);
1130  *                              printf("ampi::ampi setting numInitial to %d on manager at gid %d \n",numElements,_myManagerGID.idx);
1131  *                                      CkArray *_myManager = thisProxy.ckLocalBranch();
1132  *                                              _myManager->setNumInitial(numElements);*/
1133     }
1134 #endif
1135 }
1136
1137 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
1138                 ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
1139    :parentProxy(parent_),ciStreaming(ciStreaming_),ciBcast(ciBcast_),ciAllgather(ciAllgather_),ciAlltoall(ciAlltoall_)
1140 {
1141   init();
1142
1143   myComm=s; myComm.setArrayID(thisArrayID);
1144   myRank=myComm.getRankForIndex(thisIndex);
1145
1146   findParent(false);
1147
1148   msgs = CmmNew();
1149   posted_ireqs = CmmNew();
1150   nbcasts = 0;
1151
1152   comlibProxy = thisProxy;
1153   CmiPrintf("comlibProxy created as a copy of thisProxy, no associate call\n");
1154
1155 #if AMPI_COMLIB
1156   //  ComlibAssociateProxy(ciAlltoall, comlibProxy);
1157 #endif
1158
1159   seqEntries=parent->ckGetArraySize();
1160   oorder.init (seqEntries);
1161 }
1162
1163 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
1164 {
1165   init();
1166   
1167   seqEntries=-1;
1168 }
1169
1170 void ampi::ckJustMigrated(void)
1171 {
1172         findParent(true);
1173         ArrayElement1D::ckJustMigrated();
1174 }
1175
1176 void ampi::ckJustRestored(void)
1177 {
1178         FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
1179         findParent(true);
1180         ArrayElement1D::ckJustRestored();
1181         
1182         //BIGSIM_OOC DEBUGGING
1183         //CkPrintf("In ampi[%d] thread[%p]:   ", thisIndex, thread);
1184         //CthPrintThdMagic(thread->getTid()); 
1185 }
1186
1187 void ampi::findParent(bool forMigration) {
1188         STARTUP_DEBUG("ampi> finding my parent")
1189         parent=parentProxy[thisIndex].ckLocal();
1190         if (parent==NULL) CkAbort("AMPI can't find its parent!");
1191         thread=parent->registerAmpi(this,myComm,forMigration);
1192         if (thread==NULL) CkAbort("AMPI can't find its thread!");
1193 //      printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
1194 }
1195
1196 //The following method should be called on the first element of the
1197 //ampi array
1198 void ampi::allInitDone(CkReductionMsg *m){
1199     FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
1200     thisProxy.setInitDoneFlag();
1201     delete m;
1202 }
1203
1204 void ampi::setInitDoneFlag(){
1205     //CkPrintf("ampi[%d]::setInitDone called!\n", thisIndex);
1206     parent->ampiInitCallDone=1;
1207     parent->getTCharmThread()->start();
1208 }
1209
1210 static void cmm_pup_ampi_message(pup_er p,void **msg) {
1211         CkPupMessage(*(PUP::er *)p,msg,1);
1212         if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
1213 //      printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
1214 }
1215
1216 static void cmm_pup_posted_ireq(pup_er p,void **msg) {
1217
1218         pup_int(p, (int *)msg);
1219
1220 /*      if(pup_isUnpacking(p)){
1221             // *msg = new IReq;
1222             //when unpacking, nothing is needed to do since *msg is an index
1223             //(of type integer) to the ampiParent::ampiReqs (the AmpiRequestList)
1224         }
1225         if(!pup_isUnpacking(p)){
1226             AmpiRequestList *reqL = getReqs();
1227             int retIdx = reqL->findRequestIndex((IReq *)*msg);
1228             if(retIdx==-1){
1229                 CmiAbort("An AmpiRequest instance should be found for an instance in posted_ireq!\n");
1230             }
1231             pup_int(p, retIdx)
1232         }
1233 */
1234 //      ((IReq *)*msg)->pup(*(PUP::er *)p);
1235
1236 //      if (pup_isDeleting(p)) delete (IReq *)*msg;
1237 //      printf("[%d] pupping postd irequests %p \n",CkMyPe(),*msg);
1238 }
1239
1240 void ampi::pup(PUP::er &p)
1241 {
1242   if(!p.isUserlevel())
1243     ArrayElement1D::pup(p);//Pack superclass
1244   p|parentProxy;
1245   p|myComm;
1246   p|myRank;
1247   p|nbcasts;
1248   p|tmpVec;
1249   p|remoteProxy;
1250   p|resumeOnRecv;
1251   p|comlibProxy;
1252   p|ciStreaming;
1253   p|ciBcast;
1254   p|ciAllgather;
1255   p|ciAlltoall;
1256
1257 #if AMPI_COMLIB
1258   if(p.isUnpacking()){
1259 //    ciStreaming.setSourcePe();
1260 //    ciBcast.setSourcePe();
1261 //    ciAllgather.setSourcePe();
1262 //    ciAlltoall.setSourcePe();
1263   }
1264 #endif
1265
1266   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
1267
1268   //BIGSIM_OOC DEBUGGING
1269   //if(!p.isUnpacking()){
1270     //CkPrintf("ampi[%d]::packing: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1271   //}
1272
1273   posted_ireqs = CmmPup((pup_er)&p, posted_ireqs, cmm_pup_posted_ireq);
1274
1275   //if(p.isUnpacking()){
1276     //CkPrintf("ampi[%d]::unpacking: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1277   //}
1278
1279   p|seqEntries;
1280   p|oorder;
1281 }
1282
1283 ampi::~ampi()
1284 {
1285   if (CkInRestarting() || _BgOutOfCoreFlag==1) {
1286     // in restarting, we need to flush messages
1287     int tags[3], sts[3];
1288     tags[0] = tags[1] = tags[2] = CmmWildCard;
1289     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1290     while (msg) {
1291       delete msg;
1292       msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1293     }
1294   }
1295
1296   CmmFree(msgs);
1297   CmmFreeAll(posted_ireqs);
1298 }
1299
1300 //------------------------ Communicator Splitting ---------------------
1301 class ampiSplitKey {
1302 public:
1303         int nextSplitComm;
1304         int color; //New class of processes we'll belong to
1305         int key; //To determine rank in new ordering
1306         int rank; //Rank in old ordering
1307         ampiSplitKey() {}
1308         ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
1309                 :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
1310 };
1311
1312 /* "type" may indicate whether call is for a cartesian topology etc. */
1313
1314 void ampi::split(int color,int key,MPI_Comm *dest, int type)
1315 {
1316 #if CMK_BLUEGENE_CHARM
1317   void *curLog;         // store current log in timeline
1318   _TRACE_BG_TLINE_END(&curLog);
1319 //  TRACE_BG_AMPI_SUSPEND();
1320 #endif
1321   if (type == CART_TOPOL) {
1322         ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
1323         int rootIdx=myComm.getIndexForRank(0);
1324         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1325         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1326
1327         thread->suspend(); //Resumed by ampiParent::cartChildRegister
1328         MPI_Comm newComm=parent->getNextCart()-1;
1329         *dest=newComm;
1330   } else {
1331         ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
1332         int rootIdx=myComm.getIndexForRank(0);
1333         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1334         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1335
1336         thread->suspend(); //Resumed by ampiParent::splitChildRegister
1337         MPI_Comm newComm=parent->getNextSplit()-1;
1338         *dest=newComm;
1339   }
1340 #if CMK_BLUEGENE_CHARM
1341 //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "SPLIT_RESUME", curLog);
1342   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("SPLIT_RESUME", &curLog);
1343   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
1344 #endif
1345 }
1346
1347 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
1348         const ampiSplitKey *a=(const ampiSplitKey *)a_;
1349         const ampiSplitKey *b=(const ampiSplitKey *)b_;
1350         if (a->color!=b->color) return a->color-b->color;
1351         if (a->key!=b->key) return a->key-b->key;
1352         return a->rank-b->rank;
1353 }
1354
1355 void ampi::splitPhase1(CkReductionMsg *msg)
1356 {
1357         //Order the keys, which orders the ranks properly:
1358         int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1359         ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1360         if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1361         qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1362
1363         MPI_Comm newComm = -1;
1364         for(int i=0;i<nKeys;i++)
1365                 if(keys[i].nextSplitComm>newComm)
1366                         newComm = keys[i].nextSplitComm;
1367
1368         //Loop over the sorted keys, which gives us the new arrays:
1369         int lastColor=keys[0].color-1; //The color we're building an array for
1370         CProxy_ampi lastAmpi; //The array for lastColor
1371         int lastRoot=0; //C value for new rank 0 process for latest color
1372         ampiCommStruct lastComm; //Communicator info. for latest color
1373         for (int c=0;c<nKeys;c++) {
1374                 if (keys[c].color!=lastColor)
1375                 { //Hit a new color-- need to build a new communicator and array
1376                         lastColor=keys[c].color;
1377                         lastRoot=c;
1378                         CkArrayOptions opts;
1379                         opts.bindTo(parentProxy);
1380                         opts.setNumInitial(0);
1381                         CkArrayID unusedAID; ampiCommStruct unusedComm;
1382                         lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1383                         lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1384
1385                         CkVec<int> indices; //Maps rank to array indices for new arrau
1386                         for (int i=c;i<nKeys;i++) {
1387                                 if (keys[i].color!=lastColor) break; //Done with this color
1388                                 int idx=myComm.getIndexForRank(keys[i].rank);
1389                                 indices.push_back(idx);
1390                         }
1391
1392                         //FIXME: create a new communicator for each color, instead of
1393                         // (confusingly) re-using the same MPI_Comm number for each.
1394                         lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1395                 }
1396                 int newRank=c-lastRoot;
1397                 int newIdx=lastComm.getIndexForRank(newRank);
1398
1399                 //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1400                 lastAmpi[newIdx].insert(parentProxy,lastComm);
1401         }
1402
1403         delete msg;
1404 }
1405
1406 //...newly created array elements register with the parent, which calls:
1407 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1408         int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1409         if (splitComm.size()<=idx) splitComm.resize(idx+1);
1410         splitComm[idx]=new ampiCommStruct(s);
1411         thread->resume(); //Matches suspend at end of ampi::split
1412 }
1413
1414 //-----------------create communicator from group--------------
1415 // The procedure is like that of comm_split very much,
1416 // so the code is shamelessly copied from above
1417 //   1. reduction to make sure all members have called
1418 //   2. the root in the old communicator create the new array
1419 //   3. ampiParent::register is called to register new array as new comm
1420 class vecStruct {
1421 public:
1422   int nextgroup;
1423   groupStruct vec;
1424   vecStruct():nextgroup(-1){}
1425   vecStruct(int nextgroup_, groupStruct vec_)
1426     : nextgroup(nextgroup_), vec(vec_) { }
1427 };
1428
1429 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1430   int rootIdx=vec[0];
1431   tmpVec = vec;
1432   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1433   MPI_Comm nextgroup = parent->getNextGroup();
1434   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1435
1436   if(getPosOp(thisIndex,vec)>=0){
1437     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1438     MPI_Comm retcomm = parent->getNextGroup()-1;
1439     *newcomm = retcomm;
1440   }else{
1441     *newcomm = MPI_COMM_NULL;
1442   }
1443 }
1444
1445 void ampi::commCreatePhase1(CkReductionMsg *msg){
1446   MPI_Comm *nextGroupComm = (int *)msg->getData();
1447
1448   CkArrayOptions opts;
1449   opts.bindTo(parentProxy);
1450   opts.setNumInitial(0);
1451   CkArrayID unusedAID;
1452   ampiCommStruct unusedComm;
1453   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1454   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1455
1456   groupStruct indices = tmpVec;
1457   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1458   for(int i=0;i<indices.size();i++){
1459     int newIdx=indices[i];
1460     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1461   }
1462   delete msg;
1463 }
1464
1465 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1466   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1467   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1468   groupComm[idx]=new ampiCommStruct(s);
1469   thread->resume(); //Matches suspend at end of ampi::split
1470 }
1471
1472 /* Virtual topology communicator creation */
1473 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1474   int rootIdx=vec[0];
1475   tmpVec = vec;
1476   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1477
1478   MPI_Comm nextcart = parent->getNextCart();
1479   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1480
1481   if(getPosOp(thisIndex,vec)>=0){
1482     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1483      MPI_Comm retcomm = parent->getNextCart()-1;
1484      *newcomm = retcomm;
1485   }else
1486     *newcomm = MPI_COMM_NULL;
1487 }
1488
1489 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1490   MPI_Comm *nextCartComm = (int *)msg->getData();
1491
1492   CkArrayOptions opts;
1493   opts.bindTo(parentProxy);
1494   opts.setNumInitial(0);
1495   CkArrayID unusedAID;
1496   ampiCommStruct unusedComm;
1497   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1498   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1499
1500   groupStruct indices = tmpVec;
1501   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1502                                                 size(),indices);
1503   for(int i=0;i<indices.size();i++){
1504     int newIdx=indices[i];
1505     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1506   }
1507   delete msg;
1508 }
1509
1510 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1511   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1512   if (cartComm.size()<=idx) {
1513     cartComm.resize(idx+1);
1514     cartComm.length()=idx+1;
1515   }
1516   cartComm[idx]=new ampiCommStruct(s);
1517   thread->resume(); //Matches suspend at end of ampi::cartCreate
1518 }
1519
1520 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1521   int rootIdx=vec[0];
1522   tmpVec = vec;
1523   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1524                 myComm.getProxy());
1525   MPI_Comm nextgraph = parent->getNextGraph();
1526   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1527
1528   if(getPosOp(thisIndex,vec)>=0){
1529     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1530     MPI_Comm retcomm = parent->getNextGraph()-1;
1531     *newcomm = retcomm;
1532   }else
1533     *newcomm = MPI_COMM_NULL;
1534 }
1535
1536 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1537   MPI_Comm *nextGraphComm = (int *)msg->getData();
1538
1539   CkArrayOptions opts;
1540   opts.bindTo(parentProxy);
1541   opts.setNumInitial(0);
1542   CkArrayID unusedAID;
1543   ampiCommStruct unusedComm;
1544   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1545   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1546
1547   groupStruct indices = tmpVec;
1548   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1549                                                 .size(),indices);
1550   for(int i=0;i<indices.size();i++){
1551     int newIdx=indices[i];
1552     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1553   }
1554   delete msg;
1555 }
1556
1557 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1558   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1559   if (graphComm.size()<=idx) {
1560     graphComm.resize(idx+1);
1561     graphComm.length()=idx+1;
1562   }
1563   graphComm[idx]=new ampiCommStruct(s);
1564   thread->resume(); //Matches suspend at end of ampi::graphCreate
1565 }
1566
1567 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1568   if(thisIndex==root) { // not everybody gets the valid rvec
1569     tmpVec = rvec;
1570   }
1571   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1572   MPI_Comm nextinter = parent->getNextInter();
1573   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1574
1575   thread->suspend(); //Resumed by ampiParent::interChildRegister
1576   MPI_Comm newcomm=parent->getNextInter()-1;
1577   *ncomm=newcomm;
1578 }
1579
1580 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1581   MPI_Comm *nextInterComm = (int *)msg->getData();
1582
1583   groupStruct lgroup = myComm.getIndices();
1584   CkArrayOptions opts;
1585   opts.bindTo(parentProxy);
1586   opts.setNumInitial(0);
1587   CkArrayID unusedAID;
1588   ampiCommStruct unusedComm;
1589   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1590   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1591
1592   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1593   for(int i=0;i<lgroup.size();i++){
1594     int newIdx=lgroup[i];
1595     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1596   }
1597
1598   parentProxy[0].ExchangeProxy(newAmpi);
1599   delete msg;
1600 }
1601
1602 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1603   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1604   if (interComm.size()<=idx) interComm.resize(idx+1);
1605   interComm[idx]=new ampiCommStruct(s);
1606   //thread->resume(); // don't resume it yet, till parent set remote proxy
1607 }
1608
1609 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1610   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1611     groupStruct lvec = myComm.getIndices();
1612     groupStruct rvec = myComm.getRemoteIndices();
1613     int rsize = rvec.size();
1614     tmpVec = lvec;
1615     for(int i=0;i<rsize;i++)
1616       tmpVec.push_back(rvec[i]);
1617     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1618   }else{
1619     tmpVec.resize(0);
1620   }
1621
1622   int rootIdx=myComm.getIndexForRank(0);
1623   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1624   MPI_Comm nextintra = parent->getNextIntra();
1625   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1626
1627   thread->suspend(); //Resumed by ampiParent::interChildRegister
1628   MPI_Comm newcomm=parent->getNextIntra()-1;
1629   *ncomm=newcomm;
1630 }
1631
1632 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1633   if(tmpVec.size()==0) { delete msg; return; }
1634   MPI_Comm *nextIntraComm = (int *)msg->getData();
1635   CkArrayOptions opts;
1636   opts.bindTo(parentProxy);
1637   opts.setNumInitial(0);
1638   CkArrayID unusedAID;
1639   ampiCommStruct unusedComm;
1640   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1641   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1642
1643   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1644   for(int i=0;i<tmpVec.size();i++){
1645     int newIdx=tmpVec[i];
1646     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1647   }
1648   delete msg;
1649 }
1650
1651 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1652   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1653   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1654   intraComm[idx]=new ampiCommStruct(s);
1655   thread->resume(); //Matches suspend at end of ampi::split
1656 }
1657
1658 //------------------------ communication -----------------------
1659 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1660 {
1661   if (universeNo>MPI_COMM_WORLD) {
1662     int worldDex=universeNo-MPI_COMM_WORLD-1;
1663     if (worldDex>=_mpi_nworlds)
1664       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1665     return mpi_worlds[worldDex].comm;
1666   }
1667   CkAbort("Bad communicator passed to universeComm2CommStruct");
1668   return mpi_worlds[0].comm; // meaningless return
1669 }
1670
1671 void ampi::block(void){
1672         thread->suspend();
1673 }
1674
1675 void ampi::yield(void){
1676         thread->schedule();
1677 }
1678
1679 void ampi::unblock(void){
1680         thread->resume();
1681 }
1682
1683 void ampi::ssend_ack(int sreq_idx){
1684         if (sreq_idx == 1)
1685           thread->resume();           // MPI_Ssend
1686         else {
1687           sreq_idx -= 2;              // start from 2
1688           AmpiRequestList *reqs = &(parent->ampiReqs);
1689           SReq *sreq = (SReq *)(*reqs)[sreq_idx];
1690           sreq->statusIreq = true;
1691           if (resumeOnRecv) {
1692              thread->resume();
1693           }
1694         }
1695 }
1696
1697 void
1698 ampi::generic(AmpiMsg* msg)
1699 {
1700 MSG_ORDER_DEBUG(
1701         CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1702         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1703 )
1704 #if CMK_BLUEGENE_CHARM
1705   TRACE_BG_ADD_TAG("AMPI_generic");
1706   msg->event = NULL;
1707 #endif
1708
1709   int sync = UsrToEnv(msg)->getRef();
1710   int srcIdx;
1711   if (sync)  srcIdx = msg->srcIdx;
1712
1713 //      AmpiMsg *msgcopy = msg;
1714   if(msg->seq != -1) {
1715     int srcIdx=msg->srcIdx;
1716     int n=oorder.put(srcIdx,msg);
1717     if (n>0) { // This message was in-order
1718       inorder(msg);
1719       if (n>1) { // It enables other, previously out-of-order messages
1720         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1721           inorder(msg);
1722         }
1723       }
1724     }
1725   } else { //Cross-world or system messages are unordered
1726     inorder(msg);
1727   }
1728   
1729     // msg may be free'ed from calling inorder()
1730   if (sync>0) {         // send an ack to sender
1731     CProxy_ampi pa(thisArrayID);
1732     pa[srcIdx].ssend_ack(sync);
1733   }
1734
1735   if(resumeOnRecv){
1736     //CkPrintf("Calling TCharm::resume at ampi::generic!\n");
1737     thread->resume();
1738   }
1739 }
1740
1741 inline static AmpiRequestList *getReqs(void); 
1742
1743 void
1744 ampi::inorder(AmpiMsg* msg)
1745 {
1746 MSG_ORDER_DEBUG(
1747   CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1748         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1749 )
1750   // check posted recvs
1751   int tags[3], sts[3];
1752   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1753   IReq *ireq = NULL;
1754   if (CpvAccess(CmiPICMethod) != 2) {
1755 #if 0
1756     //IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1757     ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1758 #else
1759 #if CMK_BLUEGENE_CHARM
1760     _TRACE_BG_TLINE_END(&msg->event);    // store current log
1761     msg->eventPe = CmiMyPe();
1762 #endif
1763     //in case ampi has not initialized and posted_ireqs are only inserted 
1764     //at AMPI_Irecv (MPI_Irecv)
1765     AmpiRequestList *reqL = &(parent->ampiReqs);
1766     //When storing the req index, it's 1-based. The reason is stated in the comments
1767     //in AMPI_Irecv function.
1768     int ireqIdx = (int)((long)CmmGet(posted_ireqs, 3, tags, sts));
1769     if(reqL->size()>0 && ireqIdx>0)
1770         ireq = (IReq *)(*reqL)[ireqIdx-1];
1771     //CkPrintf("[%d] ampi::inorder, ireqIdx=%d\n", thisIndex, ireqIdx);
1772 #endif
1773     //CkPrintf("[%d] ampi::inorder, ireq=%p\n", thisIndex, ireq);
1774     if (ireq) { // receive posted
1775       ireq->receive(this, msg);
1776       // Isaac changed this so that the IReq stores the tag when receiving the message, 
1777       // instead of using this user supplied tag which could be MPI_ANY_TAG
1778       // Formerly the following line was not commented out:
1779       //ireq->tag = sts[0];         
1780       //ireq->src = sts[1];
1781       //ireq->comm = sts[2];
1782     } else {
1783       CmmPut(msgs, 3, tags, msg);
1784     }
1785   }
1786   else
1787       CmmPut(msgs, 3, tags, msg);
1788 }
1789
1790 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
1791 {
1792     int tags[3];
1793     tags[0] = t; tags[1] = s; tags[2] = comm;
1794     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1795     return msg;
1796 }
1797
1798 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1799         int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm, int sync)
1800 {
1801   CkDDT_DataType *ddt = getDDT()->getType(type);
1802   int len = ddt->getSize(count);
1803   int sIdx=thisIndex;
1804   int seq = -1;
1805   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_TAG_UB_VALUE) //Not cross-module: set seqno
1806   seq = oorder.nextOutgoing(destIdx);
1807   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1808   if (sync) UsrToEnv(msg)->setRef(sync);
1809   TCharm::activateVariable(buf);
1810   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1811   TCharm::deactivateVariable(buf);
1812   return msg;
1813 }
1814
1815 void
1816 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1817 {
1818   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1819 }
1820
1821 void
1822 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, int sync)
1823 {
1824 #if CMK_TRACE_IN_CHARM
1825    TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
1826 #endif
1827
1828 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1829   MPI_Comm disComm = myComm.getComm();
1830   ampi *dis = getAmpiInstance(disComm);
1831   CpvAccess(_currentObj) = dis;
1832 #endif
1833
1834   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1835   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),sync);
1836
1837 #if CMK_TRACE_IN_CHARM
1838    TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
1839 #endif
1840
1841   if (sync == 1) {
1842     // waiting for receiver side
1843     resumeOnRecv = false;            // so no one else awakes it
1844     block();
1845   }
1846 }
1847
1848 void
1849 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1850 {
1851   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
1852   memcpy(msg->data, buf, len);
1853   CProxy_ampi pa(aid);
1854   pa[idx].generic(msg);
1855 }
1856
1857 void
1858 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy, int sync)
1859 {
1860   if(rank==MPI_PROC_NULL) return;
1861   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1862   int destIdx = dest.getIndexForRank(rank);
1863   if(isInter()){
1864     sRank = parent->thisIndex;
1865     destcomm = MPI_COMM_FIRST_INTER;
1866     destIdx = dest.getIndexForRemoteRank(rank);
1867     arrproxy = remoteProxy;
1868   }
1869  MSG_ORDER_DEBUG(
1870   CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
1871  )
1872
1873   arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm,sync));
1874
1875 #if 0
1876 #if CMK_TRACE_ENABLED
1877   int size=0;
1878   MPI_Type_size(type,&size);
1879   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
1880 #endif
1881 #endif
1882 }
1883
1884 int
1885 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
1886 {
1887   CkDDT_DataType *ddt = getDDT()->getType(type);
1888   int len = ddt->getSize(count);
1889   
1890   if (msg->length > len && msg->length-len!=sizeof(AmpiOpHeader))
1891   { /* Received more data than we were expecting */
1892     char einfo[1024];
1893     sprintf(einfo, "FATAL ERROR in rank %d MPI_Recv (tag=%d, source=%d)\n"
1894         "  Expecting only %d bytes (%d items of type %d), \n"
1895         "  but received %d bytes from rank %d\nAMPI> MPI_Send was longer than matching MPI_Recv.",
1896             thisIndex,t,s,
1897             len, count, type,
1898             msg->length, msg->srcRank);
1899     CkAbort(einfo);
1900     return -1;
1901   }else if(msg->length < len){ // only at rare case shall we reset count by using divide
1902     count = msg->length/(ddt->getSize(1));
1903   }
1904   //
1905   TCharm::activateVariable(buf);
1906   if (msg->length-len==sizeof(AmpiOpHeader)) {  // reduction msg
1907     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
1908   } else {
1909     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
1910   }
1911   TCharm::deactivateVariable(buf);
1912   return 0;
1913 }
1914
1915 int
1916 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
1917 {
1918   MPI_Comm disComm = myComm.getComm();
1919   if(s==MPI_PROC_NULL) {
1920     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
1921     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
1922     ((MPI_Status *)sts)->MPI_LENGTH = 0;
1923     return 0;
1924   }
1925   _LOG_E_END_AMPI_PROCESSING(thisIndex)
1926 #if CMK_BLUEGENE_CHARM
1927   void *curLog;         // store current log in timeline
1928   _TRACE_BG_TLINE_END(&curLog);
1929 //  TRACE_BG_AMPI_SUSPEND();
1930 #if CMK_TRACE_IN_CHARM
1931   if(CpvAccess(traceOn)) traceSuspend();
1932 #endif
1933 #endif
1934
1935   if(isInter()){
1936     s = myComm.getIndexForRemoteRank(s);
1937     comm = MPI_COMM_FIRST_INTER;
1938   }
1939
1940   int tags[3];
1941   AmpiMsg *msg = 0;
1942   
1943  MSG_ORDER_DEBUG(
1944   CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
1945  )
1946
1947   resumeOnRecv=true;
1948   ampi *dis = getAmpiInstance(disComm);
1949 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1950 //  dis->yield();
1951 //  processRemoteMlogMessages();
1952 #endif
1953   int dosuspend = 0;
1954   while(1) {
1955       //This is done to take into account the case in which an ampi 
1956       // thread has migrated while waiting for a message
1957     tags[0] = t; tags[1] = s; tags[2] = comm;
1958     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
1959     if (msg) break;
1960     dis->thread->suspend();
1961     dosuspend = 1;
1962     dis = getAmpiInstance(disComm);
1963   }
1964
1965 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1966         CpvAccess(_currentObj) = dis;
1967         MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
1968 #endif
1969
1970   dis->resumeOnRecv=false;
1971
1972   if(sts)
1973     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1974   int status = dis->processMessage(msg, t, s, buf, count, type);
1975   if (status != 0) return status;
1976
1977   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
1978
1979 #if CMK_BLUEGENE_CHARM
1980 #if CMK_TRACE_IN_CHARM
1981   //if(CpvAccess(traceOn)) CthTraceResume(thread->getThread());
1982   //Due to the reason mentioned the in the while loop above, we need to 
1983   //use "dis" as "this" in the case of migration (or out-of-core execution in BigSim)
1984   if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
1985 #endif
1986   //TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", &curLog, 1);
1987   //TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0);
1988   //_TRACE_BG_SET_INFO((char *)msg, "RECV_RESUME",  &curLog, 1);
1989 #if 0
1990 #if 1
1991   if (!dosuspend) {
1992     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 1);
1993     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
1994   }
1995   else
1996 #endif
1997   TRACE_BG_ADD_TAG("RECV_RESUME_THREAD");
1998 #else
1999     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
2000     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2001 #endif
2002 #endif
2003
2004   delete msg;
2005   return 0;
2006 }
2007
2008 void
2009 ampi::probe(int t, int s, int comm, int *sts)
2010 {
2011   int tags[3];
2012 #if CMK_BLUEGENE_CHARM
2013   void *curLog;         // store current log in timeline
2014   _TRACE_BG_TLINE_END(&curLog);
2015 //  TRACE_BG_AMPI_SUSPEND();
2016 #endif
2017
2018   AmpiMsg *msg = 0;
2019   resumeOnRecv=true;
2020   while(1) {
2021     tags[0] = t; tags[1] = s; tags[2] = comm;
2022     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2023     if (msg) break;
2024     thread->suspend();
2025   }
2026   resumeOnRecv=false;
2027   if(sts)
2028     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2029 #if CMK_BLUEGENE_CHARM
2030 //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
2031   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
2032 #endif
2033 }
2034
2035 int
2036 ampi::iprobe(int t, int s, int comm, int *sts)
2037 {
2038   int tags[3];
2039   AmpiMsg *msg = 0;
2040   tags[0] = t; tags[1] = s; tags[2] = comm;
2041   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2042   if (msg) {
2043     if(sts)
2044       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2045     return 1;
2046   }
2047 #if CMK_BLUEGENE_CHARM
2048   void *curLog;         // store current log in timeline
2049   _TRACE_BG_TLINE_END(&curLog);
2050 //  TRACE_BG_AMPI_SUSPEND();
2051 #endif
2052   thread->schedule();
2053 #if CMK_BLUEGENE_CHARM
2054   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("IPROBE_RESUME", &curLog);
2055   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
2056 #endif
2057   return 0;
2058 }
2059
2060
2061 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
2062 void
2063 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
2064 {
2065   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2066   int rootIdx=dest.getIndexForRank(root);
2067   if(rootIdx==thisIndex) {
2068 #if 0//AMPI_COMLIB
2069     ciBcast.beginIteration();
2070     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2071 #else
2072 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2073         CpvAccess(_currentObj) = this;
2074 #endif
2075     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2076 #endif
2077   }
2078   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
2079   nbcasts++;
2080 }
2081
2082 void
2083 ampi::bcastraw(void* buf, int len, CkArrayID aid)
2084 {
2085   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, 0);
2086   memcpy(msg->data, buf, len);
2087   CProxy_ampi pa(aid);
2088   pa.generic(msg);
2089 }
2090
2091
2092 AmpiMsg* 
2093 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
2094 {
2095   CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
2096   int unit;
2097   CkDDT_DataType *ddt = getDDT()->getType(type);
2098   unit = ddt->getSize(1);
2099   int totalsize = unit*cnt;
2100
2101   AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
2102   char* addr = (char*)Alltoallbuff+disp*unit;
2103   ddt->serialize((char*)msg->data, addr, cnt, (-1));
2104   return msg;
2105 }
2106
2107 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
2108                         void *attr_in, void *attr_out, int *flag){
2109   (*flag) = 0;
2110   return (MPI_SUCCESS);
2111 }
2112 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
2113                         void *attr_in, void *attr_out, int *flag){
2114   (*(void **)attr_out) = attr_in;
2115   (*flag) = 1;
2116   return (MPI_SUCCESS);
2117 }
2118 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
2119   return (MPI_SUCCESS);
2120 }
2121
2122
2123 void AmpiSeqQ::init(int numP) 
2124 {
2125   elements.init(numP);
2126 }
2127
2128 AmpiSeqQ::~AmpiSeqQ () {
2129 }
2130   
2131 void AmpiSeqQ::pup(PUP::er &p) {
2132   p|out;
2133   p|elements;
2134 }
2135
2136 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
2137 {
2138   AmpiOtherElement &el=elements[srcIdx];
2139 #ifndef CMK_OPTIMIZE
2140   if (msg->seq<el.seqIncoming)
2141     CkAbort("AMPI Logic error: received late out-of-order message!\n");
2142 #endif
2143   out.enq(msg);
2144   el.nOut++; // We have another message in the out-of-order queue
2145 }
2146
2147 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
2148 {
2149   AmpiOtherElement &el=elements[srcIdx];
2150   if (el.nOut==0) return 0; // No more out-of-order left.
2151   // Walk through our out-of-order queue, searching for our next message:
2152   for (int i=0;i<out.length();i++) {
2153     AmpiMsg *msg=out.deq();
2154     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
2155       el.seqIncoming++;
2156       el.nOut--; // We have one less message out-of-order
2157       return msg;
2158     }
2159     else
2160       out.enq(msg);
2161   }
2162   // We walked the whole queue-- ours is not there.
2163   return 0;
2164 }
2165
2166 //BIGSIM_OOC DEBUGGING: Output for AmpiRequest and its children classes
2167 void AmpiRequest::print(){
2168             CmiPrintf("In AmpiRequest: buf=%p, count=%d, type=%d, src=%d, tag=%d, comm=%d, isvalid=%d\n", buf, count, type, src, tag, comm, isvalid);
2169 }
2170
2171 void PersReq::print(){
2172     AmpiRequest::print();
2173     CmiPrintf("In PersReq: sndrcv=%d\n", sndrcv);
2174 }
2175
2176 void IReq::print(){
2177     AmpiRequest::print();
2178     CmiPrintf("In IReq: this=%p, status=%d, length=%d\n", this, statusIreq, length);
2179 }
2180
2181 void ATAReq::print(){ //not complete for myreqs
2182     AmpiRequest::print();
2183     CmiPrintf("In ATAReq: elmcount=%d, idx=%d\n", elmcount, idx);
2184
2185
2186 void SReq::print(){
2187     AmpiRequest::print();
2188     CmiPrintf("In SReq: this=%p, status=%d\n", this, statusIreq);
2189 }
2190
2191 void AmpiRequestList::pup(PUP::er &p) { 
2192         if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
2193                 return;
2194         }
2195
2196         p(blklen); //Allocated size of block
2197         p(len); //Number of used elements in block
2198         if(p.isUnpacking()){
2199                 makeBlock(blklen,len);
2200         }
2201         int count=0;
2202         for(int i=0;i<len;i++){
2203                 char nonnull;
2204                 if(!p.isUnpacking()){
2205                         if(block[i] == NULL){
2206                                 nonnull = 0;
2207                         }else{
2208                                 nonnull = block[i]->getType();
2209                         }
2210                 }       
2211                 p(nonnull);
2212                 if(nonnull != 0){
2213                         if(p.isUnpacking()){
2214                                 switch(nonnull){
2215                                         case 1:
2216                                                 block[i] = new PersReq;
2217                                                 break;
2218                                         case 2: 
2219                                                 block[i] = new IReq;
2220                                                 break;
2221                                         case 3: 
2222                                                 block[i] = new ATAReq;
2223                                                 break;
2224                                 }
2225                         }       
2226                         block[i]->pup(p);
2227                         count++;
2228                 }else{
2229                         block[i] = 0;
2230                 }
2231         }
2232         if(p.isDeleting()){
2233                 freeBlock();
2234         }
2235 }
2236
2237 //------------------ External Interface -----------------
2238 ampiParent *getAmpiParent(void) {
2239   ampiParent *p = CtvAccess(ampiPtr);
2240 #ifndef CMK_OPTIMIZE
2241   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
2242 #endif
2243   return p;
2244 }
2245
2246 ampi *getAmpiInstance(MPI_Comm comm) {
2247   ampi *ptr=getAmpiParent()->comm2ampi(comm);
2248 #ifndef CMK_OPTIMIZE
2249   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
2250 #endif
2251   return ptr;
2252 }
2253
2254 inline static AmpiRequestList *getReqs(void) {
2255   return &(getAmpiParent()->ampiReqs);
2256 }
2257
2258 inline void checkComm(MPI_Comm comm){
2259 #ifndef CMK_OPTIMIZE
2260   getAmpiParent()->checkComm(comm);
2261 #endif
2262 }
2263
2264 inline void checkRequest(MPI_Request req){
2265 #ifndef CMK_OPTIMIZE
2266   getReqs()->checkRequest(req);
2267 #endif
2268 }
2269
2270 inline void checkRequests(int n, MPI_Request* reqs){
2271 #ifndef CMK_OPTIMIZE
2272   AmpiRequestList* reqlist = getReqs();
2273   for(int i=0;i<n;i++)
2274     reqlist->checkRequest(reqs[i]);
2275 #endif
2276 }
2277
2278 CDECL void AMPI_Migrate(void)
2279 {
2280 //  AMPIAPI("AMPI_Migrate");
2281 #if 0
2282 #if CMK_BLUEGENE_CHARM
2283   TRACE_BG_AMPI_SUSPEND();
2284 #endif
2285 #endif
2286   TCHARM_Migrate();
2287
2288 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2289     ampi *currentAmpi = getAmpiInstance(MPI_COMM_WORLD);
2290     CpvAccess(_currentObj) = currentAmpi;
2291 #endif
2292
2293 #if CMK_BLUEGENE_CHARM
2294 //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2295   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
2296 #endif
2297 }
2298
2299
2300 CDECL void AMPI_Evacuate(void)
2301 {
2302   TCHARM_Evacuate();
2303 }
2304
2305
2306
2307 CDECL void AMPI_Migrateto(int destPE)
2308 {
2309   AMPIAPI("AMPI_MigrateTo");
2310 #if 0
2311 #if CMK_BLUEGENE_CHARM
2312   TRACE_BG_AMPI_SUSPEND();
2313 #endif
2314 #endif
2315   TCHARM_Migrate_to(destPE);
2316 #if CMK_BLUEGENE_CHARM
2317   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
2318   TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
2319 #endif
2320 }
2321
2322 CDECL void AMPI_MigrateTo(int destPE)
2323 {
2324         AMPI_Migrateto(destPE);
2325 }
2326
2327 CDECL void AMPI_Async_Migrate(void)
2328 {
2329   AMPIAPI("AMPI_Async_Migrate");
2330 #if 0
2331 #if CMK_BLUEGENE_CHARM
2332   TRACE_BG_AMPI_SUSPEND();
2333 #endif
2334 #endif
2335   TCHARM_Async_Migrate();
2336 #if CMK_BLUEGENE_CHARM
2337   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2338   TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
2339 #endif
2340 }
2341
2342 CDECL void AMPI_Allow_Migrate(void)
2343 {
2344   AMPIAPI("AMPI_Allow_Migrate");
2345 #if 0
2346 #if CMK_BLUEGENE_CHARM
2347   TRACE_BG_AMPI_SUSPEND();
2348 #endif
2349 #endif
2350   TCHARM_Allow_Migrate();
2351 #if CMK_BLUEGENE_CHARM
2352   TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
2353 #endif
2354 }
2355
2356 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
2357 #if CMK_LBDB_ON
2358   //AMPIAPI("AMPI_Setmigratable");
2359   ampi *ptr=getAmpiInstance(comm);
2360   ptr->setMigratable(mig);
2361 #else
2362   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
2363 #endif
2364 }
2365
2366 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
2367 {
2368     //AMPIAPI("AMPI_Init");
2369   if (nodeinit_has_been_called) {
2370     AMPIAPI("AMPI_Init");
2371     char **argv;
2372     if (p_argv) argv=*p_argv;
2373     else argv=CkGetArgv();
2374     ampiInit(argv);
2375     if (p_argc) *p_argc=CmiGetArgc(argv);
2376   }
2377   else
2378   { /* Charm hasn't been started yet! */
2379     CkAbort("AMPI_Init> Charm is not initialized!");
2380   }
2381
2382   return 0;
2383 }
2384
2385 CDECL int AMPI_Initialized(int *isInit)
2386 {
2387   if (nodeinit_has_been_called) {
2388         AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2389         *isInit=CtvAccess(ampiInitDone);
2390   }
2391   else /* !nodeinit_has_been_called */ {
2392         *isInit=nodeinit_has_been_called;
2393   }
2394   return 0;
2395 }
2396
2397 CDECL int AMPI_Finalized(int *isFinalized)
2398 {
2399     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2400     *isFinalized=CtvAccess(ampiFinalized);
2401     return 0;
2402 }
2403
2404 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
2405 {
2406   //AMPIAPI("AMPI_Comm_rank");
2407
2408 #ifdef AMPIMSGLOG
2409   ampiParent* pptr = getAmpiParent();
2410   if(msgLogRead){
2411     PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
2412     return 0;
2413   }
2414 #endif
2415
2416   *rank = getAmpiInstance(comm)->getRank(comm);
2417
2418 #ifdef AMPIMSGLOG
2419   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2420     PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
2421   }
2422 #endif
2423   return 0;
2424 }
2425
2426 CDECL
2427 int AMPI_Comm_size(MPI_Comm comm, int *size)
2428 {
2429   //AMPIAPI("AMPI_Comm_size");
2430 #ifdef AMPIMSGLOG
2431   ampiParent* pptr = getAmpiParent();
2432   if(msgLogRead){
2433     PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
2434     return 0;
2435   }
2436 #endif
2437
2438   *size = getAmpiInstance(comm)->getSize(comm);
2439
2440 #ifdef AMPIMSGLOG
2441   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2442     PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
2443   }
2444 #endif
2445
2446   return 0;
2447 }
2448
2449 CDECL
2450 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
2451 {
2452   AMPIAPI("AMPI_Comm_compare");
2453   if(comm1==comm2) *result=MPI_IDENT;
2454   else{
2455     int equal=1;
2456     CkVec<int> ind1, ind2;
2457     ind1 = getAmpiInstance(comm1)->getIndices();
2458     ind2 = getAmpiInstance(comm2)->getIndices();
2459     if(ind1.size()==ind2.size()){
2460       for(int i=0;i<ind1.size();i++)
2461         if(ind1[i] != ind2[i]) { equal=0; break; }
2462     }
2463     if(equal==1) *result=MPI_CONGRUENT;
2464     else *result=MPI_UNEQUAL;
2465   }
2466   return 0;
2467 }
2468
2469 CDECL void AMPI_Exit(int /*exitCode*/)
2470 {
2471   AMPIAPI("AMPI_Exit");
2472   finalizeBigSimTrace();
2473   TCHARM_Done();
2474 }
2475 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
2476 {
2477   AMPI_Exit(*exitCode);
2478 }
2479
2480 CDECL
2481 int AMPI_Finalize(void)
2482 {
2483   AMPIAPI("AMPI_Finalize");
2484 #if PRINT_IDLE
2485   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
2486 #endif
2487 #if AMPI_COUNTER
2488     getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
2489 #endif
2490   CtvAccess(ampiFinalized)=1;
2491
2492 #if CMK_BLUEGENE_CHARM
2493 #if 0
2494   TRACE_BG_AMPI_SUSPEND();
2495 #endif
2496 #if CMK_TRACE_IN_CHARM
2497   if(CpvAccess(traceOn)) traceSuspend();
2498 #endif
2499 #endif
2500
2501 //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
2502   AMPI_Exit(0);
2503   return 0;
2504 }
2505
2506 CDECL
2507 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
2508                         int tag, MPI_Comm comm)
2509 {
2510   AMPIAPI("AMPI_Send");
2511 #ifdef AMPIMSGLOG
2512   if(msgLogRead){
2513     return 0;
2514   }
2515 #endif
2516
2517   ampi *ptr = getAmpiInstance(comm);
2518 #if AMPI_COMLIB
2519   if(enableStreaming){  
2520 //    ptr->getStreaming().beginIteration();
2521     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2522   } else
2523 #endif
2524     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
2525 #if AMPI_COUNTER
2526   getAmpiParent()->counters.send++;
2527 #endif
2528 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2529 //  ptr->yield();
2530 //  //  processRemoteMlogMessages();
2531 #endif
2532   return 0;
2533 }
2534
2535 CDECL
2536 int AMPI_Ssend(void *msg, int count, MPI_Datatype type, int dest,
2537                         int tag, MPI_Comm comm)
2538 {
2539   AMPIAPI("AMPI_Ssend");
2540 #ifdef AMPIMSGLOG
2541   if(msgLogRead){
2542     return 0;
2543   }
2544 #endif
2545
2546   ampi *ptr = getAmpiInstance(comm);
2547 #if AMPI_COMLIB
2548   if(enableStreaming){
2549     ptr->getStreaming().beginIteration();
2550     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2551   } else
2552 #endif
2553     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm, 1);
2554 #if AMPI_COUNTER
2555   getAmpiParent()->counters.send++;
2556 #endif
2557
2558   return 0;
2559 }
2560
2561 CDECL
2562 int AMPI_Issend(void *buf, int count, MPI_Datatype type, int dest,
2563               int tag, MPI_Comm comm, MPI_Request *request)
2564 {
2565   AMPIAPI("AMPI_Issend");
2566
2567 #ifdef AMPIMSGLOG
2568   ampiParent* pptr = getAmpiParent();
2569   if(msgLogRead){
2570     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
2571     return 0;
2572   }
2573 #endif
2574
2575   USER_CALL_DEBUG("AMPI_Issend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
2576   ampi *ptr = getAmpiInstance(comm);
2577   AmpiRequestList* reqs = getReqs();
2578   SReq *newreq = new SReq(comm);
2579   *request = reqs->insert(newreq);
2580     // 1:  blocking now  - used by MPI_Ssend
2581     // >=2:  the index of the requests - used by MPI_Issend
2582   ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm, *request+2);
2583 #if AMPI_COUNTER
2584   getAmpiParent()->counters.isend++;
2585 #endif
2586
2587 #ifdef AMPIMSGLOG
2588   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2589     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
2590   }
2591 #endif
2592
2593   return 0;
2594 }
2595
2596 CDECL
2597 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
2598               MPI_Comm comm, MPI_Status *status)
2599 {
2600   AMPIAPI("AMPI_Recv");
2601
2602 #ifdef AMPIMSGLOG
2603   ampiParent* pptr = getAmpiParent();
2604   if(msgLogRead){
2605     (*(pptr->fromPUPer))|(pptr->pupBytes);
2606     PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
2607     PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
2608     return 0;
2609   }
2610 #endif
2611
2612   ampi *ptr = getAmpiInstance(comm);
2613   if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
2614   
2615 #if AMPI_COUNTER
2616   getAmpiParent()->counters.recv++;
2617 #endif
2618  
2619 #ifdef AMPIMSGLOG
2620   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2621     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2622     (*(pptr->toPUPer))|(pptr->pupBytes);
2623     PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
2624     PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
2625   }
2626 #endif
2627   
2628   return 0;
2629 }
2630
2631 CDECL
2632 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
2633 {
2634   AMPIAPI("AMPI_Probe");
2635   ampi *ptr = getAmpiInstance(comm);
2636   ptr->probe(tag,src, comm, (int*) status);
2637   return 0;
2638 }
2639
2640 CDECL
2641 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
2642 {
2643   AMPIAPI("AMPI_Iprobe");
2644   ampi *ptr = getAmpiInstance(comm);
2645   *flag = ptr->iprobe(tag,src,comm,(int*) status);
2646   return 0;
2647 }
2648
2649 CDECL
2650 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
2651                   int stag, void *rbuf, int rcount, int rtype,
2652                   int src, int rtag, MPI_Comm comm, MPI_Status *sts)
2653 {
2654   AMPIAPI("AMPI_Sendrecv");
2655   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
2656   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
2657   if (se) return se;
2658   else return re;
2659 }
2660
2661 CDECL
2662 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
2663                          int dest, int sendtag, int source, int recvtag,
2664                          MPI_Comm comm, MPI_Status *status)
2665 {
2666   AMPIAPI("AMPI_Sendrecv_replace");
2667   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
2668                       buf, count, datatype, source, recvtag, comm, status);
2669 }
2670
2671
2672 CDECL
2673 int AMPI_Barrier(MPI_Comm comm)
2674 {
2675   AMPIAPI("AMPI_Barrier");
2676
2677   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
2678
2679   TRACE_BG_AMPI_LOG(1, 0);
2680
2681   //HACK: Use collective operation as a barrier.
2682   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
2683
2684   //BIGSIM_OOC DEBUGGING
2685   //CkPrintf("%d: in AMPI_Barrier, after AMPI_Allreduce\n", getAmpiParent()->thisIndex);
2686 #if AMPI_COUNTER
2687   getAmpiParent()->counters.barrier++;
2688 #endif
2689   return 0;
2690 }
2691
2692 CDECL
2693 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
2694                          MPI_Comm comm)
2695 {
2696   AMPIAPI("AMPI_Bcast");
2697   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
2698   if(comm==MPI_COMM_SELF) return 0;
2699
2700 #ifdef AMPIMSGLOG
2701   ampiParent* pptr = getAmpiParent();
2702   if(msgLogRead){
2703     (*(pptr->fromPUPer))|(pptr->pupBytes);
2704     PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
2705     return 0;
2706   }
2707 #endif
2708
2709   ampi* ptr = getAmpiInstance(comm);
2710   ptr->bcast(root, buf, count, type,comm);
2711 #if AMPI_COUNTER
2712   getAmpiParent()->counters.bcast++;
2713 #endif
2714
2715 #ifdef AMPIMSGLOG
2716   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2717     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2718     (*(pptr->toPUPer))|(pptr->pupBytes);
2719     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
2720   }
2721 #endif
2722 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2723 //  ptr->yield();
2724 //  //  processRemoteMlogMessages();
2725 #endif
2726
2727   return 0;
2728 }
2729
2730 /// This routine is called with the results of a Reduce or AllReduce
2731 const int MPI_REDUCE_SOURCE=0;
2732 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2733 void ampi::reduceResult(CkReductionMsg *msg)
2734 {
2735         MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2736   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2737              thisArrayID,thisIndex);
2738   delete msg;
2739 }
2740
2741 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2742 {
2743   int szdata = ddt->getSize(count);
2744   int szhdr = sizeof(AmpiOpHeader);
2745   AmpiOpHeader newhdr(op,type,count,szdata); 
2746   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2747   memcpy(msg->getData(),&newhdr,szhdr);
2748   TCharm::activateVariable(inbuf);
2749   ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2750   TCharm::deactivateVariable(inbuf);
2751   return msg;
2752 }
2753
2754 // Copy the MPI datatype "type" from inbuf to outbuf
2755 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2756   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2757   //  deserializing into the output.
2758   ampi *ptr = getAmpiInstance(comm);
2759   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2760   int len=ddt->getSize(count);
2761   char *serialized=new char[len];
2762   TCharm::activateVariable(inbuf);
2763   TCharm::activateVariable(outbuf);
2764   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
2765   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
2766   TCharm::deactivateVariable(outbuf);
2767   TCharm::deactivateVariable(inbuf);
2768   delete [] serialized;         // < memory leak!  // gzheng 
2769   
2770   return MPI_SUCCESS;
2771 }
2772
2773 CDECL
2774 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
2775                int root, MPI_Comm comm)
2776 {
2777   AMPIAPI("AMPI_Reduce");
2778   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2779
2780 #ifdef AMPIMSGLOG
2781   ampiParent* pptr = getAmpiParent();
2782   if(msgLogRead){
2783     (*(pptr->fromPUPer))|(pptr->pupBytes);
2784     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
2785     return 0;
2786   }
2787 #endif
2788   
2789   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
2790   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
2791   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
2792
2793   ampi *ptr = getAmpiInstance(comm);
2794   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
2795   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
2796   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
2797
2798   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2799
2800   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
2801   msg->setCallback(reduceCB);
2802         MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
2803   ptr->contribute(msg);
2804   if (ptr->thisIndex == rootIdx){
2805     /*HACK: Use recv() to block until reduction data comes back*/
2806     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2807       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
2808   }
2809 #if AMPI_COUNTER
2810   getAmpiParent()->counters.reduce++;
2811 #endif
2812
2813 #ifdef AMPIMSGLOG
2814   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2815     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2816     (*(pptr->toPUPer))|(pptr->pupBytes);
2817     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
2818   }
2819 #endif
2820   
2821   return 0;
2822 }
2823
2824 CDECL
2825 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
2826                   MPI_Op op, MPI_Comm comm)
2827 {
2828   AMPIAPI("AMPI_Allreduce");
2829   ampi *ptr = getAmpiInstance(comm);
2830  
2831   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
2832   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
2833   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
2834   
2835   CkDDT_DataType *ddt_type = ptr->getDDT()->getType(type);
2836
2837   TRACE_BG_AMPI_LOG(2, count * ddt_type->getSize());
2838
2839   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2840
2841 #ifdef AMPIMSGLOG
2842   ampiParent* pptr = getAmpiParent();
2843   if(msgLogRead){
2844     (*(pptr->fromPUPer))|(pptr->pupBytes);
2845     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
2846     //    CkExit();
2847     return 0;
2848   }
2849 #endif
2850
2851   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
2852   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
2853
2854   CkReductionMsg *msg=makeRednMsg(ddt_type, inbuf, count, type, op);
2855   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2856   msg->setCallback(allreduceCB);
2857   ptr->contribute(msg);
2858
2859   /*HACK: Use recv() to block until the reduction data comes back*/
2860   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2861     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
2862 #if AMPI_COUNTER
2863   getAmpiParent()->counters.allreduce++;
2864 #endif
2865
2866 #ifdef AMPIMSGLOG
2867   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2868     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2869     (*(pptr->toPUPer))|(pptr->pupBytes);
2870     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
2871     //    CkExit();
2872   }
2873 #endif
2874
2875   return 0;
2876 }
2877
2878 CDECL
2879 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
2880                    MPI_Op op, MPI_Comm comm, MPI_Request* request)
2881 {
2882   AMPIAPI("AMPI_Iallreduce");
2883   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2884
2885   checkRequest(*request);
2886   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
2887   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
2888   ampi *ptr = getAmpiInstance(comm);
2889
2890   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2891   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2892   msg->setCallback(allreduceCB);
2893   ptr->contribute(msg);
2894
2895   // using irecv instead recv to non-block the call and get request pointer
2896   AmpiRequestList* reqs = getReqs();
2897   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
2898   *request = reqs->insert(newreq);
2899   return 0;
2900 }
2901
2902 CDECL
2903 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
2904                        MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
2905 {
2906   AMPIAPI("AMPI_Reduce_scatter");
2907   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
2908   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
2909   ampi *ptr = getAmpiInstance(comm);
2910   int size = ptr->getSize(comm);
2911   int count=0;
2912   int *displs = new int [size];
2913   int len;
2914   void *tmpbuf;
2915
2916   //under construction
2917   for(int i=0;i<size;i++){
2918     displs[i] = count;
2919     count+= recvcounts[i];
2920   }
2921   len = ptr->getDDT()->getType(datatype)->getSize(count);
2922   tmpbuf = malloc(len);
2923   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
2924   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
2925                recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
2926   free(tmpbuf);
2927   delete [] displs;     // < memory leak ! // gzheng
2928   return 0;
2929 }
2930
2931 /***** MPI_Scan algorithm (from MPICH) *******
2932    recvbuf = sendbuf;
2933    partial_scan = sendbuf;
2934    mask = 0x1;
2935    while (mask < size) {
2936       dst = rank^mask;
2937       if (dst < size) {
2938          send partial_scan to dst;
2939          recv from dst into tmp_buf;
2940          if (rank > dst) {
2941             partial_scan = tmp_buf + partial_scan;
2942             recvbuf = tmp_buf + recvbuf;
2943          }
2944          else {
2945             if (op is commutative)
2946                partial_scan = tmp_buf + partial_scan;
2947             else {
2948                tmp_buf = partial_scan + tmp_buf;
2949                partial_scan = tmp_buf;
2950             }
2951          }
2952       }
2953       mask <<= 1;
2954    }
2955  ***** MPI_Scan algorithm (from MPICH) *******/
2956
2957 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
2958   (op)(invec,inoutvec,&count,&datatype);
2959 }
2960 CDECL
2961 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
2962   AMPIAPI("AMPI_Scan");
2963   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
2964   MPI_Status sts;
2965   ampi *ptr = getAmpiInstance(comm);
2966   int size = ptr->getSize(comm);
2967   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
2968   int rank = ptr->getRank(comm);
2969   int mask = 0x1;
2970   int dst;
2971   void* tmp_buf = malloc(blklen);
2972   void* partial_scan = malloc(blklen);
2973   
2974   memcpy(recvbuf, sendbuf, blklen);
2975   memcpy(partial_scan, sendbuf, blklen);
2976   while(mask < size){
2977     dst = rank^mask;
2978     if(dst < size){
2979       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
2980                    tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
2981       if(rank > dst){
2982         (op)(tmp_buf,partial_scan,&count,&datatype);
2983         (op)(tmp_buf,recvbuf,&count,&datatype);
2984       }else {
2985         (op)(partial_scan,tmp_buf,&count,&datatype);
2986         memcpy(partial_scan,tmp_buf,blklen);
2987       }
2988     }
2989     mask <<= 1;
2990
2991   }
2992
2993   free(tmp_buf);
2994   free(partial_scan);
2995 #if AMPI_COUNTER
2996   getAmpiParent()->counters.scan++;
2997 #endif
2998   return 0;
2999 }
3000
3001 CDECL
3002 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
3003   //AMPIAPI("AMPI_Op_create");
3004   *op = function;
3005   return 0;
3006 }
3007
3008 CDECL
3009 int AMPI_Op_free(MPI_Op *op){
3010   //AMPIAPI("AMPI_Op_free");
3011   *op = MPI_OP_NULL;
3012   return 0;
3013 }
3014
3015
3016 CDECL
3017 double AMPI_Wtime(void)
3018 {
3019 //  AMPIAPI("AMPI_Wtime");
3020
3021 #ifdef AMPIMSGLOG
3022   double ret=TCHARM_Wall_timer();
3023   ampiParent* pptr = getAmpiParent();
3024   if(msgLogRead){
3025     (*(pptr->fromPUPer))|ret;
3026     return ret;
3027   }
3028
3029   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3030     (*(pptr->toPUPer))|ret;
3031   }
3032 #endif
3033
3034 #if CMK_BLUEGENE_CHARM
3035   return BgGetTime();
3036 #else
3037   return TCHARM_Wall_timer();
3038 #endif
3039 }
3040
3041 CDECL
3042 double AMPI_Wtick(void){
3043   //AMPIAPI("AMPI_Wtick");
3044   return 1e-6;
3045 }
3046
3047 int PersReq::start(){
3048   if(sndrcv == 1 || sndrcv == 3) { // send or ssend request
3049     ampi *ptr=getAmpiInstance(comm);
3050     ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm, sndrcv==3?1:0);
3051   }
3052   return 0;
3053 }
3054
3055 CDECL
3056 int AMPI_Start(MPI_Request *request)
3057 {
3058   AMPIAPI("AMPI_Start");
3059   checkRequest(*request);
3060   AmpiRequestList *reqs = getReqs();
3061   if(-1==(*reqs)[*request]->start()){
3062     CkAbort("MPI_Start could be used only on persistent communication requests!");
3063   }
3064   return 0;
3065 }
3066
3067 CDECL
3068 int AMPI_Startall(int count, MPI_Request *requests){
3069   AMPIAPI("AMPI_Startall");
3070   checkRequests(count,requests);
3071   AmpiRequestList *reqs = getReqs();
3072   for(int i=0;i<count;i++){
3073     if(-1==(*reqs)[requests[i]]->start())
3074       CkAbort("MPI_Start could be used only on persistent communication requests!");
3075   }
3076   return 0;
3077 }
3078
3079 /* organize the indices of requests into a vector of a vector: 
3080  * level 1 is different msg envelope matches
3081  * level 2 is (posting) ordered requests of with envelope
3082  * each time multiple completion call loop over first elem of level 1
3083  * and move the matched to the NULL request slot.   
3084  * warning: this does not work with I-Alltoall requests */
3085 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
3086   for(int i=0;i<count;i++){
3087     if(reqs[i]!=MPI_REQUEST_NULL)
3088       return 0;
3089   }
3090   return 1;
3091 }
3092 inline int matchReq(MPI_Request ia, MPI_Request ib){
3093   checkRequest(ia);  
3094   checkRequest(ib);
3095   AmpiRequestList* reqs = getReqs();
3096   AmpiRequest *a, *b;
3097   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
3098   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
3099   a=(*reqs)[ia];  b=(*reqs)[ib];
3100   if(a->tag != b->tag) return 0;
3101   if(a->src != b->src) return 0;
3102   if(a->comm != b->comm) return 0;
3103   return 1;
3104 }
3105 inline void swapInt(int& a,int& b){
3106   int tmp;
3107   tmp=a; a=b; b=tmp;
3108 }
3109 inline void sortedIndex(int n, int* arr, int* idx){
3110   int i,j;
3111   for(i=0;i<n;i++) 
3112     idx[i]=i;
3113   for (i=0; i<n-1; i++) 
3114     for (j=0; j<n-1-i; j++)
3115       if (arr[idx[j+1]] < arr[idx[j]]) 
3116         swapInt(idx[j+1],idx[j]);
3117 }
3118 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
3119   CkAssert(count!=0);
3120   int *newidx = new int [count];
3121   int flag;
3122   sortedIndex(count,arr,newidx);
3123   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
3124   CkVec<int> slot;
3125   vec->push_back(slot);
3126   (*vec)[0].push_back(newidx[0]);
3127   for(int i=1;i<count;i++){
3128     flag=0;
3129     for(int j=0;j<vec->size();j++){
3130       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
3131         ((*vec)[j]).push_back(newidx[i]);
3132         flag++;
3133       }
3134     }
3135     if(!flag){
3136       CkVec<int> newslot;
3137       newslot.push_back(newidx[i]);
3138       vec->push_back(newslot);
3139     }else{
3140       CkAssert(flag==1);
3141     }
3142   }
3143   delete [] newidx;
3144   return vec;
3145 }
3146 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
3147   printf("vec content: ");
3148   for(int i=0;i<vec.size();i++){
3149     printf("{");
3150     for(int j=0;j<(vec[i]).size();j++){
3151       printf(" %d ",arr[(vec[i])[j]]);
3152     }
3153     printf("} ");
3154   }
3155   printf("\n");
3156 }
3157
3158 int PersReq::wait(MPI_Status *sts){
3159         if(sndrcv == 2) {
3160                 if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3161                         CkAbort("AMPI> Error in persistent request wait");
3162 #if CMK_BLUEGENE_CHARM
3163                 _TRACE_BG_TLINE_END(&event);
3164 #endif
3165         }
3166         return 0;
3167 }
3168
3169 int IReq::wait(MPI_Status *sts){
3170     if(CpvAccess(CmiPICMethod) == 2) {
3171         AMPI_DEBUG("In weird clause of IReq::wait\n");
3172         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3173             CkAbort("AMPI> Error in non-blocking request wait");
3174
3175         return 0;
3176     }
3177
3178     //Copy "this" to a local variable in the case that "this" pointer
3179     //is updated during the out-of-core emulation.
3180
3181     // optimization for Irecv
3182     // generic() writes directly to the buffer, so the only thing we
3183     // do here is to wait
3184     ampi *ptr = getAmpiInstance(comm);
3185
3186     //BIGSIM_OOC DEBUGGING
3187     //int ooccnt=0;
3188     //int ampiIndex = ptr->thisIndex;
3189     //CmiPrintf("%d: IReq's status=%d\n", ampiIndex, statusIreq);
3190         
3191     while (statusIreq == false) {
3192         //BIGSIM_OOC DEBUGGING
3193         //CmiPrintf("Before blocking: %dth time: %d: in Ireq::wait\n", ++ooccnt, ptr->thisIndex);
3194         //print();
3195
3196         ptr->resumeOnRecv=true;
3197         ptr->block();
3198                         
3199         //BIGSIM_OOC DEBUGGING
3200         //CmiPrintf("[%d] After blocking: in Ireq::wait\n", ptr->thisIndex);
3201         //CmiPrintf("IReq's this pointer: %p\n", this);
3202         //print();
3203
3204 #if CMK_BLUEGENE_CHARM
3205         //Because of the out-of-core emulation, this pointer is changed after in-out
3206         //memory operation. So we need to return from this function and do the while loop
3207         //in the outer function call.   
3208         if(_BgInOutOfCoreMode)
3209             return -1;
3210 #endif  
3211     }   // end of while
3212     ptr->resumeOnRecv=false;
3213
3214     AMPI_DEBUG("IReq::wait has resumed\n");
3215
3216     if(sts) {
3217         AMPI_DEBUG("Setting sts->MPI_TAG to this->tag=%d in IReq::wait  this=%p\n", (int)this->tag, this);
3218         sts->MPI_TAG = tag;
3219         sts->MPI_SOURCE = src;
3220         sts->MPI_COMM = comm;
3221         sts->MPI_LENGTH = length;
3222     }
3223
3224     return 0;
3225 }
3226
3227 int ATAReq::wait(MPI_Status *sts){
3228         int i;
3229         for(i=0;i<count;i++){
3230                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3231                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
3232                         CkAbort("AMPI> Error in alltoall request wait");
3233 #if CMK_BLUEGENE_CHARM
3234                 _TRACE_BG_TLINE_END(&myreqs[i].event);
3235 #endif
3236         }
3237 #if CMK_BLUEGENE_CHARM
3238         //TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", NULL, 0);
3239         TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0, 1);
3240         for (i=0; i<count; i++)
3241           _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
3242         _TRACE_BG_TLINE_END(&event);
3243 #endif
3244         return 0;
3245 }
3246
3247 int SReq::wait(MPI_Status *sts){
3248         ampi *ptr = getAmpiInstance(comm);
3249         while (statusIreq == false) {
3250           ptr->resumeOnRecv = true;
3251           ptr->block();
3252           ptr = getAmpiInstance(comm);
3253           ptr->resumeOnRecv = false;
3254         }
3255         return 0;
3256 }
3257
3258 CDECL
3259 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
3260 {
3261
3262
3263   AMPIAPI("AMPI_Wait");
3264   if(*request == MPI_REQUEST_NULL){
3265     stsempty(*sts);
3266     return 0;
3267   }
3268   checkRequest(*request);
3269   AmpiRequestList* reqs = getReqs();
3270
3271 #ifdef AMPIMSGLOG
3272   ampiParent* pptr = getAmpiParent();
3273   if(msgLogRead){
3274     (*(pptr->fromPUPer))|(pptr->pupBytes);
3275     PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3276     PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
3277     return 0;
3278   }
3279 #endif
3280
3281   AMPI_DEBUG("AMPI_Wait request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3282   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3283   //(*reqs)[*request]->wait(sts);
3284   int waitResult = -1;
3285   do{
3286     AmpiRequest *waitReq = (*reqs)[*request];
3287     waitResult = waitReq->wait(sts);
3288     if(_BgInOutOfCoreMode){
3289         reqs = getReqs();
3290     }
3291   }while(waitResult==-1);
3292
3293
3294   AMPI_DEBUG("AMPI_Wait after calling wait, request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3295
3296
3297 #ifdef AMPIMSGLOG
3298   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3299     (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
3300     (*(pptr->toPUPer))|(pptr->pupBytes);
3301     PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3302     PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
3303   }
3304 #endif
3305   
3306   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3307     reqs->free(*request);
3308     *request = MPI_REQUEST_NULL;
3309   }
3310
3311     AMPI_DEBUG("End of AMPI_Wait\n");
3312
3313   return 0;
3314 }
3315
3316 CDECL
3317 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
3318 {
3319   AMPIAPI("AMPI_Waitall");
3320   if(count==0) return MPI_SUCCESS;
3321   checkRequests(count,request);
3322   int i,j,oldPe;
3323   AmpiRequestList* reqs = getReqs();
3324   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3325
3326 #ifdef AMPIMSGLOG
3327   ampiParent* pptr = getAmpiParent();
3328   if(msgLogRead){
3329     for(i=0;i<reqvec->size();i++){
3330       for(j=0;j<((*reqvec)[i]).size();j++){
3331         if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3332           stsempty(sts[((*reqvec)[i])[j]]);
3333           continue;
3334         }
3335         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3336         (*(pptr->fromPUPer))|(pptr->pupBytes);
3337         PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3338         PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3339       }
3340     }
3341     return 0;
3342   }
3343 #endif
3344
3345 #if CMK_BLUEGENE_CHARM
3346   void *curLog;         // store current log in timeline
3347   _TRACE_BG_TLINE_END(&curLog);
3348 #if 0
3349   TRACE_BG_AMPI_SUSPEND();
3350 #endif
3351 #endif
3352   for(i=0;i<reqvec->size();i++){
3353     for(j=0;j<((*reqvec)[i]).size();j++){
3354       //CkPrintf("[%d] in loop [%d, %d]\n", pptr->thisIndex,i, j);
3355       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3356         stsempty(sts[((*reqvec)[i])[j]]);
3357         continue;
3358       }
3359       oldPe = CkMyPe();
3360
3361       int waitResult = -1;
3362       do{       
3363         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3364         waitResult = waitReq->wait(&sts[((*reqvec)[i])[j]]);
3365         if(_BgInOutOfCoreMode){
3366             reqs = getReqs();
3367             reqvec = vecIndex(count, request);
3368         }
3369       }while(waitResult==-1);
3370
3371 #ifdef AMPIMSGLOG
3372       if(msgLogWrite && pptr->thisIndex == msgLogRank){
3373         (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
3374         (*(pptr->toPUPer))|(pptr->pupBytes);
3375         PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3376         PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3377       }
3378 #endif
3379     
3380 #if 1
3381 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3382             //for fault evacuation
3383       if(oldPe != CkMyPe()){
3384 #endif
3385                         reqs = getReqs();
3386                         reqvec  = vecIndex(count,request);
3387 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3388             }
3389 #endif
3390 #endif
3391     }
3392   }
3393 #if CMK_BLUEGENE_CHARM
3394   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
3395 #endif
3396   // free memory of requests
3397   for(i=0;i<count;i++){ 
3398     if(request[i] == MPI_REQUEST_NULL)
3399       continue;
3400     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
3401       reqs->free(request[i]);
3402       request[i] = MPI_REQUEST_NULL;
3403     }
3404   }
3405   delete reqvec;
3406   return 0;
3407 }
3408
3409 CDECL
3410 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
3411 {
3412   AMPIAPI("AMPI_Waitany");
3413   
3414   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
3415   checkRequests(count,request);
3416   if(areInactiveReqs(count,request)){
3417     *idx=MPI_UNDEFINED;
3418     stsempty(*sts);
3419     return MPI_SUCCESS;
3420   }
3421   int flag=0;
3422   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3423   while(count>0){ /* keep looping until some request finishes: */
3424     for(int i=0;i<reqvec->size();i++){
3425       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
3426       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
3427         *idx = ((*reqvec)[i])[0];
3428         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
3429         return 0;
3430       }
3431     }
3432     /* no requests have finished yet-- schedule and try again */
3433     AMPI_Yield(MPI_COMM_WORLD);
3434   }
3435   *idx = MPI_UNDEFINED;
3436   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
3437         delete reqvec;
3438   return 0;
3439 }
3440
3441 CDECL
3442 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
3443                  int *array_of_indices, MPI_Status *array_of_statuses)
3444 {
3445   AMPIAPI("AMPI_Waitsome");
3446   checkRequests(incount,array_of_requests);
3447   if(areInactiveReqs(incount,array_of_requests)){
3448     *outcount=MPI_UNDEFINED;
3449     return MPI_SUCCESS;
3450   }
3451   MPI_Status sts;
3452   int i;
3453   int flag=0, realflag=0;
3454   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3455   *outcount = 0;
3456   while(1){
3457     for(i=0;i<reqvec->size();i++){
3458       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3459       if(flag == 1){ 
3460         array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3461         array_of_statuses[(*outcount)++]=sts;
3462         if(sts.MPI_COMM != 0)
3463           realflag=1; // there is real(non null) request
3464       }
3465     }
3466     if(realflag && outcount>0) break;
3467   }
3468         delete reqvec;
3469   return 0;
3470 }
3471
3472 CmiBool PersReq::test(MPI_Status *sts){
3473         if(sndrcv == 2)         // recv request
3474                 return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3475         else                    // send request
3476                 return 1;
3477
3478 }
3479 void PersReq::complete(MPI_Status *sts){
3480         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3481                 CkAbort("AMPI> Error in persistent request complete");
3482 }
3483
3484 CmiBool IReq::test(MPI_Status *sts){
3485         if (statusIreq == true) {           
3486           if(sts)
3487             sts->MPI_LENGTH = length;           
3488           return true;
3489         }
3490         else {
3491           getAmpiInstance(comm)->yield();
3492           return false;
3493         }
3494 /*
3495         return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3496 */
3497 }
3498
3499 CmiBool SReq::test(MPI_Status *sts){
3500         if (statusIreq == true) {
3501           return true;
3502         }
3503         else {
3504           getAmpiInstance(comm)->yield();
3505           return false;
3506         }
3507 }
3508
3509 void IReq::complete(MPI_Status *sts){
3510         wait(sts);
3511 /*
3512         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3513                 CkAbort("AMPI> Error in non-blocking request complete");
3514 */
3515 }
3516
3517 void SReq::complete(MPI_Status *sts){
3518         wait(sts);
3519 }
3520
3521 void IReq::receive(ampi *ptr, AmpiMsg *msg)
3522 {
3523     int sts = ptr->processMessage(msg, tag, src, buf, count, type);
3524     statusIreq = (sts == 0);
3525     length = msg->length;
3526     this->tag = msg->tag; // Although not required, we also extract tag from msg
3527     src = msg->srcRank; // Although not required, we also extract src from msg
3528     comm = msg->comm;
3529     AMPI_DEBUG("Setting this->tag to %d in IReq::receive this=%p\n", (int)this->tag, this);
3530 #if CMK_BLUEGENE_CHARM
3531     event = msg->event; 
3532 #endif
3533     delete msg;
3534     
3535     //BIGSIM_OOC DEBUGGING
3536     //CmiPrintf("In IReq::receive, this=%p ", this);
3537     //print();
3538 }
3539
3540 CmiBool ATAReq::test(MPI_Status *sts){
3541         int i, flag=1;
3542         for(i=0;i<count;i++){
3543                 flag *= getAmpiInstance(myreqs[i].comm)->iprobe(myreqs[i].tag, myreqs[i].src,
3544                                         myreqs[i].comm, (int*) sts);
3545         }
3546         return flag;
3547 }
3548 void ATAReq::complete(MPI_Status *sts){
3549         int i;
3550         for(i=0;i<count;i++){
3551                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3552                                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int*)sts))
3553                         CkAbort("AMPI> Error in alltoall request complete");
3554         }
3555 }
3556
3557 CDECL
3558 int AMPI_Test(MPI_Request *request, int *flag, MPI_Status *sts)
3559 {
3560   AMPIAPI("AMPI_Test");
3561   checkRequest(*request);
3562   if(*request==MPI_REQUEST_NULL) {
3563     *flag = 1;
3564     stsempty(*sts);
3565     return 0;
3566   }
3567   AmpiRequestList* reqs = getReqs();
3568   if(1 == (*flag = (*reqs)[*request]->test(sts))){
3569     if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3570       (*reqs)[*request]->complete(sts);
3571       reqs->free(*request);
3572       *request = MPI_REQUEST_NULL;
3573     }
3574   }
3575   return 0;
3576 }
3577
3578 CDECL
3579 int AMPI_Testany(int count, MPI_Request *request, int *index, int *flag, MPI_Status *sts){
3580   AMPIAPI("AMPI_Testany");
3581   checkRequests(count,request);
3582   if(areInactiveReqs(count,request)){
3583     *flag=1;
3584     *index=MPI_UNDEFINED;
3585     stsempty(*sts);
3586     return MPI_SUCCESS;
3587   }
3588   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3589   *flag=0;
3590   for(int i=0;i<reqvec->size();i++){
3591     AMPI_Test(&request[((*reqvec)[i])[0]], flag, sts);
3592     if(*flag==1 && sts->MPI_COMM!=0){ // skip MPI_REQUEST_NULL
3593       *index = ((*reqvec)[i])[0];
3594       return 0;
3595     }
3596   }
3597   *index = MPI_UNDEFINED;
3598         delete reqvec;
3599   return 0;
3600 }
3601
3602 CDECL
3603 int AMPI_Testall(int count, MPI_Request *request, int *flag, MPI_Status *sts)
3604 {
3605   AMPIAPI("AMPI_Testall");
3606   if(count==0) return MPI_SUCCESS;
3607   checkRequests(count,request);
3608   int tmpflag;
3609   int i,j;
3610   AmpiRequestList* reqs = getReqs();
3611   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3612   *flag = 1;  
3613   for(i=0;i<reqvec->size();i++){
3614     for(j=0;j<((*reqvec)[i]).size();j++){
3615       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL)
3616         continue;
3617       tmpflag = (*reqs)[request[((*reqvec)[i])[j]]]->test(&sts[((*reqvec)[i])[j]]);
3618       *flag *= tmpflag;
3619     }
3620   }
3621   if(flag) 
3622     MPI_Waitall(count,request,sts);
3623         delete reqvec;  
3624   return 0;
3625 }
3626
3627 CDECL
3628 int AMPI_Testsome(int incount, MPI_Request *array_of_requests, int *outcount,
3629                  int *array_of_indices, MPI_Status *array_of_statuses)
3630 {
3631   AMPIAPI("AMPI_Testsome");
3632   checkRequests(incount,array_of_requests);
3633   if(areInactiveReqs(incount,array_of_requests)){
3634     *outcount=MPI_UNDEFINED;
3635     return MPI_SUCCESS;
3636   }
3637   MPI_Status sts;
3638   int flag;
3639   int i;
3640   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3641   *outcount = 0;
3642   for(i=0;i<reqvec->size();i++){
3643     AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3644     if(flag == 1){
3645       array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3646       array_of_statuses[(*outcount)++]=sts;
3647     }
3648   }
3649         delete reqvec;
3650   return 0;
3651 }
3652
3653 CDECL
3654 int AMPI_Request_free(MPI_Request *request){
3655   AMPIAPI("AMPI_Request_free");
3656   checkRequest(*request);
3657   if(*request==MPI_REQUEST_NULL) return 0;
3658   AmpiRequestList* reqs = getReqs();
3659   reqs->free(*request);
3660   return 0;
3661 }
3662
3663 CDECL
3664 int AMPI_Cancel(MPI_Request *request){
3665   AMPIAPI("AMPI_Cancel");
3666   return AMPI_Request_free(request);
3667 }
3668
3669 CDECL
3670 int AMPI_Test_cancelled(MPI_Status* status, int* flag) {
3671     /* FIXME: always returns success */
3672     *flag = 1;
3673     return 0;
3674 }
3675
3676 CDECL
3677 int AMPI_Recv_init(void *buf, int count, int type, int src, int tag,
3678                    MPI_Comm comm, MPI_Request *req)
3679 {