AMPI: Widen int before casting it to a pointer (silence warning)
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1
2 #define AMPIMSGLOG    0
3
4 #define exit exit /*Supress definition of exit in ampi.h*/
5 #include "ampiimpl.h"
6 #include "tcharm.h"
7 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
8 #include "ampiEvents.h" /*** for trace generation for projector *****/
9 #include "ampiProjections.h"
10 #endif
11
12 #if CMK_BIGSIM_CHARM
13 #include "bigsim_logs.h"
14 #endif
15
16 #define CART_TOPOL 1
17 #define AMPI_PRINT_IDLE 0
18
19 /* change this define to "x" to trace all send/recv's */
20 #define MSG_ORDER_DEBUG(x)  //x /* empty */
21 /* change this define to "x" to trace user calls */
22 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
23 #define STARTUP_DEBUG(x)  //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
24 #define FUNCCALL_DEBUG(x) //x /* empty */
25
26 static CkDDT *getDDT(void) {
27   return getAmpiParent()->myDDT;
28 }
29
30 inline int errorCheck(MPI_Comm comm, int ifComm, int count, int ifCount, 
31     MPI_Datatype data, int ifData, int tag, int ifTag, 
32     int rank, int ifRank, void *buf, int ifBuf);
33
34   inline int checkCommunicator(MPI_Comm comm) {
35     if(comm == MPI_COMM_NULL)
36       return MPI_ERR_COMM;
37     return MPI_SUCCESS;
38   }
39
40   inline int checkCount(int count) {
41     if(count < 0)
42       return MPI_ERR_COUNT;
43     return MPI_SUCCESS;
44   }
45
46   inline int checkData(MPI_Datatype data) {
47     if(data == MPI_DATATYPE_NULL)
48       return MPI_ERR_TYPE;
49     return MPI_SUCCESS;
50   }
51
52   inline int checkTag(int tag) {
53     if(tag != MPI_ANY_TAG && tag < 0)
54       return MPI_ERR_TAG;
55     return MPI_SUCCESS;
56   }
57
58 inline int checkRank(int rank, MPI_Comm comm) {
59   int size;
60   AMPI_Comm_size(comm, &size);
61   if(((rank >= 0) && (rank < size)) || (rank == MPI_ANY_SOURCE) || (rank ==
62         MPI_PROC_NULL))
63     return MPI_SUCCESS;
64   return MPI_ERR_RANK;
65 }
66
67   inline int checkBuf(void *buf, int count) {
68     if((count != 0 && buf == NULL))
69       return MPI_ERR_BUFFER;
70     return MPI_SUCCESS;
71   }
72
73 inline int errorCheck(MPI_Comm comm, int ifComm, int count, int ifCount,
74     MPI_Datatype data, int ifData, int tag, int ifTag,
75     int rank, int ifRank, void *buf, int ifBuf) {
76   int ret;
77   if(ifComm) { 
78     ret = checkCommunicator(comm);
79     if(ret != MPI_SUCCESS)
80       return ret;
81   }
82   if(ifCount) {
83     ret = checkCount(count);
84     if(ret != MPI_SUCCESS)
85       return ret;
86   }
87   if(ifData) {
88     ret = checkData(data);
89     if(ret != MPI_SUCCESS)
90       return ret;
91   }
92   if(ifTag) {
93     ret = checkTag(tag);
94     if(ret != MPI_SUCCESS)
95       return ret;
96   }
97   if(ifRank) {
98     ret = checkRank(rank,comm);
99     if(ret != MPI_SUCCESS)
100       return ret;
101   }
102   if(ifBuf) {
103     ret = checkBuf(buf,count);
104     if(ret != MPI_SUCCESS)
105       return ret;
106   }
107   return MPI_SUCCESS;
108 }
109
110 //------------- startup -------------
111 static mpi_comm_worlds mpi_worlds;
112
113 int _mpi_nworlds; /*Accessed by ampif*/
114 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
115
116 /* ampiReducer: AMPI's generic reducer type 
117    MPI_Op is function pointer to MPI_User_function
118    so that it can be packed into AmpiOpHeader, shipped 
119    with the reduction message, and then plugged into 
120    the ampiReducer. 
121    One little trick is the ampi::recv which receives
122    the final reduction message will see additional
123    sizeof(AmpiOpHeader) bytes in the buffer before
124    any user data.                             */
125 class AmpiComplex { 
126   public: 
127     double re, im; 
128     void operator+=(const AmpiComplex &a) {
129       re+=a.re;
130       im+=a.im;
131     }
132     void operator*=(const AmpiComplex &a) {
133       double nu_re=re*a.re-im*a.im;
134       im=re*a.im+im*a.re;
135       re=nu_re;
136     }
137     int operator>(const AmpiComplex &a) {
138       CkAbort("Cannot compare complex numbers with MPI_MAX");
139       return 0;
140     }
141     int operator<(const AmpiComplex &a) {
142       CkAbort("Cannot compare complex numbers with MPI_MIN");
143       return 0;
144     }
145 };
146 typedef struct { float val; int idx; } FloatInt;
147 typedef struct { double val; int idx; } DoubleInt;
148 typedef struct { long val; int idx; } LongInt;
149 typedef struct { int val; int idx; } IntInt;
150 typedef struct { short val; int idx; } ShortInt;
151 typedef struct { long double val; int idx; } LongdoubleInt;
152 typedef struct { float val; float idx; } FloatFloat;
153 typedef struct { double val; double idx; } DoubleDouble;
154
155
156 #define MPI_OP_SWITCH(OPNAME) \
157   int i; \
158 switch (*datatype) { \
159   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
160   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
161   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
162   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
163   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
164   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
165   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
166   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
167   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
168   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
169   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
170   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
171   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
172   default: \
173            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
174   CmiAbort("Unsupported MPI datatype for MPI Op"); \
175 };\
176
177 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
178 #define MPI_OP_IMPL(type) \
179   if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
180   MPI_OP_SWITCH(MPI_MAX)
181 #undef MPI_OP_IMPL
182 }
183
184 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
185 #define MPI_OP_IMPL(type) \
186   if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
187   MPI_OP_SWITCH(MPI_MIN)
188 #undef MPI_OP_IMPL
189 }
190
191 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
192 #define MPI_OP_IMPL(type) \
193   ((type *)inoutvec)[i] += ((type *)invec)[i];
194   MPI_OP_SWITCH(MPI_SUM)
195 #undef MPI_OP_IMPL
196 }
197
198 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
199 #define MPI_OP_IMPL(type) \
200   ((type *)inoutvec)[i] *= ((type *)invec)[i];
201   MPI_OP_SWITCH(MPI_PROD)
202 #undef MPI_OP_IMPL
203 }
204
205 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
206   int i;  
207   switch (*datatype) {
208     case MPI_INT:
209     case MPI_LOGICAL:
210       for(i=0;i<(*len);i++)
211         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
212       break;
213     default:
214       ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
215       CmiAbort("exiting");
216   }
217 }
218 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
219   int i; 
220   switch (*datatype) {
221     case MPI_INT:
222       for(i=0;i<(*len);i++)
223         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
224       break;
225     case MPI_BYTE:
226       for(i=0;i<(*len);i++)
227         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
228       break;
229     default:
230       ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
231       CmiAbort("exiting");
232   }
233 }
234 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
235   int i;  
236   switch (*datatype) {
237     case MPI_INT:
238     case MPI_LOGICAL:
239       for(i=0;i<(*len);i++)
240         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
241       break;
242     default:
243       ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
244       CmiAbort("exiting");
245   }
246 }
247 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
248   int i;  
249   switch (*datatype) {
250     case MPI_INT:
251       for(i=0;i<(*len);i++)
252         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
253       break;
254     case MPI_BYTE:
255       for(i=0;i<(*len);i++)
256         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
257       break;
258     default:
259       ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
260       CmiAbort("exiting");
261   }
262 }
263 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
264   int i;  
265   switch (*datatype) {
266     case MPI_INT:
267     case MPI_LOGICAL:
268       for(i=0;i<(*len);i++)
269         ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
270       break;
271     default:
272       ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
273       CmiAbort("exiting");
274   }
275 }
276 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
277   int i;  
278   switch (*datatype) {
279     case MPI_INT:
280       for(i=0;i<(*len);i++)
281         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
282       break;
283     case MPI_BYTE:
284       for(i=0;i<(*len);i++)
285         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
286       break;
287     case MPI_UNSIGNED:
288       for(i=0;i<(*len);i++)
289         ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
290       break;
291     default:
292       ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
293       CmiAbort("exiting");
294   }
295 }
296
297 #ifndef MIN
298 #define MIN(a,b) (a < b ? a : b)
299 #endif
300
301 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
302   int i;  
303
304   switch (*datatype) {
305     case MPI_FLOAT_INT:
306       for(i=0;i<(*len);i++)
307         if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
308           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
309         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
310           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
311       break;
312     case MPI_DOUBLE_INT:
313       for(i=0;i<(*len);i++)
314         if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
315           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
316         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
317           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
318
319       break;
320     case MPI_LONG_INT:
321       for(i=0;i<(*len);i++)
322         if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
323           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
324         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
325           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
326       break;
327     case MPI_2INT:
328       for(i=0;i<(*len);i++)
329         if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
330           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
331         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
332           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
333       break;
334     case MPI_SHORT_INT:
335       for(i=0;i<(*len);i++)
336         if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
337           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
338         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
339           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
340       break;
341     case MPI_LONG_DOUBLE_INT:
342       for(i=0;i<(*len);i++)
343         if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
344           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
345         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
346           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
347       break;
348     case MPI_2FLOAT:
349       for(i=0;i<(*len);i++)
350         if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
351           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
352         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
353           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
354       break;
355     case MPI_2DOUBLE:
356       for(i=0;i<(*len);i++)
357         if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
358           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
359         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
360           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
361       break;
362     default:
363       ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
364       CmiAbort("exiting");
365   }
366 }
367 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
368   int i;  
369   switch (*datatype) {
370     case MPI_FLOAT_INT:
371       for(i=0;i<(*len);i++)
372         if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
373           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
374         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
375           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
376       break;
377     case MPI_DOUBLE_INT:
378       for(i=0;i<(*len);i++)
379         if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
380           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
381         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
382           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
383       break;
384     case MPI_LONG_INT:
385       for(i=0;i<(*len);i++)
386         if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
387           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
388         else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
389           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
390       break;
391     case MPI_2INT:
392       for(i=0;i<(*len);i++)
393         if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
394           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
395         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
396           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
397       break;
398     case MPI_SHORT_INT:
399       for(i=0;i<(*len);i++)
400         if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
401           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
402         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
403           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
404       break;
405     case MPI_LONG_DOUBLE_INT:
406       for(i=0;i<(*len);i++)
407         if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
408           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
409         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
410           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
411       break;
412     case MPI_2FLOAT:
413       for(i=0;i<(*len);i++)
414         if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
415           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
416         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
417           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
418       break;
419     case MPI_2DOUBLE:
420       for(i=0;i<(*len);i++)
421         if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
422           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
423         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
424           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
425       break;
426     default:
427       ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
428       CmiAbort("exiting");
429   }
430 }
431
432 // every msg contains a AmpiOpHeader structure before user data
433 // FIXME: non-commutative operations require messages be ordered by rank
434 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
435   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
436   MPI_Datatype dtype;
437   int szhdr, szdata, len;
438   MPI_User_function* func;
439   func = hdr->func;
440   dtype = hdr->dtype;  
441   szdata = hdr->szdata;
442   len = hdr->len;  
443   szhdr = sizeof(AmpiOpHeader);
444
445   //Assuming extent == size
446   void *ret = malloc(szhdr+szdata);
447   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
448   for(int i=1;i<nMsg;i++){
449     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
450   }
451   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
452   free(ret);
453   return retmsg;
454 }
455
456 CkReduction::reducerType AmpiReducer;
457
458 class Builtin_kvs{
459   public:
460     int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
461     Builtin_kvs(){
462       tag_ub = MPI_TAG_UB_VALUE; 
463       host = MPI_PROC_NULL;
464       io = 0;
465       wtime_is_global = 0;
466       keyval_mype = CkMyPe();
467       keyval_numpes = CkNumPes();
468       keyval_mynode = CkMyNode();
469       keyval_numnodes = CkNumNodes();
470     }
471 };
472
473 // ------------ startup support -----------
474 int _ampi_fallback_setup_count;
475 CDECL void AMPI_Setup(void);
476 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
477
478 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
479
480 /*Main routine used when missing MPI_Setup routine*/
481 CDECL void AMPI_Fallback_Main(int argc,char **argv)
482 {
483   AMPI_Main_cpp(argc,argv);
484   AMPI_Main(argc,argv);
485   FTN_NAME(MPI_MAIN,mpi_main)();
486 }
487
488 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
489 /*Startup routine used if user *doesn't* write
490   a TCHARM_User_setup routine.
491  */
492 CDECL void AMPI_Setup_Switch(void) {
493   _ampi_fallback_setup_count=0;
494   FTN_NAME(AMPI_SETUP,ampi_setup)();
495   AMPI_Setup();
496   if (_ampi_fallback_setup_count==2)
497   { //Missing AMPI_Setup in both C and Fortran:
498     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
499   }
500 }
501
502 static int nodeinit_has_been_called=0;
503 CtvDeclare(ampiParent*, ampiPtr);
504 CtvDeclare(int, ampiInitDone);
505 CtvDeclare(void*,stackBottom);
506 CtvDeclare(int, ampiFinalized);
507 CkpvDeclare(Builtin_kvs, bikvs);
508 CkpvDeclare(int,argvExtracted);
509 static int enableStreaming = 0;
510
511 CDECL long ampiCurrentStackUsage(){
512   int localVariable;
513
514   unsigned long p1 =  (unsigned long)((void*)&localVariable);
515   unsigned long p2 =  (unsigned long)(CtvAccess(stackBottom));
516
517
518   if(p1 > p2)
519     return p1 - p2;
520   else
521     return  p2 - p1;
522
523 }
524
525 FDECL void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
526   long usage = ampiCurrentStackUsage();
527   CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
528 }
529
530
531 CDECL void AMPI_threadstart(void *data);
532 static int AMPI_threadstart_idx = -1;
533
534 static void ampiNodeInit(void)
535 {
536   _mpi_nworlds=0;
537   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
538   {
539     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
540   }
541   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
542
543   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
544
545   CmiAssert(AMPI_threadstart_idx == -1);    // only initialize once
546   AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
547
548   nodeinit_has_been_called=1;
549 }
550
551 #if PRINT_IDLE
552 static double totalidle=0.0, startT=0.0;
553 static int beginHandle, endHandle;
554 static void BeginIdle(void *dummy,double curWallTime)
555 {
556   startT = curWallTime;
557 }
558 static void EndIdle(void *dummy,double curWallTime)
559 {
560   totalidle += curWallTime - startT;
561 }
562 #endif
563
564 /* for fortran reduction operation table to handle mapping */
565 typedef MPI_Op  MPI_Op_Array[128];
566 CtvDeclare(int, mpi_opc);
567 CtvDeclare(MPI_Op_Array, mpi_ops);
568
569 static void ampiProcInit(void){
570   CtvInitialize(ampiParent*, ampiPtr);
571   CtvInitialize(int,ampiInitDone);
572   CtvInitialize(int,ampiFinalized);
573   CtvInitialize(void*,stackBottom);
574
575
576   CtvInitialize(MPI_Op_Array, mpi_ops);
577   CtvInitialize(int, mpi_opc);
578
579   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
580   CkpvAccess(bikvs) = Builtin_kvs();
581
582   CkpvInitialize(int, argvExtracted);
583   CkpvAccess(argvExtracted) = 0;
584
585 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
586   REGISTER_AMPI
587 #endif
588   initAmpiProjections();
589
590   char **argv=CkGetArgv();
591 #if AMPI_COMLIB  
592   if(CkpvAccess(argvExtracted)==0){
593     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
594   }
595 #endif
596
597 #if AMPIMSGLOG
598   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
599   //msgLogRead = CmiGetArgFlag(argv, "+msgLogRead");
600   if (CmiGetArgIntDesc(argv,"+msgLogRead", &msgLogRank, "Re-play message processing order for AMPI")) {
601     msgLogRead = 1;
602   }
603   //CmiGetArgInt(argv, "+msgLogRank", &msgLogRank);
604   char *procs = NULL;
605   if (CmiGetArgStringDesc(argv, "+msgLogRanks", &procs, "A list of AMPI processors to record , e.g. 0,10,20-30")) {
606     msgLogRanks.set(procs);
607   }
608   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
609   if (CkMyPe() == 0) {
610     if (msgLogWrite) CmiPrintf("Writing AMPI messages of rank %s to log: %s\n", procs?procs:"", msgLogFilename);
611     if (msgLogRead) CmiPrintf("Reading AMPI messages of rank %s from log: %s\n", procs?procs:"", msgLogFilename);
612   }
613 #endif
614
615   // initBigSimTrace(1,outtiming);
616 }
617
618 #if AMPIMSGLOG
619 static inline int record_msglog(int rank){
620   return msgLogRanks.includes(rank);
621 }
622 #endif
623
624 void AMPI_Install_Idle_Timer(){
625 #if AMPI_PRINT_IDLE
626   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
627   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
628 #endif
629 }
630
631 void AMPI_Uninstall_Idle_Timer(){
632 #if AMPI_PRINT_IDLE
633   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
634   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
635 #endif
636 }
637
638 PUPfunctionpointer(MPI_MainFn)
639
640   class MPI_threadstart_t {
641     public:
642       MPI_MainFn fn;
643       MPI_threadstart_t() {}
644       MPI_threadstart_t(MPI_MainFn fn_)
645         :fn(fn_) {}
646       void start(void) {
647         char **argv=CmiCopyArgs(CkGetArgv());
648         int argc=CkGetArgc();
649
650         // Set a pointer to somewhere close to the bottom of the stack.
651         // This is used for roughly estimating the stack usage later.
652         CtvAccess(stackBottom) = &argv;
653
654 #if CMK_AMPI_FNPTR_HACK
655         AMPI_Fallback_Main(argc,argv);
656 #else
657         (fn)(argc,argv);
658 #endif
659       }
660       void pup(PUP::er &p) {
661         p|fn;
662       }
663   };
664 PUPmarshall(MPI_threadstart_t)
665
666 CDECL void AMPI_threadstart(void *data)
667 {
668   STARTUP_DEBUG("MPI_threadstart")
669     MPI_threadstart_t t;
670   pupFromBuf(data,t);
671 #if CMK_TRACE_IN_CHARM
672   if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
673 #endif
674   t.start();
675 }
676
677 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
678 {
679   STARTUP_DEBUG("ampiCreateMain")
680     int _nchunks=TCHARM_Get_num_chunks();
681   //Make a new threads array:
682   MPI_threadstart_t s(mainFn);
683   memBuf b; pupIntoBuf(b,s);
684   TCHARM_Create_data( _nchunks,AMPI_threadstart_idx,
685       b.getData(), b.getSize());
686 }
687
688 /* TCharm Semaphore ID's for AMPI startup */
689 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
690 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
691
692 static CProxy_ampiWorlds ampiWorldsGroup;
693
694 static void init_operations()
695 {
696   CtvInitialize(MPI_Op_Array, mpi_ops);
697   int i = 0;
698   MPI_Op *tab = CtvAccess(mpi_ops);
699   tab[i++] = MPI_MAX;
700   tab[i++] = MPI_MIN;
701   tab[i++] = MPI_SUM;
702   tab[i++] = MPI_PROD;
703   tab[i++] = MPI_LAND;
704   tab[i++] = MPI_BAND;
705   tab[i++] = MPI_LOR;
706   tab[i++] = MPI_BOR;
707   tab[i++] = MPI_LXOR;
708   tab[i++] = MPI_BXOR;
709   tab[i++] = MPI_MAXLOC;
710   tab[i++] = MPI_MINLOC;
711
712   CtvInitialize(int, mpi_opc);
713   CtvAccess(mpi_opc) = i;
714 }
715
716 /*
717    Called from MPI_Init, a collective initialization call:
718    creates a new AMPI array and attaches it to the current
719    set of TCHARM threads.
720  */
721 static ampi *ampiInit(char **argv)
722 {
723   FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CmiMyPe(), TCHARM_Element());)
724     if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
725   STARTUP_DEBUG("ampiInit> begin")
726
727     MPI_Comm new_world;
728   int _nchunks;
729   CkArrayOptions opts;
730   CProxy_ampiParent parent;
731   if (TCHARM_Element()==0) //the rank of a tcharm object
732   { /* I'm responsible for building the arrays: */
733     STARTUP_DEBUG("ampiInit> creating arrays")
734
735       // FIXME: Need to serialize global communicator allocation in one place.
736       //Allocate the next communicator
737       if(_mpi_nworlds == MPI_MAX_COMM_WORLDS)
738       {
739         CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
740       }
741     int new_idx=_mpi_nworlds;
742     new_world=MPI_COMM_WORLD+new_idx; // Isaac guessed there shouldn't be a +1 here
743
744     //Create and attach the ampiParent array
745     CkArrayID threads;
746     opts=TCHARM_Attach_start(&threads,&_nchunks);
747     parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
748     STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
749   }
750   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
751
752   FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
753
754     if (TCHARM_Element()==0)
755     {
756       //Make a new ampi array
757       CkArrayID empty;
758
759       ampiCommStruct worldComm(new_world,empty,_nchunks);
760       CProxy_ampi arr;
761
762
763 #if AMPI_COMLIB
764
765       ComlibInstanceHandle ciStreaming = 1;
766       ComlibInstanceHandle ciBcast = 2;
767       ComlibInstanceHandle ciAllgather = 3;
768       ComlibInstanceHandle ciAlltoall = 4;
769
770       arr=CProxy_ampi::ckNew(parent, worldComm, ciStreaming, ciBcast, ciAllgather, ciAlltoall, opts);
771
772
773       CkPrintf("Using untested comlib code in ampi.C\n");
774
775       Strategy *sStreaming = new StreamingStrategy(1,10);
776       CkAssert(ciStreaming == ComlibRegister(sStreaming));
777
778       Strategy *sBcast = new BroadcastStrategy(USE_HYPERCUBE);
779       CkAssert(ciBcast = ComlibRegister(sBcast));
780
781       Strategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
782       CkAssert(ciAllgather = ComlibRegister(sAllgather));
783
784       Strategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
785       CkAssert(ciAlltoall = ComlibRegister(sAlltoall));
786
787       CmiPrintf("Created AMPI comlib strategies in new manner\n");
788
789       // FIXME: Propogate the comlib table here
790       CkpvAccess(conv_com_object).doneCreating();
791 #else
792       arr=CProxy_ampi::ckNew(parent,worldComm,opts);
793
794 #endif
795
796       //Broadcast info. to the mpi_worlds array
797       // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
798       ampiCommStruct newComm(new_world,arr,_nchunks);
799       //CkPrintf("In ampiInit: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
800       if (ampiWorldsGroup.ckGetGroupID().isZero())
801         ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
802       else
803         ampiWorldsGroup.add(newComm);
804       STARTUP_DEBUG("ampiInit> arrays created")
805
806     }
807
808   // Find our ampi object:
809   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
810   CtvAccess(ampiInitDone)=1;
811   CtvAccess(ampiFinalized)=0;
812   STARTUP_DEBUG("ampiInit> complete")
813 #if CMK_BIGSIM_CHARM
814     //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
815     TRACE_BG_ADD_TAG("AMPI_START");
816 #endif
817
818   init_operations();     // initialize fortran reduction operation table
819
820   getAmpiParent()->ampiInitCallDone = 0;
821
822   CProxy_ampi cbproxy = ptr->getProxy();
823   CkCallback cb(CkIndex_ampi::allInitDone(NULL), cbproxy[0]);
824   ptr->contribute(0, NULL, CkReduction::sum_int, cb);
825
826   ampiParent *thisParent = getAmpiParent(); 
827   while(thisParent->ampiInitCallDone!=1){
828     //CkPrintf("In checking ampiInitCallDone(%d) loop at parent %d!\n", thisParent->ampiInitCallDone, thisParent->thisIndex);
829     thisParent->getTCharmThread()->stop();
830     /* 
831      * thisParent needs to be updated in case of the parent is being pupped.
832      * In such case, thisParent got changed
833      */
834     thisParent = getAmpiParent();
835   }
836
837 #ifdef CMK_BIGSIM_CHARM
838   BgSetStartOutOfCore();
839 #endif
840
841   return ptr;
842 }
843
844 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
845 class ampiWorlds : public CBase_ampiWorlds {
846   public:
847     ampiWorlds(const ampiCommStruct &nextWorld) {
848       ampiWorldsGroup=thisgroup;
849       //CkPrintf("In constructor: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
850       add(nextWorld);
851     }
852     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
853     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
854     void add(const ampiCommStruct &nextWorld) {
855       int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD); // Isaac guessed there shouldn't be a +1 after the MPI_COMM_WORLD
856       mpi_worlds[new_idx].comm=nextWorld;
857       if (_mpi_nworlds<=new_idx) _mpi_nworlds=new_idx+1;
858       STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
859     }
860 };
861
862 //-------------------- ampiParent -------------------------
863   ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
864 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
865 {
866   int barrier = 0x1234;
867   STARTUP_DEBUG("ampiParent> starting up")
868     thread=NULL;
869   worldPtr=NULL;
870   myDDT=&myDDTsto;
871   prepareCtv();
872
873   init();
874
875   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
876   AsyncEvacuate(CmiFalse);
877 }
878
879 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
880   thread=NULL;
881   worldPtr=NULL;
882   myDDT=&myDDTsto;
883
884   init();
885
886   AsyncEvacuate(CmiFalse);
887 }
888
889 void ampiParent::pup(PUP::er &p) {
890   ArrayElement1D::pup(p);
891   p|threads;
892   p|worldNo;           // why it was missing from here before??
893   p|worldStruct;
894   myDDT->pup(p);
895   p|splitComm;
896   p|groupComm;
897   p|groups;
898
899   //BIGSIM_OOC DEBUGGING
900   //if(!p.isUnpacking()){
901   //    CmiPrintf("ampiParent[%d] packing ampiRequestList: \n", thisIndex);
902   //    ampiReqs.print();
903   //}
904
905   p|ampiReqs;
906
907   //BIGSIM_OOC DEBUGGING
908   //if(p.isUnpacking()){
909   //    CmiPrintf("ampiParent[%d] unpacking ampiRequestList: \n", thisIndex);
910   //    ampiReqs.print();
911   //}
912
913   p|RProxyCnt;
914   p|tmpRProxy;
915   p|winStructList;
916   p|infos;
917
918   p|ampiInitCallDone;
919 }
920 void ampiParent::prepareCtv(void) {
921   thread=threads[thisIndex].ckLocal();
922   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
923   CtvAccessOther(thread->getThread(),ampiPtr) = this;
924   STARTUP_DEBUG("ampiParent> found TCharm")
925 }
926
927 void ampiParent::init(){
928   CkAssert(groups.size() == 0);
929   groups.push_back(new groupStruct);
930
931 #if AMPIMSGLOG
932   if(msgLogWrite && record_msglog(thisIndex)){
933     char fname[128];
934     sprintf(fname, "%s.%d", msgLogFilename,thisIndex);
935 #if CMK_PROJECTIONS_USE_ZLIB && 0
936     fMsgLog = gzopen(fname,"wb");
937     toPUPer = new PUP::tozDisk(fMsgLog);
938 #else
939     fMsgLog = fopen(fname,"wb");
940     CmiAssert(fMsgLog != NULL);
941     toPUPer = new PUP::toDisk(fMsgLog);
942 #endif
943   }else if(msgLogRead){
944     char fname[128];
945     sprintf(fname, "%s.%d", msgLogFilename,msgLogRank);
946 #if CMK_PROJECTIONS_USE_ZLIB && 0
947     fMsgLog = gzopen(fname,"rb");
948     fromPUPer = new PUP::fromzDisk(fMsgLog);
949 #else
950     fMsgLog = fopen(fname,"rb");
951     CmiAssert(fMsgLog != NULL);
952     fromPUPer = new PUP::fromDisk(fMsgLog);
953 #endif
954     CkPrintf("AMPI> opened message log file: %s for replay\n", fname);
955   }
956 #endif
957 }
958
959 void ampiParent::finalize(){
960 #if AMPIMSGLOG
961   if(msgLogWrite && record_msglog(thisIndex)){
962     delete toPUPer;
963 #if CMK_PROJECTIONS_USE_ZLIB && 0
964     gzclose(fMsgLog);
965 #else
966     fclose(fMsgLog);
967 #endif
968   }else if(msgLogRead){
969     delete fromPUPer;
970 #if CMK_PROJECTIONS_USE_ZLIB && 0
971     gzclose(fMsgLog);
972 #else
973     fclose(fMsgLog);
974 #endif
975   }
976 #endif
977 }
978
979 void ampiParent::ckJustMigrated(void) {
980   ArrayElement1D::ckJustMigrated();
981   prepareCtv();
982 }
983
984 void ampiParent::ckJustRestored(void) {
985   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
986     ArrayElement1D::ckJustRestored();
987   prepareCtv();
988
989   //BIGSIM_OOC DEBUGGING
990   //CkPrintf("In ampiParent[%d] with TCharm thread=%p:   ",thisIndex, thread);
991   //CthPrintThdMagic(thread->getTid()); 
992 }
993
994 ampiParent::~ampiParent() {
995   STARTUP_DEBUG("ampiParent> destructor called");
996   finalize();
997 }
998
999 //Children call this when they are first created or just migrated
1000 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
1001 {
1002   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
1003
1004   if (s.getComm()>=MPI_COMM_WORLD)
1005   { //We now have our COMM_WORLD-- register it
1006     //Note that split communicators don't keep a raw pointer, so
1007     //they don't need to re-register on migration.
1008     if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
1009     worldPtr=ptr;
1010     worldStruct=s;
1011
1012     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
1013     CkVec<int> _indices;
1014     _indices.push_back(thisIndex);
1015     selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
1016   }
1017
1018   if (!forMigration)
1019   { //Register the new communicator:
1020     MPI_Comm comm = s.getComm();
1021     STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
1022       if (comm>=MPI_COMM_WORLD) { 
1023         // Pass the new ampi to the waiting ampiInit
1024         thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
1025       } else if (isSplit(comm)) {
1026         splitChildRegister(s);
1027       } else if (isGroup(comm)) {
1028         groupChildRegister(s);
1029       } else if (isCart(comm)) {
1030         cartChildRegister(s);
1031       } else if (isGraph(comm)) {
1032         graphChildRegister(s);
1033       } else if (isInter(comm)) {
1034         interChildRegister(s);
1035       } else if (isIntra(comm)) {
1036         intraChildRegister(s);
1037       }else
1038         CkAbort("ampiParent recieved child with bad communicator");
1039   }
1040
1041   return thread;
1042 }
1043
1044 //BIGSIM_OOC DEBUGGING
1045 //Move the comm2ampi from inline to normal function for the sake of debugging
1046 /*ampi *ampiParent::comm2ampi(MPI_Comm comm){
1047 //BIGSIM_OOC DEBUGGING
1048 //CmiPrintf("%d, in ampiParent::comm2ampi, comm=%d\n", thisIndex, comm);
1049 if (comm==MPI_COMM_WORLD) return worldPtr;
1050 if (comm==MPI_COMM_SELF) return worldPtr;
1051 if (comm==worldNo) return worldPtr;
1052 if (isSplit(comm)) {
1053 const ampiCommStruct &st=getSplit(comm);
1054 return st.getProxy()[thisIndex].ckLocal();
1055 }
1056 if (isGroup(comm)) {
1057 const ampiCommStruct &st=getGroup(comm);
1058 return st.getProxy()[thisIndex].ckLocal();
1059 }
1060 if (isCart(comm)) {
1061 const ampiCommStruct &st = getCart(comm);
1062 return st.getProxy()[thisIndex].ckLocal();
1063 }
1064 if (isGraph(comm)) {
1065 const ampiCommStruct &st = getGraph(comm);
1066 return st.getProxy()[thisIndex].ckLocal();
1067 }
1068 if (isInter(comm)) {
1069 const ampiCommStruct &st=getInter(comm);
1070 return st.getProxy()[thisIndex].ckLocal();
1071 }
1072 if (isIntra(comm)) {
1073 const ampiCommStruct &st=getIntra(comm);
1074 return st.getProxy()[thisIndex].ckLocal();
1075 }
1076 if (comm>MPI_COMM_WORLD) return worldPtr; //Use MPI_WORLD ampi for cross-world messages:
1077 CkAbort("Invalid communicator used!");
1078 return NULL;
1079 }*/
1080
1081 // reduction client data - preparation for checkpointing
1082 class ckptClientStruct {
1083   public:
1084     const char *dname;
1085     ampiParent *ampiPtr;
1086     ckptClientStruct(const char *s, ampiParent *a): dname(s), ampiPtr(a) {}
1087 };
1088
1089 static void checkpointClient(void *param,void *msg)
1090 {
1091   ckptClientStruct *client = (ckptClientStruct*)param;
1092   const char *dname = client->dname;
1093   ampiParent *ampiPtr = client->ampiPtr;
1094   ampiPtr->Checkpoint(strlen(dname), dname);
1095   delete client;
1096 }
1097
1098 void ampiParent::startCheckpoint(const char* dname){
1099   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
1100   if (thisIndex==0) {
1101     ckptClientStruct *clientData = new ckptClientStruct(dname, this);
1102     CkCallback cb(checkpointClient, clientData);
1103     contribute(0, NULL, CkReduction::sum_int, cb);
1104   }
1105   else
1106     contribute(0, NULL, CkReduction::sum_int);
1107
1108 #if 0
1109 #if CMK_BIGSIM_CHARM
1110   void *curLog;         // store current log in timeline
1111   _TRACE_BG_TLINE_END(&curLog);
1112   TRACE_BG_AMPI_SUSPEND();
1113 #endif
1114 #endif
1115
1116   thread->stop();
1117
1118 #if CMK_BIGSIM_CHARM
1119   // _TRACE_BG_BEGIN_EXECUTE_NOMSG("CHECKPOINT_RESUME", &curLog);
1120   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
1121 #endif
1122 }
1123
1124 void ampiParent::Checkpoint(int len, const char* dname){
1125   if (len == 0) {
1126     // memory checkpoint
1127     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1128     CkStartMemCheckpoint(cb);
1129   }
1130   else {
1131     char dirname[256];
1132     strncpy(dirname,dname,len);
1133     dirname[len]='\0';
1134     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1135     CkStartCheckpoint(dirname,cb);
1136   }
1137 }
1138 void ampiParent::ResumeThread(void){
1139   thread->resume();
1140 }
1141
1142 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
1143     int *keyval, void* extra_state){
1144   KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
1145   int idx = kvlist.size();
1146   kvlist.resize(idx+1);
1147   kvlist[idx] = newnode;
1148   *keyval = idx;
1149   return 0;
1150 }
1151   int ampiParent::freeKeyval(int *keyval){
1152     if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
1153       return -1;
1154     delete kvlist[*keyval];
1155     kvlist[*keyval] = NULL;
1156     *keyval = MPI_KEYVAL_INVALID;
1157     return 0;
1158   }
1159
1160   int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
1161     if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1162       return -1;
1163     ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1164     // Enlarge the keyval list:
1165     while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
1166     cs.getKeyvals()[keyval]=attribute_val;
1167     return 0;
1168   }
1169
1170 int ampiParent::kv_is_builtin(int keyval) {
1171   switch(keyval) {
1172     case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
1173     case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
1174     case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
1175     case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
1176     case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
1177     case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
1178     case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
1179     case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
1180     default: return 0;
1181   };
1182 }
1183
1184 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
1185   *flag = false;
1186   if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
1187     *flag=true;
1188     *(int **)attribute_val = kv_builtin_storage;  // all default tags are ints
1189     return 0;
1190   }
1191   if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1192     return -1; /* invalid keyval */
1193
1194   ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1195   if (keyval>=cs.getKeyvals().size())  
1196     return 0; /* we don't have a value yet */
1197   if (cs.getKeyvals()[keyval]==0)
1198     return 0; /* we had a value, but now it's zero */
1199   /* Otherwise, we have a good value */
1200   *flag = true;
1201   *(void **)attribute_val = cs.getKeyvals()[keyval];
1202   return 0;
1203 }
1204 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
1205   /* no way to delete an attribute: just overwrite it with 0 */
1206   return putAttr(comm,keyval,0);
1207 }
1208
1209 //----------------------- ampi -------------------------
1210 void ampi::init(void) {
1211   parent=NULL;
1212   thread=NULL;
1213   msgs=NULL;
1214   posted_ireqs=NULL;
1215   resumeOnRecv=false;
1216   AsyncEvacuate(CmiFalse);
1217 }
1218
1219 ampi::ampi()
1220 {
1221   /* this constructor only exists so we can create an empty array during split */
1222   CkAbort("Default ampi constructor should never be called");
1223 }
1224
1225   ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
1226 :parentProxy(parent_)
1227 {
1228   init();
1229
1230   myComm=s; myComm.setArrayID(thisArrayID);
1231   myRank=myComm.getRankForIndex(thisIndex);
1232
1233   findParent(false);
1234
1235   msgs = CmmNew();
1236   posted_ireqs = CmmNew();
1237   nbcasts = 0;
1238
1239 #if AMPI_COMLIB
1240   comlibProxy = thisProxy; // Will later be associated with comlib
1241 #endif
1242
1243   seqEntries=parent->ckGetArraySize();
1244   oorder.init (seqEntries);
1245 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1246   if(thisIndex == 0){
1247     /*      CkAssert(CkMyPe() == 0);
1248      *              CkGroupID _myManagerGID = thisProxy.ckGetArrayID();     
1249      *                      CkAssert(numElements);
1250      *                              printf("ampi::ampi setting numInitial to %d on manager at gid %d \n",numElements,_myManagerGID.idx);
1251      *                                      CkArray *_myManager = thisProxy.ckLocalBranch();
1252      *                                              _myManager->setNumInitial(numElements);*/
1253   }
1254 #endif
1255 }
1256
1257 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
1258     ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
1259 :parentProxy(parent_)
1260 {
1261 #if AMPI_COMLIB
1262   ciStreaming = ciStreaming_;
1263   ciBcast = ciBcast_;
1264   ciAllgather = ciAllgather_;
1265   ciAlltoall = ciAlltoall_;
1266
1267   init();
1268
1269   myComm=s; myComm.setArrayID(thisArrayID);
1270   myRank=myComm.getRankForIndex(thisIndex);
1271
1272   findParent(false);
1273
1274   msgs = CmmNew();
1275   posted_ireqs = CmmNew();
1276   nbcasts = 0;
1277
1278   comlibProxy = thisProxy;
1279   CmiPrintf("comlibProxy created as a copy of thisProxy, no associate call\n");
1280
1281 #if AMPI_COMLIB
1282   //  ComlibAssociateProxy(ciAlltoall, comlibProxy);
1283 #endif
1284
1285   seqEntries=parent->ckGetArraySize();
1286   oorder.init (seqEntries);
1287 #endif
1288 }
1289
1290 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
1291 {
1292   init();
1293
1294   seqEntries=-1;
1295 }
1296
1297 void ampi::ckJustMigrated(void)
1298 {
1299   findParent(true);
1300   ArrayElement1D::ckJustMigrated();
1301 }
1302
1303 void ampi::ckJustRestored(void)
1304 {
1305   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
1306     findParent(true);
1307   ArrayElement1D::ckJustRestored();
1308
1309   //BIGSIM_OOC DEBUGGING
1310   //CkPrintf("In ampi[%d] thread[%p]:   ", thisIndex, thread);
1311   //CthPrintThdMagic(thread->getTid()); 
1312 }
1313
1314 void ampi::findParent(bool forMigration) {
1315   STARTUP_DEBUG("ampi> finding my parent")
1316     parent=parentProxy[thisIndex].ckLocal();
1317   if (parent==NULL) CkAbort("AMPI can't find its parent!");
1318   thread=parent->registerAmpi(this,myComm,forMigration);
1319   if (thread==NULL) CkAbort("AMPI can't find its thread!");
1320   //    printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
1321 }
1322
1323 //The following method should be called on the first element of the
1324 //ampi array
1325 void ampi::allInitDone(CkReductionMsg *m){
1326   FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
1327     thisProxy.setInitDoneFlag();
1328   delete m;
1329 }
1330
1331 void ampi::setInitDoneFlag(){
1332   //CkPrintf("ampi[%d]::setInitDone called!\n", thisIndex);
1333   parent->ampiInitCallDone=1;
1334   parent->getTCharmThread()->start();
1335 }
1336
1337 static void cmm_pup_ampi_message(pup_er p,void **msg) {
1338   CkPupMessage(*(PUP::er *)p,msg,1);
1339   if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
1340   //    printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
1341 }
1342
1343 static void cmm_pup_posted_ireq(pup_er p,void **msg) {
1344
1345   pup_int(p, (int *)msg);
1346
1347   /*    if(pup_isUnpacking(p)){
1348   // *msg = new IReq;
1349   //when unpacking, nothing is needed to do since *msg is an index
1350   //(of type integer) to the ampiParent::ampiReqs (the AmpiRequestList)
1351   }
1352   if(!pup_isUnpacking(p)){
1353   AmpiRequestList *reqL = getReqs();
1354   int retIdx = reqL->findRequestIndex((IReq *)*msg);
1355   if(retIdx==-1){
1356   CmiAbort("An AmpiRequest instance should be found for an instance in posted_ireq!\n");
1357   }
1358   pup_int(p, retIdx)
1359   }
1360    */
1361   //    ((IReq *)*msg)->pup(*(PUP::er *)p);
1362
1363   //    if (pup_isDeleting(p)) delete (IReq *)*msg;
1364   //    printf("[%d] pupping postd irequests %p \n",CkMyPe(),*msg);
1365 }
1366
1367 void ampi::pup(PUP::er &p)
1368 {
1369   if(!p.isUserlevel())
1370     ArrayElement1D::pup(p);//Pack superclass
1371   p|parentProxy;
1372   p|myComm;
1373   p|myRank;
1374   p|nbcasts;
1375   p|tmpVec;
1376   p|remoteProxy;
1377   p|resumeOnRecv;
1378 #if AMPI_COMLIB
1379   p|comlibProxy;
1380   p|ciStreaming;
1381   p|ciBcast;
1382   p|ciAllgather;
1383   p|ciAlltoall;
1384
1385   if(p.isUnpacking()){
1386     //    ciStreaming.setSourcePe();
1387     //    ciBcast.setSourcePe();
1388     //    ciAllgather.setSourcePe();
1389     //    ciAlltoall.setSourcePe();
1390   }
1391 #endif
1392
1393   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
1394
1395   //BIGSIM_OOC DEBUGGING
1396   //if(!p.isUnpacking()){
1397   //CkPrintf("ampi[%d]::packing: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1398   //}
1399
1400   posted_ireqs = CmmPup((pup_er)&p, posted_ireqs, cmm_pup_posted_ireq);
1401
1402   //if(p.isUnpacking()){
1403   //CkPrintf("ampi[%d]::unpacking: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1404   //}
1405
1406   p|seqEntries;
1407   p|oorder;
1408 }
1409
1410 ampi::~ampi()
1411 {
1412   if (CkInRestarting() || _BgOutOfCoreFlag==1) {
1413     // in restarting, we need to flush messages
1414     int tags[3], sts[3];
1415     tags[0] = tags[1] = tags[2] = CmmWildCard;
1416     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1417     while (msg) {
1418       delete msg;
1419       msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1420     }
1421   }
1422
1423   CmmFree(msgs);
1424   CmmFreeAll(posted_ireqs);
1425 }
1426
1427 //------------------------ Communicator Splitting ---------------------
1428 class ampiSplitKey {
1429   public:
1430     int nextSplitComm;
1431     int color; //New class of processes we'll belong to
1432     int key; //To determine rank in new ordering
1433     int rank; //Rank in old ordering
1434     ampiSplitKey() {}
1435     ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
1436       :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
1437 };
1438
1439 /* "type" may indicate whether call is for a cartesian topology etc. */
1440
1441 void ampi::split(int color,int key,MPI_Comm *dest, int type)
1442 {
1443 #if CMK_BIGSIM_CHARM
1444   void *curLog;         // store current log in timeline
1445   _TRACE_BG_TLINE_END(&curLog);
1446   //  TRACE_BG_AMPI_SUSPEND();
1447 #endif
1448   if (type == CART_TOPOL) {
1449     ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
1450     int rootIdx=myComm.getIndexForRank(0);
1451     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1452     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1453
1454     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1455     MPI_Comm newComm=parent->getNextCart()-1;
1456     *dest=newComm;
1457   } else {
1458     ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
1459     int rootIdx=myComm.getIndexForRank(0);
1460     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1461     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1462
1463     thread->suspend(); //Resumed by ampiParent::splitChildRegister
1464     MPI_Comm newComm=parent->getNextSplit()-1;
1465     *dest=newComm;
1466   }
1467 #if CMK_BIGSIM_CHARM
1468   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "SPLIT_RESUME", curLog);
1469   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("SPLIT_RESUME", &curLog);
1470   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
1471 #endif
1472 }
1473
1474 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
1475   const ampiSplitKey *a=(const ampiSplitKey *)a_;
1476   const ampiSplitKey *b=(const ampiSplitKey *)b_;
1477   if (a->color!=b->color) return a->color-b->color;
1478   if (a->key!=b->key) return a->key-b->key;
1479   return a->rank-b->rank;
1480 }
1481
1482 void ampi::splitPhase1(CkReductionMsg *msg)
1483 {
1484   //Order the keys, which orders the ranks properly:
1485   int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1486   ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1487   if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1488   qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1489
1490   MPI_Comm newComm = -1;
1491   for(int i=0;i<nKeys;i++)
1492     if(keys[i].nextSplitComm>newComm)
1493       newComm = keys[i].nextSplitComm;
1494
1495   //Loop over the sorted keys, which gives us the new arrays:
1496   int lastColor=keys[0].color-1; //The color we're building an array for
1497   CProxy_ampi lastAmpi; //The array for lastColor
1498   int lastRoot=0; //C value for new rank 0 process for latest color
1499   ampiCommStruct lastComm; //Communicator info. for latest color
1500   for (int c=0;c<nKeys;c++) {
1501     if (keys[c].color!=lastColor)
1502     { //Hit a new color-- need to build a new communicator and array
1503       lastColor=keys[c].color;
1504       lastRoot=c;
1505       CkArrayOptions opts;
1506       opts.bindTo(parentProxy);
1507       opts.setNumInitial(0);
1508       CkArrayID unusedAID; ampiCommStruct unusedComm;
1509       lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1510       lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1511
1512       CkVec<int> indices; //Maps rank to array indices for new arrau
1513       for (int i=c;i<nKeys;i++) {
1514         if (keys[i].color!=lastColor) break; //Done with this color
1515         int idx=myComm.getIndexForRank(keys[i].rank);
1516         indices.push_back(idx);
1517       }
1518
1519       //FIXME: create a new communicator for each color, instead of
1520       // (confusingly) re-using the same MPI_Comm number for each.
1521       lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1522     }
1523     int newRank=c-lastRoot;
1524     int newIdx=lastComm.getIndexForRank(newRank);
1525
1526     //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1527     lastAmpi[newIdx].insert(parentProxy,lastComm);
1528   }
1529
1530   delete msg;
1531 }
1532
1533 //...newly created array elements register with the parent, which calls:
1534 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1535   int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1536   if (splitComm.size()<=idx) splitComm.resize(idx+1);
1537   splitComm[idx]=new ampiCommStruct(s);
1538   thread->resume(); //Matches suspend at end of ampi::split
1539 }
1540
1541 //-----------------create communicator from group--------------
1542 // The procedure is like that of comm_split very much,
1543 // so the code is shamelessly copied from above
1544 //   1. reduction to make sure all members have called
1545 //   2. the root in the old communicator create the new array
1546 //   3. ampiParent::register is called to register new array as new comm
1547 class vecStruct {
1548   public:
1549     int nextgroup;
1550     groupStruct vec;
1551     vecStruct():nextgroup(-1){}
1552     vecStruct(int nextgroup_, groupStruct vec_)
1553       : nextgroup(nextgroup_), vec(vec_) { }
1554 };
1555
1556 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1557   int rootIdx=vec[0];
1558   tmpVec = vec;
1559   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1560   MPI_Comm nextgroup = parent->getNextGroup();
1561   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1562
1563   if(getPosOp(thisIndex,vec)>=0){
1564     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1565     MPI_Comm retcomm = parent->getNextGroup()-1;
1566     *newcomm = retcomm;
1567   }else{
1568     *newcomm = MPI_COMM_NULL;
1569   }
1570 }
1571
1572 void ampi::commCreatePhase1(CkReductionMsg *msg){
1573   MPI_Comm *nextGroupComm = (int *)msg->getData();
1574
1575   CkArrayOptions opts;
1576   opts.bindTo(parentProxy);
1577   opts.setNumInitial(0);
1578   CkArrayID unusedAID;
1579   ampiCommStruct unusedComm;
1580   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1581   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1582
1583   groupStruct indices = tmpVec;
1584   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1585   for(int i=0;i<indices.size();i++){
1586     int newIdx=indices[i];
1587     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1588   }
1589   delete msg;
1590 }
1591
1592 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1593   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1594   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1595   groupComm[idx]=new ampiCommStruct(s);
1596   thread->resume(); //Matches suspend at end of ampi::split
1597 }
1598
1599 /* Virtual topology communicator creation */
1600 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1601   int rootIdx=vec[0];
1602   tmpVec = vec;
1603   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1604
1605   MPI_Comm nextcart = parent->getNextCart();
1606   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1607
1608   if(getPosOp(thisIndex,vec)>=0){
1609     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1610     MPI_Comm retcomm = parent->getNextCart()-1;
1611     *newcomm = retcomm;
1612   }else
1613     *newcomm = MPI_COMM_NULL;
1614 }
1615
1616 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1617   MPI_Comm *nextCartComm = (int *)msg->getData();
1618
1619   CkArrayOptions opts;
1620   opts.bindTo(parentProxy);
1621   opts.setNumInitial(0);
1622   CkArrayID unusedAID;
1623   ampiCommStruct unusedComm;
1624   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1625   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1626
1627   groupStruct indices = tmpVec;
1628   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1629       size(),indices);
1630   for(int i=0;i<indices.size();i++){
1631     int newIdx=indices[i];
1632     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1633   }
1634   delete msg;
1635 }
1636
1637 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1638   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1639   if (cartComm.size()<=idx) {
1640     cartComm.resize(idx+1);
1641     cartComm.length()=idx+1;
1642   }
1643   cartComm[idx]=new ampiCommStruct(s);
1644   thread->resume(); //Matches suspend at end of ampi::cartCreate
1645 }
1646
1647 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1648   int rootIdx=vec[0];
1649   tmpVec = vec;
1650   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1651       myComm.getProxy());
1652   MPI_Comm nextgraph = parent->getNextGraph();
1653   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1654
1655   if(getPosOp(thisIndex,vec)>=0){
1656     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1657     MPI_Comm retcomm = parent->getNextGraph()-1;
1658     *newcomm = retcomm;
1659   }else
1660     *newcomm = MPI_COMM_NULL;
1661 }
1662
1663 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1664   MPI_Comm *nextGraphComm = (int *)msg->getData();
1665
1666   CkArrayOptions opts;
1667   opts.bindTo(parentProxy);
1668   opts.setNumInitial(0);
1669   CkArrayID unusedAID;
1670   ampiCommStruct unusedComm;
1671   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1672   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1673
1674   groupStruct indices = tmpVec;
1675   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1676       .size(),indices);
1677   for(int i=0;i<indices.size();i++){
1678     int newIdx=indices[i];
1679     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1680   }
1681   delete msg;
1682 }
1683
1684 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1685   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1686   if (graphComm.size()<=idx) {
1687     graphComm.resize(idx+1);
1688     graphComm.length()=idx+1;
1689   }
1690   graphComm[idx]=new ampiCommStruct(s);
1691   thread->resume(); //Matches suspend at end of ampi::graphCreate
1692 }
1693
1694 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1695   if(thisIndex==root) { // not everybody gets the valid rvec
1696     tmpVec = rvec;
1697   }
1698   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1699   MPI_Comm nextinter = parent->getNextInter();
1700   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1701
1702   thread->suspend(); //Resumed by ampiParent::interChildRegister
1703   MPI_Comm newcomm=parent->getNextInter()-1;
1704   *ncomm=newcomm;
1705 }
1706
1707 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1708   MPI_Comm *nextInterComm = (int *)msg->getData();
1709
1710   groupStruct lgroup = myComm.getIndices();
1711   CkArrayOptions opts;
1712   opts.bindTo(parentProxy);
1713   opts.setNumInitial(0);
1714   CkArrayID unusedAID;
1715   ampiCommStruct unusedComm;
1716   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1717   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1718
1719   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1720   for(int i=0;i<lgroup.size();i++){
1721     int newIdx=lgroup[i];
1722     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1723   }
1724
1725   parentProxy[0].ExchangeProxy(newAmpi);
1726   delete msg;
1727 }
1728
1729 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1730   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1731   if (interComm.size()<=idx) interComm.resize(idx+1);
1732   interComm[idx]=new ampiCommStruct(s);
1733   //thread->resume(); // don't resume it yet, till parent set remote proxy
1734 }
1735
1736 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1737   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1738     groupStruct lvec = myComm.getIndices();
1739     groupStruct rvec = myComm.getRemoteIndices();
1740     int rsize = rvec.size();
1741     tmpVec = lvec;
1742     for(int i=0;i<rsize;i++)
1743       tmpVec.push_back(rvec[i]);
1744     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1745   }else{
1746     tmpVec.resize(0);
1747   }
1748
1749   int rootIdx=myComm.getIndexForRank(0);
1750   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1751   MPI_Comm nextintra = parent->getNextIntra();
1752   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1753
1754   thread->suspend(); //Resumed by ampiParent::interChildRegister
1755   MPI_Comm newcomm=parent->getNextIntra()-1;
1756   *ncomm=newcomm;
1757 }
1758
1759 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1760   if(tmpVec.size()==0) { delete msg; return; }
1761   MPI_Comm *nextIntraComm = (int *)msg->getData();
1762   CkArrayOptions opts;
1763   opts.bindTo(parentProxy);
1764   opts.setNumInitial(0);
1765   CkArrayID unusedAID;
1766   ampiCommStruct unusedComm;
1767   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1768   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1769
1770   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1771   for(int i=0;i<tmpVec.size();i++){
1772     int newIdx=tmpVec[i];
1773     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1774   }
1775   delete msg;
1776 }
1777
1778 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1779   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1780   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1781   intraComm[idx]=new ampiCommStruct(s);
1782   thread->resume(); //Matches suspend at end of ampi::split
1783 }
1784
1785 //------------------------ communication -----------------------
1786 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1787 {
1788   if (universeNo>MPI_COMM_WORLD) {
1789     int worldDex=universeNo-MPI_COMM_WORLD-1;
1790     if (worldDex>=_mpi_nworlds)
1791       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1792     return mpi_worlds[worldDex].comm;
1793   }
1794   CkAbort("Bad communicator passed to universeComm2CommStruct");
1795   return mpi_worlds[0].comm; // meaningless return
1796 }
1797
1798 void ampi::block(void){
1799   thread->suspend();
1800 }
1801
1802 void ampi::yield(void){
1803   thread->schedule();
1804 }
1805
1806 void ampi::unblock(void){
1807   thread->resume();
1808 }
1809
1810   void ampi::ssend_ack(int sreq_idx){
1811     if (sreq_idx == 1)
1812       thread->resume();           // MPI_Ssend
1813     else {
1814       sreq_idx -= 2;              // start from 2
1815       AmpiRequestList *reqs = &(parent->ampiReqs);
1816       SReq *sreq = (SReq *)(*reqs)[sreq_idx];
1817       sreq->statusIreq = true;
1818       if (resumeOnRecv) {
1819         thread->resume();
1820       }
1821     }
1822   }
1823
1824   void
1825 ampi::generic(AmpiMsg* msg)
1826 {
1827   MSG_ORDER_DEBUG(
1828       CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1829         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1830       )
1831 #if CMK_BIGSIM_CHARM
1832     TRACE_BG_ADD_TAG("AMPI_generic");
1833   msg->event = NULL;
1834 #endif
1835
1836   int sync = UsrToEnv(msg)->getRef();
1837   int srcIdx;
1838   if (sync)  srcIdx = msg->srcIdx;
1839
1840   //    AmpiMsg *msgcopy = msg;
1841   if(msg->seq != -1) {
1842     int srcIdx=msg->srcIdx;
1843     int n=oorder.put(srcIdx,msg);
1844     if (n>0) { // This message was in-order
1845       inorder(msg);
1846       if (n>1) { // It enables other, previously out-of-order messages
1847         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1848           inorder(msg);
1849         }
1850       }
1851     }
1852   } else { //Cross-world or system messages are unordered
1853     inorder(msg);
1854   }
1855
1856   // msg may be free'ed from calling inorder()
1857   if (sync>0) {         // send an ack to sender
1858     CProxy_ampi pa(thisArrayID);
1859     pa[srcIdx].ssend_ack(sync);
1860   }
1861
1862   if(resumeOnRecv){
1863     //CkPrintf("Calling TCharm::resume at ampi::generic!\n");
1864     thread->resume();
1865   }
1866 }
1867
1868 inline static AmpiRequestList *getReqs(void); 
1869
1870   void
1871 ampi::inorder(AmpiMsg* msg)
1872 {
1873   MSG_ORDER_DEBUG(
1874       CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1875         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1876       )
1877     // check posted recvs
1878     int tags[3], sts[3];
1879   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1880   IReq *ireq = NULL;
1881   if (CpvAccess(CmiPICMethod) != 2) {
1882 #if 0
1883     //IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1884     ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1885 #else
1886 #if CMK_BIGSIM_CHARM
1887     _TRACE_BG_TLINE_END(&msg->event);    // store current log
1888     msg->eventPe = CmiMyPe();
1889 #endif
1890     //in case ampi has not initialized and posted_ireqs are only inserted 
1891     //at AMPI_Irecv (MPI_Irecv)
1892     AmpiRequestList *reqL = &(parent->ampiReqs);
1893     //When storing the req index, it's 1-based. The reason is stated in the comments
1894     //in AMPI_Irecv function.
1895     int ireqIdx = (int)((long)CmmGet(posted_ireqs, 3, tags, sts));
1896     if(reqL->size()>0 && ireqIdx>0)
1897       ireq = (IReq *)(*reqL)[ireqIdx-1];
1898     //CkPrintf("[%d] ampi::inorder, ireqIdx=%d\n", thisIndex, ireqIdx);
1899 #endif
1900     //CkPrintf("[%d] ampi::inorder, ireq=%p\n", thisIndex, ireq);
1901     if (ireq) { // receive posted
1902       ireq->receive(this, msg);
1903       // Isaac changed this so that the IReq stores the tag when receiving the message, 
1904       // instead of using this user supplied tag which could be MPI_ANY_TAG
1905       // Formerly the following line was not commented out:
1906       //ireq->tag = sts[0];         
1907       //ireq->src = sts[1];
1908       //ireq->comm = sts[2];
1909     } else {
1910       CmmPut(msgs, 3, tags, msg);
1911     }
1912   }
1913   else
1914     CmmPut(msgs, 3, tags, msg);
1915 }
1916
1917 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
1918 {
1919   int tags[3];
1920   tags[0] = t; tags[1] = s; tags[2] = comm;
1921   AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1922   return msg;
1923 }
1924
1925 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1926     int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm, int sync)
1927 {
1928   CkDDT_DataType *ddt = getDDT()->getType(type);
1929   int len = ddt->getSize(count);
1930   int sIdx=thisIndex;
1931   int seq = -1;
1932   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_TAG_UB_VALUE) //Not cross-module: set seqno
1933     seq = oorder.nextOutgoing(destIdx);
1934   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1935   if (sync) UsrToEnv(msg)->setRef(sync);
1936   TCharm::activateVariable(buf);
1937   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1938   TCharm::deactivateVariable(buf);
1939   return msg;
1940 }
1941
1942 #if AMPI_COMLIB
1943   void
1944 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1945 {
1946   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1947 }
1948 #endif
1949
1950   void
1951 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, int sync)
1952 {
1953 #if CMK_TRACE_IN_CHARM
1954   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
1955 #endif
1956
1957 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1958   MPI_Comm disComm = myComm.getComm();
1959   ampi *dis = getAmpiInstance(disComm);
1960   CpvAccess(_currentObj) = dis;
1961 #endif
1962
1963   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1964   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),sync);
1965
1966 #if CMK_TRACE_IN_CHARM
1967   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
1968 #endif
1969
1970   if (sync == 1) {
1971     // waiting for receiver side
1972     resumeOnRecv = false;            // so no one else awakes it
1973     block();
1974   }
1975 }
1976
1977   void
1978 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1979 {
1980   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
1981   memcpy(msg->data, buf, len);
1982   CProxy_ampi pa(aid);
1983   pa[idx].generic(msg);
1984 }
1985
1986   void
1987 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy, int sync)
1988 {
1989   if(rank==MPI_PROC_NULL) return;
1990   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1991   int destIdx = dest.getIndexForRank(rank);
1992   if(isInter()){
1993     sRank = parent->thisIndex;
1994     destcomm = MPI_COMM_FIRST_INTER;
1995     destIdx = dest.getIndexForRemoteRank(rank);
1996     arrproxy = remoteProxy;
1997   }
1998   MSG_ORDER_DEBUG(
1999       CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
2000       )
2001
2002     arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm,sync));
2003
2004 #if 0
2005 #if CMK_TRACE_ENABLED
2006   int size=0;
2007   MPI_Type_size(type,&size);
2008   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
2009 #endif
2010 #endif
2011 }
2012
2013   int
2014 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
2015 {
2016   CkDDT_DataType *ddt = getDDT()->getType(type);
2017   int len = ddt->getSize(count);
2018
2019   if(msg->length < len){ // only at rare case shall we reset count by using divide
2020     count = msg->length/(ddt->getSize(1));
2021   }
2022
2023   TCharm::activateVariable(buf);
2024   if (t==MPI_REDUCE_TAG) {      // reduction msg
2025     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
2026   } else {
2027     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
2028   }
2029   TCharm::deactivateVariable(buf);
2030   return 0;
2031 }
2032
2033   int
2034 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
2035 {
2036   MPI_Comm disComm = myComm.getComm();
2037   if(s==MPI_PROC_NULL) {
2038     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
2039     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
2040     ((MPI_Status *)sts)->MPI_LENGTH = 0;
2041     return 0;
2042   }
2043 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
2044   _LOG_E_END_AMPI_PROCESSING(thisIndex)
2045 #endif
2046 #if CMK_BIGSIM_CHARM
2047    void *curLog;                // store current log in timeline
2048   _TRACE_BG_TLINE_END(&curLog);
2049   //  TRACE_BG_AMPI_SUSPEND();
2050 #if CMK_TRACE_IN_CHARM
2051   if(CpvAccess(traceOn)) traceSuspend();
2052 #endif
2053 #endif
2054
2055   if(isInter()){
2056     s = myComm.getIndexForRemoteRank(s);
2057     comm = MPI_COMM_FIRST_INTER;
2058   }
2059
2060   int tags[3];
2061   AmpiMsg *msg = 0;
2062
2063   MSG_ORDER_DEBUG(
2064       CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
2065       )
2066
2067   ampi *dis = getAmpiInstance(disComm);
2068 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2069   //  dis->yield();
2070   //  processRemoteMlogMessages();
2071 #endif
2072   int dosuspend = 0;
2073   while(1) {
2074     //This is done to take into account the case in which an ampi 
2075     // thread has migrated while waiting for a message
2076     tags[0] = t; tags[1] = s; tags[2] = comm;
2077     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
2078     if (msg) break;
2079     dis->resumeOnRecv=true;
2080     dis->thread->suspend();
2081     dosuspend = 1;
2082     dis = getAmpiInstance(disComm);
2083   }
2084
2085 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2086   CpvAccess(_currentObj) = dis;
2087   MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
2088 #endif
2089
2090     dis->resumeOnRecv=false;
2091
2092   if(sts)
2093     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2094   int status = dis->processMessage(msg, t, s, buf, count, type);
2095   if (status != 0) return status;
2096
2097 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
2098   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
2099 #endif
2100
2101 #if CMK_BIGSIM_CHARM
2102 #if CMK_TRACE_IN_CHARM
2103     //if(CpvAccess(traceOn)) CthTraceResume(thread->getThread());
2104     //Due to the reason mentioned the in the while loop above, we need to 
2105     //use "dis" as "this" in the case of migration (or out-of-core execution in BigSim)
2106     if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
2107 #endif
2108   //TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", &curLog, 1);
2109   //TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0);
2110   //_TRACE_BG_SET_INFO((char *)msg, "RECV_RESUME",  &curLog, 1);
2111 #if 0
2112 #if 1
2113   if (!dosuspend) {
2114     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 1);
2115     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2116   }
2117   else
2118 #endif
2119     TRACE_BG_ADD_TAG("RECV_RESUME_THREAD");
2120 #else
2121   TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
2122   if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2123 #endif
2124 #endif
2125
2126   delete msg;
2127   return 0;
2128 }
2129
2130   void
2131 ampi::probe(int t, int s, int comm, int *sts)
2132 {
2133   int tags[3];
2134 #if CMK_BIGSIM_CHARM
2135   void *curLog;         // store current log in timeline
2136   _TRACE_BG_TLINE_END(&curLog);
2137   //  TRACE_BG_AMPI_SUSPEND();
2138 #endif
2139
2140   AmpiMsg *msg = 0;
2141   resumeOnRecv=true;
2142   while(1) {
2143     tags[0] = t; tags[1] = s; tags[2] = comm;
2144     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2145     if (msg) break;
2146     thread->suspend();
2147   }
2148   resumeOnRecv=false;
2149   if(sts)
2150     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2151 #if CMK_BIGSIM_CHARM
2152   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
2153   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
2154 #endif
2155 }
2156
2157   int
2158 ampi::iprobe(int t, int s, int comm, int *sts)
2159 {
2160   int tags[3];
2161   AmpiMsg *msg = 0;
2162   tags[0] = t; tags[1] = s; tags[2] = comm;
2163   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2164   if (msg) {
2165     if(sts)
2166       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2167     return 1;
2168   }
2169 #if CMK_BIGSIM_CHARM
2170   void *curLog;         // store current log in timeline
2171   _TRACE_BG_TLINE_END(&curLog);
2172   //  TRACE_BG_AMPI_SUSPEND();
2173 #endif
2174   thread->schedule();
2175 #if CMK_BIGSIM_CHARM
2176   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("IPROBE_RESUME", &curLog);
2177   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
2178 #endif
2179   return 0;
2180 }
2181
2182
2183 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
2184   void
2185 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
2186 {
2187   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2188   int rootIdx=dest.getIndexForRank(root);
2189   if(rootIdx==thisIndex) {
2190 #if 0//AMPI_COMLIB
2191     ciBcast.beginIteration();
2192     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2193 #else
2194 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2195     CpvAccess(_currentObj) = this;
2196 #endif
2197     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2198 #endif
2199   }
2200   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
2201   nbcasts++;
2202 }
2203
2204   void
2205 ampi::bcastraw(void* buf, int len, CkArrayID aid)
2206 {
2207   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, MPI_COMM_WORLD);
2208   memcpy(msg->data, buf, len);
2209   CProxy_ampi pa(aid);
2210   pa.generic(msg);
2211 }
2212
2213
2214   AmpiMsg* 
2215 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
2216 {
2217   CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
2218   int unit;
2219   CkDDT_DataType *ddt = getDDT()->getType(type);
2220   unit = ddt->getSize(1);
2221   int totalsize = unit*cnt;
2222
2223   AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
2224   char* addr = (char*)Alltoallbuff+disp*unit;
2225   ddt->serialize((char*)msg->data, addr, cnt, (-1));
2226   return msg;
2227 }
2228
2229 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
2230     void *attr_in, void *attr_out, int *flag){
2231   (*flag) = 0;
2232   return (MPI_SUCCESS);
2233 }
2234 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
2235     void *attr_in, void *attr_out, int *flag){
2236   (*(void **)attr_out) = attr_in;
2237   (*flag) = 1;
2238   return (MPI_SUCCESS);
2239 }
2240 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
2241   return (MPI_SUCCESS);
2242 }
2243
2244
2245 void AmpiSeqQ::init(int numP) 
2246 {
2247   elements.init(numP);
2248 }
2249
2250 AmpiSeqQ::~AmpiSeqQ () {
2251 }
2252
2253 void AmpiSeqQ::pup(PUP::er &p) {
2254   p|out;
2255   p|elements;
2256 }
2257
2258 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
2259 {
2260   AmpiOtherElement &el=elements[srcIdx];
2261 #ifndef CMK_OPTIMIZE
2262   if (msg->seq<el.seqIncoming)
2263     CkAbort("AMPI Logic error: received late out-of-order message!\n");
2264 #endif
2265   out.enq(msg);
2266   el.nOut++; // We have another message in the out-of-order queue
2267 }
2268
2269 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
2270 {
2271   AmpiOtherElement &el=elements[srcIdx];
2272   if (el.nOut==0) return 0; // No more out-of-order left.
2273   // Walk through our out-of-order queue, searching for our next message:
2274   for (int i=0;i<out.length();i++) {
2275     AmpiMsg *msg=out.deq();
2276     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
2277       el.seqIncoming++;
2278       el.nOut--; // We have one less message out-of-order
2279       return msg;
2280     }
2281     else
2282       out.enq(msg);
2283   }
2284   // We walked the whole queue-- ours is not there.
2285   return 0;
2286 }
2287
2288 //BIGSIM_OOC DEBUGGING: Output for AmpiRequest and its children classes
2289 void AmpiRequest::print(){
2290   CmiPrintf("In AmpiRequest: buf=%p, count=%d, type=%d, src=%d, tag=%d, comm=%d, isvalid=%d\n", buf, count, type, src, tag, comm, isvalid);
2291 }
2292
2293 void PersReq::print(){
2294   AmpiRequest::print();
2295   CmiPrintf("In PersReq: sndrcv=%d\n", sndrcv);
2296 }
2297
2298 void IReq::print(){
2299   AmpiRequest::print();
2300   CmiPrintf("In IReq: this=%p, status=%d, length=%d\n", this, statusIreq, length);
2301 }
2302
2303 void ATAReq::print(){ //not complete for myreqs
2304   AmpiRequest::print();
2305   CmiPrintf("In ATAReq: elmcount=%d, idx=%d\n", elmcount, idx);
2306
2307
2308 void SReq::print(){
2309   AmpiRequest::print();
2310   CmiPrintf("In SReq: this=%p, status=%d\n", this, statusIreq);
2311 }
2312
2313 void AmpiRequestList::pup(PUP::er &p) { 
2314   if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
2315     return;
2316   }
2317
2318   p(blklen); //Allocated size of block
2319   p(len); //Number of used elements in block
2320   if(p.isUnpacking()){
2321     makeBlock(blklen,len);
2322   }
2323   int count=0;
2324   for(int i=0;i<len;i++){
2325     char nonnull;
2326     if(!p.isUnpacking()){
2327       if(block[i] == NULL){
2328         nonnull = 0;
2329       }else{
2330         nonnull = block[i]->getType();
2331       }
2332     }   
2333     p(nonnull);
2334     if(nonnull != 0){
2335       if(p.isUnpacking()){
2336         switch(nonnull){
2337           case 1:
2338             block[i] = new PersReq;
2339             break;
2340           case 2:       
2341             block[i] = new IReq;
2342             break;
2343           case 3:       
2344             block[i] = new ATAReq;
2345             break;
2346         }
2347       } 
2348       block[i]->pup(p);
2349       count++;
2350     }else{
2351       block[i] = 0;
2352     }
2353   }
2354   if(p.isDeleting()){
2355     freeBlock();
2356   }
2357 }
2358
2359 //------------------ External Interface -----------------
2360 ampiParent *getAmpiParent(void) {
2361   ampiParent *p = CtvAccess(ampiPtr);
2362 #ifndef CMK_OPTIMIZE
2363   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
2364 #endif
2365   return p;
2366 }
2367
2368 ampi *getAmpiInstance(MPI_Comm comm) {
2369   ampi *ptr=getAmpiParent()->comm2ampi(comm);
2370 #ifndef CMK_OPTIMIZE
2371   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
2372 #endif
2373   return ptr;
2374 }
2375
2376 inline static AmpiRequestList *getReqs(void) {
2377   return &(getAmpiParent()->ampiReqs);
2378 }
2379
2380 inline void checkComm(MPI_Comm comm){
2381 #ifndef CMK_OPTIMIZE
2382   getAmpiParent()->checkComm(comm);
2383 #endif
2384 }
2385
2386 inline void checkRequest(MPI_Request req){
2387 #ifndef CMK_OPTIMIZE
2388   getReqs()->checkRequest(req);
2389 #endif
2390 }
2391
2392 inline void checkRequests(int n, MPI_Request* reqs){
2393 #ifndef CMK_OPTIMIZE
2394   AmpiRequestList* reqlist = getReqs();
2395   for(int i=0;i<n;i++)
2396     reqlist->checkRequest(reqs[i]);
2397 #endif
2398 }
2399
2400 CDECL void AMPI_Migrate(void)
2401 {
2402   //  AMPIAPI("AMPI_Migrate");
2403 #if 0
2404 #if CMK_BIGSIM_CHARM
2405   TRACE_BG_AMPI_SUSPEND();
2406 #endif
2407 #endif
2408   TCHARM_Migrate();
2409
2410 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2411   ampi *currentAmpi = getAmpiInstance(MPI_COMM_WORLD);
2412   CpvAccess(_currentObj) = currentAmpi;
2413 #endif
2414
2415 #if CMK_BIGSIM_CHARM
2416   //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2417   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
2418 #endif
2419 }
2420
2421
2422 CDECL void AMPI_Evacuate(void)
2423 {
2424   TCHARM_Evacuate();
2425 }
2426
2427
2428
2429 CDECL void AMPI_Migrateto(int destPE)
2430 {
2431   AMPIAPI("AMPI_MigrateTo");
2432 #if 0
2433 #if CMK_BIGSIM_CHARM
2434   TRACE_BG_AMPI_SUSPEND();
2435 #endif
2436 #endif
2437   TCHARM_Migrate_to(destPE);
2438 #if CMK_BIGSIM_CHARM
2439   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
2440   TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
2441 #endif
2442 }
2443
2444 CDECL void AMPI_MigrateTo(int destPE)
2445 {
2446   AMPI_Migrateto(destPE);
2447 }
2448
2449 CDECL void AMPI_Async_Migrate(void)
2450 {
2451   AMPIAPI("AMPI_Async_Migrate");
2452 #if 0
2453 #if CMK_BIGSIM_CHARM
2454   TRACE_BG_AMPI_SUSPEND();
2455 #endif
2456 #endif
2457   TCHARM_Async_Migrate();
2458 #if CMK_BIGSIM_CHARM
2459   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2460   TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
2461 #endif
2462 }
2463
2464 CDECL void AMPI_Allow_Migrate(void)
2465 {
2466   AMPIAPI("AMPI_Allow_Migrate");
2467 #if 0
2468 #if CMK_BIGSIM_CHARM
2469   TRACE_BG_AMPI_SUSPEND();
2470 #endif
2471 #endif
2472   TCHARM_Allow_Migrate();
2473 #if CMK_BIGSIM_CHARM
2474   TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
2475 #endif
2476 }
2477
2478 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
2479 #if CMK_LBDB_ON
2480   //AMPIAPI("AMPI_Setmigratable");
2481   ampi *ptr=getAmpiInstance(comm);
2482   ptr->setMigratable(mig);
2483 #else
2484   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
2485 #endif
2486 }
2487
2488 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
2489 {
2490   //AMPIAPI("AMPI_Init");
2491   if (nodeinit_has_been_called) {
2492     AMPIAPI("AMPI_Init");
2493     char **argv;
2494     if (p_argv) argv=*p_argv;
2495     else argv=CkGetArgv();
2496     ampiInit(argv);
2497     if (p_argc) *p_argc=CmiGetArgc(argv);
2498   }
2499   else
2500   { /* Charm hasn't been started yet! */
2501     CkAbort("AMPI_Init> Charm is not initialized!");
2502   }
2503
2504   return 0;
2505 }
2506
2507 CDECL int AMPI_Initialized(int *isInit)
2508 {
2509   if (nodeinit_has_been_called) {
2510     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2511     *isInit=CtvAccess(ampiInitDone);
2512   }
2513   else /* !nodeinit_has_been_called */ {
2514     *isInit=nodeinit_has_been_called;
2515   }
2516   return 0;
2517 }
2518
2519 CDECL int AMPI_Finalized(int *isFinalized)
2520 {
2521   AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2522   *isFinalized=CtvAccess(ampiFinalized);
2523   return 0;
2524 }
2525
2526 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
2527 {
2528   //AMPIAPI("AMPI_Comm_rank");
2529
2530 #ifndef CMK_OPTIMIZE 
2531   if(checkCommunicator(comm) != MPI_SUCCESS)
2532     return checkCommunicator(comm);
2533 #endif
2534
2535 #if AMPIMSGLOG
2536   ampiParent* pptr = getAmpiParent();
2537   if(msgLogRead){
2538     PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
2539     return 0;
2540   }
2541 #endif
2542
2543   *rank = getAmpiInstance(comm)->getRank(comm);
2544
2545 #if AMPIMSGLOG
2546   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2547     PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
2548   }
2549 #endif
2550   return 0;
2551 }
2552
2553   CDECL
2554 int AMPI_Comm_size(MPI_Comm comm, int *size)
2555 {
2556   //AMPIAPI("AMPI_Comm_size");
2557
2558 #ifndef CMK_OPTIMIZE 
2559   if(checkCommunicator(comm) != MPI_SUCCESS)
2560     return checkCommunicator(comm);
2561 #endif
2562
2563 #if AMPIMSGLOG
2564   ampiParent* pptr = getAmpiParent();
2565   if(msgLogRead){
2566     PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
2567     return 0;
2568   }
2569 #endif
2570
2571   *size = getAmpiInstance(comm)->getSize(comm);
2572
2573 #if AMPIMSGLOG
2574   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2575     PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
2576   }
2577 #endif
2578
2579   return 0;
2580 }
2581
2582   CDECL
2583 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
2584 {
2585
2586 #ifndef CMK_OPTIMIZE 
2587   if(checkCommunicator(comm1) != MPI_SUCCESS)
2588     return checkCommunicator(comm1);
2589   if(checkCommunicator(comm2) != MPI_SUCCESS)
2590     return checkCommunicator(comm2);
2591 #endif
2592
2593   AMPIAPI("AMPI_Comm_compare");
2594   if(comm1==comm2) *result=MPI_IDENT;
2595   else{
2596     int equal=1;
2597     CkVec<int> ind1, ind2;
2598     ind1 = getAmpiInstance(comm1)->getIndices();
2599     ind2 = getAmpiInstance(comm2)->getIndices();
2600     if(ind1.size()==ind2.size()){
2601       for(int i=0;i<ind1.size();i++)
2602         if(ind1[i] != ind2[i]) { equal=0; break; }
2603     }
2604     if(equal==1) *result=MPI_CONGRUENT;
2605     else *result=MPI_UNEQUAL;
2606   }
2607   return 0;
2608 }
2609
2610 CDECL void AMPI_Exit(int /*exitCode*/)
2611 {
2612   AMPIAPI("AMPI_Exit");
2613   //finalizeBigSimTrace();
2614   TCHARM_Done();
2615 }
2616 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
2617 {
2618   AMPI_Exit(*exitCode);
2619 }
2620
2621   CDECL
2622 int AMPI_Finalize(void)
2623 {
2624   AMPIAPI("AMPI_Finalize");
2625 #if PRINT_IDLE
2626   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
2627 #endif
2628 #if AMPI_COUNTER
2629   getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
2630 #endif
2631   CtvAccess(ampiFinalized)=1;
2632
2633 #if CMK_BIGSIM_CHARM
2634 #if 0
2635   TRACE_BG_AMPI_SUSPEND();
2636 #endif
2637 #if CMK_TRACE_IN_CHARM
2638   if(CpvAccess(traceOn)) traceSuspend();
2639 #endif
2640 #endif
2641
2642   //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
2643   AMPI_Exit(0);
2644   return 0;
2645 }
2646
2647 CDECL
2648 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
2649     int tag, MPI_Comm comm) {
2650
2651 #ifndef CMK_OPTIMIZE
2652   int ret;
2653   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2654   if(ret != MPI_SUCCESS)
2655     return ret;
2656 #endif
2657
2658   AMPIAPI("AMPI_Send");
2659 #if AMPIMSGLOG
2660   if(msgLogRead){
2661     return 0;
2662   }
2663 #endif
2664
2665   ampi *ptr = getAmpiInstance(comm);
2666 #if AMPI_COMLIB
2667   if(enableStreaming){  
2668     //    ptr->getStreaming().beginIteration();
2669     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2670   } else
2671 #endif
2672     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
2673 #if AMPI_COUNTER
2674   getAmpiParent()->counters.send++;
2675 #endif
2676 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2677   //  ptr->yield();
2678   //  //  processRemoteMlogMessages();
2679 #endif
2680   return 0;
2681 }
2682
2683   CDECL
2684 int AMPI_Ssend(void *msg, int count, MPI_Datatype type, int dest,
2685     int tag, MPI_Comm comm)
2686 {
2687 #ifndef CMK_OPTIMIZE
2688   int ret;
2689   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2690   if(ret != MPI_SUCCESS)
2691     return ret;
2692 #endif
2693
2694   AMPIAPI("AMPI_Ssend");
2695 #if AMPIMSGLOG
2696   if(msgLogRead){
2697     return 0;
2698   }
2699 #endif
2700
2701   ampi *ptr = getAmpiInstance(comm);
2702 #if AMPI_COMLIB
2703   if(enableStreaming){
2704     ptr->getStreaming().beginIteration();
2705     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2706   } else
2707 #endif
2708     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm, 1);
2709 #if AMPI_COUNTER
2710   getAmpiParent()->counters.send++;
2711 #endif
2712
2713   return 0;
2714 }
2715
2716   CDECL
2717 int AMPI_Issend(void *buf, int count, MPI_Datatype type, int dest,
2718     int tag, MPI_Comm comm, MPI_Request *request)
2719 {
2720   AMPIAPI("AMPI_Issend");
2721
2722 #ifndef CMK_OPTIMIZE
2723   int ret;
2724   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
2725   if(ret != MPI_SUCCESS)
2726   {
2727     *request = MPI_REQUEST_NULL;
2728     return ret;
2729   }
2730 #endif
2731
2732 #if AMPIMSGLOG
2733   ampiParent* pptr = getAmpiParent();
2734   if(msgLogRead){
2735     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
2736     return 0;
2737   }
2738 #endif
2739
2740   USER_CALL_DEBUG("AMPI_Issend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
2741   ampi *ptr = getAmpiInstance(comm);
2742   AmpiRequestList* reqs = getReqs();
2743   SReq *newreq = new SReq(comm);
2744   *request = reqs->insert(newreq);
2745   // 1:  blocking now  - used by MPI_Ssend
2746   // >=2:  the index of the requests - used by MPI_Issend
2747   ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm, *request+2);
2748 #if AMPI_COUNTER
2749   getAmpiParent()->counters.isend++;
2750 #endif
2751
2752 #if AMPIMSGLOG
2753   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2754     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
2755   }
2756 #endif
2757
2758   return 0;
2759 }
2760
2761   CDECL
2762 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
2763     MPI_Comm comm, MPI_Status *status)
2764 {
2765   AMPIAPI("AMPI_Recv");
2766
2767 #ifndef CMK_OPTIMIZE
2768   int ret;
2769   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, msg, 1);
2770   if(ret != MPI_SUCCESS)
2771     return ret;
2772 #endif
2773
2774 #if AMPIMSGLOG
2775   ampiParent* pptr = getAmpiParent();
2776   if(msgLogRead){
2777     (*(pptr->fromPUPer))|(pptr->pupBytes);
2778     PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
2779     PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
2780     return 0;
2781   }
2782 #endif
2783
2784   ampi *ptr = getAmpiInstance(comm);
2785   if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
2786
2787 #if AMPI_COUNTER
2788   getAmpiParent()->counters.recv++;
2789 #endif
2790
2791 #if AMPIMSGLOG
2792   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2793     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2794     (*(pptr->toPUPer))|(pptr->pupBytes);
2795     PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
2796     PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
2797   }
2798 #endif
2799
2800   return 0;
2801 }
2802
2803   CDECL
2804 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
2805 {
2806
2807 #ifndef CMK_OPTIMIZE
2808   int ret;
2809   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2810   if(ret != MPI_SUCCESS)
2811     return ret;
2812 #endif
2813
2814   AMPIAPI("AMPI_Probe");
2815   ampi *ptr = getAmpiInstance(comm);
2816   ptr->probe(tag,src, comm, (int*) status);
2817   return 0;
2818 }
2819
2820   CDECL
2821 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
2822 {
2823   AMPIAPI("AMPI_Iprobe");
2824
2825 #ifndef CMK_OPTIMIZE
2826   int ret;
2827   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2828   if(ret != MPI_SUCCESS)
2829     return ret;
2830 #endif
2831
2832   ampi *ptr = getAmpiInstance(comm);
2833   *flag = ptr->iprobe(tag,src,comm,(int*) status);
2834   return 0;
2835 }
2836
2837   CDECL
2838 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
2839     int stag, void *rbuf, int rcount, int rtype,
2840     int src, int rtag, MPI_Comm comm, MPI_Status *sts)
2841 {
2842   AMPIAPI("AMPI_Sendrecv");
2843
2844 #ifndef CMK_OPTIMIZE
2845   int ret;
2846   ret = errorCheck(comm, 1, scount, 1, stype, 1, stag, 1, dest, 1, sbuf, 1);
2847   if(ret != MPI_SUCCESS)
2848     return ret;
2849   ret = errorCheck(comm, 1, rcount, 1, rtype, 1, rtag, 1, src, 1, rbuf, 1);
2850   if(ret != MPI_SUCCESS)
2851     return ret;
2852 #endif
2853
2854   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
2855   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
2856   if (se) return se;
2857   else return re;
2858 }
2859
2860   CDECL
2861 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
2862     int dest, int sendtag, int source, int recvtag,
2863     MPI_Comm comm, MPI_Status *status)
2864 {
2865   AMPIAPI("AMPI_Sendrecv_replace");
2866   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
2867       buf, count, datatype, source, recvtag, comm, status);
2868 }
2869
2870
2871   CDECL
2872 int AMPI_Barrier(MPI_Comm comm)
2873 {
2874   AMPIAPI("AMPI_Barrier");
2875
2876 #ifndef CMK_OPTIMIZE
2877   if(checkCommunicator(comm) != MPI_SUCCESS)
2878     return checkCommunicator(comm);
2879 #endif
2880
2881   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
2882
2883   TRACE_BG_AMPI_LOG(MPI_BARRIER, 0);
2884
2885   //HACK: Use collective operation as a barrier.
2886   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
2887
2888   //BIGSIM_OOC DEBUGGING
2889   //CkPrintf("%d: in AMPI_Barrier, after AMPI_Allreduce\n", getAmpiParent()->thisIndex);
2890 #if AMPI_COUNTER
2891   getAmpiParent()->counters.barrier++;
2892 #endif
2893   return 0;
2894 }
2895
2896   CDECL
2897 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
2898     MPI_Comm comm)
2899 {
2900   AMPIAPI("AMPI_Bcast");
2901
2902 #ifndef CMK_OPTIMIZE
2903   int ret;
2904   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, buf, 1);
2905   if(ret != MPI_SUCCESS)
2906     return ret;
2907 #endif
2908
2909   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
2910   if(comm==MPI_COMM_SELF) return 0;
2911
2912 #if AMPIMSGLOG
2913   ampiParent* pptr = getAmpiParent();
2914   if(msgLogRead){
2915     (*(pptr->fromPUPer))|(pptr->pupBytes);
2916     PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
2917     return 0;
2918   }
2919 #endif
2920
2921   ampi* ptr = getAmpiInstance(comm);
2922   ptr->bcast(root, buf, count, type,comm);
2923 #if AMPI_COUNTER
2924   getAmpiParent()->counters.bcast++;
2925 #endif
2926
2927 #if AMPIMSGLOG
2928   if(msgLogWrite && record_msglog(pptr->thisIndex)) {
2929     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2930     (*(pptr->toPUPer))|(pptr->pupBytes);
2931     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
2932   }
2933 #endif
2934 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2935   //  ptr->yield();
2936   //  //  processRemoteMlogMessages();
2937 #endif
2938
2939   return 0;
2940 }
2941
2942 /// This routine is called with the results of a Reduce or AllReduce
2943 const int MPI_REDUCE_SOURCE=0;
2944 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2945 void ampi::reduceResult(CkReductionMsg *msg)
2946 {
2947   MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2948   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2949       thisArrayID,thisIndex);
2950   delete msg;
2951 }
2952
2953 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2954 {
2955   int szdata = ddt->getSize(count);
2956   int szhdr = sizeof(AmpiOpHeader);
2957   AmpiOpHeader newhdr(op,type,count,szdata); 
2958   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2959   memcpy(msg->getData(),&newhdr,szhdr);
2960   if (count > 0) {
2961     TCharm::activateVariable(inbuf);
2962     ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2963     TCharm::deactivateVariable(inbuf);
2964   }
2965   return msg;
2966 }
2967
2968 // Copy the MPI datatype "type" from inbuf to outbuf
2969 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2970   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2971   //  deserializing into the output.
2972   ampi *ptr = getAmpiInstance(comm);
2973   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2974   int len=ddt->getSize(count);
2975   char *serialized=new char[len];
2976   TCharm::activateVariable(inbuf);
2977   TCharm::activateVariable(outbuf);
2978   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
2979   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
2980   TCharm::deactivateVariable(outbuf);
2981   TCharm::deactivateVariable(inbuf);
2982   delete [] serialized;         // < memory leak!  // gzheng 
2983
2984   return MPI_SUCCESS;
2985 }
2986
2987 #define SYNCHRONOUS_REDUCE                           0
2988
2989   CDECL
2990 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
2991     int root, MPI_Comm comm)
2992 {
2993   AMPIAPI("AMPI_Reduce");
2994
2995 #ifndef CMK_OPTIMIZE
2996   int ret;
2997   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, 0, 0);
2998   if(ret != MPI_SUCCESS)
2999     return ret;
3000 #endif
3001
3002   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3003
3004 #if AMPIMSGLOG
3005   ampiParent* pptr = getAmpiParent();
3006   if(msgLogRead){
3007     (*(pptr->fromPUPer))|(pptr->pupBytes);
3008     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
3009     return 0;
3010   }
3011 #endif
3012
3013   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
3014   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
3015   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
3016
3017   ampi *ptr = getAmpiInstance(comm);
3018   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
3019   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
3020   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
3021
3022   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3023
3024   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
3025   msg->setCallback(reduceCB);
3026   MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
3027   ptr->contribute(msg);
3028
3029   if (ptr->thisIndex == rootIdx){
3030     /*HACK: Use recv() to block until reduction data comes back*/
3031     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3032       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
3033
3034 #if SYNCHRONOUS_REDUCE
3035       AmpiMsg *msg = new (0, 0) AmpiMsg(-1, MPI_REDUCE_TAG, -1, rootIdx, 0, MPI_REDUCE_COMM);
3036       CProxy_ampi pa(ptr->getProxy());
3037       pa.generic(msg);
3038 #endif
3039   }
3040 #if SYNCHRONOUS_REDUCE
3041   ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, NULL, 0, type, MPI_REDUCE_COMM);
3042 #endif
3043
3044 #if AMPI_COUNTER
3045   getAmpiParent()->counters.reduce++;
3046 #endif
3047
3048 #if AMPIMSGLOG
3049   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3050     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3051     (*(pptr->toPUPer))|(pptr->pupBytes);
3052     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3053   }
3054 #endif
3055
3056   return 0;
3057 }
3058
3059   CDECL
3060 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
3061     MPI_Op op, MPI_Comm comm)
3062 {
3063   AMPIAPI("AMPI_Allreduce");
3064
3065 #ifndef CMK_OPTIMIZE
3066   int ret;
3067   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, 0, 0);
3068   if(ret != MPI_SUCCESS)
3069     return ret;
3070 #endif
3071
3072   ampi *ptr = getAmpiInstance(comm);
3073
3074   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
3075   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
3076   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
3077
3078   CkDDT_DataType *ddt_type = ptr->getDDT()->getType(type);
3079
3080 #if CMK_BIGSIM_CHARM
3081   TRACE_BG_AMPI_LOG(MPI_ALLREDUCE, ddt_type->getSize(count));
3082 #endif
3083
3084   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3085
3086 #if AMPIMSGLOG
3087   ampiParent* pptr = getAmpiParent();
3088   if(msgLogRead){
3089     (*(pptr->fromPUPer))|(pptr->pupBytes);
3090     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
3091     //    CkExit();
3092     return 0;
3093   }
3094 #endif
3095
3096   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
3097   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
3098
3099   CkReductionMsg *msg=makeRednMsg(ddt_type, inbuf, count, type, op);
3100   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3101   msg->setCallback(allreduceCB);
3102   ptr->contribute(msg);
3103
3104   /*HACK: Use recv() to block until the reduction data comes back*/
3105   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3106     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
3107 #if AMPI_COUNTER
3108   getAmpiParent()->counters.allreduce++;
3109 #endif
3110
3111 #if AMPIMSGLOG
3112   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3113     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3114     (*(pptr->toPUPer))|(pptr->pupBytes);
3115     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3116     //    CkExit();
3117   }
3118 #endif
3119
3120   return 0;
3121 }
3122
3123   CDECL
3124 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
3125     MPI_Op op, MPI_Comm comm, MPI_Request* request)
3126 {
3127   AMPIAPI("AMPI_Iallreduce");
3128
3129 #ifndef CMK_OPTIMIZE
3130   int ret;
3131   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, inbuf, 1);
3132   if(ret != MPI_SUCCESS)
3133   {
3134     *request = MPI_REQUEST_NULL;
3135     return ret;
3136   }
3137 #endif
3138
3139   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3140
3141   checkRequest(*request);
3142   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
3143   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
3144   ampi *ptr = getAmpiInstance(comm);
3145
3146   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3147   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3148   msg->setCallback(allreduceCB);
3149   ptr->contribute(msg);
3150
3151   // using irecv instead recv to non-block the call and get request pointer
3152   AmpiRequestList* reqs = getReqs();
3153   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
3154   *request = reqs->insert(newreq);
3155   return 0;
3156 }
3157
3158   CDECL
3159 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
3160     MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
3161 {
3162   AMPIAPI("AMPI_Reduce_scatter");
3163
3164 #ifndef CMK_OPTIMIZE
3165   int ret;
3166   ret = errorCheck(comm, 1, 0, 0, datatype, 1, 0, 0, 0, 0, sendbuf, 1);
3167   if(ret != MPI_SUCCESS)
3168     return ret;
3169 #endif
3170
3171   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
3172   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
3173   ampi *ptr = getAmpiInstance(comm);
3174   int size = ptr->getSize(comm);
3175   int count=0;
3176   int *displs = new int [size];
3177   int len;
3178   void *tmpbuf;
3179
3180   //under construction
3181   for(int i=0;i<size;i++){
3182     displs[i] = count;
3183     count+= recvcounts[i];
3184   }
3185   len = ptr->getDDT()->getType(datatype)->getSize(count);
3186   tmpbuf = malloc(len);
3187   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
3188   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
3189       recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
3190   free(tmpbuf);
3191   delete [] displs;     // < memory leak ! // gzheng
3192   return 0;
3193 }
3194
3195 /***** MPI_Scan algorithm (from MPICH) *******
3196   recvbuf = sendbuf;
3197   partial_scan = sendbuf;
3198   mask = 0x1;
3199   while (mask < size) {
3200   dst = rank^mask;
3201   if (dst < size) {
3202   send partial_scan to dst;
3203   recv from dst into tmp_buf;
3204   if (rank > dst) {
3205   partial_scan = tmp_buf + partial_scan;
3206   recvbuf = tmp_buf + recvbuf;
3207   }
3208   else {
3209   if (op is commutative)
3210   partial_scan = tmp_buf + partial_scan;
3211   else {
3212   tmp_buf = partial_scan + tmp_buf;
3213   partial_scan = tmp_buf;
3214   }
3215   }
3216   }
3217   mask <<= 1;
3218   }
3219  ***** MPI_Scan algorithm (from MPICH) *******/
3220
3221 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
3222   (op)(invec,inoutvec,&count,&datatype);
3223 }
3224 CDECL
3225 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
3226   AMPIAPI("AMPI_Scan");
3227
3228 #ifndef CMK_OPTIMIZE
3229   int ret;
3230   ret = errorCheck(comm, 1, count, 1, datatype, 1, 0, 0, 0, 0, sendbuf, 1);
3231   if(ret != MPI_SUCCESS)
3232     return ret;
3233 #endif
3234
3235   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
3236   MPI_Status sts;
3237   ampi *ptr = getAmpiInstance(comm);
3238   int size = ptr->getSize(comm);
3239   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
3240   int rank = ptr->getRank(comm);
3241   int mask = 0x1;
3242   int dst;
3243   void* tmp_buf = malloc(blklen);
3244   void* partial_scan = malloc(blklen);
3245
3246   memcpy(recvbuf, sendbuf, blklen);
3247   memcpy(partial_scan, sendbuf, blklen);
3248   while(mask < size){
3249     dst = rank^mask;
3250     if(dst < size){
3251       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
3252           tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
3253       if(rank > dst){
3254         (op)(tmp_buf,partial_scan,&count,&datatype);
3255         (op)(tmp_buf,recvbuf,&count,&datatype);
3256       }else {
3257         (op)(partial_scan,tmp_buf,&count,&datatype);
3258         memcpy(partial_scan,tmp_buf,blklen);
3259       }
3260     }
3261     mask <<= 1;
3262
3263   }
3264
3265   free(tmp_buf);
3266   free(partial_scan);
3267 #if AMPI_COUNTER
3268   getAmpiParent()->counters.scan++;
3269 #endif
3270   return 0;
3271 }
3272
3273 CDECL
3274 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
3275   //AMPIAPI("AMPI_Op_create");
3276   *op = function;
3277   return 0;
3278 }
3279
3280 CDECL
3281 int AMPI_Op_free(MPI_Op *op){
3282   //AMPIAPI("AMPI_Op_free");
3283   *op = MPI_OP_NULL;
3284   return 0;
3285 }
3286
3287
3288   CDECL
3289 double AMPI_Wtime(void)
3290 {
3291   //  AMPIAPI("AMPI_Wtime");
3292
3293 #if AMPIMSGLOG
3294   double ret=TCHARM_Wall_timer();
3295   ampiParent* pptr = getAmpiParent();
3296   if(msgLogRead){
3297     (*(pptr->fromPUPer))|ret;
3298     return ret;
3299   }
3300
3301   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3302     (*(pptr->toPUPer))|ret;
3303   }
3304 #endif
3305
3306 #if CMK_BIGSIM_CHARM
3307   return BgGetTime();
3308 #else
3309   return TCHARM_Wall_timer();
3310 #endif
3311 }
3312
3313 CDECL
3314 double AMPI_Wtick(void){
3315   //AMPIAPI("AMPI_Wtick");
3316   return 1e-6;
3317 }
3318
3319
3320 int PersReq::start(){
3321   if(sndrcv == 1 || sndrcv == 3) { // send or ssend request
3322     ampi *ptr=getAmpiInstance(comm);
3323     ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm, sndrcv==3?1:0);
3324   }
3325   return 0;
3326 }
3327
3328   CDECL
3329 int AMPI_Start(MPI_Request *request)
3330 {
3331   AMPIAPI("AMPI_Start");
3332   checkRequest(*request);
3333   AmpiRequestList *reqs = getReqs();
3334   if(-1==(*reqs)[*request]->start()) {
3335     CkAbort("MPI_Start could be used only on persistent communication requests!");
3336   }
3337   return 0;
3338 }
3339
3340 CDECL
3341 int AMPI_Startall(int count, MPI_Request *requests){
3342   AMPIAPI("AMPI_Startall");
3343   checkRequests(count,requests);
3344   AmpiRequestList *reqs = getReqs();
3345   for(int i=0;i<count;i++){
3346     if(-1==(*reqs)[requests[i]]->start())
3347       CkAbort("MPI_Start could be used only on persistent communication requests!");
3348   }
3349   return 0;
3350 }
3351
3352 /* organize the indices of requests into a vector of a vector: 
3353  * level 1 is different msg envelope matches
3354  * level 2 is (posting) ordered requests of with envelope
3355  * each time multiple completion call loop over first elem of level 1
3356  * and move the matched to the NULL request slot.   
3357  * warning: this does not work with I-Alltoall requests */
3358 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
3359   for(int i=0;i<count;i++){
3360     if(reqs[i]!=MPI_REQUEST_NULL)
3361       return 0;
3362   }
3363   return 1;
3364 }
3365 inline int matchReq(MPI_Request ia, MPI_Request ib){
3366   checkRequest(ia);  
3367   checkRequest(ib);
3368   AmpiRequestList* reqs = getReqs();
3369   AmpiRequest *a, *b;
3370   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
3371   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
3372   a=(*reqs)[ia];  b=(*reqs)[ib];
3373   if(a->tag != b->tag) return 0;
3374   if(a->src != b->src) return 0;
3375   if(a->comm != b->comm) return 0;
3376   return 1;
3377 }
3378 inline void swapInt(int& a,int& b){
3379   int tmp;
3380   tmp=a; a=b; b=tmp;
3381 }
3382 inline void sortedIndex(int n, int* arr, int* idx){
3383   int i,j;
3384   for(i=0;i<n;i++) 
3385     idx[i]=i;
3386   for (i=0; i<n-1; i++) 
3387     for (j=0; j<n-1-i; j++)
3388       if (arr[idx[j+1]] < arr[idx[j]]) 
3389         swapInt(idx[j+1],idx[j]);
3390 }
3391 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
3392   CkAssert(count!=0);
3393   int *newidx = new int [count];
3394   int flag;
3395   sortedIndex(count,arr,newidx);
3396   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
3397   CkVec<int> slot;
3398   vec->push_back(slot);
3399   (*vec)[0].push_back(newidx[0]);
3400   for(int i=1;i<count;i++){
3401     flag=0;
3402     for(int j=0;j<vec->size();j++){
3403       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
3404         ((*vec)[j]).push_back(newidx[i]);
3405         flag++;
3406       }
3407     }
3408     if(!flag){
3409       CkVec<int> newslot;
3410       newslot.push_back(newidx[i]);
3411       vec->push_back(newslot);
3412     }else{
3413       CkAssert(flag==1);
3414     }
3415   }
3416   delete [] newidx;
3417   return vec;
3418 }
3419 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
3420   printf("vec content: ");
3421   for(int i=0;i<vec.size();i++){
3422     printf("{");
3423     for(int j=0;j<(vec[i]).size();j++){
3424       printf(" %d ",arr[(vec[i])[j]]);
3425     }
3426     printf("} ");
3427   }
3428   printf("\n");
3429 }
3430
3431 int PersReq::wait(MPI_Status *sts){
3432   if(sndrcv == 2) {
3433     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3434       CkAbort("AMPI> Error in persistent request wait");
3435 #if CMK_BIGSIM_CHARM
3436     _TRACE_BG_TLINE_END(&event);
3437 #endif
3438   }
3439   return 0;
3440 }
3441
3442 int IReq::wait(MPI_Status *sts){
3443   if(CpvAccess(CmiPICMethod) == 2) {
3444     AMPI_DEBUG("In weird clause of IReq::wait\n");
3445     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3446       CkAbort("AMPI> Error in non-blocking request wait");
3447
3448     return 0;
3449   }
3450
3451   //Copy "this" to a local variable in the case that "this" pointer
3452   //is updated during the out-of-core emulation.
3453
3454   // optimization for Irecv
3455   // generic() writes directly to the buffer, so the only thing we
3456   // do here is to wait
3457   ampi *ptr = getAmpiInstance(comm);
3458
3459   //BIGSIM_OOC DEBUGGING
3460   //int ooccnt=0;
3461   //int ampiIndex = ptr->thisIndex;
3462   //CmiPrintf("%d: IReq's status=%d\n", ampiIndex, statusIreq);
3463
3464   while (statusIreq == false) {
3465     //BIGSIM_OOC DEBUGGING
3466     //CmiPrintf("Before blocking: %dth time: %d: in Ireq::wait\n", ++ooccnt, ptr->thisIndex);
3467     //print();
3468
3469     ptr->resumeOnRecv=true;
3470     ptr->block();
3471
3472     //BIGSIM_OOC DEBUGGING
3473     //CmiPrintf("[%d] After blocking: in Ireq::wait\n", ptr->thisIndex);
3474     //CmiPrintf("IReq's this pointer: %p\n", this);
3475     //print();
3476
3477 #if CMK_BIGSIM_CHARM
3478     //Because of the out-of-core emulation, this pointer is changed after in-out
3479     //memory operation. So we need to return from this function and do the while loop
3480     //in the outer function call.       
3481     if(_BgInOutOfCoreMode)
3482       return -1;
3483 #endif  
3484   }   // end of while
3485   ptr->resumeOnRecv=false;
3486
3487   AMPI_DEBUG("IReq::wait has resumed\n");
3488
3489   if(sts) {
3490     AMPI_DEBUG("Setting sts->MPI_TAG to this->tag=%d in IReq::wait  this=%p\n", (int)this->tag, this);
3491     sts->MPI_TAG = tag;
3492     sts->MPI_SOURCE = src;
3493     sts->MPI_COMM = comm;
3494     sts->MPI_LENGTH = length;
3495   }
3496
3497   return 0;
3498 }
3499
3500 int ATAReq::wait(MPI_Status *sts){
3501   int i;
3502   for(i=0;i<count;i++){
3503     if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3504           myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
3505       CkAbort("AMPI> Error in alltoall request wait");
3506 #if CMK_BIGSIM_CHARM
3507     _TRACE_BG_TLINE_END(&myreqs[i].event);
3508 #endif
3509   }
3510 #if CMK_BIGSIM_CHARM
3511   //TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", NULL, 0);
3512   TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0, 1);
3513   for (i=0; i<count; i++)
3514     _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
3515   _TRACE_BG_TLINE_END(&event);
3516 #endif
3517   return 0;
3518 }
3519
3520 int SReq::wait(MPI_Status *sts){
3521   ampi *ptr = getAmpiInstance(comm);
3522   while (statusIreq == false) {
3523     ptr->resumeOnRecv = true;
3524     ptr->block();
3525     ptr = getAmpiInstance(comm);
3526     ptr->resumeOnRecv = false;
3527   }
3528   return 0;
3529 }
3530
3531   CDECL
3532 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
3533 {
3534   AMPIAPI("AMPI_Wait");
3535   if(*request == MPI_REQUEST_NULL){
3536     stsempty(*sts);
3537     return 0;
3538   }
3539   checkRequest(*request);
3540   AmpiRequestList* reqs = getReqs();
3541
3542 #if AMPIMSGLOG
3543   ampiParent* pptr = getAmpiParent();
3544   if(msgLogRead){
3545     (*(pptr->fromPUPer))|(pptr->pupBytes);
3546     PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3547     PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
3548     return 0;
3549   }
3550 #endif
3551
3552   AMPI_DEBUG("AMPI_Wait request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3553   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3554   //(*reqs)[*request]->wait(sts);
3555   int waitResult = -1;
3556   do{
3557     AmpiRequest *waitReq = (*reqs)[*request];
3558     waitResult = waitReq->wait(sts);
3559     if(_BgInOutOfCoreMode){
3560       reqs = getReqs();
3561     }
3562   }while(waitResult==-1);
3563
3564
3565   AMPI_DEBUG("AMPI_Wait after calling wait, request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3566
3567
3568 #if AMPIMSGLOG
3569   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3570     (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
3571     (*(pptr->toPUPer))|(pptr->pupBytes);
3572     PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3573     PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
3574   }
3575 #endif
3576
3577   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3578     reqs->free(*request);
3579     *request = MPI_REQUEST_NULL;
3580   }
3581
3582   AMPI_DEBUG("End of AMPI_Wait\n");
3583
3584   return 0;
3585 }
3586
3587   CDECL
3588 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
3589 {
3590   AMPIAPI("AMPI_Waitall");
3591   if(count==0) return MPI_SUCCESS;
3592   checkRequests(count,request);
3593   int i,j,oldPe;
3594   AmpiRequestList* reqs = getReqs();
3595   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3596
3597 #if AMPIMSGLOG
3598   ampiParent* pptr = getAmpiParent();
3599   if(msgLogRead){
3600     for(i=0;i<reqvec->size();i++){
3601       for(j=0;j<((*reqvec)[i]).size();j++){
3602         if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3603           stsempty(sts[((*reqvec)[i])[j]]);
3604           continue;
3605         }
3606         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3607         (*(pptr->fromPUPer))|(pptr->pupBytes);
3608         PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3609         PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3610       }
3611     }
3612     return 0;
3613   }
3614 #endif
3615
3616 #if CMK_BIGSIM_CHARM
3617   void *curLog;         // store current log in timeline
3618   _TRACE_BG_TLINE_END(&curLog);
3619 #if 0
3620   TRACE_BG_AMPI_SUSPEND();
3621 #endif
3622 #endif
3623   for(i=0;i<reqvec->size();i++){
3624     for(j=0;j<((*reqvec)[i]).size();j++){
3625       //CkPrintf("[%d] in loop [%d, %d]\n", pptr->thisIndex,i, j);
3626       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3627         stsempty(sts[((*reqvec)[i])[j]]);
3628         continue;
3629       }
3630       oldPe = CkMyPe();
3631
3632       int waitResult = -1;
3633       do{       
3634         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3635         waitResult = waitReq->wait(&sts[((*reqvec)[i])[j]]);
3636         if(_BgInOutOfCoreMode){
3637           reqs = getReqs();
3638           reqvec = vecIndex(count, request);
3639         }
3640
3641 #if AMPIMSGLOG
3642         if(msgLogWrite && record_msglog(pptr->thisIndex)){
3643           (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
3644           (*(pptr->toPUPer))|(pptr->pupBytes);
3645           PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3646           PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3647         }
3648 #endif
3649
3650       }while(waitResult==-1);
3651
3652 #if 1
3653 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3654       //for fault evacuation
3655       if(oldPe != CkMyPe()){
3656 #endif
3657         reqs = getReqs();
3658         reqvec  = vecIndex(count,request);
3659 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3660       }
3661 #endif
3662 #endif
3663     }
3664   }
3665 #if CMK_BIGSIM_CHARM
3666   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
3667 #endif
3668   // free memory of requests
3669   for(i=0;i<count;i++){ 
3670     if(request[i] == MPI_REQUEST_NULL)
3671       continue;
3672     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
3673       reqs->free(request[i]);
3674       request[i] = MPI_REQUEST_NULL;
3675     }
3676   }
3677   delete reqvec;
3678   return 0;
3679 }
3680
3681   CDECL
3682 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
3683 {
3684   AMPIAPI("AMPI_Waitany");
3685
3686   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
3687   if(count == 0) return MPI_SUCCESS;
3688   checkRequests(count,request);
3689   if(areInactiveReqs(count,request)){
3690     *idx=MPI_UNDEFINED;
3691     stsempty(*sts);
3692     return MPI_SUCCESS;
3693   }
3694   int flag=0;
3695   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3696   while(count>0){ /* keep looping until some request finishes: */
3697     for(int i=0;i<reqvec->size();i++){
3698       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
3699       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
3700         *idx = ((*reqvec)[i])[0];
3701         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
3702         return 0;
3703       }
3704     }
3705     /* no requests have finished yet-- schedule and try again */
3706     AMPI_Yield(MPI_COMM_WORLD);
3707   }
3708   *idx = MPI_UNDEFINED;
3709   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
3710   delete reqvec;
3711   return 0;
3712 }
3713
3714   CDECL
3715 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
3716     int *array_of_indices, MPI_Status *array_of_statuses)
3717 {
3718   AMPIAPI("AMPI_Waitsome");
3719   checkRequests(incount,array_of_requests);
3720   if(areInactiveReqs(incount,array_of_requests)){
3721     *outcount=MPI_UNDEFINED;
3722     return MPI_SUCCESS;
3723   }
3724   MPI_Status sts;
3725   int i;
3726   int flag=0, realflag=0;
3727   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3728   *outcount = 0;
3729   while(1){
3730     for(i=0;i<reqvec->size();i++){
3731       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3732       if(flag == 1){ 
3733         array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3734         array_of_statuses[(*outcount)++]=sts;
3735         if(sts.MPI_COMM != 0)
3736           realflag=1; // there is real(non null) request
3737       }
3738     }
3739     if(realflag && outcount>0) break;
3740   }
3741   delete reqvec;
3742   return 0;
3743 }
3744
3745   CmiBool PersReq::test(MPI_Status *sts){
3746     if(sndrcv == 2)     // recv request
3747       return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3748     else                        // send request
3749       return 1;
3750
3751   }
3752   void PersReq::complete(MPI_Status *sts){
3753     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3754       CkAbort("AMPI> Error in persistent request complete");
3755   }
3756
3757 CmiBool IReq::test(MPI_Status *sts){
3758   if (statusIreq == true) {           
3759     if(sts)
3760       sts->MPI_LENGTH = length;           
3761     return true;
3762   }
3763   else {
3764     getAmpiInstance(comm)-&