AMPI: Formatting fix, to keep output variables together
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #define exit exit /*Supress definition of exit in ampi.h*/
9 #include "ampiimpl.h"
10 #include "tcharm.h"
11 #include "ampiEvents.h" /*** for trace generation for projector *****/
12 #include "ampiProjections.h"
13
14 #define CART_TOPOL 1
15 #define AMPI_PRINT_IDLE 0
16
17 /* change this define to "x" to trace all send/recv's */
18 #define MSG_ORDER_DEBUG(x)  //x /* empty */
19 /* change this define to "x" to trace user calls */
20 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
21 #define STARTUP_DEBUG(x)  //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
22 #define FUNCCALL_DEBUG(x) //x /* empty */
23
24 static CkDDT *getDDT(void) {
25   return getAmpiParent()->myDDT;
26 }
27
28 //------------- startup -------------
29 static mpi_comm_worlds mpi_worlds;
30
31 int mpi_nworlds; /*Accessed by ampif*/
32 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
33
34 /* ampiReducer: AMPI's generic reducer type 
35    MPI_Op is function pointer to MPI_User_function
36    so that it can be packed into AmpiOpHeader, shipped 
37    with the reduction message, and then plugged into 
38    the ampiReducer. 
39    One little trick is the ampi::recv which receives
40    the final reduction message will see additional
41    sizeof(AmpiOpHeader) bytes in the buffer before
42    any user data.                             */
43 class AmpiComplex { 
44 public: 
45         double re, im; 
46         void operator+=(const AmpiComplex &a) {
47                 re+=a.re;
48                 im+=a.im;
49         }
50         void operator*=(const AmpiComplex &a) {
51                 double nu_re=re*a.re-im*a.im;
52                 im=re*a.im+im*a.re;
53                 re=nu_re;
54         }
55         int operator>(const AmpiComplex &a) {
56                 CkAbort("Cannot compare complex numbers with MPI_MAX");
57                 return 0;
58         }
59         int operator<(const AmpiComplex &a) {
60                 CkAbort("Cannot compare complex numbers with MPI_MIN");
61                 return 0;
62         }
63 };
64 typedef struct { float val; int idx; } FloatInt;
65 typedef struct { double val; int idx; } DoubleInt;
66 typedef struct { long val; int idx; } LongInt;
67 typedef struct { int val; int idx; } IntInt;
68 typedef struct { short val; int idx; } ShortInt;
69 typedef struct { long double val; int idx; } LongdoubleInt;
70 typedef struct { float val; float idx; } FloatFloat;
71 typedef struct { double val; double idx; } DoubleDouble;
72
73
74 #define MPI_OP_SWITCH(OPNAME) \
75   int i; \
76   switch (*datatype) { \
77   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
78   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
79   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
80   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
81   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
82   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
83   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
84   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
85   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
86   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
87   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
88   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
89   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
90   default: \
91     ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
92     CmiAbort("Unsupported MPI datatype for MPI Op"); \
93   };\
94
95 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
96 #define MPI_OP_IMPL(type) \
97         if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
98   MPI_OP_SWITCH(MPI_MAX)
99 #undef MPI_OP_IMPL
100 }
101
102 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
103 #define MPI_OP_IMPL(type) \
104         if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
105   MPI_OP_SWITCH(MPI_MIN)
106 #undef MPI_OP_IMPL
107 }
108
109 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
110 #define MPI_OP_IMPL(type) \
111         ((type *)inoutvec)[i] += ((type *)invec)[i];
112   MPI_OP_SWITCH(MPI_SUM)
113 #undef MPI_OP_IMPL
114 }
115
116 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
117 #define MPI_OP_IMPL(type) \
118         ((type *)inoutvec)[i] *= ((type *)invec)[i];
119   MPI_OP_SWITCH(MPI_PROD)
120 #undef MPI_OP_IMPL
121 }
122
123 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
124   int i;  
125   switch (*datatype) {
126   case MPI_INT:
127   case MPI_LOGICAL:
128     for(i=0;i<(*len);i++)
129       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
130     break;
131   default:
132     ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
133     CmiAbort("exiting");
134   }
135 }
136 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
137   int i; 
138   switch (*datatype) {
139   case MPI_INT:
140     for(i=0;i<(*len);i++)
141       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
142     break;
143   case MPI_BYTE:
144     for(i=0;i<(*len);i++)
145       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
146     break;
147   default:
148     ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
149     CmiAbort("exiting");
150   }
151 }
152 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
153   int i;  
154   switch (*datatype) {
155   case MPI_INT:
156   case MPI_LOGICAL:
157     for(i=0;i<(*len);i++)
158       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
159     break;
160   default:
161     ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
162     CmiAbort("exiting");
163   }
164 }
165 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
166   int i;  
167   switch (*datatype) {
168   case MPI_INT:
169     for(i=0;i<(*len);i++)
170       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
171     break;
172   case MPI_BYTE:
173     for(i=0;i<(*len);i++)
174       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
175     break;
176   default:
177     ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
178     CmiAbort("exiting");
179   }
180 }
181 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
182   int i;  
183   switch (*datatype) {
184   case MPI_INT:
185   case MPI_LOGICAL:
186     for(i=0;i<(*len);i++)
187       ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
188     break;
189   default:
190     ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
191     CmiAbort("exiting");
192   }
193 }
194 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
195   int i;  
196   switch (*datatype) {
197   case MPI_INT:
198     for(i=0;i<(*len);i++)
199       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
200     break;
201   case MPI_BYTE:
202     for(i=0;i<(*len);i++)
203       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
204     break;
205         case MPI_UNSIGNED:
206     for(i=0;i<(*len);i++)
207       ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
208     break;
209   default:
210     ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
211     CmiAbort("exiting");
212   }
213 }
214
215 #ifndef MIN
216 #define MIN(a,b) (a < b ? a : b)
217 #endif
218
219 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
220   int i;  
221
222   switch (*datatype) {
223   case MPI_FLOAT_INT:
224     for(i=0;i<(*len);i++)
225       if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
226         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
227       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
228         ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
229     break;
230   case MPI_DOUBLE_INT:
231     for(i=0;i<(*len);i++)
232       if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
233         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
234       else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
235         ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
236
237     break;
238   case MPI_LONG_INT:
239     for(i=0;i<(*len);i++)
240       if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
241         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
242       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
243         ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
244     break;
245   case MPI_2INT:
246     for(i=0;i<(*len);i++)
247       if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
248         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
249       else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
250         ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
251     break;
252   case MPI_SHORT_INT:
253     for(i=0;i<(*len);i++)
254       if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
255         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
256       else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
257         ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
258     break;
259   case MPI_LONG_DOUBLE_INT:
260     for(i=0;i<(*len);i++)
261       if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
262         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
263       else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
264         ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
265     break;
266   case MPI_2FLOAT:
267     for(i=0;i<(*len);i++)
268       if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
269         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
270       else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
271         ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
272     break;
273   case MPI_2DOUBLE:
274     for(i=0;i<(*len);i++)
275       if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
276         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
277       else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
278         ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
279     break;
280   default:
281     ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
282     CmiAbort("exiting");
283   }
284 }
285 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
286   int i;  
287   switch (*datatype) {
288   case MPI_FLOAT_INT:
289     for(i=0;i<(*len);i++)
290       if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
291         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
292       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
293         ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
294     break;
295   case MPI_DOUBLE_INT:
296     for(i=0;i<(*len);i++)
297       if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
298         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
299       else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
300         ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
301     break;
302   case MPI_LONG_INT:
303     for(i=0;i<(*len);i++)
304       if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
305         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
306       else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
307         ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
308     break;
309   case MPI_2INT:
310     for(i=0;i<(*len);i++)
311       if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
312         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
313       else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
314         ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
315     break;
316   case MPI_SHORT_INT:
317     for(i=0;i<(*len);i++)
318       if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
319         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
320       else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
321         ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
322     break;
323   case MPI_LONG_DOUBLE_INT:
324     for(i=0;i<(*len);i++)
325       if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
326         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
327       else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
328         ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
329     break;
330   case MPI_2FLOAT:
331     for(i=0;i<(*len);i++)
332       if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
333         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
334       else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
335         ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
336     break;
337   case MPI_2DOUBLE:
338     for(i=0;i<(*len);i++)
339       if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
340         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
341       else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
342         ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
343     break;
344   default:
345     ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
346     CmiAbort("exiting");
347   }
348 }
349
350 // every msg contains a AmpiOpHeader structure before user data
351 // FIXME: non-commutative operations require messages be ordered by rank
352 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
353   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
354   MPI_Datatype dtype;
355   int szhdr, szdata, len;
356   MPI_User_function* func;
357   func = hdr->func;
358   dtype = hdr->dtype;  
359   szdata = hdr->szdata;
360   len = hdr->len;  
361   szhdr = sizeof(AmpiOpHeader);
362
363   //Assuming extent == size
364   void *ret = malloc(szhdr+szdata);
365   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
366   for(int i=1;i<nMsg;i++){
367     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
368   }
369   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
370   free(ret);
371   return retmsg;
372 }
373
374 CkReduction::reducerType AmpiReducer;
375
376 class Builtin_kvs{
377  public:
378   int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
379   Builtin_kvs(){
380     tag_ub = MPI_TAG_UB_VALUE; 
381     host = MPI_PROC_NULL;
382     io = 0;
383     wtime_is_global = 0;
384     keyval_mype = CkMyPe();
385     keyval_numpes = CkNumPes();
386     keyval_mynode = CkMyNode();
387     keyval_numnodes = CkNumNodes();
388   }
389 };
390
391 // ------------ startup support -----------
392 int _ampi_fallback_setup_count;
393 CDECL void AMPI_Setup(void);
394 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
395
396 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
397
398 /*Main routine used when missing MPI_Setup routine*/
399 CDECL void AMPI_Fallback_Main(int argc,char **argv)
400 {
401   AMPI_Main_cpp(argc,argv);
402   AMPI_Main(argc,argv);
403   FTN_NAME(MPI_MAIN,mpi_main)();
404 }
405
406 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
407 /*Startup routine used if user *doesn't* write
408   a TCHARM_User_setup routine.
409  */
410 CDECL void AMPI_Setup_Switch(void) {
411   _ampi_fallback_setup_count=0;
412   FTN_NAME(AMPI_SETUP,ampi_setup)();
413   AMPI_Setup();
414   if (_ampi_fallback_setup_count==2)
415   { //Missing AMPI_Setup in both C and Fortran:
416     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
417   }
418 }
419
420 static int nodeinit_has_been_called=0;
421 CtvDeclare(ampiParent*, ampiPtr);
422 CtvDeclare(int, ampiInitDone);
423 CtvDeclare(void*,stackBottom);
424 CtvDeclare(int, ampiFinalized);
425 CkpvDeclare(Builtin_kvs, bikvs);
426 CkpvDeclare(int,argvExtracted);
427 static int enableStreaming = 0;
428
429 CDECL long ampiCurrentStackUsage(){
430   int localVariable;
431   
432   unsigned long p1 =  (unsigned long)((void*)&localVariable);
433   unsigned long p2 =  (unsigned long)(CtvAccess(stackBottom));
434
435
436   if(p1 > p2)
437     return p1 - p2;
438   else
439     return  p2 - p1;
440  
441 }
442
443 FDECL void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
444   long usage = ampiCurrentStackUsage();
445   CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
446 }
447
448
449 CDECL void AMPI_threadstart(void *data);
450 static int AMPI_threadstart_idx = -1;
451
452 static void ampiNodeInit(void)
453 {
454   mpi_nworlds=0;
455   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
456   {
457     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
458   }
459   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
460
461   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
462   
463   CmiAssert(AMPI_threadstart_idx == -1);    // only initialize once
464   AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
465
466   nodeinit_has_been_called=1;
467 }
468
469 #if PRINT_IDLE
470 static double totalidle=0.0, startT=0.0;
471 static int beginHandle, endHandle;
472 static void BeginIdle(void *dummy,double curWallTime)
473 {
474   startT = curWallTime;
475 }
476 static void EndIdle(void *dummy,double curWallTime)
477 {
478   totalidle += curWallTime - startT;
479 }
480 #endif
481
482 /* for fortran reduction operation table to handle mapping */
483 typedef MPI_Op  MPI_Op_Array[128];
484 CtvDeclare(int, mpi_opc);
485 CtvDeclare(MPI_Op_Array, mpi_ops);
486
487 static void ampiProcInit(void){
488   CtvInitialize(ampiParent*, ampiPtr);
489   CtvInitialize(int,ampiInitDone);
490   CtvInitialize(int,ampiFinalized);
491   CtvInitialize(void*,stackBottom);
492
493
494   CtvInitialize(MPI_Op_Array, mpi_ops);
495   CtvInitialize(int, mpi_opc);
496
497   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
498   CkpvAccess(bikvs) = Builtin_kvs();
499
500   CkpvInitialize(int, argvExtracted);
501   CkpvAccess(argvExtracted) = 0;
502   REGISTER_AMPI
503   initAmpiProjections();
504   char **argv=CkGetArgv();
505 #if AMPI_COMLIB  
506   if(CkpvAccess(argvExtracted)==0){
507     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
508   }
509 #endif
510
511 #ifdef AMPIMSGLOG
512   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
513   msgLogRead = CmiGetArgFlag(argv, "+msgLogRead");
514   CmiGetArgInt(argv, "+msgLogRank", &msgLogRank);
515   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
516 #endif
517 }
518
519 void AMPI_Install_Idle_Timer(){
520 #if AMPI_PRINT_IDLE
521   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
522   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
523 #endif
524 }
525
526 void AMPI_Uninstall_Idle_Timer(){
527 #if AMPI_PRINT_IDLE
528   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
529   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
530 #endif
531 }
532
533 PUPfunctionpointer(MPI_MainFn)
534
535 class MPI_threadstart_t {
536 public:
537         MPI_MainFn fn;
538         MPI_threadstart_t() {}
539         MPI_threadstart_t(MPI_MainFn fn_)
540                 :fn(fn_) {}
541         void start(void) {
542                 char **argv=CmiCopyArgs(CkGetArgv());
543                 int argc=CkGetArgc();
544
545                 // Set a pointer to somewhere close to the bottom of the stack.
546                 // This is used for roughly estimating the stack usage later.
547                 CtvAccess(stackBottom) = &argv;
548
549 #if CMK_AMPI_FNPTR_HACK
550                 AMPI_Fallback_Main(argc,argv);
551 #else
552                 (fn)(argc,argv);
553 #endif
554         }
555         void pup(PUP::er &p) {
556                 p|fn;
557         }
558 };
559 PUPmarshall(MPI_threadstart_t)
560
561 CDECL void AMPI_threadstart(void *data)
562 {
563         STARTUP_DEBUG("MPI_threadstart")
564         MPI_threadstart_t t;
565         pupFromBuf(data,t);
566 #if CMK_TRACE_IN_CHARM
567         if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
568 #endif
569         t.start();
570 }
571
572 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
573 {
574         STARTUP_DEBUG("ampiCreateMain")
575         int _nchunks=TCHARM_Get_num_chunks();
576         //Make a new threads array:
577         MPI_threadstart_t s(mainFn);
578         memBuf b; pupIntoBuf(b,s);
579         TCHARM_Create_data( _nchunks,AMPI_threadstart_idx,
580                           b.getData(), b.getSize());
581 }
582
583 /* TCharm Semaphore ID's for AMPI startup */
584 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
585 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
586
587 static CProxy_ampiWorlds ampiWorldsGroup;
588
589 static void init_operations()
590 {
591   CtvInitialize(MPI_Op_Array, mpi_ops);
592   int i = 0;
593   MPI_Op *tab = CtvAccess(mpi_ops);
594   tab[i++] = MPI_MAX;
595   tab[i++] = MPI_MIN;
596   tab[i++] = MPI_SUM;
597   tab[i++] = MPI_PROD;
598   tab[i++] = MPI_LAND;
599   tab[i++] = MPI_BAND;
600   tab[i++] = MPI_LOR;
601   tab[i++] = MPI_BOR;
602   tab[i++] = MPI_LXOR;
603   tab[i++] = MPI_BXOR;
604   tab[i++] = MPI_MAXLOC;
605   tab[i++] = MPI_MINLOC;
606
607   CtvInitialize(int, mpi_opc);
608   CtvAccess(mpi_opc) = i;
609 }
610
611 /*
612 Called from MPI_Init, a collective initialization call:
613  creates a new AMPI array and attaches it to the current
614  set of TCHARM threads.
615 */
616 static ampi *ampiInit(char **argv)
617 {
618   FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CmiMyPe(), TCHARM_Element());)
619   if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
620   STARTUP_DEBUG("ampiInit> begin")
621   
622   MPI_Comm new_world;
623   int _nchunks;
624   CkArrayOptions opts;
625   CProxy_ampiParent parent;
626   if (TCHARM_Element()==0) //the rank of a tcharm object
627   { /* I'm responsible for building the arrays: */
628         STARTUP_DEBUG("ampiInit> creating arrays")
629
630 // FIXME: Need to serialize global communicator allocation in one place.
631         //Allocate the next communicator
632         if(mpi_nworlds == MPI_MAX_COMM_WORLDS)
633         {
634                 CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
635         }
636         int new_idx=mpi_nworlds;
637         new_world=MPI_COMM_WORLD+new_idx; // Isaac guessed there shouldn't be a +1 here
638
639         //Create and attach the ampiParent array
640         CkArrayID threads;
641         opts=TCHARM_Attach_start(&threads,&_nchunks);
642         parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
643         STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
644   }
645   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
646
647   FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
648
649   if (TCHARM_Element()==0)
650   {
651           //Make a new ampi array
652           CkArrayID empty;
653
654         ampiCommStruct worldComm(new_world,empty,_nchunks);
655         CProxy_ampi arr;
656
657
658 #if AMPI_COMLIB
659
660         ComlibInstanceHandle ciStreaming = 1;
661         ComlibInstanceHandle ciBcast = 2;
662         ComlibInstanceHandle ciAllgather = 3;
663         ComlibInstanceHandle ciAlltoall = 4;
664
665         arr=CProxy_ampi::ckNew(parent, worldComm, ciStreaming, ciBcast, ciAllgather, ciAlltoall, opts);
666         
667
668         CkPrintf("Using untested comlib code in ampi.C\n");
669
670         Strategy *sStreaming = new StreamingStrategy(1,10);
671         CkAssert(ciStreaming == ComlibRegister(sStreaming));
672         
673         Strategy *sBcast = new BroadcastStrategy(USE_HYPERCUBE);
674         CkAssert(ciBcast = ComlibRegister(sBcast));
675         
676         Strategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
677         CkAssert(ciAllgather = ComlibRegister(sAllgather));
678
679         Strategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
680         CkAssert(ciAlltoall = ComlibRegister(sAlltoall));
681         
682         CmiPrintf("Created AMPI comlib strategies in new manner\n");
683
684         // FIXME: Propogate the comlib table here
685         CkpvAccess(conv_com_object).doneCreating();
686 #else
687         arr=CProxy_ampi::ckNew(parent,worldComm,opts);
688
689 #endif
690
691         //Broadcast info. to the mpi_worlds array
692         // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
693         ampiCommStruct newComm(new_world,arr,_nchunks);
694         //CkPrintf("In ampiInit: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
695         if (ampiWorldsGroup.ckGetGroupID().isZero())
696                 ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
697         else
698                 ampiWorldsGroup.add(newComm);
699         STARTUP_DEBUG("ampiInit> arrays created")
700
701   }
702
703   // Find our ampi object:
704   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
705   CtvAccess(ampiInitDone)=1;
706   CtvAccess(ampiFinalized)=0;
707   STARTUP_DEBUG("ampiInit> complete")
708 #if CMK_BLUEGENE_CHARM
709 //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
710   TRACE_BG_ADD_TAG("AMPI_START");
711 #endif
712
713   init_operations();     // initialize fortran reduction operation table
714
715   getAmpiParent()->ampiInitCallDone = 0;
716
717   CProxy_ampi cbproxy = ptr->getProxy();
718   CkCallback cb(CkIndex_ampi::allInitDone(NULL), cbproxy[0]);
719   ptr->contribute(0, NULL, CkReduction::sum_int, cb);
720
721   ampiParent *thisParent = getAmpiParent(); 
722   while(thisParent->ampiInitCallDone!=1){
723     //CkPrintf("In checking ampiInitCallDone(%d) loop at parent %d!\n", thisParent->ampiInitCallDone, thisParent->thisIndex);
724     thisParent->getTCharmThread()->stop();
725     /* 
726      * thisParent needs to be updated in case of the parent is being pupped.
727      * In such case, thisParent got changed
728      */
729     thisParent = getAmpiParent();
730   }
731
732 #ifdef CMK_BLUEGENE_CHARM
733     BgSetStartOutOfCore();
734 #endif
735
736   return ptr;
737 }
738
739 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
740 class ampiWorlds : public CBase_ampiWorlds {
741 public:
742     ampiWorlds(const ampiCommStruct &nextWorld) {
743         ampiWorldsGroup=thisgroup;
744         //CkPrintf("In constructor: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
745         add(nextWorld);
746     }
747     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
748     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
749     void add(const ampiCommStruct &nextWorld) {
750       int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD); // Isaac guessed there shouldn't be a +1 after the MPI_COMM_WORLD
751         mpi_worlds[new_idx].comm=nextWorld;
752         if (mpi_nworlds<=new_idx) mpi_nworlds=new_idx+1;
753         STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
754     }
755 };
756
757 //-------------------- ampiParent -------------------------
758 ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
759                 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
760 {
761   int barrier = 0x1234;
762   STARTUP_DEBUG("ampiParent> starting up")
763   thread=NULL;
764   worldPtr=NULL;
765   myDDT=&myDDTsto;
766   prepareCtv();
767
768   init();
769
770   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
771         AsyncEvacuate(CmiFalse);
772 }
773
774 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
775   thread=NULL;
776   worldPtr=NULL;
777   myDDT=&myDDTsto;
778
779   init();
780
781   AsyncEvacuate(CmiFalse);
782 }
783
784 void ampiParent::pup(PUP::er &p) {
785   ArrayElement1D::pup(p);
786   p|threads;
787   p|worldNo;           // why it was missing from here before??
788   p|worldStruct;
789   myDDT->pup(p);
790   p|splitComm;
791   p|groupComm;
792   p|groups;
793
794 //BIGSIM_OOC DEBUGGING
795 //if(!p.isUnpacking()){
796 //    CmiPrintf("ampiParent[%d] packing ampiRequestList: \n", thisIndex);
797 //    ampiReqs.print();
798 //}
799
800   p|ampiReqs;
801
802 //BIGSIM_OOC DEBUGGING
803 //if(p.isUnpacking()){
804 //    CmiPrintf("ampiParent[%d] unpacking ampiRequestList: \n", thisIndex);
805 //    ampiReqs.print();
806 //}
807
808   p|RProxyCnt;
809   p|tmpRProxy;
810   p|winStructList;
811   p|infos;
812
813   p|ampiInitCallDone;
814 }
815 void ampiParent::prepareCtv(void) {
816   thread=threads[thisIndex].ckLocal();
817   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
818   CtvAccessOther(thread->getThread(),ampiPtr) = this;
819   STARTUP_DEBUG("ampiParent> found TCharm")
820 }
821
822 void ampiParent::init(){
823 #ifdef AMPIMSGLOG
824   if(msgLogWrite && msgLogRank == thisIndex){
825 #if CMK_PROJECTIONS_USE_ZLIB
826     fMsgLog = gzopen(msgLogFilename,"wb");
827     toPUPer = new PUP::tozDisk(fMsgLog);
828 #else
829     fMsgLog = fopen(msgLogFilename,"wb");
830     toPUPer = new PUP::toDisk(fMsgLog);
831 #endif
832   }else if(msgLogRead){
833 #if CMK_PROJECTIONS_USE_ZLIB
834     fMsgLog = gzopen(msgLogFilename,"rb");
835     fromPUPer = new PUP::fromzDisk(fMsgLog);
836 #else
837     fMsgLog = fopen(msgLogFilename,"rb");
838     fromPUPer = new PUP::fromDisk(fMsgLog);
839 #endif
840   }
841 #endif
842 }
843
844 void ampiParent::finalize(){
845 #ifdef AMPIMSGLOG
846   if(msgLogWrite && msgLogRank == thisIndex){
847     delete toPUPer;
848 #if CMK_PROJECTIONS_USE_ZLIB
849     gzclose(fMsgLog);
850 #else
851     fclose(fMsgLog);
852 #endif
853   }else if(msgLogRead){
854     delete fromPUPer;
855 #if CMK_PROJECTIONS_USE_ZLIB
856     gzclose(fMsgLog);
857 #else
858     fclose(fMsgLog);
859 #endif
860   }
861 #endif
862 }
863
864 void ampiParent::ckJustMigrated(void) {
865   ArrayElement1D::ckJustMigrated();
866   prepareCtv();
867 }
868
869 void ampiParent::ckJustRestored(void) {
870   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
871   ArrayElement1D::ckJustRestored();
872   prepareCtv();
873   
874   //BIGSIM_OOC DEBUGGING
875   //CkPrintf("In ampiParent[%d] with TCharm thread=%p:   ",thisIndex, thread);
876   //CthPrintThdMagic(thread->getTid()); 
877 }
878
879 ampiParent::~ampiParent() {
880   STARTUP_DEBUG("ampiParent> destructor called");
881   finalize();
882 }
883
884 //Children call this when they are first created or just migrated
885 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
886 {
887   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
888
889   if (s.getComm()>=MPI_COMM_WORLD)
890   { //We now have our COMM_WORLD-- register it
891     //Note that split communicators don't keep a raw pointer, so
892     //they don't need to re-register on migration.
893      if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
894      worldPtr=ptr;
895      worldStruct=s;
896
897     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
898      CkVec<int> _indices;
899      _indices.push_back(thisIndex);
900      selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
901   }
902   
903   if (!forMigration)
904   { //Register the new communicator:
905      MPI_Comm comm = s.getComm();
906      STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
907      if (comm>=MPI_COMM_WORLD) { 
908        // Pass the new ampi to the waiting ampiInit
909        thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
910      } else if (isSplit(comm)) {
911        splitChildRegister(s);
912      } else if (isGroup(comm)) {
913        groupChildRegister(s);
914      } else if (isCart(comm)) {
915        cartChildRegister(s);
916      } else if (isGraph(comm)) {
917        graphChildRegister(s);
918      } else if (isInter(comm)) {
919        interChildRegister(s);
920      } else if (isIntra(comm)) {
921        intraChildRegister(s);
922      }else
923        CkAbort("ampiParent recieved child with bad communicator");
924   }
925
926   return thread;
927 }
928
929 //BIGSIM_OOC DEBUGGING
930 //Move the comm2ampi from inline to normal function for the sake of debugging
931 /*ampi *ampiParent::comm2ampi(MPI_Comm comm){
932       //BIGSIM_OOC DEBUGGING
933       //CmiPrintf("%d, in ampiParent::comm2ampi, comm=%d\n", thisIndex, comm);
934       if (comm==MPI_COMM_WORLD) return worldPtr;
935       if (comm==MPI_COMM_SELF) return worldPtr;
936       if (comm==worldNo) return worldPtr;
937       if (isSplit(comm)) {
938          const ampiCommStruct &st=getSplit(comm);
939          return st.getProxy()[thisIndex].ckLocal();
940       }
941       if (isGroup(comm)) {
942          const ampiCommStruct &st=getGroup(comm);
943          return st.getProxy()[thisIndex].ckLocal();
944       }
945       if (isCart(comm)) {
946         const ampiCommStruct &st = getCart(comm);
947         return st.getProxy()[thisIndex].ckLocal();
948       }
949       if (isGraph(comm)) {
950         const ampiCommStruct &st = getGraph(comm);
951         return st.getProxy()[thisIndex].ckLocal();
952       }
953       if (isInter(comm)) {
954          const ampiCommStruct &st=getInter(comm);
955          return st.getProxy()[thisIndex].ckLocal();
956       }
957       if (isIntra(comm)) {
958          const ampiCommStruct &st=getIntra(comm);
959          return st.getProxy()[thisIndex].ckLocal();
960       }
961       if (comm>MPI_COMM_WORLD) return worldPtr; //Use MPI_WORLD ampi for cross-world messages:
962       CkAbort("Invalid communicator used!");
963       return NULL;
964 }*/
965
966 // reduction client data - preparation for checkpointing
967 class ckptClientStruct {
968 public:
969   char *dname;
970   ampiParent *ampiPtr;
971   ckptClientStruct(char *s, ampiParent *a): dname(s), ampiPtr(a) {}
972 };
973
974 static void checkpointClient(void *param,void *msg)
975 {
976   ckptClientStruct *client = (ckptClientStruct*)param;
977   char *dname = client->dname;
978   ampiParent *ampiPtr = client->ampiPtr;
979   ampiPtr->Checkpoint(strlen(dname), dname);
980   delete client;
981 }
982
983 void ampiParent::startCheckpoint(char* dname){
984   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
985   if (thisIndex==0) {
986         ckptClientStruct *clientData = new ckptClientStruct(dname, this);
987         CkCallback cb(checkpointClient, clientData);
988         contribute(0, NULL, CkReduction::sum_int, cb);
989   }
990   else
991         contribute(0, NULL, CkReduction::sum_int);
992
993 #if 0
994 #if CMK_BLUEGENE_CHARM
995   void *curLog;         // store current log in timeline
996   _TRACE_BG_TLINE_END(&curLog);
997   TRACE_BG_AMPI_SUSPEND();
998 #endif
999 #endif
1000
1001   thread->stop();
1002
1003 #if CMK_BLUEGENE_CHARM
1004   // _TRACE_BG_BEGIN_EXECUTE_NOMSG("CHECKPOINT_RESUME", &curLog);
1005   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
1006 #endif
1007 }
1008 void ampiParent::Checkpoint(int len, char* dname){
1009   if (len == 0) {
1010     // memory checkpoint
1011     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1012     CkStartMemCheckpoint(cb);
1013   }
1014   else {
1015     char dirname[256];
1016     strncpy(dirname,dname,len);
1017     dirname[len]='\0';
1018     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1019     CkStartCheckpoint(dirname,cb);
1020   }
1021 }
1022 void ampiParent::ResumeThread(void){
1023   thread->resume();
1024 }
1025
1026 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
1027                              int *keyval, void* extra_state){
1028         KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
1029         int idx = kvlist.size();
1030         kvlist.resize(idx+1);
1031         kvlist[idx] = newnode;
1032         *keyval = idx;
1033         return 0;
1034 }
1035 int ampiParent::freeKeyval(int *keyval){
1036         if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
1037                 return -1;
1038         delete kvlist[*keyval];
1039         kvlist[*keyval] = NULL;
1040         *keyval = MPI_KEYVAL_INVALID;
1041         return 0;
1042 }
1043
1044 int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
1045         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1046                 return -1;
1047         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1048         // Enlarge the keyval list:
1049         while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
1050         cs.getKeyvals()[keyval]=attribute_val;
1051         return 0;
1052 }
1053
1054 int ampiParent::kv_is_builtin(int keyval) {
1055         switch(keyval) {
1056         case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
1057         case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
1058         case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
1059         case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
1060         case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
1061         case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
1062         case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
1063         case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
1064         default: return 0;
1065         };
1066 }
1067
1068 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
1069         *flag = false;
1070         if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
1071           *flag=true;
1072           *(int **)attribute_val = kv_builtin_storage;  // all default tags are ints
1073           return 0;
1074         }
1075         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1076                 return -1; /* invalid keyval */
1077         
1078         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1079         if (keyval>=cs.getKeyvals().size())  
1080                 return 0; /* we don't have a value yet */
1081         if (cs.getKeyvals()[keyval]==0)
1082                 return 0; /* we had a value, but now it's zero */
1083         /* Otherwise, we have a good value */
1084         *flag = true;
1085         *(void **)attribute_val = cs.getKeyvals()[keyval];
1086         return 0;
1087 }
1088 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
1089         /* no way to delete an attribute: just overwrite it with 0 */
1090         return putAttr(comm,keyval,0);
1091 }
1092
1093 //----------------------- ampi -------------------------
1094 void ampi::init(void) {
1095   parent=NULL;
1096   thread=NULL;
1097   msgs=NULL;
1098   posted_ireqs=NULL;
1099   resumeOnRecv=false;
1100   AsyncEvacuate(CmiFalse);
1101 }
1102
1103 ampi::ampi()
1104 {
1105   /* this constructor only exists so we can create an empty array during split */
1106   CkAbort("Default ampi constructor should never be called");
1107 }
1108
1109 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
1110    :parentProxy(parent_)
1111 {
1112   init();
1113
1114   myComm=s; myComm.setArrayID(thisArrayID);
1115   myRank=myComm.getRankForIndex(thisIndex);
1116
1117   findParent(false);
1118
1119   msgs = CmmNew();
1120   posted_ireqs = CmmNew();
1121   nbcasts = 0;
1122
1123   comlibProxy = thisProxy; // Will later be associated with comlib
1124   
1125   seqEntries=parent->numElements;
1126   oorder.init (seqEntries);
1127 #ifdef _FAULT_MLOG_
1128     if(thisIndex == 0){
1129 /*      CkAssert(CkMyPe() == 0);
1130  *              CkGroupID _myManagerGID = thisProxy.ckGetArrayID();     
1131  *                      CkAssert(numElements);
1132  *                              printf("ampi::ampi setting numInitial to %d on manager at gid %d \n",numElements,_myManagerGID.idx);
1133  *                                      CkArray *_myManager = thisProxy.ckLocalBranch();
1134  *                                              _myManager->setNumInitial(numElements);*/
1135     }
1136 #endif
1137 }
1138
1139 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
1140                 ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
1141    :parentProxy(parent_),ciStreaming(ciStreaming_),ciBcast(ciBcast_),ciAllgather(ciAllgather_),ciAlltoall(ciAlltoall_)
1142 {
1143   init();
1144
1145   myComm=s; myComm.setArrayID(thisArrayID);
1146   myRank=myComm.getRankForIndex(thisIndex);
1147
1148   findParent(false);
1149
1150   msgs = CmmNew();
1151   posted_ireqs = CmmNew();
1152   nbcasts = 0;
1153
1154   comlibProxy = thisProxy;
1155   CmiPrintf("comlibProxy created as a copy of thisProxy, no associate call\n");
1156
1157 #if AMPI_COMLIB
1158   //  ComlibAssociateProxy(ciAlltoall, comlibProxy);
1159 #endif
1160
1161   seqEntries=parent->numElements;
1162   oorder.init (seqEntries);
1163 }
1164
1165 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
1166 {
1167   init();
1168   
1169   seqEntries=-1;
1170 }
1171
1172 void ampi::ckJustMigrated(void)
1173 {
1174         findParent(true);
1175         ArrayElement1D::ckJustMigrated();
1176 }
1177
1178 void ampi::ckJustRestored(void)
1179 {
1180         FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
1181         findParent(true);
1182         ArrayElement1D::ckJustRestored();
1183         
1184         //BIGSIM_OOC DEBUGGING
1185         //CkPrintf("In ampi[%d] thread[%p]:   ", thisIndex, thread);
1186         //CthPrintThdMagic(thread->getTid()); 
1187 }
1188
1189 void ampi::findParent(bool forMigration) {
1190         STARTUP_DEBUG("ampi> finding my parent")
1191         parent=parentProxy[thisIndex].ckLocal();
1192         if (parent==NULL) CkAbort("AMPI can't find its parent!");
1193         thread=parent->registerAmpi(this,myComm,forMigration);
1194         if (thread==NULL) CkAbort("AMPI can't find its thread!");
1195 //      printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
1196 }
1197
1198 //The following method should be called on the first element of the
1199 //ampi array
1200 void ampi::allInitDone(CkReductionMsg *m){
1201     FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
1202     thisProxy.setInitDoneFlag();
1203     delete m;
1204 }
1205
1206 void ampi::setInitDoneFlag(){
1207     //CkPrintf("ampi[%d]::setInitDone called!\n", thisIndex);
1208     parent->ampiInitCallDone=1;
1209     parent->getTCharmThread()->start();
1210 }
1211
1212 static void cmm_pup_ampi_message(pup_er p,void **msg) {
1213         CkPupMessage(*(PUP::er *)p,msg,1);
1214         if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
1215 //      printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
1216 }
1217
1218 static void cmm_pup_posted_ireq(pup_er p,void **msg) {
1219
1220         pup_int(p, (int *)msg);
1221
1222 /*      if(pup_isUnpacking(p)){
1223             // *msg = new IReq;
1224             //when unpacking, nothing is needed to do since *msg is an index
1225             //(of type integer) to the ampiParent::ampiReqs (the AmpiRequestList)
1226         }
1227         if(!pup_isUnpacking(p)){
1228             AmpiRequestList *reqL = getReqs();
1229             int retIdx = reqL->findRequestIndex((IReq *)*msg);
1230             if(retIdx==-1){
1231                 CmiAbort("An AmpiRequest instance should be found for an instance in posted_ireq!\n");
1232             }
1233             pup_int(p, retIdx)
1234         }
1235 */
1236 //      ((IReq *)*msg)->pup(*(PUP::er *)p);
1237
1238 //      if (pup_isDeleting(p)) delete (IReq *)*msg;
1239 //      printf("[%d] pupping postd irequests %p \n",CkMyPe(),*msg);
1240 }
1241
1242 void ampi::pup(PUP::er &p)
1243 {
1244   if(!p.isUserlevel())
1245     ArrayElement1D::pup(p);//Pack superclass
1246   p|parentProxy;
1247   p|myComm;
1248   p|myRank;
1249   p|nbcasts;
1250   p|tmpVec;
1251   p|remoteProxy;
1252   p|resumeOnRecv;
1253   p|comlibProxy;
1254   p|ciStreaming;
1255   p|ciBcast;
1256   p|ciAllgather;
1257   p|ciAlltoall;
1258
1259 #if AMPI_COMLIB
1260   if(p.isUnpacking()){
1261 //    ciStreaming.setSourcePe();
1262 //    ciBcast.setSourcePe();
1263 //    ciAllgather.setSourcePe();
1264 //    ciAlltoall.setSourcePe();
1265   }
1266 #endif
1267
1268   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
1269
1270   //BIGSIM_OOC DEBUGGING
1271   //if(!p.isUnpacking()){
1272     //CkPrintf("ampi[%d]::packing: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1273   //}
1274
1275   posted_ireqs = CmmPup((pup_er)&p, posted_ireqs, cmm_pup_posted_ireq);
1276
1277   //if(p.isUnpacking()){
1278     //CkPrintf("ampi[%d]::unpacking: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1279   //}
1280
1281   p|seqEntries;
1282   p|oorder;
1283 }
1284
1285 ampi::~ampi()
1286 {
1287   if (CkInRestarting() || BgOutOfCoreFlag==1) {
1288     // in restarting, we need to flush messages
1289     int tags[3], sts[3];
1290     tags[0] = tags[1] = tags[2] = CmmWildCard;
1291     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1292     while (msg) {
1293       delete msg;
1294       msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1295     }
1296   }
1297
1298   CmmFree(msgs);
1299   CmmFreeAll(posted_ireqs);
1300 }
1301
1302 //------------------------ Communicator Splitting ---------------------
1303 class ampiSplitKey {
1304 public:
1305         int nextSplitComm;
1306         int color; //New class of processes we'll belong to
1307         int key; //To determine rank in new ordering
1308         int rank; //Rank in old ordering
1309         ampiSplitKey() {}
1310         ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
1311                 :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
1312 };
1313
1314 /* "type" may indicate whether call is for a cartesian topology etc. */
1315
1316 void ampi::split(int color,int key,MPI_Comm *dest, int type)
1317 {
1318 #if CMK_BLUEGENE_CHARM
1319   void *curLog;         // store current log in timeline
1320   _TRACE_BG_TLINE_END(&curLog);
1321 //  TRACE_BG_AMPI_SUSPEND();
1322 #endif
1323   if (type == CART_TOPOL) {
1324         ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
1325         int rootIdx=myComm.getIndexForRank(0);
1326         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1327         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1328
1329         thread->suspend(); //Resumed by ampiParent::cartChildRegister
1330         MPI_Comm newComm=parent->getNextCart()-1;
1331         *dest=newComm;
1332   } else {
1333         ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
1334         int rootIdx=myComm.getIndexForRank(0);
1335         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1336         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1337
1338         thread->suspend(); //Resumed by ampiParent::splitChildRegister
1339         MPI_Comm newComm=parent->getNextSplit()-1;
1340         *dest=newComm;
1341   }
1342 #if CMK_BLUEGENE_CHARM
1343 //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "SPLIT_RESUME", curLog);
1344   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("SPLIT_RESUME", &curLog);
1345   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
1346 #endif
1347 }
1348
1349 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
1350         const ampiSplitKey *a=(const ampiSplitKey *)a_;
1351         const ampiSplitKey *b=(const ampiSplitKey *)b_;
1352         if (a->color!=b->color) return a->color-b->color;
1353         if (a->key!=b->key) return a->key-b->key;
1354         return a->rank-b->rank;
1355 }
1356
1357 void ampi::splitPhase1(CkReductionMsg *msg)
1358 {
1359         //Order the keys, which orders the ranks properly:
1360         int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1361         ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1362         if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1363         qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1364
1365         MPI_Comm newComm = -1;
1366         for(int i=0;i<nKeys;i++)
1367                 if(keys[i].nextSplitComm>newComm)
1368                         newComm = keys[i].nextSplitComm;
1369
1370         //Loop over the sorted keys, which gives us the new arrays:
1371         int lastColor=keys[0].color-1; //The color we're building an array for
1372         CProxy_ampi lastAmpi; //The array for lastColor
1373         int lastRoot=0; //C value for new rank 0 process for latest color
1374         ampiCommStruct lastComm; //Communicator info. for latest color
1375         for (int c=0;c<nKeys;c++) {
1376                 if (keys[c].color!=lastColor)
1377                 { //Hit a new color-- need to build a new communicator and array
1378                         lastColor=keys[c].color;
1379                         lastRoot=c;
1380                         CkArrayOptions opts;
1381                         opts.bindTo(parentProxy);
1382                         opts.setNumInitial(0);
1383                         CkArrayID unusedAID; ampiCommStruct unusedComm;
1384                         lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1385                         lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1386
1387                         CkVec<int> indices; //Maps rank to array indices for new arrau
1388                         for (int i=c;i<nKeys;i++) {
1389                                 if (keys[i].color!=lastColor) break; //Done with this color
1390                                 int idx=myComm.getIndexForRank(keys[i].rank);
1391                                 indices.push_back(idx);
1392                         }
1393
1394                         //FIXME: create a new communicator for each color, instead of
1395                         // (confusingly) re-using the same MPI_Comm number for each.
1396                         lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1397                 }
1398                 int newRank=c-lastRoot;
1399                 int newIdx=lastComm.getIndexForRank(newRank);
1400
1401                 //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1402                 lastAmpi[newIdx].insert(parentProxy,lastComm);
1403         }
1404
1405         delete msg;
1406 }
1407
1408 //...newly created array elements register with the parent, which calls:
1409 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1410         int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1411         if (splitComm.size()<=idx) splitComm.resize(idx+1);
1412         splitComm[idx]=new ampiCommStruct(s);
1413         thread->resume(); //Matches suspend at end of ampi::split
1414 }
1415
1416 //-----------------create communicator from group--------------
1417 // The procedure is like that of comm_split very much,
1418 // so the code is shamelessly copied from above
1419 //   1. reduction to make sure all members have called
1420 //   2. the root in the old communicator create the new array
1421 //   3. ampiParent::register is called to register new array as new comm
1422 class vecStruct {
1423 public:
1424   int nextgroup;
1425   groupStruct vec;
1426   vecStruct():nextgroup(-1){}
1427   vecStruct(int nextgroup_, groupStruct vec_)
1428     : nextgroup(nextgroup_), vec(vec_) { }
1429 };
1430
1431 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1432   int rootIdx=vec[0];
1433   tmpVec = vec;
1434   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1435   MPI_Comm nextgroup = parent->getNextGroup();
1436   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1437
1438   if(getPosOp(thisIndex,vec)>=0){
1439     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1440     MPI_Comm retcomm = parent->getNextGroup()-1;
1441     *newcomm = retcomm;
1442   }else{
1443     *newcomm = MPI_COMM_NULL;
1444   }
1445 }
1446
1447 void ampi::commCreatePhase1(CkReductionMsg *msg){
1448   MPI_Comm *nextGroupComm = (int *)msg->getData();
1449
1450   CkArrayOptions opts;
1451   opts.bindTo(parentProxy);
1452   opts.setNumInitial(0);
1453   CkArrayID unusedAID;
1454   ampiCommStruct unusedComm;
1455   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1456   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1457
1458   groupStruct indices = tmpVec;
1459   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1460   for(int i=0;i<indices.size();i++){
1461     int newIdx=indices[i];
1462     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1463   }
1464   delete msg;
1465 }
1466
1467 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1468   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1469   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1470   groupComm[idx]=new ampiCommStruct(s);
1471   thread->resume(); //Matches suspend at end of ampi::split
1472 }
1473
1474 /* Virtual topology communicator creation */
1475 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1476   int rootIdx=vec[0];
1477   tmpVec = vec;
1478   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1479
1480   MPI_Comm nextcart = parent->getNextCart();
1481   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1482
1483   if(getPosOp(thisIndex,vec)>=0){
1484     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1485      MPI_Comm retcomm = parent->getNextCart()-1;
1486      *newcomm = retcomm;
1487   }else
1488     *newcomm = MPI_COMM_NULL;
1489 }
1490
1491 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1492   MPI_Comm *nextCartComm = (int *)msg->getData();
1493
1494   CkArrayOptions opts;
1495   opts.bindTo(parentProxy);
1496   opts.setNumInitial(0);
1497   CkArrayID unusedAID;
1498   ampiCommStruct unusedComm;
1499   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1500   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1501
1502   groupStruct indices = tmpVec;
1503   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1504                                                 size(),indices);
1505   for(int i=0;i<indices.size();i++){
1506     int newIdx=indices[i];
1507     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1508   }
1509   delete msg;
1510 }
1511
1512 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1513   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1514   if (cartComm.size()<=idx) {
1515     cartComm.resize(idx+1);
1516     cartComm.length()=idx+1;
1517   }
1518   cartComm[idx]=new ampiCommStruct(s);
1519   thread->resume(); //Matches suspend at end of ampi::cartCreate
1520 }
1521
1522 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1523   int rootIdx=vec[0];
1524   tmpVec = vec;
1525   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1526                 myComm.getProxy());
1527   MPI_Comm nextgraph = parent->getNextGraph();
1528   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1529
1530   if(getPosOp(thisIndex,vec)>=0){
1531     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1532     MPI_Comm retcomm = parent->getNextGraph()-1;
1533     *newcomm = retcomm;
1534   }else
1535     *newcomm = MPI_COMM_NULL;
1536 }
1537
1538 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1539   MPI_Comm *nextGraphComm = (int *)msg->getData();
1540
1541   CkArrayOptions opts;
1542   opts.bindTo(parentProxy);
1543   opts.setNumInitial(0);
1544   CkArrayID unusedAID;
1545   ampiCommStruct unusedComm;
1546   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1547   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1548
1549   groupStruct indices = tmpVec;
1550   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1551                                                 .size(),indices);
1552   for(int i=0;i<indices.size();i++){
1553     int newIdx=indices[i];
1554     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1555   }
1556   delete msg;
1557 }
1558
1559 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1560   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1561   if (graphComm.size()<=idx) {
1562     graphComm.resize(idx+1);
1563     graphComm.length()=idx+1;
1564   }
1565   graphComm[idx]=new ampiCommStruct(s);
1566   thread->resume(); //Matches suspend at end of ampi::graphCreate
1567 }
1568
1569 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1570   if(thisIndex==root) { // not everybody gets the valid rvec
1571     tmpVec = rvec;
1572   }
1573   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1574   MPI_Comm nextinter = parent->getNextInter();
1575   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1576
1577   thread->suspend(); //Resumed by ampiParent::interChildRegister
1578   MPI_Comm newcomm=parent->getNextInter()-1;
1579   *ncomm=newcomm;
1580 }
1581
1582 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1583   MPI_Comm *nextInterComm = (int *)msg->getData();
1584
1585   groupStruct lgroup = myComm.getIndices();
1586   CkArrayOptions opts;
1587   opts.bindTo(parentProxy);
1588   opts.setNumInitial(0);
1589   CkArrayID unusedAID;
1590   ampiCommStruct unusedComm;
1591   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1592   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1593
1594   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1595   for(int i=0;i<lgroup.size();i++){
1596     int newIdx=lgroup[i];
1597     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1598   }
1599
1600   parentProxy[0].ExchangeProxy(newAmpi);
1601   delete msg;
1602 }
1603
1604 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1605   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1606   if (interComm.size()<=idx) interComm.resize(idx+1);
1607   interComm[idx]=new ampiCommStruct(s);
1608   //thread->resume(); // don't resume it yet, till parent set remote proxy
1609 }
1610
1611 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1612   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1613     groupStruct lvec = myComm.getIndices();
1614     groupStruct rvec = myComm.getRemoteIndices();
1615     int rsize = rvec.size();
1616     tmpVec = lvec;
1617     for(int i=0;i<rsize;i++)
1618       tmpVec.push_back(rvec[i]);
1619     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1620   }else{
1621     tmpVec.resize(0);
1622   }
1623
1624   int rootIdx=myComm.getIndexForRank(0);
1625   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1626   MPI_Comm nextintra = parent->getNextIntra();
1627   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1628
1629   thread->suspend(); //Resumed by ampiParent::interChildRegister
1630   MPI_Comm newcomm=parent->getNextIntra()-1;
1631   *ncomm=newcomm;
1632 }
1633
1634 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1635   if(tmpVec.size()==0) { delete msg; return; }
1636   MPI_Comm *nextIntraComm = (int *)msg->getData();
1637   CkArrayOptions opts;
1638   opts.bindTo(parentProxy);
1639   opts.setNumInitial(0);
1640   CkArrayID unusedAID;
1641   ampiCommStruct unusedComm;
1642   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1643   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1644
1645   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1646   for(int i=0;i<tmpVec.size();i++){
1647     int newIdx=tmpVec[i];
1648     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1649   }
1650   delete msg;
1651 }
1652
1653 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1654   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1655   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1656   intraComm[idx]=new ampiCommStruct(s);
1657   thread->resume(); //Matches suspend at end of ampi::split
1658 }
1659
1660 //------------------------ communication -----------------------
1661 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1662 {
1663   if (universeNo>MPI_COMM_WORLD) {
1664     int worldDex=universeNo-MPI_COMM_WORLD-1;
1665     if (worldDex>=mpi_nworlds)
1666       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1667     return mpi_worlds[worldDex].comm;
1668   }
1669   CkAbort("Bad communicator passed to universeComm2CommStruct");
1670   return mpi_worlds[0].comm; // meaningless return
1671 }
1672
1673 void ampi::block(void){
1674         thread->suspend();
1675 }
1676
1677 void ampi::yield(void){
1678         thread->schedule();
1679 }
1680
1681 void ampi::unblock(void){
1682         thread->resume();
1683 }
1684
1685 void
1686 ampi::generic(AmpiMsg* msg)
1687 {
1688 MSG_ORDER_DEBUG(
1689         CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1690         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1691 )
1692 #if CMK_BLUEGENE_CHARM
1693   TRACE_BG_ADD_TAG("AMPI_generic");
1694   msg->event = NULL;
1695 #endif
1696
1697   int sync = UsrToEnv(msg)->getRef();
1698   int srcIdx;
1699   if (sync)  srcIdx = msg->srcIdx;
1700
1701 //      AmpiMsg *msgcopy = msg;
1702   if(msg->seq != -1) {
1703     int srcIdx=msg->srcIdx;
1704     int n=oorder.put(srcIdx,msg);
1705     if (n>0) { // This message was in-order
1706       inorder(msg);
1707       if (n>1) { // It enables other, previously out-of-order messages
1708         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1709           inorder(msg);
1710         }
1711       }
1712     }
1713   } else { //Cross-world or system messages are unordered
1714     inorder(msg);
1715   }
1716   
1717     // msg may be free'ed from calling inorder()
1718   if (sync==1) {         // send an ack to sender
1719     CProxy_ampi pa(thisArrayID);
1720     pa[srcIdx].unblock();
1721   }
1722
1723   if(resumeOnRecv){
1724     //CkPrintf("Calling TCharm::resume at ampi::generic!\n");
1725     thread->resume();
1726   }
1727 }
1728
1729 inline static AmpiRequestList *getReqs(void); 
1730
1731 void
1732 ampi::inorder(AmpiMsg* msg)
1733 {
1734 MSG_ORDER_DEBUG(
1735   CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1736         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1737 )
1738   // check posted recvs
1739   int tags[3], sts[3];
1740   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1741   IReq *ireq = NULL;
1742   if (CpvAccess(CmiPICMethod) != 2) {
1743 #if 0
1744     //IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1745     ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1746 #else
1747 #if CMK_BLUEGENE_CHARM
1748     _TRACE_BG_TLINE_END(&msg->event);    // store current log
1749     msg->eventPe = CmiMyPe();
1750 #endif
1751     //in case ampi has not initialized and posted_ireqs are only inserted 
1752     //at AMPI_Irecv (MPI_Irecv)
1753     AmpiRequestList *reqL = &(parent->ampiReqs);
1754     //When storing the req index, it's 1-based. The reason is stated in the comments
1755     //in AMPI_Irecv function.
1756     int ireqIdx = (int)((long)CmmGet(posted_ireqs, 3, tags, sts));
1757     if(reqL->size()>0 && ireqIdx>0)
1758         ireq = (IReq *)(*reqL)[ireqIdx-1];
1759     //CkPrintf("[%d] ampi::inorder, ireqIdx=%d\n", thisIndex, ireqIdx);
1760 #endif
1761     //CkPrintf("[%d] ampi::inorder, ireq=%p\n", thisIndex, ireq);
1762     if (ireq) { // receive posted
1763       ireq->receive(this, msg);
1764       // Isaac changed this so that the IReq stores the tag when receiving the message, 
1765       // instead of using this user supplied tag which could be MPI_ANY_TAG
1766       // Formerly the following line was not commented out:
1767       //ireq->tag = sts[0];         
1768       //ireq->src = sts[1];
1769       //ireq->comm = sts[2];
1770     } else {
1771       CmmPut(msgs, 3, tags, msg);
1772     }
1773   }
1774   else
1775       CmmPut(msgs, 3, tags, msg);
1776 }
1777
1778 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
1779 {
1780     int tags[3];
1781     tags[0] = t; tags[1] = s; tags[2] = comm;
1782     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1783     return msg;
1784 }
1785
1786 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1787         int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm, int sync)
1788 {
1789   CkDDT_DataType *ddt = getDDT()->getType(type);
1790   int len = ddt->getSize(count);
1791   int sIdx=thisIndex;
1792   int seq = -1;
1793   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_TAG_UB_VALUE) //Not cross-module: set seqno
1794   seq = oorder.nextOutgoing(destIdx);
1795   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1796   if (sync) UsrToEnv(msg)->setRef(sync);
1797   TCharm::activateVariable(buf);
1798   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1799   TCharm::deactivateVariable(buf);
1800   return msg;
1801 }
1802
1803 void
1804 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1805 {
1806   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1807 }
1808
1809 void
1810 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, int sync)
1811 {
1812 #if CMK_TRACE_IN_CHARM
1813    TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
1814 #endif
1815
1816 #ifdef _FAULT_MLOG_
1817   MPI_Comm disComm = myComm.getComm();
1818   ampi *dis = getAmpiInstance(disComm);
1819   CpvAccess(_currentObj) = dis;
1820 #endif
1821
1822   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1823   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),sync);
1824
1825 #if CMK_TRACE_IN_CHARM
1826    TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
1827 #endif
1828
1829   if (sync) {
1830     // waiting for receiver side
1831     resumeOnRecv = false;            // so no one else awakes it
1832     block();
1833   }
1834 }
1835
1836 void
1837 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1838 {
1839   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
1840   memcpy(msg->data, buf, len);
1841   CProxy_ampi pa(aid);
1842   pa[idx].generic(msg);
1843 }
1844
1845 void
1846 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy, int sync)
1847 {
1848   if(rank==MPI_PROC_NULL) return;
1849   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1850   int destIdx = dest.getIndexForRank(rank);
1851   if(isInter()){
1852     sRank = parent->thisIndex;
1853     destcomm = MPI_COMM_FIRST_INTER;
1854     destIdx = dest.getIndexForRemoteRank(rank);
1855     arrproxy = remoteProxy;
1856   }
1857  MSG_ORDER_DEBUG(
1858   CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
1859  )
1860
1861   arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm,sync));
1862
1863 #ifndef CMK_OPTIMIZE
1864   int size=0;
1865   MPI_Type_size(type,&size);
1866   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
1867 #endif
1868 }
1869
1870 int
1871 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
1872 {
1873   CkDDT_DataType *ddt = getDDT()->getType(type);
1874   int len = ddt->getSize(count);
1875   
1876   if (msg->length > len && msg->length-len!=sizeof(AmpiOpHeader))
1877   { /* Received more data than we were expecting */
1878     char einfo[1024];
1879     sprintf(einfo, "FATAL ERROR in rank %d MPI_Recv (tag=%d, source=%d)\n"
1880         "  Expecting only %d bytes (%d items of type %d), \n"
1881         "  but received %d bytes from rank %d\nAMPI> MPI_Send was longer than matching MPI_Recv.",
1882             thisIndex,t,s,
1883             len, count, type,
1884             msg->length, msg->srcRank);
1885     CkAbort(einfo);
1886     return -1;
1887   }else if(msg->length < len){ // only at rare case shall we reset count by using divide
1888     count = msg->length/(ddt->getSize(1));
1889   }
1890   //
1891   TCharm::activateVariable(buf);
1892   if (msg->length-len==sizeof(AmpiOpHeader)) {  // reduction msg
1893     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
1894   } else {
1895     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
1896   }
1897   TCharm::deactivateVariable(buf);
1898   return 0;
1899 }
1900
1901 int
1902 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
1903 {
1904   MPI_Comm disComm = myComm.getComm();
1905   if(s==MPI_PROC_NULL) {
1906     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
1907     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
1908     ((MPI_Status *)sts)->MPI_LENGTH = 0;
1909     return 0;
1910   }
1911   _LOG_E_END_AMPI_PROCESSING(thisIndex)
1912 #if CMK_BLUEGENE_CHARM
1913   void *curLog;         // store current log in timeline
1914   _TRACE_BG_TLINE_END(&curLog);
1915 //  TRACE_BG_AMPI_SUSPEND();
1916 #if CMK_TRACE_IN_CHARM
1917   if(CpvAccess(traceOn)) traceSuspend();
1918 #endif
1919 #endif
1920
1921   if(isInter()){
1922     s = myComm.getIndexForRemoteRank(s);
1923     comm = MPI_COMM_FIRST_INTER;
1924   }
1925
1926   int tags[3];
1927   AmpiMsg *msg = 0;
1928   
1929  MSG_ORDER_DEBUG(
1930   CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
1931  )
1932
1933   resumeOnRecv=true;
1934   ampi *dis = getAmpiInstance(disComm);
1935 #ifdef _FAULT_MLOG_
1936 //  dis->yield();
1937 //  processRemoteMlogMessages();
1938 #endif
1939   int dosuspend = 0;
1940   while(1) {
1941       //This is done to take into account the case in which an ampi 
1942       // thread has migrated while waiting for a message
1943     tags[0] = t; tags[1] = s; tags[2] = comm;
1944     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
1945     if (msg) break;
1946     dis->thread->suspend();
1947     dosuspend = 1;
1948     dis = getAmpiInstance(disComm);
1949   }
1950
1951 #ifdef _FAULT_MLOG_
1952         CpvAccess(_currentObj) = dis;
1953         MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
1954 #endif
1955
1956   dis->resumeOnRecv=false;
1957
1958   if(sts)
1959     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1960   int status = dis->processMessage(msg, t, s, buf, count, type);
1961   if (status != 0) return status;
1962
1963   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
1964
1965 #if CMK_BLUEGENE_CHARM
1966 #if CMK_TRACE_IN_CHARM
1967   //if(CpvAccess(traceOn)) CthTraceResume(thread->getThread());
1968   //Due to the reason mentioned the in the while loop above, we need to 
1969   //use "dis" as "this" in the case of migration (or out-of-core execution in BigSim)
1970   if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
1971 #endif
1972   //TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", &curLog, 1);
1973   //TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0);
1974   //_TRACE_BG_SET_INFO((char *)msg, "RECV_RESUME",  &curLog, 1);
1975 #if 0
1976 #if 1
1977   if (!dosuspend) {
1978     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 1);
1979     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
1980   }
1981   else
1982 #endif
1983   TRACE_BG_ADD_TAG("RECV_RESUME_THREAD");
1984 #else
1985     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
1986     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
1987 #endif
1988 #endif
1989
1990   delete msg;
1991   return 0;
1992 }
1993
1994 void
1995 ampi::probe(int t, int s, int comm, int *sts)
1996 {
1997   int tags[3];
1998 #if CMK_BLUEGENE_CHARM
1999   void *curLog;         // store current log in timeline
2000   _TRACE_BG_TLINE_END(&curLog);
2001 //  TRACE_BG_AMPI_SUSPEND();
2002 #endif
2003
2004   AmpiMsg *msg = 0;
2005   resumeOnRecv=true;
2006   while(1) {
2007     tags[0] = t; tags[1] = s; tags[2] = comm;
2008     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2009     if (msg) break;
2010     thread->suspend();
2011   }
2012   resumeOnRecv=false;
2013   if(sts)
2014     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2015 #if CMK_BLUEGENE_CHARM
2016 //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
2017   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
2018 #endif
2019 }
2020
2021 int
2022 ampi::iprobe(int t, int s, int comm, int *sts)
2023 {
2024   int tags[3];
2025   AmpiMsg *msg = 0;
2026   tags[0] = t; tags[1] = s; tags[2] = comm;
2027   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2028   if (msg) {
2029     if(sts)
2030       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2031     return 1;
2032   }
2033 #if CMK_BLUEGENE_CHARM
2034   void *curLog;         // store current log in timeline
2035   _TRACE_BG_TLINE_END(&curLog);
2036 //  TRACE_BG_AMPI_SUSPEND();
2037 #endif
2038   thread->schedule();
2039 #if CMK_BLUEGENE_CHARM
2040   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("IPROBE_RESUME", &curLog);
2041   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
2042 #endif
2043   return 0;
2044 }
2045
2046
2047 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
2048 void
2049 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
2050 {
2051   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2052   int rootIdx=dest.getIndexForRank(root);
2053   if(rootIdx==thisIndex) {
2054 #if 0//AMPI_COMLIB
2055     ciBcast.beginIteration();
2056     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2057 #else
2058 #ifdef _FAULT_MLOG_
2059         CpvAccess(_currentObj) = this;
2060 #endif
2061     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2062 #endif
2063   }
2064   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
2065   nbcasts++;
2066 }
2067
2068 void
2069 ampi::bcastraw(void* buf, int len, CkArrayID aid)
2070 {
2071   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, 0);
2072   memcpy(msg->data, buf, len);
2073   CProxy_ampi pa(aid);
2074   pa.generic(msg);
2075 }
2076
2077
2078 AmpiMsg* 
2079 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
2080 {
2081   CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
2082   int unit;
2083   CkDDT_DataType *ddt = getDDT()->getType(type);
2084   unit = ddt->getSize(1);
2085   int totalsize = unit*cnt;
2086
2087   AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
2088   char* addr = (char*)Alltoallbuff+disp*unit;
2089   ddt->serialize((char*)msg->data, addr, cnt, (-1));
2090   return msg;
2091 }
2092
2093 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
2094                         void *attr_in, void *attr_out, int *flag){
2095   (*flag) = 0;
2096   return (MPI_SUCCESS);
2097 }
2098 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
2099                         void *attr_in, void *attr_out, int *flag){
2100   (*(void **)attr_out) = attr_in;
2101   (*flag) = 1;
2102   return (MPI_SUCCESS);
2103 }
2104 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
2105   return (MPI_SUCCESS);
2106 }
2107
2108
2109 void AmpiSeqQ::init(int numP) 
2110 {
2111   elements.init(numP);
2112 }
2113
2114 AmpiSeqQ::~AmpiSeqQ () {
2115 }
2116   
2117 void AmpiSeqQ::pup(PUP::er &p) {
2118   p|out;
2119   p|elements;
2120 }
2121
2122 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
2123 {
2124   AmpiOtherElement &el=elements[srcIdx];
2125 #ifndef CMK_OPTIMIZE
2126   if (msg->seq<el.seqIncoming)
2127     CkAbort("AMPI Logic error: received late out-of-order message!\n");
2128 #endif
2129   out.enq(msg);
2130   el.nOut++; // We have another message in the out-of-order queue
2131 }
2132
2133 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
2134 {
2135   AmpiOtherElement &el=elements[srcIdx];
2136   if (el.nOut==0) return 0; // No more out-of-order left.
2137   // Walk through our out-of-order queue, searching for our next message:
2138   for (int i=0;i<out.length();i++) {
2139     AmpiMsg *msg=out.deq();
2140     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
2141       el.seqIncoming++;
2142       el.nOut--; // We have one less message out-of-order
2143       return msg;
2144     }
2145     else
2146       out.enq(msg);
2147   }
2148   // We walked the whole queue-- ours is not there.
2149   return 0;
2150 }
2151
2152 //BIGSIM_OOC DEBUGGING: Output for AmpiRequest and its children classes
2153 void AmpiRequest::print(){
2154             CmiPrintf("In AmpiRequest: buf=%p, count=%d, type=%d, src=%d, tag=%d, comm=%d, isvalid=%d\n", buf, count, type, src, tag, comm, isvalid);
2155 }
2156
2157 void PersReq::print(){
2158     AmpiRequest::print();
2159     CmiPrintf("In PersReq: sndrcv=%d\n", sndrcv);
2160 }
2161
2162 void IReq::print(){
2163     AmpiRequest::print();
2164     CmiPrintf("In IReq: this=%p, status=%d, length=%d\n", this, statusIreq, length);
2165 }
2166
2167 void ATAReq::print(){ //not complete for myreqs
2168     AmpiRequest::print();
2169     CmiPrintf("In ATAReq: elmcount=%d, idx=%d\n", elmcount, idx);
2170
2171
2172 void AmpiRequestList::pup(PUP::er &p) { 
2173         if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
2174                 return;
2175         }
2176
2177         p(blklen); //Allocated size of block
2178         p(len); //Number of used elements in block
2179         if(p.isUnpacking()){
2180                 makeBlock(blklen,len);
2181         }
2182         int count=0;
2183         for(int i=0;i<len;i++){
2184                 char nonnull;
2185                 if(!p.isUnpacking()){
2186                         if(block[i] == NULL){
2187                                 nonnull = 0;
2188                         }else{
2189                                 nonnull = block[i]->getType();
2190                         }
2191                 }       
2192                 p(nonnull);
2193                 if(nonnull != 0){
2194                         if(p.isUnpacking()){
2195                                 switch(nonnull){
2196                                         case 1:
2197                                                 block[i] = new PersReq;
2198                                                 break;
2199                                         case 2: 
2200                                                 block[i] = new IReq;
2201                                                 break;
2202                                         case 3: 
2203                                                 block[i] = new ATAReq;
2204                                                 break;
2205                                 }
2206                         }       
2207                         block[i]->pup(p);
2208                         count++;
2209                 }else{
2210                         block[i] = 0;
2211                 }
2212         }
2213         if(p.isDeleting()){
2214                 freeBlock();
2215         }
2216 }
2217
2218 //------------------ External Interface -----------------
2219 ampiParent *getAmpiParent(void) {
2220   ampiParent *p = CtvAccess(ampiPtr);
2221 #ifndef CMK_OPTIMIZE
2222   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
2223 #endif
2224   return p;
2225 }
2226
2227 ampi *getAmpiInstance(MPI_Comm comm) {
2228   ampi *ptr=getAmpiParent()->comm2ampi(comm);
2229 #ifndef CMK_OPTIMIZE
2230   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
2231 #endif
2232   return ptr;
2233 }
2234
2235 inline static AmpiRequestList *getReqs(void) {
2236   return &(getAmpiParent()->ampiReqs);
2237 }
2238
2239 inline void checkComm(MPI_Comm comm){
2240 #ifndef CMK_OPTIMIZE
2241   getAmpiParent()->checkComm(comm);
2242 #endif
2243 }
2244
2245 inline void checkRequest(MPI_Request req){
2246 #ifndef CMK_OPTIMIZE
2247   getReqs()->checkRequest(req);
2248 #endif
2249 }
2250
2251 inline void checkRequests(int n, MPI_Request* reqs){
2252 #ifndef CMK_OPTIMIZE
2253   AmpiRequestList* reqlist = getReqs();
2254   for(int i=0;i<n;i++)
2255     reqlist->checkRequest(reqs[i]);
2256 #endif
2257 }
2258
2259 CDECL void AMPI_Migrate(void)
2260 {
2261 //  AMPIAPI("AMPI_Migrate");
2262 #if 0
2263 #if CMK_BLUEGENE_CHARM
2264   TRACE_BG_AMPI_SUSPEND();
2265 #endif
2266 #endif
2267   TCHARM_Migrate();
2268
2269 #ifdef _FAULT_MLOG_
2270     ampi *currentAmpi = getAmpiInstance(MPI_COMM_WORLD);
2271     CpvAccess(_currentObj) = currentAmpi;
2272 #endif
2273
2274 #if CMK_BLUEGENE_CHARM
2275 //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2276   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
2277 #endif
2278 }
2279
2280
2281 CDECL void AMPI_Evacuate(void)
2282 {
2283   TCHARM_Evacuate();
2284 }
2285
2286
2287
2288 CDECL void AMPI_Migrateto(int destPE)
2289 {
2290   AMPIAPI("AMPI_MigrateTo");
2291 #if 0
2292 #if CMK_BLUEGENE_CHARM
2293   TRACE_BG_AMPI_SUSPEND();
2294 #endif
2295 #endif
2296   TCHARM_Migrate_to(destPE);
2297 #if CMK_BLUEGENE_CHARM
2298   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
2299   TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
2300 #endif
2301 }
2302
2303 CDECL void AMPI_MigrateTo(int destPE)
2304 {
2305         AMPI_Migrateto(destPE);
2306 }
2307
2308 CDECL void AMPI_Async_Migrate(void)
2309 {
2310   AMPIAPI("AMPI_Async_Migrate");
2311 #if 0
2312 #if CMK_BLUEGENE_CHARM
2313   TRACE_BG_AMPI_SUSPEND();
2314 #endif
2315 #endif
2316   TCHARM_Async_Migrate();
2317 #if CMK_BLUEGENE_CHARM
2318   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2319   TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
2320 #endif
2321 }
2322
2323 CDECL void AMPI_Allow_Migrate(void)
2324 {
2325   AMPIAPI("AMPI_Allow_Migrate");
2326 #if 0
2327 #if CMK_BLUEGENE_CHARM
2328   TRACE_BG_AMPI_SUSPEND();
2329 #endif
2330 #endif
2331   TCHARM_Allow_Migrate();
2332 #if CMK_BLUEGENE_CHARM
2333   TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
2334 #endif
2335 }
2336
2337 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
2338 #if CMK_LBDB_ON
2339   //AMPIAPI("AMPI_Setmigratable");
2340   ampi *ptr=getAmpiInstance(comm);
2341   ptr->setMigratable(mig);
2342 #else
2343   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
2344 #endif
2345 }
2346
2347 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
2348 {
2349     //AMPIAPI("AMPI_Init");
2350   if (nodeinit_has_been_called) {
2351     AMPIAPI("AMPI_Init");
2352     char **argv;
2353     if (p_argv) argv=*p_argv;
2354     else argv=CkGetArgv();
2355     ampiInit(argv);
2356     if (p_argc) *p_argc=CmiGetArgc(argv);
2357   }
2358   else
2359   { /* Charm hasn't been started yet! */
2360     CkAbort("AMPI_Init> Charm is not initialized!");
2361   }
2362
2363   return 0;
2364 }
2365
2366 CDECL int AMPI_Initialized(int *isInit)
2367 {
2368   if (nodeinit_has_been_called) {
2369         AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2370         *isInit=CtvAccess(ampiInitDone);
2371   }
2372   else /* !nodeinit_has_been_called */ {
2373         *isInit=nodeinit_has_been_called;
2374   }
2375   return 0;
2376 }
2377
2378 CDECL int AMPI_Finalized(int *isFinalized)
2379 {
2380     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2381     *isFinalized=CtvAccess(ampiFinalized);
2382     return 0;
2383 }
2384
2385 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
2386 {
2387   //AMPIAPI("AMPI_Comm_rank");
2388
2389 #ifdef AMPIMSGLOG
2390   ampiParent* pptr = getAmpiParent();
2391   if(msgLogRead){
2392     PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
2393     return 0;
2394   }
2395 #endif
2396
2397   *rank = getAmpiInstance(comm)->getRank(comm);
2398
2399 #ifdef AMPIMSGLOG
2400   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2401     PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
2402   }
2403 #endif
2404   return 0;
2405 }
2406
2407 CDECL
2408 int AMPI_Comm_size(MPI_Comm comm, int *size)
2409 {
2410   //AMPIAPI("AMPI_Comm_size");
2411 #ifdef AMPIMSGLOG
2412   ampiParent* pptr = getAmpiParent();
2413   if(msgLogRead){
2414     PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
2415     return 0;
2416   }
2417 #endif
2418
2419   *size = getAmpiInstance(comm)->getSize(comm);
2420
2421 #ifdef AMPIMSGLOG
2422   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2423     PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
2424   }
2425 #endif
2426
2427   return 0;
2428 }
2429
2430 CDECL
2431 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
2432 {
2433   AMPIAPI("AMPI_Comm_compare");
2434   if(comm1==comm2) *result=MPI_IDENT;
2435   else{
2436     int equal=1;
2437     CkVec<int> ind1, ind2;
2438     ind1 = getAmpiInstance(comm1)->getIndices();
2439     ind2 = getAmpiInstance(comm2)->getIndices();
2440     if(ind1.size()==ind2.size()){
2441       for(int i=0;i<ind1.size();i++)
2442         if(ind1[i] != ind2[i]) { equal=0; break; }
2443     }
2444     if(equal==1) *result=MPI_CONGRUENT;
2445     else *result=MPI_UNEQUAL;
2446   }
2447   return 0;
2448 }
2449
2450 CDECL void AMPI_Exit(int /*exitCode*/)
2451 {
2452   AMPIAPI("AMPI_Exit");
2453   TCHARM_Done();
2454 }
2455 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
2456 {
2457   AMPI_Exit(*exitCode);
2458 }
2459
2460 CDECL
2461 int AMPI_Finalize(void)
2462 {
2463   AMPIAPI("AMPI_Finalize");
2464 #if PRINT_IDLE
2465   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
2466 #endif
2467 #if AMPI_COUNTER
2468     getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
2469 #endif
2470   CtvAccess(ampiFinalized)=1;
2471
2472 #if CMK_BLUEGENE_CHARM
2473 #if 0
2474   TRACE_BG_AMPI_SUSPEND();
2475 #endif
2476 #if CMK_TRACE_IN_CHARM
2477   if(CpvAccess(traceOn)) traceSuspend();
2478 #endif
2479 #endif
2480
2481 //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
2482   AMPI_Exit(0);
2483   return 0;
2484 }
2485
2486 CDECL
2487 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
2488                         int tag, MPI_Comm comm)
2489 {
2490   AMPIAPI("AMPI_Send");
2491 #ifdef AMPIMSGLOG
2492   if(msgLogRead){
2493     return 0;
2494   }
2495 #endif
2496
2497   ampi *ptr = getAmpiInstance(comm);
2498 #if AMPI_COMLIB
2499   if(enableStreaming){  
2500 //    ptr->getStreaming().beginIteration();
2501     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2502   } else
2503 #endif
2504     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
2505 #if AMPI_COUNTER
2506   getAmpiParent()->counters.send++;
2507 #endif
2508 #ifdef _FAULT_MLOG_
2509 //  ptr->yield();
2510 //  //  processRemoteMlogMessages();
2511 #endif
2512   return 0;
2513 }
2514
2515 CDECL
2516 int AMPI_Ssend(void *msg, int count, MPI_Datatype type, int dest,
2517                         int tag, MPI_Comm comm)
2518 {
2519   AMPIAPI("AMPI_Ssend");
2520 #ifdef AMPIMSGLOG
2521   if(msgLogRead){
2522     return 0;
2523   }
2524 #endif
2525
2526   ampi *ptr = getAmpiInstance(comm);
2527 #if AMPI_COMLIB
2528   if(enableStreaming){
2529     ptr->getStreaming().beginIteration();
2530     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2531   } else
2532 #endif
2533     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm, 1);
2534 #if AMPI_COUNTER
2535   getAmpiParent()->counters.send++;
2536 #endif
2537
2538   return 0;
2539 }
2540
2541 CDECL
2542 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
2543               MPI_Comm comm, MPI_Status *status)
2544 {
2545   AMPIAPI("AMPI_Recv");
2546
2547 #ifdef AMPIMSGLOG
2548   ampiParent* pptr = getAmpiParent();
2549   if(msgLogRead){
2550     (*(pptr->fromPUPer))|(pptr->pupBytes);
2551     PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
2552     PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
2553     return 0;
2554   }
2555 #endif
2556
2557   ampi *ptr = getAmpiInstance(comm);
2558   if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
2559   
2560 #if AMPI_COUNTER
2561   getAmpiParent()->counters.recv++;
2562 #endif
2563  
2564 #ifdef AMPIMSGLOG
2565   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2566     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2567     (*(pptr->toPUPer))|(pptr->pupBytes);
2568     PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
2569     PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
2570   }
2571 #endif
2572   
2573   return 0;
2574 }
2575
2576 CDECL
2577 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
2578 {
2579   AMPIAPI("AMPI_Probe");
2580   ampi *ptr = getAmpiInstance(comm);
2581   ptr->probe(tag,src, comm, (int*) status);
2582   return 0;
2583 }
2584
2585 CDECL
2586 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
2587 {
2588   AMPIAPI("AMPI_Iprobe");
2589   ampi *ptr = getAmpiInstance(comm);
2590   *flag = ptr->iprobe(tag,src,comm,(int*) status);
2591   return 0;
2592 }
2593
2594 CDECL
2595 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
2596                   int stag, void *rbuf, int rcount, int rtype,
2597                   int src, int rtag, MPI_Comm comm, MPI_Status *sts)
2598 {
2599   AMPIAPI("AMPI_Sendrecv");
2600   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
2601   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
2602   if (se) return se;
2603   else return re;
2604 }
2605
2606 CDECL
2607 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
2608                          int dest, int sendtag, int source, int recvtag,
2609                          MPI_Comm comm, MPI_Status *status)
2610 {
2611   AMPIAPI("AMPI_Sendrecv_replace");
2612   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
2613                       buf, count, datatype, source, recvtag, comm, status);
2614 }
2615
2616
2617 CDECL
2618 int AMPI_Barrier(MPI_Comm comm)
2619 {
2620   AMPIAPI("AMPI_Barrier");
2621
2622   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
2623 #if CMK_BLUEGENE_CHARM
2624   void *barrierLog;             // store current log in timeline
2625   //TRACE_BG_AMPI_BARRIER_START(barrierLog);
2626   TRACE_BG_AMPI_BREAK(NULL, "AMPI_Barrier", NULL, 0, 1);
2627   _TRACE_BG_TLINE_END(&barrierLog); 
2628 #endif
2629   //HACK: Use collective operation as a barrier.
2630   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
2631
2632   //BIGSIM_OOC DEBUGGING
2633   //CkPrintf("%d: in AMPI_Barrier, after AMPI_Allreduce\n", getAmpiParent()->thisIndex);
2634 #if CMK_BLUEGENE_CHARM
2635   //TRACE_BG_AMPI_BARRIER_END(barrierLog);
2636   //_TRACE_BG_SET_INFO(NULL, "AMPI_Barrier_END",  &barrierLog, 1);
2637   TRACE_BG_AMPI_BREAK(NULL, "AMPI_Barrier_END", &barrierLog, 1, 1);
2638 #endif
2639 #if AMPI_COUNTER
2640   getAmpiParent()->counters.barrier++;
2641 #endif
2642   return 0;
2643 }
2644
2645 CDECL
2646 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
2647                          MPI_Comm comm)
2648 {
2649   AMPIAPI("AMPI_Bcast");
2650   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
2651   if(comm==MPI_COMM_SELF) return 0;
2652
2653 #ifdef AMPIMSGLOG
2654   ampiParent* pptr = getAmpiParent();
2655   if(msgLogRead){
2656     (*(pptr->fromPUPer))|(pptr->pupBytes);
2657     PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
2658     return 0;
2659   }
2660 #endif
2661
2662   ampi* ptr = getAmpiInstance(comm);
2663   ptr->bcast(root, buf, count, type,comm);
2664 #if AMPI_COUNTER
2665   getAmpiParent()->counters.bcast++;
2666 #endif
2667
2668 #ifdef AMPIMSGLOG
2669   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2670     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2671     (*(pptr->toPUPer))|(pptr->pupBytes);
2672     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
2673   }
2674 #endif
2675 #ifdef _FAULT_MLOG_
2676 //  ptr->yield();
2677 //  //  processRemoteMlogMessages();
2678 #endif
2679
2680   return 0;
2681 }
2682
2683 /// This routine is called with the results of a Reduce or AllReduce
2684 const int MPI_REDUCE_SOURCE=0;
2685 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2686 void ampi::reduceResult(CkReductionMsg *msg)
2687 {
2688         MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2689   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2690              thisArrayID,thisIndex);
2691   delete msg;
2692 }
2693
2694 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2695 {
2696   int szdata = ddt->getSize(count);
2697   int szhdr = sizeof(AmpiOpHeader);
2698   AmpiOpHeader newhdr(op,type,count,szdata); 
2699   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2700   memcpy(msg->getData(),&newhdr,szhdr);
2701   TCharm::activateVariable(inbuf);
2702   ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2703   TCharm::deactivateVariable(inbuf);
2704   return msg;
2705 }
2706
2707 // Copy the MPI datatype "type" from inbuf to outbuf
2708 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2709   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2710   //  deserializing into the output.
2711   ampi *ptr = getAmpiInstance(comm);
2712   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2713   int len=ddt->getSize(count);
2714   char *serialized=new char[len];
2715   TCharm::activateVariable(inbuf);
2716   TCharm::activateVariable(outbuf);
2717   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
2718   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
2719   TCharm::deactivateVariable(outbuf);
2720   TCharm::deactivateVariable(inbuf);
2721   delete [] serialized;         // < memory leak!  // gzheng 
2722   
2723   return MPI_SUCCESS;
2724 }
2725
2726 CDECL
2727 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
2728                int root, MPI_Comm comm)
2729 {
2730   AMPIAPI("AMPI_Reduce");
2731   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2732
2733 #ifdef AMPIMSGLOG
2734   ampiParent* pptr = getAmpiParent();
2735   if(msgLogRead){
2736     (*(pptr->fromPUPer))|(pptr->pupBytes);
2737     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
2738     return 0;
2739   }
2740 #endif
2741   
2742   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
2743   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
2744   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
2745
2746   ampi *ptr = getAmpiInstance(comm);
2747   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
2748   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
2749   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
2750
2751   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2752
2753   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
2754   msg->setCallback(reduceCB);
2755         MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
2756   ptr->contribute(msg);
2757   if (ptr->thisIndex == rootIdx){
2758     /*HACK: Use recv() to block until reduction data comes back*/
2759     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2760       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
2761   }
2762 #if AMPI_COUNTER
2763   getAmpiParent()->counters.reduce++;
2764 #endif
2765
2766 #ifdef AMPIMSGLOG
2767   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2768     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2769     (*(pptr->toPUPer))|(pptr->pupBytes);
2770     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
2771   }
2772 #endif
2773   
2774   return 0;
2775 }
2776
2777 CDECL
2778 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
2779                   MPI_Op op, MPI_Comm comm)
2780 {
2781   AMPIAPI("AMPI_Allreduce");
2782   ampi *ptr = getAmpiInstance(comm);
2783  
2784   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
2785   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
2786   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
2787   
2788   CkDDT_DataType *ddt_type = ptr->getDDT()->getType(type);
2789   TRACE_BG_AMPI_SET_SIZE(count * ddt_type->getSize());
2790   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2791
2792 #ifdef AMPIMSGLOG
2793   ampiParent* pptr = getAmpiParent();
2794   if(msgLogRead){
2795     (*(pptr->fromPUPer))|(pptr->pupBytes);
2796     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
2797     //    CkExit();
2798     return 0;
2799   }
2800 #endif
2801
2802   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
2803   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
2804
2805   CkReductionMsg *msg=makeRednMsg(ddt_type, inbuf, count, type, op);
2806   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2807   msg->setCallback(allreduceCB);
2808   ptr->contribute(msg);
2809
2810   /*HACK: Use recv() to block until the reduction data comes back*/
2811   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2812     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
2813 #if AMPI_COUNTER
2814   getAmpiParent()->counters.allreduce++;
2815 #endif
2816
2817 #ifdef AMPIMSGLOG
2818   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2819     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2820     (*(pptr->toPUPer))|(pptr->pupBytes);
2821     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
2822     //    CkExit();
2823   }
2824 #endif
2825
2826   return 0;
2827 }
2828
2829 CDECL
2830 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
2831                    MPI_Op op, MPI_Comm comm, MPI_Request* request)
2832 {
2833   AMPIAPI("AMPI_Iallreduce");
2834   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2835
2836   checkRequest(*request);
2837   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
2838   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
2839   ampi *ptr = getAmpiInstance(comm);
2840
2841   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2842   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2843   msg->setCallback(allreduceCB);
2844   ptr->contribute(msg);
2845
2846   // using irecv instead recv to non-block the call and get request pointer
2847   AmpiRequestList* reqs = getReqs();
2848   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
2849   *request = reqs->insert(newreq);
2850   return 0;
2851 }
2852
2853 CDECL
2854 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
2855                        MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
2856 {
2857   AMPIAPI("AMPI_Reduce_scatter");
2858   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
2859   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
2860   ampi *ptr = getAmpiInstance(comm);
2861   int size = ptr->getSize(comm);
2862   int count=0;
2863   int *displs = new int [size];
2864   int len;
2865   void *tmpbuf;
2866
2867   //under construction
2868   for(int i=0;i<size;i++){
2869     displs[i] = count;
2870     count+= recvcounts[i];
2871   }
2872   len = ptr->getDDT()->getType(datatype)->getSize(count);
2873   tmpbuf = malloc(len);
2874   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
2875   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
2876                recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
2877   free(tmpbuf);
2878   delete [] displs;     // < memory leak ! // gzheng
2879   return 0;
2880 }
2881
2882 /***** MPI_Scan algorithm (from MPICH) *******
2883    recvbuf = sendbuf;
2884    partial_scan = sendbuf;
2885    mask = 0x1;
2886    while (mask < size) {
2887       dst = rank^mask;
2888       if (dst < size) {
2889          send partial_scan to dst;
2890          recv from dst into tmp_buf;
2891          if (rank > dst) {
2892             partial_scan = tmp_buf + partial_scan;
2893             recvbuf = tmp_buf + recvbuf;
2894          }
2895          else {
2896             if (op is commutative)
2897                partial_scan = tmp_buf + partial_scan;
2898             else {
2899                tmp_buf = partial_scan + tmp_buf;
2900                partial_scan = tmp_buf;
2901             }
2902          }
2903       }
2904       mask <<= 1;
2905    }
2906  ***** MPI_Scan algorithm (from MPICH) *******/
2907
2908 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
2909   (op)(invec,inoutvec,&count,&datatype);
2910 }
2911 CDECL
2912 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
2913   AMPIAPI("AMPI_Scan");
2914   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
2915   MPI_Status sts;
2916   ampi *ptr = getAmpiInstance(comm);
2917   int size = ptr->getSize(comm);
2918   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
2919   int rank = ptr->getRank(comm);
2920   int mask = 0x1;
2921   int dst;
2922   void* tmp_buf = malloc(blklen);
2923   void* partial_scan = malloc(blklen);
2924   
2925   memcpy(recvbuf, sendbuf, blklen);
2926   memcpy(partial_scan, sendbuf, blklen);
2927   while(mask < size){
2928     dst = rank^mask;
2929     if(dst < size){
2930       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
2931                    tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
2932       if(rank > dst){
2933         (op)(tmp_buf,partial_scan,&count,&datatype);
2934         (op)(tmp_buf,recvbuf,&count,&datatype);
2935       }else {
2936         (op)(partial_scan,tmp_buf,&count,&datatype);
2937         memcpy(partial_scan,tmp_buf,blklen);
2938       }
2939     }
2940     mask <<= 1;
2941
2942   }
2943
2944   free(tmp_buf);
2945   free(partial_scan);
2946 #if AMPI_COUNTER
2947   getAmpiParent()->counters.scan++;
2948 #endif
2949   return 0;
2950 }
2951
2952 CDECL
2953 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
2954   //AMPIAPI("AMPI_Op_create");
2955   *op = function;
2956   return 0;
2957 }
2958
2959 CDECL
2960 int AMPI_Op_free(MPI_Op *op){
2961   //AMPIAPI("AMPI_Op_free");
2962   *op = MPI_OP_NULL;
2963   return 0;
2964 }
2965
2966
2967 CDECL
2968 double AMPI_Wtime(void)
2969 {
2970 //  AMPIAPI("AMPI_Wtime");
2971
2972 #ifdef AMPIMSGLOG
2973   double ret=TCHARM_Wall_timer();
2974   ampiParent* pptr = getAmpiParent();
2975   if(msgLogRead){
2976     (*(pptr->fromPUPer))|ret;
2977     return ret;
2978   }
2979
2980   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2981     (*(pptr->toPUPer))|ret;
2982   }
2983 #endif
2984
2985 #if CMK_BLUEGENE_CHARM
2986   return BgGetTime();
2987 #else
2988   return TCHARM_Wall_timer();
2989 #endif
2990 }
2991
2992 CDECL
2993 double AMPI_Wtick(void){
2994   //AMPIAPI("AMPI_Wtick");
2995   return 1e-6;
2996 }
2997
2998 int PersReq::start(){
2999   if(sndrcv == 1 || sndrcv == 3) { // send or ssend request
3000     ampi *ptr=getAmpiInstance(comm);
3001     ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm, sndrcv==3?1:0);
3002   }
3003   return 0;
3004 }
3005
3006 CDECL
3007 int AMPI_Start(MPI_Request *request)
3008 {
3009   AMPIAPI("AMPI_Start");
3010   checkRequest(*request);
3011   AmpiRequestList *reqs = getReqs();
3012   if(-1==(*reqs)[*request]->start()){
3013     CkAbort("MPI_Start could be used only on persistent communication requests!");
3014   }
3015   return 0;
3016 }
3017
3018 CDECL
3019 int AMPI_Startall(int count, MPI_Request *requests){
3020   AMPIAPI("AMPI_Startall");
3021   checkRequests(count,requests);
3022   AmpiRequestList *reqs = getReqs();
3023   for(int i=0;i<count;i++){
3024     if(-1==(*reqs)[requests[i]]->start())
3025       CkAbort("MPI_Start could be used only on persistent communication requests!");
3026   }
3027   return 0;
3028 }
3029
3030 /* organize the indices of requests into a vector of a vector: 
3031  * level 1 is different msg envelope matches
3032  * level 2 is (posting) ordered requests of with envelope
3033  * each time multiple completion call loop over first elem of level 1
3034  * and move the matched to the NULL request slot.   
3035  * warning: this does not work with I-Alltoall requests */
3036 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
3037   for(int i=0;i<count;i++){
3038     if(reqs[i]!=MPI_REQUEST_NULL)
3039       return 0;
3040   }
3041   return 1;
3042 }
3043 inline int matchReq(MPI_Request ia, MPI_Request ib){
3044   checkRequest(ia);  
3045   checkRequest(ib);
3046   AmpiRequestList* reqs = getReqs();
3047   AmpiRequest *a, *b;
3048   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
3049   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
3050   a=(*reqs)[ia];  b=(*reqs)[ib];
3051   if(a->tag != b->tag) return 0;
3052   if(a->src != b->src) return 0;
3053   if(a->comm != b->comm) return 0;
3054   return 1;
3055 }
3056 inline void swapInt(int& a,int& b){
3057   int tmp;
3058   tmp=a; a=b; b=tmp;
3059 }
3060 inline void sortedIndex(int n, int* arr, int* idx){
3061   int i,j;
3062   for(i=0;i<n;i++) 
3063     idx[i]=i;
3064   for (i=0; i<n-1; i++) 
3065     for (j=0; j<n-1-i; j++)
3066       if (arr[idx[j+1]] < arr[idx[j]]) 
3067         swapInt(idx[j+1],idx[j]);
3068 }
3069 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
3070   CkAssert(count!=0);
3071   int *newidx = new int [count];
3072   int flag;
3073   sortedIndex(count,arr,newidx);
3074   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
3075   CkVec<int> slot;
3076   vec->push_back(slot);
3077   (*vec)[0].push_back(newidx[0]);
3078   for(int i=1;i<count;i++){
3079     flag=0;
3080     for(int j=0;j<vec->size();j++){
3081       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
3082         ((*vec)[j]).push_back(newidx[i]);
3083         flag++;
3084       }
3085     }
3086     if(!flag){
3087       CkVec<int> newslot;
3088       newslot.push_back(newidx[i]);
3089       vec->push_back(newslot);
3090     }else{
3091       CkAssert(flag==1);
3092     }
3093   }
3094   delete [] newidx;
3095   return vec;
3096 }
3097 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
3098   printf("vec content: ");
3099   for(int i=0;i<vec.size();i++){
3100     printf("{");
3101     for(int j=0;j<(vec[i]).size();j++){
3102       printf(" %d ",arr[(vec[i])[j]]);
3103     }
3104     printf("} ");
3105   }
3106   printf("\n");
3107 }
3108
3109 int PersReq::wait(MPI_Status *sts){
3110         if(sndrcv == 2) {
3111                 if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3112                         CkAbort("AMPI> Error in persistent request wait");
3113 #if CMK_BLUEGENE_CHARM
3114                 _TRACE_BG_TLINE_END(&event);
3115 #endif
3116         }
3117         return 0;
3118 }
3119
3120 int IReq::wait(MPI_Status *sts){
3121   if (CpvAccess(CmiPICMethod) != 2) 
3122   {
3123         //Copy "this" to a local variable in the case that "this" pointer
3124         //is updated during the out-of-core emulation.
3125
3126           // optimization for Irecv
3127           // generic() writes directly to the buffer, so the only thing we
3128           // do here is to wait
3129         ampi *ptr = getAmpiInstance(comm);
3130
3131         //BIGSIM_OOC DEBUGGING
3132         //int ooccnt=0;
3133         //int ampiIndex = ptr->thisIndex;
3134         //CmiPrintf("%d: IReq's status=%d\n", ampiIndex, statusIreq);
3135         
3136         while (statusIreq == false) {
3137                 //BIGSIM_OOC DEBUGGING
3138                 //CmiPrintf("Before blocking: %dth time: %d: in Ireq::wait\n", ++ooccnt, ptr->thisIndex);
3139                 //print();
3140
3141                 ptr->resumeOnRecv=true;
3142                 ptr->block();
3143                         
3144                 //BIGSIM_OOC DEBUGGING
3145                 //CmiPrintf("[%d] After blocking: in Ireq::wait\n", ptr->thisIndex);
3146                 //CmiPrintf("IReq's this pointer: %p\n", this);
3147                 //print();
3148
3149             #if CMK_BLUEGENE_CHARM
3150                 //Because of the out-of-core emulation, this pointer is changed after in-out
3151                 //memory operation. So we need to return from this function and do the while loop
3152                 //in the outer function call.   
3153                 if(BgInOutOfCoreMode)
3154                     return -1;
3155             #endif      
3156         }   // end of while
3157         ptr->resumeOnRecv=false;
3158
3159         AMPI_DEBUG("IReq::wait has resumed\n");
3160
3161         if(sts) {
3162           AMPI_DEBUG("Setting sts->MPI_TAG to this->tag=%d in IReq::wait  this=%p\n", (int)this->tag, this);
3163           sts->MPI_TAG = tag;
3164           sts->MPI_SOURCE = src;
3165           sts->MPI_COMM = comm;
3166           sts->MPI_LENGTH = length;
3167         }
3168   }
3169   else {
3170     AMPI_DEBUG("In else clause of IReq::wait\n");
3171         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3172                 CkAbort("AMPI> Error in non-blocking request wait");
3173   }
3174         return 0;
3175 }
3176
3177 int ATAReq::wait(MPI_Status *sts){
3178         int i;
3179         for(i=0;i<count;i++){
3180                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3181                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
3182                         CkAbort("AMPI> Error in alltoall request wait");
3183 #if CMK_BLUEGENE_CHARM
3184                 _TRACE_BG_TLINE_END(&myreqs[i].event);
3185 #endif
3186         }
3187 #if CMK_BLUEGENE_CHARM
3188         //TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", NULL, 0);
3189         TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0, 1);
3190         for (i=0; i<count; i++)
3191           _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
3192         _TRACE_BG_TLINE_END(&event);
3193 #endif
3194         return 0;
3195 }
3196
3197 CDECL
3198 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
3199 {
3200
3201
3202   AMPIAPI("AMPI_Wait");
3203   if(*request == MPI_REQUEST_NULL){
3204     stsempty(*sts);
3205     return 0;
3206   }
3207   checkRequest(*request);
3208   AmpiRequestList* reqs = getReqs();
3209
3210 #ifdef AMPIMSGLOG
3211   ampiParent* pptr = getAmpiParent();
3212   if(msgLogRead){
3213     (*(pptr->fromPUPer))|(pptr->pupBytes);
3214     PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3215     PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
3216     return 0;
3217   }
3218 #endif
3219
3220   AMPI_DEBUG("AMPI_Wait request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3221   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3222   //(*reqs)[*request]->wait(sts);
3223   int waitResult = -1;
3224   do{
3225     AmpiRequest *waitReq = (*reqs)[*request];
3226     waitResult = waitReq->wait(sts);
3227     if(BgInOutOfCoreMode){
3228         reqs = getReqs();
3229     }
3230   }while(waitResult==-1);
3231
3232
3233   AMPI_DEBUG("AMPI_Wait after calling wait, request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3234
3235
3236 #ifdef AMPIMSGLOG
3237   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3238     (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
3239     (*(pptr->toPUPer))|(pptr->pupBytes);
3240     PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3241     PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
3242   }
3243 #endif
3244   
3245   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3246     reqs->free(*request);
3247     *request = MPI_REQUEST_NULL;
3248   }
3249
3250     AMPI_DEBUG("End of AMPI_Wait\n");
3251
3252   return 0;
3253 }
3254
3255 CDECL
3256 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
3257 {
3258   AMPIAPI("AMPI_Waitall");
3259   if(count==0) return MPI_SUCCESS;
3260   checkRequests(count,request);
3261   int i,j,oldPe;
3262   AmpiRequestList* reqs = getReqs();
3263   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3264
3265 #ifdef AMPIMSGLOG
3266   ampiParent* pptr = getAmpiParent();
3267   if(msgLogRead){
3268     for(i=0;i<reqvec->size();i++){
3269       for(j=0;j<((*reqvec)[i]).size();j++){
3270         if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3271           stsempty(sts[((*reqvec)[i])[j]]);
3272           continue;
3273         }
3274         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3275         (*(pptr->fromPUPer))|(pptr->pupBytes);
3276         PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3277         PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3278       }
3279     }
3280     return 0;
3281   }
3282 #endif
3283
3284 #if CMK_BLUEGENE_CHARM
3285   void *curLog;         // store current log in timeline
3286   _TRACE_BG_TLINE_END(&curLog);
3287 #if 0
3288   TRACE_BG_AMPI_SUSPEND();
3289 #endif
3290 #endif
3291   for(i=0;i<reqvec->size();i++){
3292     for(j=0;j<((*reqvec)[i]).size();j++){
3293       //CkPrintf("[%d] in loop [%d, %d]\n", pptr->thisIndex,i, j);
3294       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3295         stsempty(sts[((*reqvec)[i])[j]]);
3296         continue;
3297       }
3298       oldPe = CkMyPe();
3299
3300       int waitResult = -1;
3301       do{       
3302         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3303         waitResult = waitReq->wait(&sts[((*reqvec)[i])[j]]);
3304         if(BgInOutOfCoreMode){
3305             reqs = getReqs();
3306             reqvec = vecIndex(count, request);
3307         }
3308       }while(waitResult==-1);
3309
3310 #ifdef AMPIMSGLOG
3311       if(msgLogWrite && pptr->thisIndex == msgLogRank){
3312         (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
3313         (*(pptr->toPUPer))|(pptr->pupBytes);
3314         PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3315         PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3316       }
3317 #endif
3318     
3319 #if 1
3320 #ifndef _FAULT_MLOG_
3321             //for fault evacuation
3322       if(oldPe != CkMyPe()){
3323 #endif
3324                         reqs = getReqs();
3325                         reqvec  = vecIndex(count,request);
3326 #ifndef _FAULT_MLOG_
3327             }
3328 #endif
3329 #endif
3330     }
3331   }
3332 #if CMK_BLUEGENE_CHARM
3333   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
3334 #endif
3335   // free memory of requests
3336   for(i=0;i<count;i++){ 
3337     if(request[i] == MPI_REQUEST_NULL)
3338       continue;
3339     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
3340       reqs->free(request[i]);
3341       request[i] = MPI_REQUEST_NULL;
3342     }
3343   }
3344   delete reqvec;
3345   return 0;
3346 }
3347
3348 CDECL
3349 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
3350 {
3351   AMPIAPI("AMPI_Waitany");
3352   
3353   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
3354   checkRequests(count,request);
3355   if(areInactiveReqs(count,request)){
3356     *idx=MPI_UNDEFINED;
3357     stsempty(*sts);
3358     return MPI_SUCCESS;
3359   }
3360   int flag=0;
3361   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3362   while(count>0){ /* keep looping until some request finishes: */
3363     for(int i=0;i<reqvec->size();i++){
3364       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
3365       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
3366         *idx = ((*reqvec)[i])[0];
3367         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
3368         return 0;
3369       }
3370     }
3371     /* no requests have finished yet-- schedule and try again */
3372     AMPI_Yield(MPI_COMM_WORLD);
3373   }
3374   *idx = MPI_UNDEFINED;
3375   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
3376         delete reqvec;
3377   return 0;
3378 }
3379
3380 CDECL
3381 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
3382                  int *array_of_indices, MPI_Status *array_of_statuses)
3383 {
3384   AMPIAPI("AMPI_Waitsome");
3385   checkRequests(incount,array_of_requests);
3386   if(areInactiveReqs(incount,array_of_requests)){
3387     *outcount=MPI_UNDEFINED;
3388     return MPI_SUCCESS;
3389   }
3390   MPI_Status sts;
3391   int i;
3392   int flag=0, realflag=0;
3393   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3394   *outcount = 0;
3395   while(1){
3396     for(i=0;i<reqvec->size();i++){
3397       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3398       if(flag == 1){ 
3399         array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3400         array_of_statuses[(*outcount)++]=sts;
3401         if(sts.MPI_COMM != 0)
3402           realflag=1; // there is real(non null) request
3403       }
3404     }
3405     if(realflag && outcount>0) break;
3406   }
3407         delete reqvec;
3408   return 0;
3409 }
3410
3411 CmiBool PersReq::test(MPI_Status *sts){
3412         if(sndrcv == 2)         // recv request
3413                 return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3414         else                    // send request
3415                 return 1;
3416
3417 }
3418 void PersReq::complete(MPI_Status *sts){
3419         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3420                 CkAbort("AMPI> Error in persistent request complete");
3421 }
3422
3423 CmiBool IReq::test(MPI_Status *sts){
3424         if (statusIreq == true) {           
3425           if(sts)
3426             sts->MPI_LENGTH = length;           
3427           return true;
3428         }
3429         else {
3430           getAmpiInstance(comm)->yield();
3431           return false;
3432         }
3433 /*
3434         return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3435 */
3436 }
3437 void IReq::complete(MPI_Status *sts){
3438         wait(sts);
3439 /*
3440         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3441                 CkAbort("AMPI> Error in non-blocking request complete");
3442 */
3443 }
3444
3445 void IReq::receive(ampi *ptr, AmpiMsg *msg)
3446 {
3447     int sts = ptr->processMessage(msg, tag, src, buf, count, type);
3448     statusIreq = (sts == 0);
3449     length = msg->length;
3450     this->tag = msg->tag; // Although not required, we also extract tag from msg
3451     src = msg->srcRank; // Although not required, we also extract src from msg
3452     comm = msg->comm;
3453     AMPI_DEBUG("Setting this->tag to %d in IReq::receive this=%p\n", (int)this->tag, this);
3454 #if CMK_BLUEGENE_CHARM
3455     event = msg->event; 
3456 #endif
3457     delete msg;
3458     
3459     //BIGSIM_OOC DEBUGGING
3460     //CmiPrintf("In IReq::receive, this=%p ", this);
3461     //print();
3462 }
3463
3464 CmiBool ATAReq::test(MPI_Status *sts){
3465         int i, flag=1;
3466         for(i=0;i<count;i++){
3467                 flag *= getAmpiInstance(myreqs[i].comm)->iprobe(myreqs[i].tag, myreqs[i].src,
3468                                         myreqs[i].comm, (int*) sts);
3469         }
3470         return flag;
3471 }
3472 void ATAReq::complete(MPI_Status *sts){
3473         int i;
3474         for(i=0;i<count;i++){
3475                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3476                                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int*)sts))
3477                         CkAbort("AMPI> Error in alltoall request complete");
3478         }
3479 }
3480
3481 CDECL
3482 int AMPI_Test(MPI_Request *request, int *flag, MPI_Status *sts)
3483 {
3484   AMPIAPI("AMPI_Test");
3485   checkRequest(*request);
3486   if(*request==MPI_REQUEST_NULL) {
3487     *flag = 1;
3488     stsempty(*sts);
3489     return 0;
3490   }
3491   AmpiRequestList* reqs = getReqs();
3492   if(1 == (*flag = (*reqs)[*request]->test(sts))){
3493     if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3494       (*reqs)[*request]->complete(sts);
3495       reqs->free(*request);
3496       *request = MPI_REQUEST_NULL;
3497     }
3498   }
3499   return 0;
3500 }
3501
3502 CDECL
3503 int AMPI_Testany(int count, MPI_Request *request, int *index, int *flag, MPI_Status *sts){
3504   AMPIAPI("AMPI_Testany");
3505   checkRequests(count,request);
3506   if(areInactiveReqs(count,request)){
3507     *flag=1;
3508     *index=MPI_UNDEFINED;
3509     stsempty(*sts);
3510     return MPI_SUCCESS;
3511   }
3512   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3513   *flag=0;
3514   for(int i=0;i<reqvec->size();i++){
3515     AMPI_Test(&request[((*reqvec)[i])[0]], flag, sts);
3516     if(*flag==1 && sts->MPI_COMM!=0){ // skip MPI_REQUEST_NULL
3517       *index = ((*reqvec)[i])[0];
3518       return 0;
3519     }
3520   }
3521   *index = MPI_UNDEFINED;
3522         delete reqvec;
3523   return 0;
3524 }
3525
3526 CDECL
3527 int AMPI_Testall(int count, MPI_Request *request, int *flag, MPI_Status *sts)
3528 {
3529   AMPIAPI("AMPI_Testall");
3530   if(count==0) return MPI_SUCCESS;
3531   checkRequests(count,request);
3532   int tmpflag;
3533   int i,j;
3534   AmpiRequestList* reqs = getReqs();
3535   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3536   *flag = 1;  
3537   for(i=0;i<reqvec->size();i++){
3538     for(j=0;j<((*reqvec)[i]).size();j++){
3539       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL)
3540         continue;
3541       tmpflag = (*reqs)[request[((*reqvec)[i])[j]]]->test(&sts[((*reqvec)[i])[j]]);
3542       *flag *= tmpflag;
3543     }
3544   }
3545   if(flag) 
3546     MPI_Waitall(count,request,sts);
3547         delete reqvec;  
3548   return 0;
3549 }
3550
3551 CDECL
3552 int AMPI_Testsome(int incount, MPI_Request *array_of_requests, int *outcount,
3553                  int *array_of_indices, MPI_Status *array_of_statuses)
3554 {
3555   AMPIAPI("AMPI_Testsome");
3556   checkRequests(incount,array_of_requests);
3557   if(areInactiveReqs(incount,array_of_requests)){
3558     *outcount=MPI_UNDEFINED;
3559     return MPI_SUCCESS;
3560   }
3561   MPI_Status sts;
3562   int flag;
3563   int i;
3564   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3565   *outcount = 0;
3566   for(i=0;i<reqvec->size();i++){
3567     AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3568     if(flag == 1){
3569       array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3570       array_of_statuses[(*outcount)++]=sts;
3571     }
3572   }
3573         delete reqvec;
3574   return 0;
3575 }
3576
3577 CDECL
3578 int AMPI_Request_free(MPI_Request *request){
3579   AMPIAPI("AMPI_Request_free");
3580   checkRequest(*request);
3581   if(*request==MPI_REQUEST_NULL) return 0;
3582   AmpiRequestList* reqs = getReqs();
3583   reqs->free(*request);
3584   return 0;
3585 }
3586
3587 CDECL
3588 int AMPI_Cancel(MPI_Request *request){
3589   AMPIAPI("AMPI_Request_free");
3590   return AMPI_Request_free(request);
3591 }
3592
3593 CDECL
3594 int AMPI_Test_cancelled(MPI_Status* status, int* flag) {
3595     /* FIXME: always returns success */
3596     *flag = 1;
3597     return 0;
3598 }
3599
3600 CDECL
3601 int AMPI_Recv_init(void *buf, int count, int type, int src, int tag,
3602                    MPI_Comm comm, MPI_Request *req)
3603 {
3604   AMPIAPI("AMPI_Recv_init");
3605   AmpiRequestList* reqs = getReqs();
3606   PersReq *newreq = new PersReq(buf,count,type,src,tag,comm,2);
3607   *req = reqs->insert(newreq);
3608   return 0;
3609 }
3610
3611 CDECL
3612 int AMPI_Send_init(void *buf, int count, int type, int dest, int tag,
3613                    MPI_Comm comm, MPI_Request *req)
3614 {
3615   AMPIAPI("AMPI_Send_init");
3616   AmpiRequestList* reqs = getReqs();
3617   PersReq *newreq = new PersReq(buf,count,type,dest,tag,comm,1);
3618   *req = reqs->insert(newreq);
3619   return 0;
3620 }
3621
3622 CDECL
3623 int AMPI_Ssend_init(void *buf, int count, int type, int dest, int tag,
3624                    MPI_Comm comm, MPI_Request *req)
3625 {
3626   AMPIAPI("AMPI_Send_init");
3627   AmpiRequestList* reqs = getReqs();
3628   PersReq *newreq = new PersReq(buf,count,type,dest,tag,comm,3);
3629   *req = reqs->insert(newreq);
3630   return 0;
3631 }
3632
3633 CDECL
3634 int AMPI_Type_contiguous(int count, MPI_Datatype oldtype,
3635                          MPI_Datatype *newtype)
3636 {
3637   AMPIAPI("AMPI_Type_contiguous");
3638   getDDT()->newContiguous(count, oldtype, newtype);
3639   return 0;
3640 }
3641
3642 CDECL
3643 int AMPI_Type_vector(int count, int blocklength, int stride,
3644                      MPI_Datatype oldtype, MPI_Datatype*  newtype)
3645 {
3646   AMPIAPI("AMPI_Type_vector");
3647   getDDT()->newVector(count, blocklength, stride, oldtype, newtype);
3648   return 0 ;
3649 }
3650
3651 CDECL
3652 int AMPI_Type_hvector(int count, int blocklength, MPI_Aint stride, 
3653                       MPI_Datatype oldtype, MPI_Datatype*  newtype)
3654 {
3655   AMPIAPI("AMPI_Type_hvector");
3656   getDDT()->newHVector(count, blocklength, stride, oldtype, newtype);
3657   return 0 ;
3658 }
3659
3660 CDECL
3661 int AMPI_Type_indexed(int count, int* arrBlength, int* arrDisp, 
3662                       MPI_Datatype oldtype, MPI_Datatype*  newtype)
3663 {
3664   AMPIAPI("AMPI_Type_indexed");
3665   getDDT()->newIndexed(count, arrBlength, arrDisp, oldtype, newtype);
3666   return 0 ;
3667 }
3668
3669 CDECL
3670 int AMPI_Type_hindexed(int count, int* arrBlength, MPI_Aint* arrDisp,
3671                        MPI_Datatype oldtype, MPI_Datatype*  newtype)
3672 {
3673   AMPIAPI("AMPI_Type_hindexed");
3674   getDDT()->newHIndexed(count, arrBlength, arrDisp, oldtype, newtype);
3675   return 0 ;