AMPI: always assume no anytime migration and static insertion.
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1
2 #define AMPIMSGLOG    0
3
4 #define exit exit /*Supress definition of exit in ampi.h*/
5 #include "ampiimpl.h"
6 #include "tcharm.h"
7 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
8 #include "ampiEvents.h" /*** for trace generation for projector *****/
9 #include "ampiProjections.h"
10 #endif
11
12 #if CMK_BIGSIM_CHARM
13 #include "bigsim_logs.h"
14 #endif
15
16 #define CART_TOPOL 1
17 #define AMPI_PRINT_IDLE 0
18
19 /* change this define to "x" to trace all send/recv's */
20 #define MSG_ORDER_DEBUG(x)  //x /* empty */
21 /* change this define to "x" to trace user calls */
22 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
23 #define STARTUP_DEBUG(x)  //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
24 #define FUNCCALL_DEBUG(x) //x /* empty */
25
26 static CkDDT *getDDT(void) {
27   return getAmpiParent()->myDDT;
28 }
29
30 inline int errorCheck(MPI_Comm comm, int ifComm, int count, int ifCount, 
31     MPI_Datatype data, int ifData, int tag, int ifTag, 
32     int rank, int ifRank, void *buf, int ifBuf);
33
34   inline int checkCommunicator(MPI_Comm comm) {
35     if(comm == MPI_COMM_NULL)
36       return MPI_ERR_COMM;
37     return MPI_SUCCESS;
38   }
39
40   inline int checkCount(int count) {
41     if(count < 0)
42       return MPI_ERR_COUNT;
43     return MPI_SUCCESS;
44   }
45
46   inline int checkData(MPI_Datatype data) {
47     if(data == MPI_DATATYPE_NULL)
48       return MPI_ERR_TYPE;
49     return MPI_SUCCESS;
50   }
51
52   inline int checkTag(int tag) {
53     if(tag != MPI_ANY_TAG && tag < 0)
54       return MPI_ERR_TAG;
55     return MPI_SUCCESS;
56   }
57
58 inline int checkRank(int rank, MPI_Comm comm) {
59   int size;
60   AMPI_Comm_size(comm, &size);
61   if(((rank >= 0) && (rank < size)) || (rank == MPI_ANY_SOURCE) || (rank ==
62         MPI_PROC_NULL))
63     return MPI_SUCCESS;
64   return MPI_ERR_RANK;
65 }
66
67   inline int checkBuf(void *buf, int count) {
68     if((count != 0 && buf == NULL))
69       return MPI_ERR_BUFFER;
70     return MPI_SUCCESS;
71   }
72
73 inline int errorCheck(MPI_Comm comm, int ifComm, int count, int ifCount,
74     MPI_Datatype data, int ifData, int tag, int ifTag,
75     int rank, int ifRank, void *buf, int ifBuf) {
76   int ret;
77   if(ifComm) { 
78     ret = checkCommunicator(comm);
79     if(ret != MPI_SUCCESS)
80       return ret;
81   }
82   if(ifCount) {
83     ret = checkCount(count);
84     if(ret != MPI_SUCCESS)
85       return ret;
86   }
87   if(ifData) {
88     ret = checkData(data);
89     if(ret != MPI_SUCCESS)
90       return ret;
91   }
92   if(ifTag) {
93     ret = checkTag(tag);
94     if(ret != MPI_SUCCESS)
95       return ret;
96   }
97   if(ifRank) {
98     ret = checkRank(rank,comm);
99     if(ret != MPI_SUCCESS)
100       return ret;
101   }
102   if(ifBuf) {
103     ret = checkBuf(buf,count);
104     if(ret != MPI_SUCCESS)
105       return ret;
106   }
107   return MPI_SUCCESS;
108 }
109
110 //------------- startup -------------
111 static mpi_comm_worlds mpi_worlds;
112
113 int _mpi_nworlds; /*Accessed by ampif*/
114 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
115
116 /* ampiReducer: AMPI's generic reducer type 
117    MPI_Op is function pointer to MPI_User_function
118    so that it can be packed into AmpiOpHeader, shipped 
119    with the reduction message, and then plugged into 
120    the ampiReducer. 
121    One little trick is the ampi::recv which receives
122    the final reduction message will see additional
123    sizeof(AmpiOpHeader) bytes in the buffer before
124    any user data.                             */
125 class AmpiComplex { 
126   public: 
127     double re, im; 
128     void operator+=(const AmpiComplex &a) {
129       re+=a.re;
130       im+=a.im;
131     }
132     void operator*=(const AmpiComplex &a) {
133       double nu_re=re*a.re-im*a.im;
134       im=re*a.im+im*a.re;
135       re=nu_re;
136     }
137     int operator>(const AmpiComplex &a) {
138       CkAbort("Cannot compare complex numbers with MPI_MAX");
139       return 0;
140     }
141     int operator<(const AmpiComplex &a) {
142       CkAbort("Cannot compare complex numbers with MPI_MIN");
143       return 0;
144     }
145 };
146 typedef struct { float val; int idx; } FloatInt;
147 typedef struct { double val; int idx; } DoubleInt;
148 typedef struct { long val; int idx; } LongInt;
149 typedef struct { int val; int idx; } IntInt;
150 typedef struct { short val; int idx; } ShortInt;
151 typedef struct { long double val; int idx; } LongdoubleInt;
152 typedef struct { float val; float idx; } FloatFloat;
153 typedef struct { double val; double idx; } DoubleDouble;
154
155
156 #define MPI_OP_SWITCH(OPNAME) \
157   int i; \
158 switch (*datatype) { \
159   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
160   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
161   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
162   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
163   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
164   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
165   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
166   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
167   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
168   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
169   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
170   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
171   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
172   default: \
173            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
174   CmiAbort("Unsupported MPI datatype for MPI Op"); \
175 };\
176
177 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
178 #define MPI_OP_IMPL(type) \
179   if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
180   MPI_OP_SWITCH(MPI_MAX)
181 #undef MPI_OP_IMPL
182 }
183
184 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
185 #define MPI_OP_IMPL(type) \
186   if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
187   MPI_OP_SWITCH(MPI_MIN)
188 #undef MPI_OP_IMPL
189 }
190
191 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
192 #define MPI_OP_IMPL(type) \
193   ((type *)inoutvec)[i] += ((type *)invec)[i];
194   MPI_OP_SWITCH(MPI_SUM)
195 #undef MPI_OP_IMPL
196 }
197
198 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
199 #define MPI_OP_IMPL(type) \
200   ((type *)inoutvec)[i] *= ((type *)invec)[i];
201   MPI_OP_SWITCH(MPI_PROD)
202 #undef MPI_OP_IMPL
203 }
204
205 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
206   int i;  
207   switch (*datatype) {
208     case MPI_INT:
209     case MPI_LOGICAL:
210       for(i=0;i<(*len);i++)
211         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
212       break;
213     default:
214       ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
215       CmiAbort("exiting");
216   }
217 }
218 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
219   int i; 
220   switch (*datatype) {
221     case MPI_INT:
222       for(i=0;i<(*len);i++)
223         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
224       break;
225     case MPI_BYTE:
226       for(i=0;i<(*len);i++)
227         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
228       break;
229     default:
230       ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
231       CmiAbort("exiting");
232   }
233 }
234 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
235   int i;  
236   switch (*datatype) {
237     case MPI_INT:
238     case MPI_LOGICAL:
239       for(i=0;i<(*len);i++)
240         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
241       break;
242     default:
243       ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
244       CmiAbort("exiting");
245   }
246 }
247 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
248   int i;  
249   switch (*datatype) {
250     case MPI_INT:
251       for(i=0;i<(*len);i++)
252         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
253       break;
254     case MPI_BYTE:
255       for(i=0;i<(*len);i++)
256         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
257       break;
258     default:
259       ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
260       CmiAbort("exiting");
261   }
262 }
263 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
264   int i;  
265   switch (*datatype) {
266     case MPI_INT:
267     case MPI_LOGICAL:
268       for(i=0;i<(*len);i++)
269         ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
270       break;
271     default:
272       ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
273       CmiAbort("exiting");
274   }
275 }
276 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
277   int i;  
278   switch (*datatype) {
279     case MPI_INT:
280       for(i=0;i<(*len);i++)
281         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
282       break;
283     case MPI_BYTE:
284       for(i=0;i<(*len);i++)
285         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
286       break;
287     case MPI_UNSIGNED:
288       for(i=0;i<(*len);i++)
289         ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
290       break;
291     default:
292       ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
293       CmiAbort("exiting");
294   }
295 }
296
297 #ifndef MIN
298 #define MIN(a,b) (a < b ? a : b)
299 #endif
300
301 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
302   int i;  
303
304   switch (*datatype) {
305     case MPI_FLOAT_INT:
306       for(i=0;i<(*len);i++)
307         if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
308           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
309         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
310           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
311       break;
312     case MPI_DOUBLE_INT:
313       for(i=0;i<(*len);i++)
314         if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
315           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
316         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
317           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
318
319       break;
320     case MPI_LONG_INT:
321       for(i=0;i<(*len);i++)
322         if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
323           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
324         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
325           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
326       break;
327     case MPI_2INT:
328       for(i=0;i<(*len);i++)
329         if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
330           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
331         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
332           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
333       break;
334     case MPI_SHORT_INT:
335       for(i=0;i<(*len);i++)
336         if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
337           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
338         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
339           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
340       break;
341     case MPI_LONG_DOUBLE_INT:
342       for(i=0;i<(*len);i++)
343         if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
344           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
345         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
346           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
347       break;
348     case MPI_2FLOAT:
349       for(i=0;i<(*len);i++)
350         if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
351           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
352         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
353           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
354       break;
355     case MPI_2DOUBLE:
356       for(i=0;i<(*len);i++)
357         if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
358           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
359         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
360           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
361       break;
362     default:
363       ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
364       CmiAbort("exiting");
365   }
366 }
367 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
368   int i;  
369   switch (*datatype) {
370     case MPI_FLOAT_INT:
371       for(i=0;i<(*len);i++)
372         if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
373           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
374         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
375           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
376       break;
377     case MPI_DOUBLE_INT:
378       for(i=0;i<(*len);i++)
379         if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
380           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
381         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
382           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
383       break;
384     case MPI_LONG_INT:
385       for(i=0;i<(*len);i++)
386         if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
387           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
388         else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
389           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
390       break;
391     case MPI_2INT:
392       for(i=0;i<(*len);i++)
393         if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
394           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
395         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
396           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
397       break;
398     case MPI_SHORT_INT:
399       for(i=0;i<(*len);i++)
400         if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
401           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
402         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
403           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
404       break;
405     case MPI_LONG_DOUBLE_INT:
406       for(i=0;i<(*len);i++)
407         if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
408           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
409         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
410           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
411       break;
412     case MPI_2FLOAT:
413       for(i=0;i<(*len);i++)
414         if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
415           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
416         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
417           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
418       break;
419     case MPI_2DOUBLE:
420       for(i=0;i<(*len);i++)
421         if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
422           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
423         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
424           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
425       break;
426     default:
427       ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
428       CmiAbort("exiting");
429   }
430 }
431
432 // every msg contains a AmpiOpHeader structure before user data
433 // FIXME: non-commutative operations require messages be ordered by rank
434 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
435   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
436   MPI_Datatype dtype;
437   int szhdr, szdata, len;
438   MPI_User_function* func;
439   func = hdr->func;
440   dtype = hdr->dtype;  
441   szdata = hdr->szdata;
442   len = hdr->len;  
443   szhdr = sizeof(AmpiOpHeader);
444
445   //Assuming extent == size
446   void *ret = malloc(szhdr+szdata);
447   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
448   for(int i=1;i<nMsg;i++){
449     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
450   }
451   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
452   free(ret);
453   return retmsg;
454 }
455
456 CkReduction::reducerType AmpiReducer;
457
458 class Builtin_kvs{
459   public:
460     int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
461     Builtin_kvs(){
462       tag_ub = MPI_TAG_UB_VALUE; 
463       host = MPI_PROC_NULL;
464       io = 0;
465       wtime_is_global = 0;
466       keyval_mype = CkMyPe();
467       keyval_numpes = CkNumPes();
468       keyval_mynode = CkMyNode();
469       keyval_numnodes = CkNumNodes();
470     }
471 };
472
473 // ------------ startup support -----------
474 int _ampi_fallback_setup_count;
475 CDECL void AMPI_Setup(void);
476 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
477
478 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
479
480 /*Main routine used when missing MPI_Setup routine*/
481 CDECL void AMPI_Fallback_Main(int argc,char **argv)
482 {
483   AMPI_Main_cpp(argc,argv);
484   AMPI_Main(argc,argv);
485   FTN_NAME(MPI_MAIN,mpi_main)();
486 }
487
488 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
489 /*Startup routine used if user *doesn't* write
490   a TCHARM_User_setup routine.
491  */
492 CDECL void AMPI_Setup_Switch(void) {
493   _ampi_fallback_setup_count=0;
494   FTN_NAME(AMPI_SETUP,ampi_setup)();
495   AMPI_Setup();
496   if (_ampi_fallback_setup_count==2)
497   { //Missing AMPI_Setup in both C and Fortran:
498     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
499   }
500 }
501
502 static int nodeinit_has_been_called=0;
503 CtvDeclare(ampiParent*, ampiPtr);
504 CtvDeclare(int, ampiInitDone);
505 CtvDeclare(void*,stackBottom);
506 CtvDeclare(int, ampiFinalized);
507 CkpvDeclare(Builtin_kvs, bikvs);
508 CkpvDeclare(int,argvExtracted);
509 static int enableStreaming = 0;
510
511 CDECL long ampiCurrentStackUsage(){
512   int localVariable;
513
514   unsigned long p1 =  (unsigned long)((void*)&localVariable);
515   unsigned long p2 =  (unsigned long)(CtvAccess(stackBottom));
516
517
518   if(p1 > p2)
519     return p1 - p2;
520   else
521     return  p2 - p1;
522
523 }
524
525 FDECL void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
526   long usage = ampiCurrentStackUsage();
527   CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
528 }
529
530
531 CDECL void AMPI_threadstart(void *data);
532 static int AMPI_threadstart_idx = -1;
533
534 static void ampiNodeInit(void)
535 {
536   _mpi_nworlds=0;
537   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
538   {
539     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
540   }
541   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
542
543   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
544
545   CmiAssert(AMPI_threadstart_idx == -1);    // only initialize once
546   AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
547
548   nodeinit_has_been_called=1;
549
550    // ASSUME NO ANYTIME MIGRATION and STATIC INSERTON
551   _isAnytimeMigration = false;
552   _isStaticInsertion = true;
553 }
554
555 #if PRINT_IDLE
556 static double totalidle=0.0, startT=0.0;
557 static int beginHandle, endHandle;
558 static void BeginIdle(void *dummy,double curWallTime)
559 {
560   startT = curWallTime;
561 }
562 static void EndIdle(void *dummy,double curWallTime)
563 {
564   totalidle += curWallTime - startT;
565 }
566 #endif
567
568 /* for fortran reduction operation table to handle mapping */
569 typedef MPI_Op  MPI_Op_Array[128];
570 CtvDeclare(int, mpi_opc);
571 CtvDeclare(MPI_Op_Array, mpi_ops);
572
573 static void ampiProcInit(void){
574   CtvInitialize(ampiParent*, ampiPtr);
575   CtvInitialize(int,ampiInitDone);
576   CtvInitialize(int,ampiFinalized);
577   CtvInitialize(void*,stackBottom);
578
579
580   CtvInitialize(MPI_Op_Array, mpi_ops);
581   CtvInitialize(int, mpi_opc);
582
583   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
584   CkpvAccess(bikvs) = Builtin_kvs();
585
586   CkpvInitialize(int, argvExtracted);
587   CkpvAccess(argvExtracted) = 0;
588
589 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
590   REGISTER_AMPI
591 #endif
592   initAmpiProjections();
593
594   char **argv=CkGetArgv();
595 #if AMPI_COMLIB  
596   if(CkpvAccess(argvExtracted)==0){
597     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
598   }
599 #endif
600
601 #if AMPIMSGLOG
602   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
603   //msgLogRead = CmiGetArgFlag(argv, "+msgLogRead");
604   if (CmiGetArgIntDesc(argv,"+msgLogRead", &msgLogRank, "Re-play message processing order for AMPI")) {
605     msgLogRead = 1;
606   }
607   //CmiGetArgInt(argv, "+msgLogRank", &msgLogRank);
608   char *procs = NULL;
609   if (CmiGetArgStringDesc(argv, "+msgLogRanks", &procs, "A list of AMPI processors to record , e.g. 0,10,20-30")) {
610     msgLogRanks.set(procs);
611   }
612   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
613   if (CkMyPe() == 0) {
614     if (msgLogWrite) CmiPrintf("Writing AMPI messages of rank %s to log: %s\n", procs?procs:"", msgLogFilename);
615     if (msgLogRead) CmiPrintf("Reading AMPI messages of rank %s from log: %s\n", procs?procs:"", msgLogFilename);
616   }
617 #endif
618
619   // initBigSimTrace(1,outtiming);
620 }
621
622 #if AMPIMSGLOG
623 static inline int record_msglog(int rank){
624   return msgLogRanks.includes(rank);
625 }
626 #endif
627
628 void AMPI_Install_Idle_Timer(){
629 #if AMPI_PRINT_IDLE
630   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
631   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
632 #endif
633 }
634
635 void AMPI_Uninstall_Idle_Timer(){
636 #if AMPI_PRINT_IDLE
637   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
638   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
639 #endif
640 }
641
642 PUPfunctionpointer(MPI_MainFn)
643
644   class MPI_threadstart_t {
645     public:
646       MPI_MainFn fn;
647       MPI_threadstart_t() {}
648       MPI_threadstart_t(MPI_MainFn fn_)
649         :fn(fn_) {}
650       void start(void) {
651         char **argv=CmiCopyArgs(CkGetArgv());
652         int argc=CkGetArgc();
653
654         // Set a pointer to somewhere close to the bottom of the stack.
655         // This is used for roughly estimating the stack usage later.
656         CtvAccess(stackBottom) = &argv;
657
658 #if CMK_AMPI_FNPTR_HACK
659         AMPI_Fallback_Main(argc,argv);
660 #else
661         (fn)(argc,argv);
662 #endif
663       }
664       void pup(PUP::er &p) {
665         p|fn;
666       }
667   };
668 PUPmarshall(MPI_threadstart_t)
669
670 CDECL void AMPI_threadstart(void *data)
671 {
672   STARTUP_DEBUG("MPI_threadstart")
673     MPI_threadstart_t t;
674   pupFromBuf(data,t);
675 #if CMK_TRACE_IN_CHARM
676   if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
677 #endif
678   t.start();
679 }
680
681 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
682 {
683   STARTUP_DEBUG("ampiCreateMain")
684     int _nchunks=TCHARM_Get_num_chunks();
685   //Make a new threads array:
686   MPI_threadstart_t s(mainFn);
687   memBuf b; pupIntoBuf(b,s);
688   TCHARM_Create_data( _nchunks,AMPI_threadstart_idx,
689       b.getData(), b.getSize());
690 }
691
692 /* TCharm Semaphore ID's for AMPI startup */
693 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
694 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
695
696 static CProxy_ampiWorlds ampiWorldsGroup;
697
698 static void init_operations()
699 {
700   CtvInitialize(MPI_Op_Array, mpi_ops);
701   int i = 0;
702   MPI_Op *tab = CtvAccess(mpi_ops);
703   tab[i++] = MPI_MAX;
704   tab[i++] = MPI_MIN;
705   tab[i++] = MPI_SUM;
706   tab[i++] = MPI_PROD;
707   tab[i++] = MPI_LAND;
708   tab[i++] = MPI_BAND;
709   tab[i++] = MPI_LOR;
710   tab[i++] = MPI_BOR;
711   tab[i++] = MPI_LXOR;
712   tab[i++] = MPI_BXOR;
713   tab[i++] = MPI_MAXLOC;
714   tab[i++] = MPI_MINLOC;
715
716   CtvInitialize(int, mpi_opc);
717   CtvAccess(mpi_opc) = i;
718 }
719
720 /*
721    Called from MPI_Init, a collective initialization call:
722    creates a new AMPI array and attaches it to the current
723    set of TCHARM threads.
724  */
725 static ampi *ampiInit(char **argv)
726 {
727   FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CmiMyPe(), TCHARM_Element());)
728     if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
729   STARTUP_DEBUG("ampiInit> begin")
730
731     MPI_Comm new_world;
732   int _nchunks;
733   CkArrayOptions opts;
734   CProxy_ampiParent parent;
735   if (TCHARM_Element()==0) //the rank of a tcharm object
736   { /* I'm responsible for building the arrays: */
737     STARTUP_DEBUG("ampiInit> creating arrays")
738
739       // FIXME: Need to serialize global communicator allocation in one place.
740       //Allocate the next communicator
741       if(_mpi_nworlds == MPI_MAX_COMM_WORLDS)
742       {
743         CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
744       }
745     int new_idx=_mpi_nworlds;
746     new_world=MPI_COMM_WORLD+new_idx; // Isaac guessed there shouldn't be a +1 here
747
748     //Create and attach the ampiParent array
749     CkArrayID threads;
750     opts=TCHARM_Attach_start(&threads,&_nchunks);
751     parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
752     STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
753   }
754   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
755
756   FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
757
758     if (TCHARM_Element()==0)
759     {
760       //Make a new ampi array
761       CkArrayID empty;
762
763       ampiCommStruct worldComm(new_world,empty,_nchunks);
764       CProxy_ampi arr;
765
766
767 #if AMPI_COMLIB
768
769       ComlibInstanceHandle ciStreaming = 1;
770       ComlibInstanceHandle ciBcast = 2;
771       ComlibInstanceHandle ciAllgather = 3;
772       ComlibInstanceHandle ciAlltoall = 4;
773
774       arr=CProxy_ampi::ckNew(parent, worldComm, ciStreaming, ciBcast, ciAllgather, ciAlltoall, opts);
775
776
777       CkPrintf("Using untested comlib code in ampi.C\n");
778
779       Strategy *sStreaming = new StreamingStrategy(1,10);
780       CkAssert(ciStreaming == ComlibRegister(sStreaming));
781
782       Strategy *sBcast = new BroadcastStrategy(USE_HYPERCUBE);
783       CkAssert(ciBcast = ComlibRegister(sBcast));
784
785       Strategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
786       CkAssert(ciAllgather = ComlibRegister(sAllgather));
787
788       Strategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
789       CkAssert(ciAlltoall = ComlibRegister(sAlltoall));
790
791       CmiPrintf("Created AMPI comlib strategies in new manner\n");
792
793       // FIXME: Propogate the comlib table here
794       CkpvAccess(conv_com_object).doneCreating();
795 #else
796       arr=CProxy_ampi::ckNew(parent,worldComm,opts);
797 #endif
798
799       //Broadcast info. to the mpi_worlds array
800       // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
801       ampiCommStruct newComm(new_world,arr,_nchunks);
802       //CkPrintf("In ampiInit: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
803       if (ampiWorldsGroup.ckGetGroupID().isZero())
804         ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
805       else
806         ampiWorldsGroup.add(newComm);
807       STARTUP_DEBUG("ampiInit> arrays created")
808
809     }
810
811   // Find our ampi object:
812   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
813   CtvAccess(ampiInitDone)=1;
814   CtvAccess(ampiFinalized)=0;
815   STARTUP_DEBUG("ampiInit> complete")
816 #if CMK_BIGSIM_CHARM
817     //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
818     TRACE_BG_ADD_TAG("AMPI_START");
819 #endif
820
821   init_operations();     // initialize fortran reduction operation table
822
823   getAmpiParent()->ampiInitCallDone = 0;
824
825   CProxy_ampi cbproxy = ptr->getProxy();
826   CkCallback cb(CkIndex_ampi::allInitDone(NULL), cbproxy[0]);
827   ptr->contribute(0, NULL, CkReduction::sum_int, cb);
828
829   ampiParent *thisParent = getAmpiParent(); 
830   while(thisParent->ampiInitCallDone!=1){
831     //CkPrintf("In checking ampiInitCallDone(%d) loop at parent %d!\n", thisParent->ampiInitCallDone, thisParent->thisIndex);
832     thisParent->getTCharmThread()->stop();
833     /* 
834      * thisParent needs to be updated in case of the parent is being pupped.
835      * In such case, thisParent got changed
836      */
837     thisParent = getAmpiParent();
838   }
839
840 #ifdef CMK_BIGSIM_CHARM
841   BgSetStartOutOfCore();
842 #endif
843
844   return ptr;
845 }
846
847 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
848 class ampiWorlds : public CBase_ampiWorlds {
849   public:
850     ampiWorlds(const ampiCommStruct &nextWorld) {
851       ampiWorldsGroup=thisgroup;
852       //CkPrintf("In constructor: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
853       add(nextWorld);
854     }
855     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
856     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
857     void add(const ampiCommStruct &nextWorld) {
858       int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD); // Isaac guessed there shouldn't be a +1 after the MPI_COMM_WORLD
859       mpi_worlds[new_idx].comm=nextWorld;
860       if (_mpi_nworlds<=new_idx) _mpi_nworlds=new_idx+1;
861       STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
862     }
863 };
864
865 //-------------------- ampiParent -------------------------
866   ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
867 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
868 {
869   int barrier = 0x1234;
870   STARTUP_DEBUG("ampiParent> starting up")
871     thread=NULL;
872   worldPtr=NULL;
873   myDDT=&myDDTsto;
874   prepareCtv();
875
876   init();
877
878   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
879   AsyncEvacuate(CmiFalse);
880 }
881
882 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
883   thread=NULL;
884   worldPtr=NULL;
885   myDDT=&myDDTsto;
886
887   init();
888
889   AsyncEvacuate(CmiFalse);
890 }
891
892 void ampiParent::pup(PUP::er &p) {
893   ArrayElement1D::pup(p);
894   p|threads;
895   p|worldNo;           // why it was missing from here before??
896   p|worldStruct;
897   myDDT->pup(p);
898   p|splitComm;
899   p|groupComm;
900   p|groups;
901
902   //BIGSIM_OOC DEBUGGING
903   //if(!p.isUnpacking()){
904   //    CmiPrintf("ampiParent[%d] packing ampiRequestList: \n", thisIndex);
905   //    ampiReqs.print();
906   //}
907
908   p|ampiReqs;
909
910   //BIGSIM_OOC DEBUGGING
911   //if(p.isUnpacking()){
912   //    CmiPrintf("ampiParent[%d] unpacking ampiRequestList: \n", thisIndex);
913   //    ampiReqs.print();
914   //}
915
916   p|RProxyCnt;
917   p|tmpRProxy;
918   p|winStructList;
919   p|infos;
920
921   p|ampiInitCallDone;
922 }
923 void ampiParent::prepareCtv(void) {
924   thread=threads[thisIndex].ckLocal();
925   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
926   CtvAccessOther(thread->getThread(),ampiPtr) = this;
927   STARTUP_DEBUG("ampiParent> found TCharm")
928 }
929
930 void ampiParent::init(){
931   CkAssert(groups.size() == 0);
932   groups.push_back(new groupStruct);
933
934 #if AMPIMSGLOG
935   if(msgLogWrite && record_msglog(thisIndex)){
936     char fname[128];
937     sprintf(fname, "%s.%d", msgLogFilename,thisIndex);
938 #if CMK_PROJECTIONS_USE_ZLIB && 0
939     fMsgLog = gzopen(fname,"wb");
940     toPUPer = new PUP::tozDisk(fMsgLog);
941 #else
942     fMsgLog = fopen(fname,"wb");
943     CmiAssert(fMsgLog != NULL);
944     toPUPer = new PUP::toDisk(fMsgLog);
945 #endif
946   }else if(msgLogRead){
947     char fname[128];
948     sprintf(fname, "%s.%d", msgLogFilename,msgLogRank);
949 #if CMK_PROJECTIONS_USE_ZLIB && 0
950     fMsgLog = gzopen(fname,"rb");
951     fromPUPer = new PUP::fromzDisk(fMsgLog);
952 #else
953     fMsgLog = fopen(fname,"rb");
954     CmiAssert(fMsgLog != NULL);
955     fromPUPer = new PUP::fromDisk(fMsgLog);
956 #endif
957     CkPrintf("AMPI> opened message log file: %s for replay\n", fname);
958   }
959 #endif
960 }
961
962 void ampiParent::finalize(){
963 #if AMPIMSGLOG
964   if(msgLogWrite && record_msglog(thisIndex)){
965     delete toPUPer;
966 #if CMK_PROJECTIONS_USE_ZLIB && 0
967     gzclose(fMsgLog);
968 #else
969     fclose(fMsgLog);
970 #endif
971   }else if(msgLogRead){
972     delete fromPUPer;
973 #if CMK_PROJECTIONS_USE_ZLIB && 0
974     gzclose(fMsgLog);
975 #else
976     fclose(fMsgLog);
977 #endif
978   }
979 #endif
980 }
981
982 void ampiParent::ckJustMigrated(void) {
983   ArrayElement1D::ckJustMigrated();
984   prepareCtv();
985 }
986
987 void ampiParent::ckJustRestored(void) {
988   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
989     ArrayElement1D::ckJustRestored();
990   prepareCtv();
991
992   //BIGSIM_OOC DEBUGGING
993   //CkPrintf("In ampiParent[%d] with TCharm thread=%p:   ",thisIndex, thread);
994   //CthPrintThdMagic(thread->getTid()); 
995 }
996
997 ampiParent::~ampiParent() {
998   STARTUP_DEBUG("ampiParent> destructor called");
999   finalize();
1000 }
1001
1002 //Children call this when they are first created or just migrated
1003 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
1004 {
1005   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
1006
1007   if (s.getComm()>=MPI_COMM_WORLD)
1008   { //We now have our COMM_WORLD-- register it
1009     //Note that split communicators don't keep a raw pointer, so
1010     //they don't need to re-register on migration.
1011     if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
1012     worldPtr=ptr;
1013     worldStruct=s;
1014
1015     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
1016     CkVec<int> _indices;
1017     _indices.push_back(thisIndex);
1018     selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
1019   }
1020
1021   if (!forMigration)
1022   { //Register the new communicator:
1023     MPI_Comm comm = s.getComm();
1024     STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
1025       if (comm>=MPI_COMM_WORLD) { 
1026         // Pass the new ampi to the waiting ampiInit
1027         thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
1028       } else if (isSplit(comm)) {
1029         splitChildRegister(s);
1030       } else if (isGroup(comm)) {
1031         groupChildRegister(s);
1032       } else if (isCart(comm)) {
1033         cartChildRegister(s);
1034       } else if (isGraph(comm)) {
1035         graphChildRegister(s);
1036       } else if (isInter(comm)) {
1037         interChildRegister(s);
1038       } else if (isIntra(comm)) {
1039         intraChildRegister(s);
1040       }else
1041         CkAbort("ampiParent recieved child with bad communicator");
1042   }
1043
1044   return thread;
1045 }
1046
1047 //BIGSIM_OOC DEBUGGING
1048 //Move the comm2ampi from inline to normal function for the sake of debugging
1049 /*ampi *ampiParent::comm2ampi(MPI_Comm comm){
1050 //BIGSIM_OOC DEBUGGING
1051 //CmiPrintf("%d, in ampiParent::comm2ampi, comm=%d\n", thisIndex, comm);
1052 if (comm==MPI_COMM_WORLD) return worldPtr;
1053 if (comm==MPI_COMM_SELF) return worldPtr;
1054 if (comm==worldNo) return worldPtr;
1055 if (isSplit(comm)) {
1056 const ampiCommStruct &st=getSplit(comm);
1057 return st.getProxy()[thisIndex].ckLocal();
1058 }
1059 if (isGroup(comm)) {
1060 const ampiCommStruct &st=getGroup(comm);
1061 return st.getProxy()[thisIndex].ckLocal();
1062 }
1063 if (isCart(comm)) {
1064 const ampiCommStruct &st = getCart(comm);
1065 return st.getProxy()[thisIndex].ckLocal();
1066 }
1067 if (isGraph(comm)) {
1068 const ampiCommStruct &st = getGraph(comm);
1069 return st.getProxy()[thisIndex].ckLocal();
1070 }
1071 if (isInter(comm)) {
1072 const ampiCommStruct &st=getInter(comm);
1073 return st.getProxy()[thisIndex].ckLocal();
1074 }
1075 if (isIntra(comm)) {
1076 const ampiCommStruct &st=getIntra(comm);
1077 return st.getProxy()[thisIndex].ckLocal();
1078 }
1079 if (comm>MPI_COMM_WORLD) return worldPtr; //Use MPI_WORLD ampi for cross-world messages:
1080 CkAbort("Invalid communicator used!");
1081 return NULL;
1082 }*/
1083
1084 // reduction client data - preparation for checkpointing
1085 class ckptClientStruct {
1086   public:
1087     const char *dname;
1088     ampiParent *ampiPtr;
1089     ckptClientStruct(const char *s, ampiParent *a): dname(s), ampiPtr(a) {}
1090 };
1091
1092 static void checkpointClient(void *param,void *msg)
1093 {
1094   ckptClientStruct *client = (ckptClientStruct*)param;
1095   const char *dname = client->dname;
1096   ampiParent *ampiPtr = client->ampiPtr;
1097   ampiPtr->Checkpoint(strlen(dname), dname);
1098   delete client;
1099 }
1100
1101 void ampiParent::startCheckpoint(const char* dname){
1102   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
1103   if (thisIndex==0) {
1104     ckptClientStruct *clientData = new ckptClientStruct(dname, this);
1105     CkCallback cb(checkpointClient, clientData);
1106     contribute(0, NULL, CkReduction::sum_int, cb);
1107   }
1108   else
1109     contribute(0, NULL, CkReduction::sum_int);
1110
1111 #if 0
1112 #if CMK_BIGSIM_CHARM
1113   void *curLog;         // store current log in timeline
1114   _TRACE_BG_TLINE_END(&curLog);
1115   TRACE_BG_AMPI_SUSPEND();
1116 #endif
1117 #endif
1118
1119   thread->stop();
1120
1121 #if CMK_BIGSIM_CHARM
1122   // _TRACE_BG_BEGIN_EXECUTE_NOMSG("CHECKPOINT_RESUME", &curLog);
1123   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
1124 #endif
1125 }
1126
1127 void ampiParent::Checkpoint(int len, const char* dname){
1128   if (len == 0) {
1129     // memory checkpoint
1130     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1131     CkStartMemCheckpoint(cb);
1132   }
1133   else {
1134     char dirname[256];
1135     strncpy(dirname,dname,len);
1136     dirname[len]='\0';
1137     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1138     CkStartCheckpoint(dirname,cb);
1139   }
1140 }
1141 void ampiParent::ResumeThread(void){
1142   thread->resume();
1143 }
1144
1145 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
1146     int *keyval, void* extra_state){
1147   KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
1148   int idx = kvlist.size();
1149   kvlist.resize(idx+1);
1150   kvlist[idx] = newnode;
1151   *keyval = idx;
1152   return 0;
1153 }
1154   int ampiParent::freeKeyval(int *keyval){
1155     if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
1156       return -1;
1157     delete kvlist[*keyval];
1158     kvlist[*keyval] = NULL;
1159     *keyval = MPI_KEYVAL_INVALID;
1160     return 0;
1161   }
1162
1163   int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
1164     if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1165       return -1;
1166     ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1167     // Enlarge the keyval list:
1168     while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
1169     cs.getKeyvals()[keyval]=attribute_val;
1170     return 0;
1171   }
1172
1173 int ampiParent::kv_is_builtin(int keyval) {
1174   switch(keyval) {
1175     case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
1176     case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
1177     case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
1178     case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
1179     case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
1180     case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
1181     case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
1182     case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
1183     default: return 0;
1184   };
1185 }
1186
1187 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
1188   *flag = false;
1189   if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
1190     *flag=true;
1191     *(int **)attribute_val = kv_builtin_storage;  // all default tags are ints
1192     return 0;
1193   }
1194   if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1195     return -1; /* invalid keyval */
1196
1197   ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1198   if (keyval>=cs.getKeyvals().size())  
1199     return 0; /* we don't have a value yet */
1200   if (cs.getKeyvals()[keyval]==0)
1201     return 0; /* we had a value, but now it's zero */
1202   /* Otherwise, we have a good value */
1203   *flag = true;
1204   *(void **)attribute_val = cs.getKeyvals()[keyval];
1205   return 0;
1206 }
1207 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
1208   /* no way to delete an attribute: just overwrite it with 0 */
1209   return putAttr(comm,keyval,0);
1210 }
1211
1212 //----------------------- ampi -------------------------
1213 void ampi::init(void) {
1214   parent=NULL;
1215   thread=NULL;
1216   msgs=NULL;
1217   posted_ireqs=NULL;
1218   resumeOnRecv=false;
1219   AsyncEvacuate(CmiFalse);
1220 }
1221
1222 ampi::ampi()
1223 {
1224   /* this constructor only exists so we can create an empty array during split */
1225   CkAbort("Default ampi constructor should never be called");
1226 }
1227
1228   ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
1229 :parentProxy(parent_)
1230 {
1231   init();
1232
1233   myComm=s; myComm.setArrayID(thisArrayID);
1234   myRank=myComm.getRankForIndex(thisIndex);
1235
1236   findParent(false);
1237
1238   msgs = CmmNew();
1239   posted_ireqs = CmmNew();
1240   nbcasts = 0;
1241
1242 #if AMPI_COMLIB
1243   comlibProxy = thisProxy; // Will later be associated with comlib
1244 #endif
1245
1246   seqEntries=parent->ckGetArraySize();
1247   oorder.init (seqEntries);
1248 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1249   if(thisIndex == 0){
1250     /*      CkAssert(CkMyPe() == 0);
1251      *              CkGroupID _myManagerGID = thisProxy.ckGetArrayID();     
1252      *                      CkAssert(numElements);
1253      *                              printf("ampi::ampi setting numInitial to %d on manager at gid %d \n",numElements,_myManagerGID.idx);
1254      *                                      CkArray *_myManager = thisProxy.ckLocalBranch();
1255      *                                              _myManager->setNumInitial(numElements);*/
1256   }
1257 #endif
1258 }
1259
1260 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
1261     ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
1262 :parentProxy(parent_)
1263 {
1264 #if AMPI_COMLIB
1265   ciStreaming = ciStreaming_;
1266   ciBcast = ciBcast_;
1267   ciAllgather = ciAllgather_;
1268   ciAlltoall = ciAlltoall_;
1269
1270   init();
1271
1272   myComm=s; myComm.setArrayID(thisArrayID);
1273   myRank=myComm.getRankForIndex(thisIndex);
1274
1275   findParent(false);
1276
1277   msgs = CmmNew();
1278   posted_ireqs = CmmNew();
1279   nbcasts = 0;
1280
1281   comlibProxy = thisProxy;
1282   CmiPrintf("comlibProxy created as a copy of thisProxy, no associate call\n");
1283
1284 #if AMPI_COMLIB
1285   //  ComlibAssociateProxy(ciAlltoall, comlibProxy);
1286 #endif
1287
1288   seqEntries=parent->ckGetArraySize();
1289   oorder.init (seqEntries);
1290 #endif
1291 }
1292
1293 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
1294 {
1295   init();
1296
1297   seqEntries=-1;
1298 }
1299
1300 void ampi::ckJustMigrated(void)
1301 {
1302   findParent(true);
1303   ArrayElement1D::ckJustMigrated();
1304 }
1305
1306 void ampi::ckJustRestored(void)
1307 {
1308   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
1309     findParent(true);
1310   ArrayElement1D::ckJustRestored();
1311
1312   //BIGSIM_OOC DEBUGGING
1313   //CkPrintf("In ampi[%d] thread[%p]:   ", thisIndex, thread);
1314   //CthPrintThdMagic(thread->getTid()); 
1315 }
1316
1317 void ampi::findParent(bool forMigration) {
1318   STARTUP_DEBUG("ampi> finding my parent")
1319     parent=parentProxy[thisIndex].ckLocal();
1320   if (parent==NULL) CkAbort("AMPI can't find its parent!");
1321   thread=parent->registerAmpi(this,myComm,forMigration);
1322   if (thread==NULL) CkAbort("AMPI can't find its thread!");
1323   //    printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
1324 }
1325
1326 //The following method should be called on the first element of the
1327 //ampi array
1328 void ampi::allInitDone(CkReductionMsg *m){
1329   FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
1330     thisProxy.setInitDoneFlag();
1331   delete m;
1332 }
1333
1334 void ampi::setInitDoneFlag(){
1335   //CkPrintf("ampi[%d]::setInitDone called!\n", thisIndex);
1336   parent->ampiInitCallDone=1;
1337   parent->getTCharmThread()->start();
1338 }
1339
1340 static void cmm_pup_ampi_message(pup_er p,void **msg) {
1341   CkPupMessage(*(PUP::er *)p,msg,1);
1342   if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
1343   //    printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
1344 }
1345
1346 static void cmm_pup_posted_ireq(pup_er p,void **msg) {
1347
1348   pup_int(p, (int *)msg);
1349
1350   /*    if(pup_isUnpacking(p)){
1351   // *msg = new IReq;
1352   //when unpacking, nothing is needed to do since *msg is an index
1353   //(of type integer) to the ampiParent::ampiReqs (the AmpiRequestList)
1354   }
1355   if(!pup_isUnpacking(p)){
1356   AmpiRequestList *reqL = getReqs();
1357   int retIdx = reqL->findRequestIndex((IReq *)*msg);
1358   if(retIdx==-1){
1359   CmiAbort("An AmpiRequest instance should be found for an instance in posted_ireq!\n");
1360   }
1361   pup_int(p, retIdx)
1362   }
1363    */
1364   //    ((IReq *)*msg)->pup(*(PUP::er *)p);
1365
1366   //    if (pup_isDeleting(p)) delete (IReq *)*msg;
1367   //    printf("[%d] pupping postd irequests %p \n",CkMyPe(),*msg);
1368 }
1369
1370 void ampi::pup(PUP::er &p)
1371 {
1372   if(!p.isUserlevel())
1373     ArrayElement1D::pup(p);//Pack superclass
1374   p|parentProxy;
1375   p|myComm;
1376   p|myRank;
1377   p|nbcasts;
1378   p|tmpVec;
1379   p|remoteProxy;
1380   p|resumeOnRecv;
1381 #if AMPI_COMLIB
1382   p|comlibProxy;
1383   p|ciStreaming;
1384   p|ciBcast;
1385   p|ciAllgather;
1386   p|ciAlltoall;
1387
1388   if(p.isUnpacking()){
1389     //    ciStreaming.setSourcePe();
1390     //    ciBcast.setSourcePe();
1391     //    ciAllgather.setSourcePe();
1392     //    ciAlltoall.setSourcePe();
1393   }
1394 #endif
1395
1396   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
1397
1398   //BIGSIM_OOC DEBUGGING
1399   //if(!p.isUnpacking()){
1400   //CkPrintf("ampi[%d]::packing: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1401   //}
1402
1403   posted_ireqs = CmmPup((pup_er)&p, posted_ireqs, cmm_pup_posted_ireq);
1404
1405   //if(p.isUnpacking()){
1406   //CkPrintf("ampi[%d]::unpacking: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1407   //}
1408
1409   p|seqEntries;
1410   p|oorder;
1411 }
1412
1413 ampi::~ampi()
1414 {
1415   if (CkInRestarting() || _BgOutOfCoreFlag==1) {
1416     // in restarting, we need to flush messages
1417     int tags[3], sts[3];
1418     tags[0] = tags[1] = tags[2] = CmmWildCard;
1419     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1420     while (msg) {
1421       delete msg;
1422       msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1423     }
1424   }
1425
1426   CmmFree(msgs);
1427   CmmFreeAll(posted_ireqs);
1428 }
1429
1430 //------------------------ Communicator Splitting ---------------------
1431 class ampiSplitKey {
1432   public:
1433     int nextSplitComm;
1434     int color; //New class of processes we'll belong to
1435     int key; //To determine rank in new ordering
1436     int rank; //Rank in old ordering
1437     ampiSplitKey() {}
1438     ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
1439       :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
1440 };
1441
1442 /* "type" may indicate whether call is for a cartesian topology etc. */
1443
1444 void ampi::split(int color,int key,MPI_Comm *dest, int type)
1445 {
1446 #if CMK_BIGSIM_CHARM
1447   void *curLog;         // store current log in timeline
1448   _TRACE_BG_TLINE_END(&curLog);
1449   //  TRACE_BG_AMPI_SUSPEND();
1450 #endif
1451   if (type == CART_TOPOL) {
1452     ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
1453     int rootIdx=myComm.getIndexForRank(0);
1454     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1455     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1456
1457     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1458     MPI_Comm newComm=parent->getNextCart()-1;
1459     *dest=newComm;
1460   } else {
1461     ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
1462     int rootIdx=myComm.getIndexForRank(0);
1463     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1464     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1465
1466     thread->suspend(); //Resumed by ampiParent::splitChildRegister
1467     MPI_Comm newComm=parent->getNextSplit()-1;
1468     *dest=newComm;
1469   }
1470 #if CMK_BIGSIM_CHARM
1471   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "SPLIT_RESUME", curLog);
1472   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("SPLIT_RESUME", &curLog);
1473   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
1474 #endif
1475 }
1476
1477 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
1478   const ampiSplitKey *a=(const ampiSplitKey *)a_;
1479   const ampiSplitKey *b=(const ampiSplitKey *)b_;
1480   if (a->color!=b->color) return a->color-b->color;
1481   if (a->key!=b->key) return a->key-b->key;
1482   return a->rank-b->rank;
1483 }
1484
1485 void ampi::splitPhase1(CkReductionMsg *msg)
1486 {
1487   //Order the keys, which orders the ranks properly:
1488   int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1489   ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1490   if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1491   qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1492
1493   MPI_Comm newComm = -1;
1494   for(int i=0;i<nKeys;i++)
1495     if(keys[i].nextSplitComm>newComm)
1496       newComm = keys[i].nextSplitComm;
1497
1498   //Loop over the sorted keys, which gives us the new arrays:
1499   int lastColor=keys[0].color-1; //The color we're building an array for
1500   CProxy_ampi lastAmpi; //The array for lastColor
1501   int lastRoot=0; //C value for new rank 0 process for latest color
1502   ampiCommStruct lastComm; //Communicator info. for latest color
1503   for (int c=0;c<nKeys;c++) {
1504     if (keys[c].color!=lastColor)
1505     { //Hit a new color-- need to build a new communicator and array
1506       lastColor=keys[c].color;
1507       lastRoot=c;
1508       CkArrayOptions opts;
1509       opts.bindTo(parentProxy);
1510       opts.setNumInitial(0);
1511       CkArrayID unusedAID; ampiCommStruct unusedComm;
1512       lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1513       lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1514
1515       CkVec<int> indices; //Maps rank to array indices for new arrau
1516       for (int i=c;i<nKeys;i++) {
1517         if (keys[i].color!=lastColor) break; //Done with this color
1518         int idx=myComm.getIndexForRank(keys[i].rank);
1519         indices.push_back(idx);
1520       }
1521
1522       //FIXME: create a new communicator for each color, instead of
1523       // (confusingly) re-using the same MPI_Comm number for each.
1524       lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1525     }
1526     int newRank=c-lastRoot;
1527     int newIdx=lastComm.getIndexForRank(newRank);
1528
1529     //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1530     lastAmpi[newIdx].insert(parentProxy,lastComm);
1531   }
1532
1533   delete msg;
1534 }
1535
1536 //...newly created array elements register with the parent, which calls:
1537 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1538   int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1539   if (splitComm.size()<=idx) splitComm.resize(idx+1);
1540   splitComm[idx]=new ampiCommStruct(s);
1541   thread->resume(); //Matches suspend at end of ampi::split
1542 }
1543
1544 //-----------------create communicator from group--------------
1545 // The procedure is like that of comm_split very much,
1546 // so the code is shamelessly copied from above
1547 //   1. reduction to make sure all members have called
1548 //   2. the root in the old communicator create the new array
1549 //   3. ampiParent::register is called to register new array as new comm
1550 class vecStruct {
1551   public:
1552     int nextgroup;
1553     groupStruct vec;
1554     vecStruct():nextgroup(-1){}
1555     vecStruct(int nextgroup_, groupStruct vec_)
1556       : nextgroup(nextgroup_), vec(vec_) { }
1557 };
1558
1559 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1560   int rootIdx=vec[0];
1561   tmpVec = vec;
1562   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1563   MPI_Comm nextgroup = parent->getNextGroup();
1564   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1565
1566   if(getPosOp(thisIndex,vec)>=0){
1567     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1568     MPI_Comm retcomm = parent->getNextGroup()-1;
1569     *newcomm = retcomm;
1570   }else{
1571     *newcomm = MPI_COMM_NULL;
1572   }
1573 }
1574
1575 void ampi::commCreatePhase1(CkReductionMsg *msg){
1576   MPI_Comm *nextGroupComm = (int *)msg->getData();
1577
1578   CkArrayOptions opts;
1579   opts.bindTo(parentProxy);
1580   opts.setNumInitial(0);
1581   CkArrayID unusedAID;
1582   ampiCommStruct unusedComm;
1583   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1584   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1585
1586   groupStruct indices = tmpVec;
1587   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1588   for(int i=0;i<indices.size();i++){
1589     int newIdx=indices[i];
1590     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1591   }
1592   delete msg;
1593 }
1594
1595 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1596   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1597   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1598   groupComm[idx]=new ampiCommStruct(s);
1599   thread->resume(); //Matches suspend at end of ampi::split
1600 }
1601
1602 /* Virtual topology communicator creation */
1603 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1604   int rootIdx=vec[0];
1605   tmpVec = vec;
1606   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1607
1608   MPI_Comm nextcart = parent->getNextCart();
1609   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1610
1611   if(getPosOp(thisIndex,vec)>=0){
1612     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1613     MPI_Comm retcomm = parent->getNextCart()-1;
1614     *newcomm = retcomm;
1615   }else
1616     *newcomm = MPI_COMM_NULL;
1617 }
1618
1619 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1620   MPI_Comm *nextCartComm = (int *)msg->getData();
1621
1622   CkArrayOptions opts;
1623   opts.bindTo(parentProxy);
1624   opts.setNumInitial(0);
1625   CkArrayID unusedAID;
1626   ampiCommStruct unusedComm;
1627   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1628   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1629
1630   groupStruct indices = tmpVec;
1631   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1632       size(),indices);
1633   for(int i=0;i<indices.size();i++){
1634     int newIdx=indices[i];
1635     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1636   }
1637   delete msg;
1638 }
1639
1640 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1641   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1642   if (cartComm.size()<=idx) {
1643     cartComm.resize(idx+1);
1644     cartComm.length()=idx+1;
1645   }
1646   cartComm[idx]=new ampiCommStruct(s);
1647   thread->resume(); //Matches suspend at end of ampi::cartCreate
1648 }
1649
1650 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1651   int rootIdx=vec[0];
1652   tmpVec = vec;
1653   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1654       myComm.getProxy());
1655   MPI_Comm nextgraph = parent->getNextGraph();
1656   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1657
1658   if(getPosOp(thisIndex,vec)>=0){
1659     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1660     MPI_Comm retcomm = parent->getNextGraph()-1;
1661     *newcomm = retcomm;
1662   }else
1663     *newcomm = MPI_COMM_NULL;
1664 }
1665
1666 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1667   MPI_Comm *nextGraphComm = (int *)msg->getData();
1668
1669   CkArrayOptions opts;
1670   opts.bindTo(parentProxy);
1671   opts.setNumInitial(0);
1672   CkArrayID unusedAID;
1673   ampiCommStruct unusedComm;
1674   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1675   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1676
1677   groupStruct indices = tmpVec;
1678   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1679       .size(),indices);
1680   for(int i=0;i<indices.size();i++){
1681     int newIdx=indices[i];
1682     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1683   }
1684   delete msg;
1685 }
1686
1687 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1688   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1689   if (graphComm.size()<=idx) {
1690     graphComm.resize(idx+1);
1691     graphComm.length()=idx+1;
1692   }
1693   graphComm[idx]=new ampiCommStruct(s);
1694   thread->resume(); //Matches suspend at end of ampi::graphCreate
1695 }
1696
1697 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1698   if(thisIndex==root) { // not everybody gets the valid rvec
1699     tmpVec = rvec;
1700   }
1701   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1702   MPI_Comm nextinter = parent->getNextInter();
1703   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1704
1705   thread->suspend(); //Resumed by ampiParent::interChildRegister
1706   MPI_Comm newcomm=parent->getNextInter()-1;
1707   *ncomm=newcomm;
1708 }
1709
1710 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1711   MPI_Comm *nextInterComm = (int *)msg->getData();
1712
1713   groupStruct lgroup = myComm.getIndices();
1714   CkArrayOptions opts;
1715   opts.bindTo(parentProxy);
1716   opts.setNumInitial(0);
1717   CkArrayID unusedAID;
1718   ampiCommStruct unusedComm;
1719   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1720   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1721
1722   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1723   for(int i=0;i<lgroup.size();i++){
1724     int newIdx=lgroup[i];
1725     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1726   }
1727
1728   parentProxy[0].ExchangeProxy(newAmpi);
1729   delete msg;
1730 }
1731
1732 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1733   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1734   if (interComm.size()<=idx) interComm.resize(idx+1);
1735   interComm[idx]=new ampiCommStruct(s);
1736   //thread->resume(); // don't resume it yet, till parent set remote proxy
1737 }
1738
1739 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1740   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1741     groupStruct lvec = myComm.getIndices();
1742     groupStruct rvec = myComm.getRemoteIndices();
1743     int rsize = rvec.size();
1744     tmpVec = lvec;
1745     for(int i=0;i<rsize;i++)
1746       tmpVec.push_back(rvec[i]);
1747     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1748   }else{
1749     tmpVec.resize(0);
1750   }
1751
1752   int rootIdx=myComm.getIndexForRank(0);
1753   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1754   MPI_Comm nextintra = parent->getNextIntra();
1755   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1756
1757   thread->suspend(); //Resumed by ampiParent::interChildRegister
1758   MPI_Comm newcomm=parent->getNextIntra()-1;
1759   *ncomm=newcomm;
1760 }
1761
1762 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1763   if(tmpVec.size()==0) { delete msg; return; }
1764   MPI_Comm *nextIntraComm = (int *)msg->getData();
1765   CkArrayOptions opts;
1766   opts.bindTo(parentProxy);
1767   opts.setNumInitial(0);
1768   CkArrayID unusedAID;
1769   ampiCommStruct unusedComm;
1770   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1771   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1772
1773   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1774   for(int i=0;i<tmpVec.size();i++){
1775     int newIdx=tmpVec[i];
1776     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1777   }
1778   delete msg;
1779 }
1780
1781 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1782   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1783   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1784   intraComm[idx]=new ampiCommStruct(s);
1785   thread->resume(); //Matches suspend at end of ampi::split
1786 }
1787
1788 //------------------------ communication -----------------------
1789 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1790 {
1791   if (universeNo>MPI_COMM_WORLD) {
1792     int worldDex=universeNo-MPI_COMM_WORLD-1;
1793     if (worldDex>=_mpi_nworlds)
1794       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1795     return mpi_worlds[worldDex].comm;
1796   }
1797   CkAbort("Bad communicator passed to universeComm2CommStruct");
1798   return mpi_worlds[0].comm; // meaningless return
1799 }
1800
1801 void ampi::block(void){
1802   thread->suspend();
1803 }
1804
1805 void ampi::yield(void){
1806   thread->schedule();
1807 }
1808
1809 void ampi::unblock(void){
1810   thread->resume();
1811 }
1812
1813   void ampi::ssend_ack(int sreq_idx){
1814     if (sreq_idx == 1)
1815       thread->resume();           // MPI_Ssend
1816     else {
1817       sreq_idx -= 2;              // start from 2
1818       AmpiRequestList *reqs = &(parent->ampiReqs);
1819       SReq *sreq = (SReq *)(*reqs)[sreq_idx];
1820       sreq->statusIreq = true;
1821       if (resumeOnRecv) {
1822         thread->resume();
1823       }
1824     }
1825   }
1826
1827   void
1828 ampi::generic(AmpiMsg* msg)
1829 {
1830   MSG_ORDER_DEBUG(
1831       CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1832         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1833       )
1834 #if CMK_BIGSIM_CHARM
1835     TRACE_BG_ADD_TAG("AMPI_generic");
1836   msg->event = NULL;
1837 #endif
1838
1839   int sync = UsrToEnv(msg)->getRef();
1840   int srcIdx;
1841   if (sync)  srcIdx = msg->srcIdx;
1842
1843   //    AmpiMsg *msgcopy = msg;
1844   if(msg->seq != -1) {
1845     int srcIdx=msg->srcIdx;
1846     int n=oorder.put(srcIdx,msg);
1847     if (n>0) { // This message was in-order
1848       inorder(msg);
1849       if (n>1) { // It enables other, previously out-of-order messages
1850         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1851           inorder(msg);
1852         }
1853       }
1854     }
1855   } else { //Cross-world or system messages are unordered
1856     inorder(msg);
1857   }
1858
1859   // msg may be free'ed from calling inorder()
1860   if (sync>0) {         // send an ack to sender
1861     CProxy_ampi pa(thisArrayID);
1862     pa[srcIdx].ssend_ack(sync);
1863   }
1864
1865   if(resumeOnRecv){
1866     //CkPrintf("Calling TCharm::resume at ampi::generic!\n");
1867     thread->resume();
1868   }
1869 }
1870
1871 inline static AmpiRequestList *getReqs(void); 
1872
1873   void
1874 ampi::inorder(AmpiMsg* msg)
1875 {
1876   MSG_ORDER_DEBUG(
1877       CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1878         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1879       )
1880     // check posted recvs
1881     int tags[3], sts[3];
1882   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1883   IReq *ireq = NULL;
1884   if (CpvAccess(CmiPICMethod) != 2) {
1885 #if 0
1886     //IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1887     ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1888 #else
1889 #if CMK_BIGSIM_CHARM
1890     _TRACE_BG_TLINE_END(&msg->event);    // store current log
1891     msg->eventPe = CmiMyPe();
1892 #endif
1893     //in case ampi has not initialized and posted_ireqs are only inserted 
1894     //at AMPI_Irecv (MPI_Irecv)
1895     AmpiRequestList *reqL = &(parent->ampiReqs);
1896     //When storing the req index, it's 1-based. The reason is stated in the comments
1897     //in AMPI_Irecv function.
1898     int ireqIdx = (int)((long)CmmGet(posted_ireqs, 3, tags, sts));
1899     if(reqL->size()>0 && ireqIdx>0)
1900       ireq = (IReq *)(*reqL)[ireqIdx-1];
1901     //CkPrintf("[%d] ampi::inorder, ireqIdx=%d\n", thisIndex, ireqIdx);
1902 #endif
1903     //CkPrintf("[%d] ampi::inorder, ireq=%p\n", thisIndex, ireq);
1904     if (ireq) { // receive posted
1905       ireq->receive(this, msg);
1906       // Isaac changed this so that the IReq stores the tag when receiving the message, 
1907       // instead of using this user supplied tag which could be MPI_ANY_TAG
1908       // Formerly the following line was not commented out:
1909       //ireq->tag = sts[0];         
1910       //ireq->src = sts[1];
1911       //ireq->comm = sts[2];
1912     } else {
1913       CmmPut(msgs, 3, tags, msg);
1914     }
1915   }
1916   else
1917     CmmPut(msgs, 3, tags, msg);
1918 }
1919
1920 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
1921 {
1922   int tags[3];
1923   tags[0] = t; tags[1] = s; tags[2] = comm;
1924   AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1925   return msg;
1926 }
1927
1928 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1929     int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm, int sync)
1930 {
1931   CkDDT_DataType *ddt = getDDT()->getType(type);
1932   int len = ddt->getSize(count);
1933   int sIdx=thisIndex;
1934   int seq = -1;
1935   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_TAG_UB_VALUE) //Not cross-module: set seqno
1936     seq = oorder.nextOutgoing(destIdx);
1937   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1938   if (sync) UsrToEnv(msg)->setRef(sync);
1939   TCharm::activateVariable(buf);
1940   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1941   TCharm::deactivateVariable(buf);
1942   return msg;
1943 }
1944
1945 #if AMPI_COMLIB
1946   void
1947 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1948 {
1949   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1950 }
1951 #endif
1952
1953   void
1954 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, int sync)
1955 {
1956 #if CMK_TRACE_IN_CHARM
1957   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
1958 #endif
1959
1960 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1961   MPI_Comm disComm = myComm.getComm();
1962   ampi *dis = getAmpiInstance(disComm);
1963   CpvAccess(_currentObj) = dis;
1964 #endif
1965
1966   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1967   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),sync);
1968
1969 #if CMK_TRACE_IN_CHARM
1970   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
1971 #endif
1972
1973   if (sync == 1) {
1974     // waiting for receiver side
1975     resumeOnRecv = false;            // so no one else awakes it
1976     block();
1977   }
1978 }
1979
1980   void
1981 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1982 {
1983   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
1984   memcpy(msg->data, buf, len);
1985   CProxy_ampi pa(aid);
1986   pa[idx].generic(msg);
1987 }
1988
1989   void
1990 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy, int sync)
1991 {
1992   if(rank==MPI_PROC_NULL) return;
1993   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1994   int destIdx = dest.getIndexForRank(rank);
1995   if(isInter()){
1996     sRank = parent->thisIndex;
1997     destcomm = MPI_COMM_FIRST_INTER;
1998     destIdx = dest.getIndexForRemoteRank(rank);
1999     arrproxy = remoteProxy;
2000   }
2001   MSG_ORDER_DEBUG(
2002       CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
2003       )
2004
2005     arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm,sync));
2006
2007 #if 0
2008 #if CMK_TRACE_ENABLED
2009   int size=0;
2010   MPI_Type_size(type,&size);
2011   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
2012 #endif
2013 #endif
2014 }
2015
2016   int
2017 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
2018 {
2019   CkDDT_DataType *ddt = getDDT()->getType(type);
2020   int len = ddt->getSize(count);
2021
2022   if(msg->length < len){ // only at rare case shall we reset count by using divide
2023     count = msg->length/(ddt->getSize(1));
2024   }
2025
2026   TCharm::activateVariable(buf);
2027   if (t==MPI_REDUCE_TAG) {      // reduction msg
2028     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
2029   } else {
2030     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
2031   }
2032   TCharm::deactivateVariable(buf);
2033   return 0;
2034 }
2035
2036   int
2037 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
2038 {
2039   MPI_Comm disComm = myComm.getComm();
2040   if(s==MPI_PROC_NULL) {
2041     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
2042     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
2043     ((MPI_Status *)sts)->MPI_LENGTH = 0;
2044     return 0;
2045   }
2046 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
2047   _LOG_E_END_AMPI_PROCESSING(thisIndex)
2048 #endif
2049 #if CMK_BIGSIM_CHARM
2050    void *curLog;                // store current log in timeline
2051   _TRACE_BG_TLINE_END(&curLog);
2052   //  TRACE_BG_AMPI_SUSPEND();
2053 #if CMK_TRACE_IN_CHARM
2054   if(CpvAccess(traceOn)) traceSuspend();
2055 #endif
2056 #endif
2057
2058   if(isInter()){
2059     s = myComm.getIndexForRemoteRank(s);
2060     comm = MPI_COMM_FIRST_INTER;
2061   }
2062
2063   int tags[3];
2064   AmpiMsg *msg = 0;
2065
2066   MSG_ORDER_DEBUG(
2067       CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
2068       )
2069
2070   ampi *dis = getAmpiInstance(disComm);
2071 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2072   //  dis->yield();
2073   //  processRemoteMlogMessages();
2074 #endif
2075   int dosuspend = 0;
2076   while(1) {
2077     //This is done to take into account the case in which an ampi 
2078     // thread has migrated while waiting for a message
2079     tags[0] = t; tags[1] = s; tags[2] = comm;
2080     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
2081     if (msg) break;
2082     dis->resumeOnRecv=true;
2083     dis->thread->suspend();
2084     dosuspend = 1;
2085     dis = getAmpiInstance(disComm);
2086   }
2087
2088 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2089   CpvAccess(_currentObj) = dis;
2090   MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
2091 #endif
2092
2093     dis->resumeOnRecv=false;
2094
2095   if(sts)
2096     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2097   int status = dis->processMessage(msg, t, s, buf, count, type);
2098   if (status != 0) return status;
2099
2100 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
2101   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
2102 #endif
2103
2104 #if CMK_BIGSIM_CHARM
2105 #if CMK_TRACE_IN_CHARM
2106     //if(CpvAccess(traceOn)) CthTraceResume(thread->getThread());
2107     //Due to the reason mentioned the in the while loop above, we need to 
2108     //use "dis" as "this" in the case of migration (or out-of-core execution in BigSim)
2109     if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
2110 #endif
2111   //TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", &curLog, 1);
2112   //TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0);
2113   //_TRACE_BG_SET_INFO((char *)msg, "RECV_RESUME",  &curLog, 1);
2114 #if 0
2115 #if 1
2116   if (!dosuspend) {
2117     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 1);
2118     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2119   }
2120   else
2121 #endif
2122     TRACE_BG_ADD_TAG("RECV_RESUME_THREAD");
2123 #else
2124   TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
2125   if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2126 #endif
2127 #endif
2128
2129   delete msg;
2130   return 0;
2131 }
2132
2133   void
2134 ampi::probe(int t, int s, int comm, int *sts)
2135 {
2136   int tags[3];
2137 #if CMK_BIGSIM_CHARM
2138   void *curLog;         // store current log in timeline
2139   _TRACE_BG_TLINE_END(&curLog);
2140   //  TRACE_BG_AMPI_SUSPEND();
2141 #endif
2142
2143   AmpiMsg *msg = 0;
2144   resumeOnRecv=true;
2145   while(1) {
2146     tags[0] = t; tags[1] = s; tags[2] = comm;
2147     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2148     if (msg) break;
2149     thread->suspend();
2150   }
2151   resumeOnRecv=false;
2152   if(sts)
2153     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2154 #if CMK_BIGSIM_CHARM
2155   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
2156   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
2157 #endif
2158 }
2159
2160   int
2161 ampi::iprobe(int t, int s, int comm, int *sts)
2162 {
2163   int tags[3];
2164   AmpiMsg *msg = 0;
2165   tags[0] = t; tags[1] = s; tags[2] = comm;
2166   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2167   if (msg) {
2168     if(sts)
2169       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2170     return 1;
2171   }
2172 #if CMK_BIGSIM_CHARM
2173   void *curLog;         // store current log in timeline
2174   _TRACE_BG_TLINE_END(&curLog);
2175   //  TRACE_BG_AMPI_SUSPEND();
2176 #endif
2177   thread->schedule();
2178 #if CMK_BIGSIM_CHARM
2179   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("IPROBE_RESUME", &curLog);
2180   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
2181 #endif
2182   return 0;
2183 }
2184
2185
2186 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
2187   void
2188 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
2189 {
2190   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2191   int rootIdx=dest.getIndexForRank(root);
2192   if(rootIdx==thisIndex) {
2193 #if 0//AMPI_COMLIB
2194     ciBcast.beginIteration();
2195     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2196 #else
2197 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2198     CpvAccess(_currentObj) = this;
2199 #endif
2200     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2201 #endif
2202   }
2203   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
2204   nbcasts++;
2205 }
2206
2207   void
2208 ampi::bcastraw(void* buf, int len, CkArrayID aid)
2209 {
2210   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, MPI_COMM_WORLD);
2211   memcpy(msg->data, buf, len);
2212   CProxy_ampi pa(aid);
2213   pa.generic(msg);
2214 }
2215
2216
2217   AmpiMsg* 
2218 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
2219 {
2220   CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
2221   int unit;
2222   CkDDT_DataType *ddt = getDDT()->getType(type);
2223   unit = ddt->getSize(1);
2224   int totalsize = unit*cnt;
2225
2226   AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
2227   char* addr = (char*)Alltoallbuff+disp*unit;
2228   ddt->serialize((char*)msg->data, addr, cnt, (-1));
2229   return msg;
2230 }
2231
2232 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
2233     void *attr_in, void *attr_out, int *flag){
2234   (*flag) = 0;
2235   return (MPI_SUCCESS);
2236 }
2237 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
2238     void *attr_in, void *attr_out, int *flag){
2239   (*(void **)attr_out) = attr_in;
2240   (*flag) = 1;
2241   return (MPI_SUCCESS);
2242 }
2243 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
2244   return (MPI_SUCCESS);
2245 }
2246
2247
2248 void AmpiSeqQ::init(int numP) 
2249 {
2250   elements.init(numP);
2251 }
2252
2253 AmpiSeqQ::~AmpiSeqQ () {
2254 }
2255
2256 void AmpiSeqQ::pup(PUP::er &p) {
2257   p|out;
2258   p|elements;
2259 }
2260
2261 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
2262 {
2263   AmpiOtherElement &el=elements[srcIdx];
2264 #ifndef CMK_OPTIMIZE
2265   if (msg->seq<el.seqIncoming)
2266     CkAbort("AMPI Logic error: received late out-of-order message!\n");
2267 #endif
2268   out.enq(msg);
2269   el.nOut++; // We have another message in the out-of-order queue
2270 }
2271
2272 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
2273 {
2274   AmpiOtherElement &el=elements[srcIdx];
2275   if (el.nOut==0) return 0; // No more out-of-order left.
2276   // Walk through our out-of-order queue, searching for our next message:
2277   for (int i=0;i<out.length();i++) {
2278     AmpiMsg *msg=out.deq();
2279     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
2280       el.seqIncoming++;
2281       el.nOut--; // We have one less message out-of-order
2282       return msg;
2283     }
2284     else
2285       out.enq(msg);
2286   }
2287   // We walked the whole queue-- ours is not there.
2288   return 0;
2289 }
2290
2291 //BIGSIM_OOC DEBUGGING: Output for AmpiRequest and its children classes
2292 void AmpiRequest::print(){
2293   CmiPrintf("In AmpiRequest: buf=%p, count=%d, type=%d, src=%d, tag=%d, comm=%d, isvalid=%d\n", buf, count, type, src, tag, comm, isvalid);
2294 }
2295
2296 void PersReq::print(){
2297   AmpiRequest::print();
2298   CmiPrintf("In PersReq: sndrcv=%d\n", sndrcv);
2299 }
2300
2301 void IReq::print(){
2302   AmpiRequest::print();
2303   CmiPrintf("In IReq: this=%p, status=%d, length=%d\n", this, statusIreq, length);
2304 }
2305
2306 void ATAReq::print(){ //not complete for myreqs
2307   AmpiRequest::print();
2308   CmiPrintf("In ATAReq: elmcount=%d, idx=%d\n", elmcount, idx);
2309
2310
2311 void SReq::print(){
2312   AmpiRequest::print();
2313   CmiPrintf("In SReq: this=%p, status=%d\n", this, statusIreq);
2314 }
2315
2316 void AmpiRequestList::pup(PUP::er &p) { 
2317   if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
2318     return;
2319   }
2320
2321   p(blklen); //Allocated size of block
2322   p(len); //Number of used elements in block
2323   if(p.isUnpacking()){
2324     makeBlock(blklen,len);
2325   }
2326   int count=0;
2327   for(int i=0;i<len;i++){
2328     char nonnull;
2329     if(!p.isUnpacking()){
2330       if(block[i] == NULL){
2331         nonnull = 0;
2332       }else{
2333         nonnull = block[i]->getType();
2334       }
2335     }   
2336     p(nonnull);
2337     if(nonnull != 0){
2338       if(p.isUnpacking()){
2339         switch(nonnull){
2340           case 1:
2341             block[i] = new PersReq;
2342             break;
2343           case 2:       
2344             block[i] = new IReq;
2345             break;
2346           case 3:       
2347             block[i] = new ATAReq;
2348             break;
2349         }
2350       } 
2351       block[i]->pup(p);
2352       count++;
2353     }else{
2354       block[i] = 0;
2355     }
2356   }
2357   if(p.isDeleting()){
2358     freeBlock();
2359   }
2360 }
2361
2362 //------------------ External Interface -----------------
2363 ampiParent *getAmpiParent(void) {
2364   ampiParent *p = CtvAccess(ampiPtr);
2365 #ifndef CMK_OPTIMIZE
2366   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
2367 #endif
2368   return p;
2369 }
2370
2371 ampi *getAmpiInstance(MPI_Comm comm) {
2372   ampi *ptr=getAmpiParent()->comm2ampi(comm);
2373 #ifndef CMK_OPTIMIZE
2374   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
2375 #endif
2376   return ptr;
2377 }
2378
2379 inline static AmpiRequestList *getReqs(void) {
2380   return &(getAmpiParent()->ampiReqs);
2381 }
2382
2383 inline void checkComm(MPI_Comm comm){
2384 #ifndef CMK_OPTIMIZE
2385   getAmpiParent()->checkComm(comm);
2386 #endif
2387 }
2388
2389 inline void checkRequest(MPI_Request req){
2390 #ifndef CMK_OPTIMIZE
2391   getReqs()->checkRequest(req);
2392 #endif
2393 }
2394
2395 inline void checkRequests(int n, MPI_Request* reqs){
2396 #ifndef CMK_OPTIMIZE
2397   AmpiRequestList* reqlist = getReqs();
2398   for(int i=0;i<n;i++)
2399     reqlist->checkRequest(reqs[i]);
2400 #endif
2401 }
2402
2403 CDECL void AMPI_Migrate(void)
2404 {
2405   //  AMPIAPI("AMPI_Migrate");
2406 #if 0
2407 #if CMK_BIGSIM_CHARM
2408   TRACE_BG_AMPI_SUSPEND();
2409 #endif
2410 #endif
2411   TCHARM_Migrate();
2412
2413 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2414   ampi *currentAmpi = getAmpiInstance(MPI_COMM_WORLD);
2415   CpvAccess(_currentObj) = currentAmpi;
2416 #endif
2417
2418 #if CMK_BIGSIM_CHARM
2419   //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2420   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
2421 #endif
2422 }
2423
2424
2425 CDECL void AMPI_Evacuate(void)
2426 {
2427   TCHARM_Evacuate();
2428 }
2429
2430
2431
2432 CDECL void AMPI_Migrateto(int destPE)
2433 {
2434   AMPIAPI("AMPI_MigrateTo");
2435 #if 0
2436 #if CMK_BIGSIM_CHARM
2437   TRACE_BG_AMPI_SUSPEND();
2438 #endif
2439 #endif
2440   TCHARM_Migrate_to(destPE);
2441 #if CMK_BIGSIM_CHARM
2442   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
2443   TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
2444 #endif
2445 }
2446
2447 CDECL void AMPI_MigrateTo(int destPE)
2448 {
2449   AMPI_Migrateto(destPE);
2450 }
2451
2452 CDECL void AMPI_Async_Migrate(void)
2453 {
2454   AMPIAPI("AMPI_Async_Migrate");
2455 #if 0
2456 #if CMK_BIGSIM_CHARM
2457   TRACE_BG_AMPI_SUSPEND();
2458 #endif
2459 #endif
2460   TCHARM_Async_Migrate();
2461 #if CMK_BIGSIM_CHARM
2462   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2463   TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
2464 #endif
2465 }
2466
2467 CDECL void AMPI_Allow_Migrate(void)
2468 {
2469   AMPIAPI("AMPI_Allow_Migrate");
2470 #if 0
2471 #if CMK_BIGSIM_CHARM
2472   TRACE_BG_AMPI_SUSPEND();
2473 #endif
2474 #endif
2475   TCHARM_Allow_Migrate();
2476 #if CMK_BIGSIM_CHARM
2477   TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
2478 #endif
2479 }
2480
2481 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
2482 #if CMK_LBDB_ON
2483   //AMPIAPI("AMPI_Setmigratable");
2484   ampi *ptr=getAmpiInstance(comm);
2485   ptr->setMigratable(mig);
2486 #else
2487   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
2488 #endif
2489 }
2490
2491 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
2492 {
2493   //AMPIAPI("AMPI_Init");
2494   if (nodeinit_has_been_called) {
2495     AMPIAPI("AMPI_Init");
2496     char **argv;
2497     if (p_argv) argv=*p_argv;
2498     else argv=CkGetArgv();
2499     ampiInit(argv);
2500     if (p_argc) *p_argc=CmiGetArgc(argv);
2501   }
2502   else
2503   { /* Charm hasn't been started yet! */
2504     CkAbort("AMPI_Init> Charm is not initialized!");
2505   }
2506
2507   return 0;
2508 }
2509
2510 CDECL int AMPI_Initialized(int *isInit)
2511 {
2512   if (nodeinit_has_been_called) {
2513     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2514     *isInit=CtvAccess(ampiInitDone);
2515   }
2516   else /* !nodeinit_has_been_called */ {
2517     *isInit=nodeinit_has_been_called;
2518   }
2519   return 0;
2520 }
2521
2522 CDECL int AMPI_Finalized(int *isFinalized)
2523 {
2524   AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2525   *isFinalized=CtvAccess(ampiFinalized);
2526   return 0;
2527 }
2528
2529 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
2530 {
2531   //AMPIAPI("AMPI_Comm_rank");
2532
2533 #ifndef CMK_OPTIMIZE 
2534   if(checkCommunicator(comm) != MPI_SUCCESS)
2535     return checkCommunicator(comm);
2536 #endif
2537
2538 #if AMPIMSGLOG
2539   ampiParent* pptr = getAmpiParent();
2540   if(msgLogRead){
2541     PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
2542     return 0;
2543   }
2544 #endif
2545
2546   *rank = getAmpiInstance(comm)->getRank(comm);
2547
2548 #if AMPIMSGLOG
2549   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2550     PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
2551   }
2552 #endif
2553   return 0;
2554 }
2555
2556   CDECL
2557 int AMPI_Comm_size(MPI_Comm comm, int *size)
2558 {
2559   //AMPIAPI("AMPI_Comm_size");
2560
2561 #ifndef CMK_OPTIMIZE 
2562   if(checkCommunicator(comm) != MPI_SUCCESS)
2563     return checkCommunicator(comm);
2564 #endif
2565
2566 #if AMPIMSGLOG
2567   ampiParent* pptr = getAmpiParent();
2568   if(msgLogRead){
2569     PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
2570     return 0;
2571   }
2572 #endif
2573
2574   *size = getAmpiInstance(comm)->getSize(comm);
2575
2576 #if AMPIMSGLOG
2577   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2578     PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
2579   }
2580 #endif
2581
2582   return 0;
2583 }
2584
2585   CDECL
2586 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
2587 {
2588
2589 #ifndef CMK_OPTIMIZE 
2590   if(checkCommunicator(comm1) != MPI_SUCCESS)
2591     return checkCommunicator(comm1);
2592   if(checkCommunicator(comm2) != MPI_SUCCESS)
2593     return checkCommunicator(comm2);
2594 #endif
2595
2596   AMPIAPI("AMPI_Comm_compare");
2597   if(comm1==comm2) *result=MPI_IDENT;
2598   else{
2599     int equal=1;
2600     CkVec<int> ind1, ind2;
2601     ind1 = getAmpiInstance(comm1)->getIndices();
2602     ind2 = getAmpiInstance(comm2)->getIndices();
2603     if(ind1.size()==ind2.size()){
2604       for(int i=0;i<ind1.size();i++)
2605         if(ind1[i] != ind2[i]) { equal=0; break; }
2606     }
2607     if(equal==1) *result=MPI_CONGRUENT;
2608     else *result=MPI_UNEQUAL;
2609   }
2610   return 0;
2611 }
2612
2613 CDECL void AMPI_Exit(int /*exitCode*/)
2614 {
2615   AMPIAPI("AMPI_Exit");
2616   //finalizeBigSimTrace();
2617   TCHARM_Done();
2618 }
2619 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
2620 {
2621   AMPI_Exit(*exitCode);
2622 }
2623
2624   CDECL
2625 int AMPI_Finalize(void)
2626 {
2627   AMPIAPI("AMPI_Finalize");
2628 #if PRINT_IDLE
2629   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
2630 #endif
2631 #if AMPI_COUNTER
2632   getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
2633 #endif
2634   CtvAccess(ampiFinalized)=1;
2635
2636 #if CMK_BIGSIM_CHARM
2637 #if 0
2638   TRACE_BG_AMPI_SUSPEND();
2639 #endif
2640 #if CMK_TRACE_IN_CHARM
2641   if(CpvAccess(traceOn)) traceSuspend();
2642 #endif
2643 #endif
2644
2645   //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
2646   AMPI_Exit(0);
2647   return 0;
2648 }
2649
2650 CDECL
2651 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
2652     int tag, MPI_Comm comm) {
2653
2654 #ifndef CMK_OPTIMIZE
2655   int ret;
2656   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2657   if(ret != MPI_SUCCESS)
2658     return ret;
2659 #endif
2660
2661   AMPIAPI("AMPI_Send");
2662 #if AMPIMSGLOG
2663   if(msgLogRead){
2664     return 0;
2665   }
2666 #endif
2667
2668   ampi *ptr = getAmpiInstance(comm);
2669 #if AMPI_COMLIB
2670   if(enableStreaming){  
2671     //    ptr->getStreaming().beginIteration();
2672     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2673   } else
2674 #endif
2675     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
2676 #if AMPI_COUNTER
2677   getAmpiParent()->counters.send++;
2678 #endif
2679 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2680   //  ptr->yield();
2681   //  //  processRemoteMlogMessages();
2682 #endif
2683   return 0;
2684 }
2685
2686   CDECL
2687 int AMPI_Ssend(void *msg, int count, MPI_Datatype type, int dest,
2688     int tag, MPI_Comm comm)
2689 {
2690 #ifndef CMK_OPTIMIZE
2691   int ret;
2692   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2693   if(ret != MPI_SUCCESS)
2694     return ret;
2695 #endif
2696
2697   AMPIAPI("AMPI_Ssend");
2698 #if AMPIMSGLOG
2699   if(msgLogRead){
2700     return 0;
2701   }
2702 #endif
2703
2704   ampi *ptr = getAmpiInstance(comm);
2705 #if AMPI_COMLIB
2706   if(enableStreaming){
2707     ptr->getStreaming().beginIteration();
2708     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2709   } else
2710 #endif
2711     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm, 1);
2712 #if AMPI_COUNTER
2713   getAmpiParent()->counters.send++;
2714 #endif
2715
2716   return 0;
2717 }
2718
2719   CDECL
2720 int AMPI_Issend(void *buf, int count, MPI_Datatype type, int dest,
2721     int tag, MPI_Comm comm, MPI_Request *request)
2722 {
2723   AMPIAPI("AMPI_Issend");
2724
2725 #ifndef CMK_OPTIMIZE
2726   int ret;
2727   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
2728   if(ret != MPI_SUCCESS)
2729   {
2730     *request = MPI_REQUEST_NULL;
2731     return ret;
2732   }
2733 #endif
2734
2735 #if AMPIMSGLOG
2736   ampiParent* pptr = getAmpiParent();
2737   if(msgLogRead){
2738     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
2739     return 0;
2740   }
2741 #endif
2742
2743   USER_CALL_DEBUG("AMPI_Issend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
2744   ampi *ptr = getAmpiInstance(comm);
2745   AmpiRequestList* reqs = getReqs();
2746   SReq *newreq = new SReq(comm);
2747   *request = reqs->insert(newreq);
2748   // 1:  blocking now  - used by MPI_Ssend
2749   // >=2:  the index of the requests - used by MPI_Issend
2750   ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm, *request+2);
2751 #if AMPI_COUNTER
2752   getAmpiParent()->counters.isend++;
2753 #endif
2754
2755 #if AMPIMSGLOG
2756   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2757     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
2758   }
2759 #endif
2760
2761   return 0;
2762 }
2763
2764   CDECL
2765 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
2766     MPI_Comm comm, MPI_Status *status)
2767 {
2768   AMPIAPI("AMPI_Recv");
2769
2770 #ifndef CMK_OPTIMIZE
2771   int ret;
2772   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, msg, 1);
2773   if(ret != MPI_SUCCESS)
2774     return ret;
2775 #endif
2776
2777 #if AMPIMSGLOG
2778   ampiParent* pptr = getAmpiParent();
2779   if(msgLogRead){
2780     (*(pptr->fromPUPer))|(pptr->pupBytes);
2781     PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
2782     PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
2783     return 0;
2784   }
2785 #endif
2786
2787   ampi *ptr = getAmpiInstance(comm);
2788   if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
2789
2790 #if AMPI_COUNTER
2791   getAmpiParent()->counters.recv++;
2792 #endif
2793
2794 #if AMPIMSGLOG
2795   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2796     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2797     (*(pptr->toPUPer))|(pptr->pupBytes);
2798     PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
2799     PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
2800   }
2801 #endif
2802
2803   return 0;
2804 }
2805
2806   CDECL
2807 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
2808 {
2809
2810 #ifndef CMK_OPTIMIZE
2811   int ret;
2812   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2813   if(ret != MPI_SUCCESS)
2814     return ret;
2815 #endif
2816
2817   AMPIAPI("AMPI_Probe");
2818   ampi *ptr = getAmpiInstance(comm);
2819   ptr->probe(tag,src, comm, (int*) status);
2820   return 0;
2821 }
2822
2823   CDECL
2824 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
2825 {
2826   AMPIAPI("AMPI_Iprobe");
2827
2828 #ifndef CMK_OPTIMIZE
2829   int ret;
2830   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2831   if(ret != MPI_SUCCESS)
2832     return ret;
2833 #endif
2834
2835   ampi *ptr = getAmpiInstance(comm);
2836   *flag = ptr->iprobe(tag,src,comm,(int*) status);
2837   return 0;
2838 }
2839
2840   CDECL
2841 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
2842     int stag, void *rbuf, int rcount, int rtype,
2843     int src, int rtag, MPI_Comm comm, MPI_Status *sts)
2844 {
2845   AMPIAPI("AMPI_Sendrecv");
2846
2847 #ifndef CMK_OPTIMIZE
2848   int ret;
2849   ret = errorCheck(comm, 1, scount, 1, stype, 1, stag, 1, dest, 1, sbuf, 1);
2850   if(ret != MPI_SUCCESS)
2851     return ret;
2852   ret = errorCheck(comm, 1, rcount, 1, rtype, 1, rtag, 1, src, 1, rbuf, 1);
2853   if(ret != MPI_SUCCESS)
2854     return ret;
2855 #endif
2856
2857   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
2858   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
2859   if (se) return se;
2860   else return re;
2861 }
2862
2863   CDECL
2864 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
2865     int dest, int sendtag, int source, int recvtag,
2866     MPI_Comm comm, MPI_Status *status)
2867 {
2868   AMPIAPI("AMPI_Sendrecv_replace");
2869   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
2870       buf, count, datatype, source, recvtag, comm, status);
2871 }
2872
2873
2874   CDECL
2875 int AMPI_Barrier(MPI_Comm comm)
2876 {
2877   AMPIAPI("AMPI_Barrier");
2878
2879 #ifndef CMK_OPTIMIZE
2880   if(checkCommunicator(comm) != MPI_SUCCESS)
2881     return checkCommunicator(comm);
2882 #endif
2883
2884   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
2885
2886   TRACE_BG_AMPI_LOG(MPI_BARRIER, 0);
2887
2888   //HACK: Use collective operation as a barrier.
2889   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
2890
2891   //BIGSIM_OOC DEBUGGING
2892   //CkPrintf("%d: in AMPI_Barrier, after AMPI_Allreduce\n", getAmpiParent()->thisIndex);
2893 #if AMPI_COUNTER
2894   getAmpiParent()->counters.barrier++;
2895 #endif
2896   return 0;
2897 }
2898
2899   CDECL
2900 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
2901     MPI_Comm comm)
2902 {
2903   AMPIAPI("AMPI_Bcast");
2904
2905 #ifndef CMK_OPTIMIZE
2906   int ret;
2907   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, buf, 1);
2908   if(ret != MPI_SUCCESS)
2909     return ret;
2910 #endif
2911
2912   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
2913   if(comm==MPI_COMM_SELF) return 0;
2914
2915 #if AMPIMSGLOG
2916   ampiParent* pptr = getAmpiParent();
2917   if(msgLogRead){
2918     (*(pptr->fromPUPer))|(pptr->pupBytes);
2919     PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
2920     return 0;
2921   }
2922 #endif
2923
2924   ampi* ptr = getAmpiInstance(comm);
2925   ptr->bcast(root, buf, count, type,comm);
2926 #if AMPI_COUNTER
2927   getAmpiParent()->counters.bcast++;
2928 #endif
2929
2930 #if AMPIMSGLOG
2931   if(msgLogWrite && record_msglog(pptr->thisIndex)) {
2932     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2933     (*(pptr->toPUPer))|(pptr->pupBytes);
2934     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
2935   }
2936 #endif
2937 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2938   //  ptr->yield();
2939   //  //  processRemoteMlogMessages();
2940 #endif
2941
2942   return 0;
2943 }
2944
2945 /// This routine is called with the results of a Reduce or AllReduce
2946 const int MPI_REDUCE_SOURCE=0;
2947 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2948 void ampi::reduceResult(CkReductionMsg *msg)
2949 {
2950   MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2951   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2952       thisArrayID,thisIndex);
2953   delete msg;
2954 }
2955
2956 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2957 {
2958   int szdata = ddt->getSize(count);
2959   int szhdr = sizeof(AmpiOpHeader);
2960   AmpiOpHeader newhdr(op,type,count,szdata); 
2961   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2962   memcpy(msg->getData(),&newhdr,szhdr);
2963   if (count > 0) {
2964     TCharm::activateVariable(inbuf);
2965     ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2966     TCharm::deactivateVariable(inbuf);
2967   }
2968   return msg;
2969 }
2970
2971 // Copy the MPI datatype "type" from inbuf to outbuf
2972 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2973   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2974   //  deserializing into the output.
2975   ampi *ptr = getAmpiInstance(comm);
2976   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2977   int len=ddt->getSize(count);
2978   char *serialized=new char[len];
2979   TCharm::activateVariable(inbuf);
2980   TCharm::activateVariable(outbuf);
2981   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
2982   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
2983   TCharm::deactivateVariable(outbuf);
2984   TCharm::deactivateVariable(inbuf);
2985   delete [] serialized;         // < memory leak!  // gzheng 
2986
2987   return MPI_SUCCESS;
2988 }
2989
2990 #define SYNCHRONOUS_REDUCE                           0
2991
2992   CDECL
2993 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
2994     int root, MPI_Comm comm)
2995 {
2996   AMPIAPI("AMPI_Reduce");
2997
2998 #ifndef CMK_OPTIMIZE
2999   int ret;
3000   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, 0, 0);
3001   if(ret != MPI_SUCCESS)
3002     return ret;
3003 #endif
3004
3005   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3006
3007 #if AMPIMSGLOG
3008   ampiParent* pptr = getAmpiParent();
3009   if(msgLogRead){
3010     (*(pptr->fromPUPer))|(pptr->pupBytes);
3011     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
3012     return 0;
3013   }
3014 #endif
3015
3016   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
3017   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
3018   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
3019
3020   ampi *ptr = getAmpiInstance(comm);
3021   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
3022   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
3023   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
3024
3025   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3026
3027   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
3028   msg->setCallback(reduceCB);
3029   MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
3030   ptr->contribute(msg);
3031
3032   if (ptr->thisIndex == rootIdx){
3033     /*HACK: Use recv() to block until reduction data comes back*/
3034     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3035       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
3036
3037 #if SYNCHRONOUS_REDUCE
3038       AmpiMsg *msg = new (0, 0) AmpiMsg(-1, MPI_REDUCE_TAG, -1, rootIdx, 0, MPI_REDUCE_COMM);
3039       CProxy_ampi pa(ptr->getProxy());
3040       pa.generic(msg);
3041 #endif
3042   }
3043 #if SYNCHRONOUS_REDUCE
3044   ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, NULL, 0, type, MPI_REDUCE_COMM);
3045 #endif
3046
3047 #if AMPI_COUNTER
3048   getAmpiParent()->counters.reduce++;
3049 #endif
3050
3051 #if AMPIMSGLOG
3052   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3053     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3054     (*(pptr->toPUPer))|(pptr->pupBytes);
3055     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3056   }
3057 #endif
3058
3059   return 0;
3060 }
3061
3062   CDECL
3063 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
3064     MPI_Op op, MPI_Comm comm)
3065 {
3066   AMPIAPI("AMPI_Allreduce");
3067
3068 #ifndef CMK_OPTIMIZE
3069   int ret;
3070   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, 0, 0);
3071   if(ret != MPI_SUCCESS)
3072     return ret;
3073 #endif
3074
3075   ampi *ptr = getAmpiInstance(comm);
3076
3077   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
3078   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
3079   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
3080
3081   CkDDT_DataType *ddt_type = ptr->getDDT()->getType(type);
3082
3083 #if CMK_BIGSIM_CHARM
3084   TRACE_BG_AMPI_LOG(MPI_ALLREDUCE, ddt_type->getSize(count));
3085 #endif
3086
3087   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3088
3089 #if AMPIMSGLOG
3090   ampiParent* pptr = getAmpiParent();
3091   if(msgLogRead){
3092     (*(pptr->fromPUPer))|(pptr->pupBytes);
3093     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
3094     //    CkExit();
3095     return 0;
3096   }
3097 #endif
3098
3099   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
3100   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
3101
3102   CkReductionMsg *msg=makeRednMsg(ddt_type, inbuf, count, type, op);
3103   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3104   msg->setCallback(allreduceCB);
3105   ptr->contribute(msg);
3106
3107   /*HACK: Use recv() to block until the reduction data comes back*/
3108   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3109     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
3110 #if AMPI_COUNTER
3111   getAmpiParent()->counters.allreduce++;
3112 #endif
3113
3114 #if AMPIMSGLOG
3115   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3116     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3117     (*(pptr->toPUPer))|(pptr->pupBytes);
3118     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3119     //    CkExit();
3120   }
3121 #endif
3122
3123   return 0;
3124 }
3125
3126   CDECL
3127 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
3128     MPI_Op op, MPI_Comm comm, MPI_Request* request)
3129 {
3130   AMPIAPI("AMPI_Iallreduce");
3131
3132 #ifndef CMK_OPTIMIZE
3133   int ret;
3134   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, inbuf, 1);
3135   if(ret != MPI_SUCCESS)
3136   {
3137     *request = MPI_REQUEST_NULL;
3138     return ret;
3139   }
3140 #endif
3141
3142   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3143
3144   checkRequest(*request);
3145   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
3146   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
3147   ampi *ptr = getAmpiInstance(comm);
3148
3149   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3150   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3151   msg->setCallback(allreduceCB);
3152   ptr->contribute(msg);
3153
3154   // using irecv instead recv to non-block the call and get request pointer
3155   AmpiRequestList* reqs = getReqs();
3156   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
3157   *request = reqs->insert(newreq);
3158   return 0;
3159 }
3160
3161   CDECL
3162 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
3163     MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
3164 {
3165   AMPIAPI("AMPI_Reduce_scatter");
3166
3167 #ifndef CMK_OPTIMIZE
3168   int ret;
3169   ret = errorCheck(comm, 1, 0, 0, datatype, 1, 0, 0, 0, 0, sendbuf, 1);
3170   if(ret != MPI_SUCCESS)
3171     return ret;
3172 #endif
3173
3174   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
3175   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
3176   ampi *ptr = getAmpiInstance(comm);
3177   int size = ptr->getSize(comm);
3178   int count=0;
3179   int *displs = new int [size];
3180   int len;
3181   void *tmpbuf;
3182
3183   //under construction
3184   for(int i=0;i<size;i++){
3185     displs[i] = count;
3186     count+= recvcounts[i];
3187   }
3188   len = ptr->getDDT()->getType(datatype)->getSize(count);
3189   tmpbuf = malloc(len);
3190   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
3191   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
3192       recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
3193   free(tmpbuf);
3194   delete [] displs;     // < memory leak ! // gzheng
3195   return 0;
3196 }
3197
3198 /***** MPI_Scan algorithm (from MPICH) *******
3199   recvbuf = sendbuf;
3200   partial_scan = sendbuf;
3201   mask = 0x1;
3202   while (mask < size) {
3203   dst = rank^mask;
3204   if (dst < size) {
3205   send partial_scan to dst;
3206   recv from dst into tmp_buf;
3207   if (rank > dst) {
3208   partial_scan = tmp_buf + partial_scan;
3209   recvbuf = tmp_buf + recvbuf;
3210   }
3211   else {
3212   if (op is commutative)
3213   partial_scan = tmp_buf + partial_scan;
3214   else {
3215   tmp_buf = partial_scan + tmp_buf;
3216   partial_scan = tmp_buf;
3217   }
3218   }
3219   }
3220   mask <<= 1;
3221   }
3222  ***** MPI_Scan algorithm (from MPICH) *******/
3223
3224 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
3225   (op)(invec,inoutvec,&count,&datatype);
3226 }
3227 CDECL
3228 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
3229   AMPIAPI("AMPI_Scan");
3230
3231 #ifndef CMK_OPTIMIZE
3232   int ret;
3233   ret = errorCheck(comm, 1, count, 1, datatype, 1, 0, 0, 0, 0, sendbuf, 1);
3234   if(ret != MPI_SUCCESS)
3235     return ret;
3236 #endif
3237
3238   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
3239   MPI_Status sts;
3240   ampi *ptr = getAmpiInstance(comm);
3241   int size = ptr->getSize(comm);
3242   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
3243   int rank = ptr->getRank(comm);
3244   int mask = 0x1;
3245   int dst;
3246   void* tmp_buf = malloc(blklen);
3247   void* partial_scan = malloc(blklen);
3248
3249   memcpy(recvbuf, sendbuf, blklen);
3250   memcpy(partial_scan, sendbuf, blklen);
3251   while(mask < size){
3252     dst = rank^mask;
3253     if(dst < size){
3254       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
3255           tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
3256       if(rank > dst){
3257         (op)(tmp_buf,partial_scan,&count,&datatype);
3258         (op)(tmp_buf,recvbuf,&count,&datatype);
3259       }else {
3260         (op)(partial_scan,tmp_buf,&count,&datatype);
3261         memcpy(partial_scan,tmp_buf,blklen);
3262       }
3263     }
3264     mask <<= 1;
3265
3266   }
3267
3268   free(tmp_buf);
3269   free(partial_scan);
3270 #if AMPI_COUNTER
3271   getAmpiParent()->counters.scan++;
3272 #endif
3273   return 0;
3274 }
3275
3276 CDECL
3277 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
3278   //AMPIAPI("AMPI_Op_create");
3279   *op = function;
3280   return 0;
3281 }
3282
3283 CDECL
3284 int AMPI_Op_free(MPI_Op *op){
3285   //AMPIAPI("AMPI_Op_free");
3286   *op = MPI_OP_NULL;
3287   return 0;
3288 }
3289
3290
3291   CDECL
3292 double AMPI_Wtime(void)
3293 {
3294   //  AMPIAPI("AMPI_Wtime");
3295
3296 #if AMPIMSGLOG
3297   double ret=TCHARM_Wall_timer();
3298   ampiParent* pptr = getAmpiParent();
3299   if(msgLogRead){
3300     (*(pptr->fromPUPer))|ret;
3301     return ret;
3302   }
3303
3304   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3305     (*(pptr->toPUPer))|ret;
3306   }
3307 #endif
3308
3309 #if CMK_BIGSIM_CHARM
3310   return BgGetTime();
3311 #else
3312   return TCHARM_Wall_timer();
3313 #endif
3314 }
3315
3316 CDECL
3317 double AMPI_Wtick(void){
3318   //AMPIAPI("AMPI_Wtick");
3319   return 1e-6;
3320 }
3321
3322
3323 int PersReq::start(){
3324   if(sndrcv == 1 || sndrcv == 3) { // send or ssend request
3325     ampi *ptr=getAmpiInstance(comm);
3326     ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm, sndrcv==3?1:0);
3327   }
3328   return 0;
3329 }
3330
3331   CDECL
3332 int AMPI_Start(MPI_Request *request)
3333 {
3334   AMPIAPI("AMPI_Start");
3335   checkRequest(*request);
3336   AmpiRequestList *reqs = getReqs();
3337   if(-1==(*reqs)[*request]->start()) {
3338     CkAbort("MPI_Start could be used only on persistent communication requests!");
3339   }
3340   return 0;
3341 }
3342
3343 CDECL
3344 int AMPI_Startall(int count, MPI_Request *requests){
3345   AMPIAPI("AMPI_Startall");
3346   checkRequests(count,requests);
3347   AmpiRequestList *reqs = getReqs();
3348   for(int i=0;i<count;i++){
3349     if(-1==(*reqs)[requests[i]]->start())
3350       CkAbort("MPI_Start could be used only on persistent communication requests!");
3351   }
3352   return 0;
3353 }
3354
3355 /* organize the indices of requests into a vector of a vector: 
3356  * level 1 is different msg envelope matches
3357  * level 2 is (posting) ordered requests of with envelope
3358  * each time multiple completion call loop over first elem of level 1
3359  * and move the matched to the NULL request slot.   
3360  * warning: this does not work with I-Alltoall requests */
3361 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
3362   for(int i=0;i<count;i++){
3363     if(reqs[i]!=MPI_REQUEST_NULL)
3364       return 0;
3365   }
3366   return 1;
3367 }
3368 inline int matchReq(MPI_Request ia, MPI_Request ib){
3369   checkRequest(ia);  
3370   checkRequest(ib);
3371   AmpiRequestList* reqs = getReqs();
3372   AmpiRequest *a, *b;
3373   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
3374   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
3375   a=(*reqs)[ia];  b=(*reqs)[ib];
3376   if(a->tag != b->tag) return 0;
3377   if(a->src != b->src) return 0;
3378   if(a->comm != b->comm) return 0;
3379   return 1;
3380 }
3381 inline void swapInt(int& a,int& b){
3382   int tmp;
3383   tmp=a; a=b; b=tmp;
3384 }
3385 inline void sortedIndex(int n, int* arr, int* idx){
3386   int i,j;
3387   for(i=0;i<n;i++) 
3388     idx[i]=i;
3389   for (i=0; i<n-1; i++) 
3390     for (j=0; j<n-1-i; j++)
3391       if (arr[idx[j+1]] < arr[idx[j]]) 
3392         swapInt(idx[j+1],idx[j]);
3393 }
3394 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
3395   CkAssert(count!=0);
3396   int *newidx = new int [count];
3397   int flag;
3398   sortedIndex(count,arr,newidx);
3399   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
3400   CkVec<int> slot;
3401   vec->push_back(slot);
3402   (*vec)[0].push_back(newidx[0]);
3403   for(int i=1;i<count;i++){
3404     flag=0;
3405     for(int j=0;j<vec->size();j++){
3406       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
3407         ((*vec)[j]).push_back(newidx[i]);
3408         flag++;
3409       }
3410     }
3411     if(!flag){
3412       CkVec<int> newslot;
3413       newslot.push_back(newidx[i]);
3414       vec->push_back(newslot);
3415     }else{
3416       CkAssert(flag==1);
3417     }
3418   }
3419   delete [] newidx;
3420   return vec;
3421 }
3422 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
3423   printf("vec content: ");
3424   for(int i=0;i<vec.size();i++){
3425     printf("{");
3426     for(int j=0;j<(vec[i]).size();j++){
3427       printf(" %d ",arr[(vec[i])[j]]);
3428     }
3429     printf("} ");
3430   }
3431   printf("\n");
3432 }
3433
3434 int PersReq::wait(MPI_Status *sts){
3435   if(sndrcv == 2) {
3436     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3437       CkAbort("AMPI> Error in persistent request wait");
3438 #if CMK_BIGSIM_CHARM
3439     _TRACE_BG_TLINE_END(&event);
3440 #endif
3441   }
3442   return 0;
3443 }
3444
3445 int IReq::wait(MPI_Status *sts){
3446   if(CpvAccess(CmiPICMethod) == 2) {
3447     AMPI_DEBUG("In weird clause of IReq::wait\n");
3448     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3449       CkAbort("AMPI> Error in non-blocking request wait");
3450
3451     return 0;
3452   }
3453
3454   //Copy "this" to a local variable in the case that "this" pointer
3455   //is updated during the out-of-core emulation.
3456
3457   // optimization for Irecv
3458   // generic() writes directly to the buffer, so the only thing we
3459   // do here is to wait
3460   ampi *ptr = getAmpiInstance(comm);
3461
3462   //BIGSIM_OOC DEBUGGING
3463   //int ooccnt=0;
3464   //int ampiIndex = ptr->thisIndex;
3465   //CmiPrintf("%d: IReq's status=%d\n", ampiIndex, statusIreq);
3466
3467   while (statusIreq == false) {
3468     //BIGSIM_OOC DEBUGGING
3469     //CmiPrintf("Before blocking: %dth time: %d: in Ireq::wait\n", ++ooccnt, ptr->thisIndex);
3470     //print();
3471
3472     ptr->resumeOnRecv=true;
3473     ptr->block();
3474
3475     //BIGSIM_OOC DEBUGGING
3476     //CmiPrintf("[%d] After blocking: in Ireq::wait\n", ptr->thisIndex);
3477     //CmiPrintf("IReq's this pointer: %p\n", this);
3478     //print();
3479
3480 #if CMK_BIGSIM_CHARM
3481     //Because of the out-of-core emulation, this pointer is changed after in-out
3482     //memory operation. So we need to return from this function and do the while loop
3483     //in the outer function call.       
3484     if(_BgInOutOfCoreMode)
3485       return -1;
3486 #endif  
3487   }   // end of while
3488   ptr->resumeOnRecv=false;
3489
3490   AMPI_DEBUG("IReq::wait has resumed\n");
3491
3492   if(sts) {
3493     AMPI_DEBUG("Setting sts->MPI_TAG to this->tag=%d in IReq::wait  this=%p\n", (int)this->tag, this);
3494     sts->MPI_TAG = tag;
3495     sts->MPI_SOURCE = src;
3496     sts->MPI_COMM = comm;
3497     sts->MPI_LENGTH = length;
3498   }
3499
3500   return 0;
3501 }
3502
3503 int ATAReq::wait(MPI_Status *sts){
3504   int i;
3505   for(i=0;i<count;i++){
3506     if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3507           myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
3508       CkAbort("AMPI> Error in alltoall request wait");
3509 #if CMK_BIGSIM_CHARM
3510     _TRACE_BG_TLINE_END(&myreqs[i].event);
3511 #endif
3512   }
3513 #if CMK_BIGSIM_CHARM
3514   //TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", NULL, 0);
3515   TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0, 1);
3516   for (i=0; i<count; i++)
3517     _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
3518   _TRACE_BG_TLINE_END(&event);
3519 #endif
3520   return 0;
3521 }
3522
3523 int SReq::wait(MPI_Status *sts){
3524   ampi *ptr = getAmpiInstance(comm);
3525   while (statusIreq == false) {
3526     ptr->resumeOnRecv = true;
3527     ptr->block();
3528     ptr = getAmpiInstance(comm);
3529     ptr->resumeOnRecv = false;
3530   }
3531   return 0;
3532 }
3533
3534   CDECL
3535 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
3536 {
3537   AMPIAPI("AMPI_Wait");
3538   if(*request == MPI_REQUEST_NULL){
3539     stsempty(*sts);
3540     return 0;
3541   }
3542   checkRequest(*request);
3543   AmpiRequestList* reqs = getReqs();
3544
3545 #if AMPIMSGLOG
3546   ampiParent* pptr = getAmpiParent();
3547   if(msgLogRead){
3548     (*(pptr->fromPUPer))|(pptr->pupBytes);
3549     PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3550     PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
3551     return 0;
3552   }
3553 #endif
3554
3555   AMPI_DEBUG("AMPI_Wait request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3556   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3557   //(*reqs)[*request]->wait(sts);
3558   int waitResult = -1;
3559   do{
3560     AmpiRequest *waitReq = (*reqs)[*request];
3561     waitResult = waitReq->wait(sts);
3562     if(_BgInOutOfCoreMode){
3563       reqs = getReqs();
3564     }
3565   }while(waitResult==-1);
3566
3567
3568   AMPI_DEBUG("AMPI_Wait after calling wait, request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3569
3570
3571 #if AMPIMSGLOG
3572   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3573     (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
3574     (*(pptr->toPUPer))|(pptr->pupBytes);
3575     PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3576     PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
3577   }
3578 #endif
3579
3580   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3581     reqs->free(*request);
3582     *request = MPI_REQUEST_NULL;
3583   }
3584
3585   AMPI_DEBUG("End of AMPI_Wait\n");
3586
3587   return 0;
3588 }
3589
3590   CDECL
3591 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
3592 {
3593   AMPIAPI("AMPI_Waitall");
3594   if(count==0) return MPI_SUCCESS;
3595   checkRequests(count,request);
3596   int i,j,oldPe;
3597   AmpiRequestList* reqs = getReqs();
3598   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3599
3600 #if AMPIMSGLOG
3601   ampiParent* pptr = getAmpiParent();
3602   if(msgLogRead){
3603     for(i=0;i<reqvec->size();i++){
3604       for(j=0;j<((*reqvec)[i]).size();j++){
3605         if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3606           stsempty(sts[((*reqvec)[i])[j]]);
3607           continue;
3608         }
3609         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3610         (*(pptr->fromPUPer))|(pptr->pupBytes);
3611         PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3612         PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3613       }
3614     }
3615     return 0;
3616   }
3617 #endif
3618
3619 #if CMK_BIGSIM_CHARM
3620   void *curLog;         // store current log in timeline
3621   _TRACE_BG_TLINE_END(&curLog);
3622 #if 0
3623   TRACE_BG_AMPI_SUSPEND();
3624 #endif
3625 #endif
3626   for(i=0;i<reqvec->size();i++){
3627     for(j=0;j<((*reqvec)[i]).size();j++){
3628       //CkPrintf("[%d] in loop [%d, %d]\n", pptr->thisIndex,i, j);
3629       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3630         stsempty(sts[((*reqvec)[i])[j]]);
3631         continue;
3632       }
3633       oldPe = CkMyPe();
3634
3635       int waitResult = -1;
3636       do{       
3637         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3638         waitResult = waitReq->wait(&sts[((*reqvec)[i])[j]]);
3639         if(_BgInOutOfCoreMode){
3640           reqs = getReqs();
3641           reqvec = vecIndex(count, request);
3642         }
3643
3644 #if AMPIMSGLOG
3645         if(msgLogWrite && record_msglog(pptr->thisIndex)){
3646           (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
3647           (*(pptr->toPUPer))|(pptr->pupBytes);
3648           PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3649           PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3650         }
3651 #endif
3652
3653       }while(waitResult==-1);
3654
3655 #if 1
3656 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3657       //for fault evacuation
3658       if(oldPe != CkMyPe()){
3659 #endif
3660         reqs = getReqs();
3661         reqvec  = vecIndex(count,request);
3662 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3663       }
3664 #endif
3665 #endif
3666     }
3667   }
3668 #if CMK_BIGSIM_CHARM
3669   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
3670 #endif
3671   // free memory of requests
3672   for(i=0;i<count;i++){ 
3673     if(request[i] == MPI_REQUEST_NULL)
3674       continue;
3675     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
3676       reqs->free(request[i]);
3677       request[i] = MPI_REQUEST_NULL;
3678     }
3679   }
3680   delete reqvec;
3681   return 0;
3682 }
3683
3684   CDECL
3685 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
3686 {
3687   AMPIAPI("AMPI_Waitany");
3688
3689   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
3690   if(count == 0) return MPI_SUCCESS;
3691   checkRequests(count,request);
3692   if(areInactiveReqs(count,request)){
3693     *idx=MPI_UNDEFINED;
3694     stsempty(*sts);
3695     return MPI_SUCCESS;
3696   }
3697   int flag=0;
3698   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3699   while(count>0){ /* keep looping until some request finishes: */
3700     for(int i=0;i<reqvec->size();i++){
3701       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
3702       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
3703         *idx = ((*reqvec)[i])[0];
3704         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
3705         return 0;
3706       }
3707     }
3708     /* no requests have finished yet-- schedule and try again */
3709     AMPI_Yield(MPI_COMM_WORLD);
3710   }
3711   *idx = MPI_UNDEFINED;
3712   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
3713   delete reqvec;
3714   return 0;
3715 }
3716
3717   CDECL
3718 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
3719     int *array_of_indices, MPI_Status *array_of_statuses)
3720 {
3721   AMPIAPI("AMPI_Waitsome");
3722   checkRequests(incount,array_of_requests);
3723   if(areInactiveReqs(incount,array_of_requests)){
3724     *outcount=MPI_UNDEFINED;
3725     return MPI_SUCCESS;
3726   }
3727   MPI_Status sts;
3728   int i;
3729   int flag=0, realflag=0;
3730   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3731   *outcount = 0;
3732   while(1){
3733     for(i=0;i<reqvec->size();i++){
3734       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3735       if(flag == 1){ 
3736         array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3737         array_of_statuses[(*outcount)++]=sts;
3738         if(sts.MPI_COMM != 0)
3739           realflag=1; // there is real(non null) request
3740       }
3741     }
3742     if(realflag && outcount>0) break;
3743   }
3744   delete reqvec;
3745   return 0;
3746 }
3747
3748   CmiBool PersReq::test(MPI_Status *sts){
3749     if(sndrcv == 2)     // recv request
3750       return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3751     else                        // send request
3752       return 1;
3753
3754   }
3755   void PersReq::complete(MPI_Status *sts){
3756     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3757       CkAbort("AMPI> Error in persistent request complete");
3758   }
3759
3760 CmiBool IReq::test(MPI_Status *sts){
3761   if (statusIreq == true) {           
3762     if(sts)
3763       sts->MPI_LENGTH = length;