AMPI_Sendrecv: Reject MPI_IN_PLACE where the standard doesn't call for it
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1
2 #define AMPIMSGLOG    0
3
4 #define exit exit /*Supress definition of exit in ampi.h*/
5 #include "ampiimpl.h"
6 #include "tcharm.h"
7 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
8 #include "ampiEvents.h" /*** for trace generation for projector *****/
9 #include "ampiProjections.h"
10 #endif
11
12 #if CMK_BIGSIM_CHARM
13 #include "bigsim_logs.h"
14 #endif
15
16 #define CART_TOPOL 1
17 #define AMPI_PRINT_IDLE 0
18
19 /* change this define to "x" to trace all send/recv's */
20 #define MSG_ORDER_DEBUG(x)  //x /* empty */
21 /* change this define to "x" to trace user calls */
22 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
23 #define STARTUP_DEBUG(x)  //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
24 #define FUNCCALL_DEBUG(x) //x /* empty */
25
26 static CkDDT *getDDT(void) {
27   return getAmpiParent()->myDDT;
28 }
29
30   inline int checkCommunicator(MPI_Comm comm) {
31     if(comm == MPI_COMM_NULL)
32       return MPI_ERR_COMM;
33     return MPI_SUCCESS;
34   }
35
36   inline int checkCount(int count) {
37     if(count < 0)
38       return MPI_ERR_COUNT;
39     return MPI_SUCCESS;
40   }
41
42   inline int checkData(MPI_Datatype data) {
43     if(data == MPI_DATATYPE_NULL)
44       return MPI_ERR_TYPE;
45     return MPI_SUCCESS;
46   }
47
48   inline int checkTag(int tag) {
49     if(tag != MPI_ANY_TAG && tag < 0)
50       return MPI_ERR_TAG;
51     return MPI_SUCCESS;
52   }
53
54 inline int checkRank(int rank, MPI_Comm comm) {
55   int size;
56   AMPI_Comm_size(comm, &size);
57   if(((rank >= 0) && (rank < size)) || (rank == MPI_ANY_SOURCE) || (rank ==
58         MPI_PROC_NULL))
59     return MPI_SUCCESS;
60   return MPI_ERR_RANK;
61 }
62
63   inline int checkBuf(void *buf, int count) {
64     if((count != 0 && buf == NULL))
65       return MPI_ERR_BUFFER;
66     return MPI_SUCCESS;
67   }
68
69 inline int errorCheck(MPI_Comm comm, int ifComm, int count, int ifCount,
70                       MPI_Datatype data, int ifData, int tag, int ifTag,
71                       int rank, int ifRank,
72                       void *buf1, int ifBuf1, void *buf2 = 0, int ifBuf2 = 0) {
73   int ret;
74   if(ifComm) { 
75     ret = checkCommunicator(comm);
76     if(ret != MPI_SUCCESS)
77       return ret;
78   }
79   if(ifCount) {
80     ret = checkCount(count);
81     if(ret != MPI_SUCCESS)
82       return ret;
83   }
84   if(ifData) {
85     ret = checkData(data);
86     if(ret != MPI_SUCCESS)
87       return ret;
88   }
89   if(ifTag) {
90     ret = checkTag(tag);
91     if(ret != MPI_SUCCESS)
92       return ret;
93   }
94   if(ifRank) {
95     ret = checkRank(rank,comm);
96     if(ret != MPI_SUCCESS)
97       return ret;
98   }
99   if(ifBuf1) {
100     ret = checkBuf(buf1,count);
101     if(ret != MPI_SUCCESS)
102       return ret;
103   }
104   if(ifBuf2) {
105     ret = checkBuf(buf2,count);
106     if(ret != MPI_SUCCESS)
107       return ret;
108   }
109   return MPI_SUCCESS;
110 }
111
112 //------------- startup -------------
113 static mpi_comm_worlds mpi_worlds;
114
115 int _mpi_nworlds; /*Accessed by ampif*/
116 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
117
118 /* ampiReducer: AMPI's generic reducer type 
119    MPI_Op is function pointer to MPI_User_function
120    so that it can be packed into AmpiOpHeader, shipped 
121    with the reduction message, and then plugged into 
122    the ampiReducer. 
123    One little trick is the ampi::recv which receives
124    the final reduction message will see additional
125    sizeof(AmpiOpHeader) bytes in the buffer before
126    any user data.                             */
127 class AmpiComplex { 
128   public: 
129     double re, im; 
130     void operator+=(const AmpiComplex &a) {
131       re+=a.re;
132       im+=a.im;
133     }
134     void operator*=(const AmpiComplex &a) {
135       double nu_re=re*a.re-im*a.im;
136       im=re*a.im+im*a.re;
137       re=nu_re;
138     }
139     int operator>(const AmpiComplex &a) {
140       CkAbort("Cannot compare complex numbers with MPI_MAX");
141       return 0;
142     }
143     int operator<(const AmpiComplex &a) {
144       CkAbort("Cannot compare complex numbers with MPI_MIN");
145       return 0;
146     }
147 };
148 typedef struct { float val; int idx; } FloatInt;
149 typedef struct { double val; int idx; } DoubleInt;
150 typedef struct { long val; int idx; } LongInt;
151 typedef struct { int val; int idx; } IntInt;
152 typedef struct { short val; int idx; } ShortInt;
153 typedef struct { long double val; int idx; } LongdoubleInt;
154 typedef struct { float val; float idx; } FloatFloat;
155 typedef struct { double val; double idx; } DoubleDouble;
156
157
158 #define MPI_OP_SWITCH(OPNAME) \
159   int i; \
160 switch (*datatype) { \
161   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
162   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
163   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
164   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
165   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
166   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
167   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
168   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
169   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
170   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
171   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
172   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
173   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
174   default: \
175            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
176   CmiAbort("Unsupported MPI datatype for MPI Op"); \
177 };\
178
179 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
180 #define MPI_OP_IMPL(type) \
181   if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
182   MPI_OP_SWITCH(MPI_MAX)
183 #undef MPI_OP_IMPL
184 }
185
186 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
187 #define MPI_OP_IMPL(type) \
188   if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
189   MPI_OP_SWITCH(MPI_MIN)
190 #undef MPI_OP_IMPL
191 }
192
193 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
194 #define MPI_OP_IMPL(type) \
195   ((type *)inoutvec)[i] += ((type *)invec)[i];
196   MPI_OP_SWITCH(MPI_SUM)
197 #undef MPI_OP_IMPL
198 }
199
200 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
201 #define MPI_OP_IMPL(type) \
202   ((type *)inoutvec)[i] *= ((type *)invec)[i];
203   MPI_OP_SWITCH(MPI_PROD)
204 #undef MPI_OP_IMPL
205 }
206
207 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
208   int i;  
209   switch (*datatype) {
210     case MPI_INT:
211     case MPI_LOGICAL:
212       for(i=0;i<(*len);i++)
213         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
214       break;
215     default:
216       ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
217       CmiAbort("exiting");
218   }
219 }
220 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
221   int i; 
222   switch (*datatype) {
223     case MPI_INT:
224       for(i=0;i<(*len);i++)
225         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
226       break;
227     case MPI_BYTE:
228       for(i=0;i<(*len);i++)
229         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
230       break;
231     default:
232       ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
233       CmiAbort("exiting");
234   }
235 }
236 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
237   int i;  
238   switch (*datatype) {
239     case MPI_INT:
240     case MPI_LOGICAL:
241       for(i=0;i<(*len);i++)
242         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
243       break;
244     default:
245       ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
246       CmiAbort("exiting");
247   }
248 }
249 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
250   int i;  
251   switch (*datatype) {
252     case MPI_INT:
253       for(i=0;i<(*len);i++)
254         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
255       break;
256     case MPI_BYTE:
257       for(i=0;i<(*len);i++)
258         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
259       break;
260     default:
261       ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
262       CmiAbort("exiting");
263   }
264 }
265 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
266   int i;  
267   switch (*datatype) {
268     case MPI_INT:
269     case MPI_LOGICAL:
270       for(i=0;i<(*len);i++)
271         ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
272       break;
273     default:
274       ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
275       CmiAbort("exiting");
276   }
277 }
278 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
279   int i;  
280   switch (*datatype) {
281     case MPI_INT:
282       for(i=0;i<(*len);i++)
283         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
284       break;
285     case MPI_BYTE:
286       for(i=0;i<(*len);i++)
287         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
288       break;
289     case MPI_UNSIGNED:
290       for(i=0;i<(*len);i++)
291         ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
292       break;
293     default:
294       ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
295       CmiAbort("exiting");
296   }
297 }
298
299 #ifndef MIN
300 #define MIN(a,b) (a < b ? a : b)
301 #endif
302
303 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
304   int i;  
305
306   switch (*datatype) {
307     case MPI_FLOAT_INT:
308       for(i=0;i<(*len);i++)
309         if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
310           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
311         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
312           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
313       break;
314     case MPI_DOUBLE_INT:
315       for(i=0;i<(*len);i++)
316         if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
317           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
318         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
319           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
320
321       break;
322     case MPI_LONG_INT:
323       for(i=0;i<(*len);i++)
324         if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
325           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
326         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
327           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
328       break;
329     case MPI_2INT:
330       for(i=0;i<(*len);i++)
331         if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
332           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
333         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
334           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
335       break;
336     case MPI_SHORT_INT:
337       for(i=0;i<(*len);i++)
338         if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
339           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
340         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
341           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
342       break;
343     case MPI_LONG_DOUBLE_INT:
344       for(i=0;i<(*len);i++)
345         if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
346           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
347         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
348           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
349       break;
350     case MPI_2FLOAT:
351       for(i=0;i<(*len);i++)
352         if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
353           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
354         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
355           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
356       break;
357     case MPI_2DOUBLE:
358       for(i=0;i<(*len);i++)
359         if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
360           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
361         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
362           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
363       break;
364     default:
365       ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
366       CmiAbort("exiting");
367   }
368 }
369 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
370   int i;  
371   switch (*datatype) {
372     case MPI_FLOAT_INT:
373       for(i=0;i<(*len);i++)
374         if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
375           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
376         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
377           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
378       break;
379     case MPI_DOUBLE_INT:
380       for(i=0;i<(*len);i++)
381         if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
382           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
383         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
384           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
385       break;
386     case MPI_LONG_INT:
387       for(i=0;i<(*len);i++)
388         if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
389           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
390         else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
391           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
392       break;
393     case MPI_2INT:
394       for(i=0;i<(*len);i++)
395         if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
396           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
397         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
398           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
399       break;
400     case MPI_SHORT_INT:
401       for(i=0;i<(*len);i++)
402         if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
403           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
404         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
405           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
406       break;
407     case MPI_LONG_DOUBLE_INT:
408       for(i=0;i<(*len);i++)
409         if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
410           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
411         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
412           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
413       break;
414     case MPI_2FLOAT:
415       for(i=0;i<(*len);i++)
416         if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
417           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
418         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
419           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
420       break;
421     case MPI_2DOUBLE:
422       for(i=0;i<(*len);i++)
423         if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
424           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
425         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
426           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
427       break;
428     default:
429       ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
430       CmiAbort("exiting");
431   }
432 }
433
434 // every msg contains a AmpiOpHeader structure before user data
435 // FIXME: non-commutative operations require messages be ordered by rank
436 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
437   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
438   MPI_Datatype dtype;
439   int szhdr, szdata, len;
440   MPI_User_function* func;
441   func = hdr->func;
442   dtype = hdr->dtype;  
443   szdata = hdr->szdata;
444   len = hdr->len;  
445   szhdr = sizeof(AmpiOpHeader);
446
447   //Assuming extent == size
448   void *ret = malloc(szhdr+szdata);
449   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
450   for(int i=1;i<nMsg;i++){
451     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
452   }
453   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
454   free(ret);
455   return retmsg;
456 }
457
458 CkReduction::reducerType AmpiReducer;
459
460 class Builtin_kvs{
461   public:
462     int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
463     Builtin_kvs(){
464       tag_ub = MPI_TAG_UB_VALUE; 
465       host = MPI_PROC_NULL;
466       io = 0;
467       wtime_is_global = 0;
468       keyval_mype = CkMyPe();
469       keyval_numpes = CkNumPes();
470       keyval_mynode = CkMyNode();
471       keyval_numnodes = CkNumNodes();
472     }
473 };
474
475 // ------------ startup support -----------
476 int _ampi_fallback_setup_count;
477 CDECL void AMPI_Setup(void);
478 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
479
480 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
481
482 /*Main routine used when missing MPI_Setup routine*/
483 CDECL void AMPI_Fallback_Main(int argc,char **argv)
484 {
485   AMPI_Main_cpp(argc,argv);
486   AMPI_Main(argc,argv);
487   FTN_NAME(MPI_MAIN,mpi_main)();
488 }
489
490 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
491 /*Startup routine used if user *doesn't* write
492   a TCHARM_User_setup routine.
493  */
494 CDECL void AMPI_Setup_Switch(void) {
495   _ampi_fallback_setup_count=0;
496   FTN_NAME(AMPI_SETUP,ampi_setup)();
497   AMPI_Setup();
498   if (_ampi_fallback_setup_count==2)
499   { //Missing AMPI_Setup in both C and Fortran:
500     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
501   }
502 }
503
504 static int nodeinit_has_been_called=0;
505 CtvDeclare(ampiParent*, ampiPtr);
506 CtvDeclare(int, ampiInitDone);
507 CtvDeclare(void*,stackBottom);
508 CtvDeclare(int, ampiFinalized);
509 CkpvDeclare(Builtin_kvs, bikvs);
510 CkpvDeclare(int,argvExtracted);
511 static int enableStreaming = 0;
512
513 CDECL long ampiCurrentStackUsage(){
514   int localVariable;
515
516   unsigned long p1 =  (unsigned long)((void*)&localVariable);
517   unsigned long p2 =  (unsigned long)(CtvAccess(stackBottom));
518
519
520   if(p1 > p2)
521     return p1 - p2;
522   else
523     return  p2 - p1;
524
525 }
526
527 FDECL void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
528   long usage = ampiCurrentStackUsage();
529   CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
530 }
531
532
533 CDECL void AMPI_threadstart(void *data);
534 static int AMPI_threadstart_idx = -1;
535
536 static void ampiNodeInit(void)
537 {
538   _mpi_nworlds=0;
539   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
540   {
541     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
542   }
543   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
544
545   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
546
547   CmiAssert(AMPI_threadstart_idx == -1);    // only initialize once
548   AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
549
550   nodeinit_has_been_called=1;
551
552    // ASSUME NO ANYTIME MIGRATION and STATIC INSERTON
553   _isAnytimeMigration = false;
554   _isStaticInsertion = true;
555 }
556
557 #if PRINT_IDLE
558 static double totalidle=0.0, startT=0.0;
559 static int beginHandle, endHandle;
560 static void BeginIdle(void *dummy,double curWallTime)
561 {
562   startT = curWallTime;
563 }
564 static void EndIdle(void *dummy,double curWallTime)
565 {
566   totalidle += curWallTime - startT;
567 }
568 #endif
569
570 /* for fortran reduction operation table to handle mapping */
571 typedef MPI_Op  MPI_Op_Array[128];
572 CtvDeclare(int, mpi_opc);
573 CtvDeclare(MPI_Op_Array, mpi_ops);
574
575 static void ampiProcInit(void){
576   CtvInitialize(ampiParent*, ampiPtr);
577   CtvInitialize(int,ampiInitDone);
578   CtvInitialize(int,ampiFinalized);
579   CtvInitialize(void*,stackBottom);
580
581
582   CtvInitialize(MPI_Op_Array, mpi_ops);
583   CtvInitialize(int, mpi_opc);
584
585   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
586   CkpvAccess(bikvs) = Builtin_kvs();
587
588   CkpvInitialize(int, argvExtracted);
589   CkpvAccess(argvExtracted) = 0;
590
591 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
592   REGISTER_AMPI
593 #endif
594   initAmpiProjections();
595
596   char **argv=CkGetArgv();
597 #if AMPI_COMLIB  
598   if(CkpvAccess(argvExtracted)==0){
599     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
600   }
601 #endif
602
603 #if AMPIMSGLOG
604   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
605   //msgLogRead = CmiGetArgFlag(argv, "+msgLogRead");
606   if (CmiGetArgIntDesc(argv,"+msgLogRead", &msgLogRank, "Re-play message processing order for AMPI")) {
607     msgLogRead = 1;
608   }
609   //CmiGetArgInt(argv, "+msgLogRank", &msgLogRank);
610   char *procs = NULL;
611   if (CmiGetArgStringDesc(argv, "+msgLogRanks", &procs, "A list of AMPI processors to record , e.g. 0,10,20-30")) {
612     msgLogRanks.set(procs);
613   }
614   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
615   if (CkMyPe() == 0) {
616     if (msgLogWrite) CmiPrintf("Writing AMPI messages of rank %s to log: %s\n", procs?procs:"", msgLogFilename);
617     if (msgLogRead) CmiPrintf("Reading AMPI messages of rank %s from log: %s\n", procs?procs:"", msgLogFilename);
618   }
619 #endif
620
621   // initBigSimTrace(1,outtiming);
622 }
623
624 #if AMPIMSGLOG
625 static inline int record_msglog(int rank){
626   return msgLogRanks.includes(rank);
627 }
628 #endif
629
630 void AMPI_Install_Idle_Timer(){
631 #if AMPI_PRINT_IDLE
632   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
633   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
634 #endif
635 }
636
637 void AMPI_Uninstall_Idle_Timer(){
638 #if AMPI_PRINT_IDLE
639   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
640   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
641 #endif
642 }
643
644 PUPfunctionpointer(MPI_MainFn)
645
646   class MPI_threadstart_t {
647     public:
648       MPI_MainFn fn;
649       MPI_threadstart_t() {}
650       MPI_threadstart_t(MPI_MainFn fn_)
651         :fn(fn_) {}
652       void start(void) {
653         char **argv=CmiCopyArgs(CkGetArgv());
654         int argc=CkGetArgc();
655
656         // Set a pointer to somewhere close to the bottom of the stack.
657         // This is used for roughly estimating the stack usage later.
658         CtvAccess(stackBottom) = &argv;
659
660 #if CMK_AMPI_FNPTR_HACK
661         AMPI_Fallback_Main(argc,argv);
662 #else
663         (fn)(argc,argv);
664 #endif
665       }
666       void pup(PUP::er &p) {
667         p|fn;
668       }
669   };
670 PUPmarshall(MPI_threadstart_t)
671
672 CDECL void AMPI_threadstart(void *data)
673 {
674   STARTUP_DEBUG("MPI_threadstart")
675     MPI_threadstart_t t;
676   pupFromBuf(data,t);
677 #if CMK_TRACE_IN_CHARM
678   if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
679 #endif
680   t.start();
681 }
682
683 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
684 {
685   STARTUP_DEBUG("ampiCreateMain")
686     int _nchunks=TCHARM_Get_num_chunks();
687   //Make a new threads array:
688   MPI_threadstart_t s(mainFn);
689   memBuf b; pupIntoBuf(b,s);
690   TCHARM_Create_data( _nchunks,AMPI_threadstart_idx,
691       b.getData(), b.getSize());
692 }
693
694 /* TCharm Semaphore ID's for AMPI startup */
695 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
696 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
697
698 static CProxy_ampiWorlds ampiWorldsGroup;
699
700 static void init_operations()
701 {
702   CtvInitialize(MPI_Op_Array, mpi_ops);
703   int i = 0;
704   MPI_Op *tab = CtvAccess(mpi_ops);
705   tab[i++] = MPI_MAX;
706   tab[i++] = MPI_MIN;
707   tab[i++] = MPI_SUM;
708   tab[i++] = MPI_PROD;
709   tab[i++] = MPI_LAND;
710   tab[i++] = MPI_BAND;
711   tab[i++] = MPI_LOR;
712   tab[i++] = MPI_BOR;
713   tab[i++] = MPI_LXOR;
714   tab[i++] = MPI_BXOR;
715   tab[i++] = MPI_MAXLOC;
716   tab[i++] = MPI_MINLOC;
717
718   CtvInitialize(int, mpi_opc);
719   CtvAccess(mpi_opc) = i;
720 }
721
722 /*
723    Called from MPI_Init, a collective initialization call:
724    creates a new AMPI array and attaches it to the current
725    set of TCHARM threads.
726  */
727 static ampi *ampiInit(char **argv)
728 {
729   FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CmiMyPe(), TCHARM_Element());)
730     if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
731   STARTUP_DEBUG("ampiInit> begin")
732
733     MPI_Comm new_world;
734   int _nchunks;
735   CkArrayOptions opts;
736   CProxy_ampiParent parent;
737   if (TCHARM_Element()==0) //the rank of a tcharm object
738   { /* I'm responsible for building the arrays: */
739     STARTUP_DEBUG("ampiInit> creating arrays")
740
741       // FIXME: Need to serialize global communicator allocation in one place.
742       //Allocate the next communicator
743       if(_mpi_nworlds == MPI_MAX_COMM_WORLDS)
744       {
745         CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
746       }
747     int new_idx=_mpi_nworlds;
748     new_world=MPI_COMM_WORLD+new_idx; // Isaac guessed there shouldn't be a +1 here
749
750     //Create and attach the ampiParent array
751     CkArrayID threads;
752     opts=TCHARM_Attach_start(&threads,&_nchunks);
753     parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
754     STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
755   }
756   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
757
758   FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
759
760     if (TCHARM_Element()==0)
761     {
762       //Make a new ampi array
763       CkArrayID empty;
764
765       ampiCommStruct worldComm(new_world,empty,_nchunks);
766       CProxy_ampi arr;
767
768
769 #if AMPI_COMLIB
770
771       ComlibInstanceHandle ciStreaming = 1;
772       ComlibInstanceHandle ciBcast = 2;
773       ComlibInstanceHandle ciAllgather = 3;
774       ComlibInstanceHandle ciAlltoall = 4;
775
776       arr=CProxy_ampi::ckNew(parent, worldComm, ciStreaming, ciBcast, ciAllgather, ciAlltoall, opts);
777
778
779       CkPrintf("Using untested comlib code in ampi.C\n");
780
781       Strategy *sStreaming = new StreamingStrategy(1,10);
782       CkAssert(ciStreaming == ComlibRegister(sStreaming));
783
784       Strategy *sBcast = new BroadcastStrategy(USE_HYPERCUBE);
785       CkAssert(ciBcast = ComlibRegister(sBcast));
786
787       Strategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
788       CkAssert(ciAllgather = ComlibRegister(sAllgather));
789
790       Strategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
791       CkAssert(ciAlltoall = ComlibRegister(sAlltoall));
792
793       CmiPrintf("Created AMPI comlib strategies in new manner\n");
794
795       // FIXME: Propogate the comlib table here
796       CkpvAccess(conv_com_object).doneCreating();
797 #else
798       arr=CProxy_ampi::ckNew(parent,worldComm,opts);
799 #endif
800
801       //Broadcast info. to the mpi_worlds array
802       // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
803       ampiCommStruct newComm(new_world,arr,_nchunks);
804       //CkPrintf("In ampiInit: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
805       if (ampiWorldsGroup.ckGetGroupID().isZero())
806         ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
807       else
808         ampiWorldsGroup.add(newComm);
809       STARTUP_DEBUG("ampiInit> arrays created")
810
811     }
812
813   // Find our ampi object:
814   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
815   CtvAccess(ampiInitDone)=1;
816   CtvAccess(ampiFinalized)=0;
817   STARTUP_DEBUG("ampiInit> complete")
818 #if CMK_BIGSIM_CHARM
819     //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
820     TRACE_BG_ADD_TAG("AMPI_START");
821 #endif
822
823   init_operations();     // initialize fortran reduction operation table
824
825   getAmpiParent()->ampiInitCallDone = 0;
826
827   CProxy_ampi cbproxy = ptr->getProxy();
828   CkCallback cb(CkIndex_ampi::allInitDone(NULL), cbproxy[0]);
829   ptr->contribute(0, NULL, CkReduction::sum_int, cb);
830
831   ampiParent *thisParent = getAmpiParent(); 
832   while(thisParent->ampiInitCallDone!=1){
833     //CkPrintf("In checking ampiInitCallDone(%d) loop at parent %d!\n", thisParent->ampiInitCallDone, thisParent->thisIndex);
834     thisParent->getTCharmThread()->stop();
835     /* 
836      * thisParent needs to be updated in case of the parent is being pupped.
837      * In such case, thisParent got changed
838      */
839     thisParent = getAmpiParent();
840   }
841
842 #ifdef CMK_BIGSIM_CHARM
843   BgSetStartOutOfCore();
844 #endif
845
846   return ptr;
847 }
848
849 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
850 class ampiWorlds : public CBase_ampiWorlds {
851   public:
852     ampiWorlds(const ampiCommStruct &nextWorld) {
853       ampiWorldsGroup=thisgroup;
854       //CkPrintf("In constructor: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
855       add(nextWorld);
856     }
857     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
858     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
859     void add(const ampiCommStruct &nextWorld) {
860       int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD); // Isaac guessed there shouldn't be a +1 after the MPI_COMM_WORLD
861       mpi_worlds[new_idx].comm=nextWorld;
862       if (_mpi_nworlds<=new_idx) _mpi_nworlds=new_idx+1;
863       STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
864     }
865 };
866
867 //-------------------- ampiParent -------------------------
868   ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
869 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
870 {
871   int barrier = 0x1234;
872   STARTUP_DEBUG("ampiParent> starting up")
873     thread=NULL;
874   worldPtr=NULL;
875   myDDT=&myDDTsto;
876   prepareCtv();
877
878   init();
879
880   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
881   AsyncEvacuate(CmiFalse);
882 }
883
884 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
885   thread=NULL;
886   worldPtr=NULL;
887   myDDT=&myDDTsto;
888
889   init();
890
891   AsyncEvacuate(CmiFalse);
892 }
893
894 void ampiParent::pup(PUP::er &p) {
895   ArrayElement1D::pup(p);
896   p|threads;
897   p|worldNo;           // why it was missing from here before??
898   p|worldStruct;
899   myDDT->pup(p);
900   p|splitComm;
901   p|groupComm;
902   p|groups;
903
904   //BIGSIM_OOC DEBUGGING
905   //if(!p.isUnpacking()){
906   //    CmiPrintf("ampiParent[%d] packing ampiRequestList: \n", thisIndex);
907   //    ampiReqs.print();
908   //}
909
910   p|ampiReqs;
911
912   //BIGSIM_OOC DEBUGGING
913   //if(p.isUnpacking()){
914   //    CmiPrintf("ampiParent[%d] unpacking ampiRequestList: \n", thisIndex);
915   //    ampiReqs.print();
916   //}
917
918   p|RProxyCnt;
919   p|tmpRProxy;
920   p|winStructList;
921   p|infos;
922
923   p|ampiInitCallDone;
924 }
925 void ampiParent::prepareCtv(void) {
926   thread=threads[thisIndex].ckLocal();
927   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
928   CtvAccessOther(thread->getThread(),ampiPtr) = this;
929   STARTUP_DEBUG("ampiParent> found TCharm")
930 }
931
932 void ampiParent::init(){
933   CkAssert(groups.size() == 0);
934   groups.push_back(new groupStruct);
935
936 #if AMPIMSGLOG
937   if(msgLogWrite && record_msglog(thisIndex)){
938     char fname[128];
939     sprintf(fname, "%s.%d", msgLogFilename,thisIndex);
940 #if CMK_PROJECTIONS_USE_ZLIB && 0
941     fMsgLog = gzopen(fname,"wb");
942     toPUPer = new PUP::tozDisk(fMsgLog);
943 #else
944     fMsgLog = fopen(fname,"wb");
945     CmiAssert(fMsgLog != NULL);
946     toPUPer = new PUP::toDisk(fMsgLog);
947 #endif
948   }else if(msgLogRead){
949     char fname[128];
950     sprintf(fname, "%s.%d", msgLogFilename,msgLogRank);
951 #if CMK_PROJECTIONS_USE_ZLIB && 0
952     fMsgLog = gzopen(fname,"rb");
953     fromPUPer = new PUP::fromzDisk(fMsgLog);
954 #else
955     fMsgLog = fopen(fname,"rb");
956     CmiAssert(fMsgLog != NULL);
957     fromPUPer = new PUP::fromDisk(fMsgLog);
958 #endif
959     CkPrintf("AMPI> opened message log file: %s for replay\n", fname);
960   }
961 #endif
962 }
963
964 void ampiParent::finalize(){
965 #if AMPIMSGLOG
966   if(msgLogWrite && record_msglog(thisIndex)){
967     delete toPUPer;
968 #if CMK_PROJECTIONS_USE_ZLIB && 0
969     gzclose(fMsgLog);
970 #else
971     fclose(fMsgLog);
972 #endif
973   }else if(msgLogRead){
974     delete fromPUPer;
975 #if CMK_PROJECTIONS_USE_ZLIB && 0
976     gzclose(fMsgLog);
977 #else
978     fclose(fMsgLog);
979 #endif
980   }
981 #endif
982 }
983
984 void ampiParent::ckJustMigrated(void) {
985   ArrayElement1D::ckJustMigrated();
986   prepareCtv();
987 }
988
989 void ampiParent::ckJustRestored(void) {
990   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
991     ArrayElement1D::ckJustRestored();
992   prepareCtv();
993
994   //BIGSIM_OOC DEBUGGING
995   //CkPrintf("In ampiParent[%d] with TCharm thread=%p:   ",thisIndex, thread);
996   //CthPrintThdMagic(thread->getTid()); 
997 }
998
999 ampiParent::~ampiParent() {
1000   STARTUP_DEBUG("ampiParent> destructor called");
1001   finalize();
1002 }
1003
1004 //Children call this when they are first created or just migrated
1005 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
1006 {
1007   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
1008
1009   if (s.getComm()>=MPI_COMM_WORLD)
1010   { //We now have our COMM_WORLD-- register it
1011     //Note that split communicators don't keep a raw pointer, so
1012     //they don't need to re-register on migration.
1013     if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
1014     worldPtr=ptr;
1015     worldStruct=s;
1016
1017     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
1018     CkVec<int> _indices;
1019     _indices.push_back(thisIndex);
1020     selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
1021   }
1022
1023   if (!forMigration)
1024   { //Register the new communicator:
1025     MPI_Comm comm = s.getComm();
1026     STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
1027       if (comm>=MPI_COMM_WORLD) { 
1028         // Pass the new ampi to the waiting ampiInit
1029         thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
1030       } else if (isSplit(comm)) {
1031         splitChildRegister(s);
1032       } else if (isGroup(comm)) {
1033         groupChildRegister(s);
1034       } else if (isCart(comm)) {
1035         cartChildRegister(s);
1036       } else if (isGraph(comm)) {
1037         graphChildRegister(s);
1038       } else if (isInter(comm)) {
1039         interChildRegister(s);
1040       } else if (isIntra(comm)) {
1041         intraChildRegister(s);
1042       }else
1043         CkAbort("ampiParent recieved child with bad communicator");
1044   }
1045
1046   return thread;
1047 }
1048
1049 //BIGSIM_OOC DEBUGGING
1050 //Move the comm2ampi from inline to normal function for the sake of debugging
1051 /*ampi *ampiParent::comm2ampi(MPI_Comm comm){
1052 //BIGSIM_OOC DEBUGGING
1053 //CmiPrintf("%d, in ampiParent::comm2ampi, comm=%d\n", thisIndex, comm);
1054 if (comm==MPI_COMM_WORLD) return worldPtr;
1055 if (comm==MPI_COMM_SELF) return worldPtr;
1056 if (comm==worldNo) return worldPtr;
1057 if (isSplit(comm)) {
1058 const ampiCommStruct &st=getSplit(comm);
1059 return st.getProxy()[thisIndex].ckLocal();
1060 }
1061 if (isGroup(comm)) {
1062 const ampiCommStruct &st=getGroup(comm);
1063 return st.getProxy()[thisIndex].ckLocal();
1064 }
1065 if (isCart(comm)) {
1066 const ampiCommStruct &st = getCart(comm);
1067 return st.getProxy()[thisIndex].ckLocal();
1068 }
1069 if (isGraph(comm)) {
1070 const ampiCommStruct &st = getGraph(comm);
1071 return st.getProxy()[thisIndex].ckLocal();
1072 }
1073 if (isInter(comm)) {
1074 const ampiCommStruct &st=getInter(comm);
1075 return st.getProxy()[thisIndex].ckLocal();
1076 }
1077 if (isIntra(comm)) {
1078 const ampiCommStruct &st=getIntra(comm);
1079 return st.getProxy()[thisIndex].ckLocal();
1080 }
1081 if (comm>MPI_COMM_WORLD) return worldPtr; //Use MPI_WORLD ampi for cross-world messages:
1082 CkAbort("Invalid communicator used!");
1083 return NULL;
1084 }*/
1085
1086 // reduction client data - preparation for checkpointing
1087 class ckptClientStruct {
1088   public:
1089     const char *dname;
1090     ampiParent *ampiPtr;
1091     ckptClientStruct(const char *s, ampiParent *a): dname(s), ampiPtr(a) {}
1092 };
1093
1094 static void checkpointClient(void *param,void *msg)
1095 {
1096   ckptClientStruct *client = (ckptClientStruct*)param;
1097   const char *dname = client->dname;
1098   ampiParent *ampiPtr = client->ampiPtr;
1099   ampiPtr->Checkpoint(strlen(dname), dname);
1100   delete client;
1101 }
1102
1103 void ampiParent::startCheckpoint(const char* dname){
1104   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
1105   if (thisIndex==0) {
1106     ckptClientStruct *clientData = new ckptClientStruct(dname, this);
1107     CkCallback cb(checkpointClient, clientData);
1108     contribute(0, NULL, CkReduction::sum_int, cb);
1109   }
1110   else
1111     contribute(0, NULL, CkReduction::sum_int);
1112
1113 #if 0
1114 #if CMK_BIGSIM_CHARM
1115   void *curLog;         // store current log in timeline
1116   _TRACE_BG_TLINE_END(&curLog);
1117   TRACE_BG_AMPI_SUSPEND();
1118 #endif
1119 #endif
1120
1121   thread->stop();
1122
1123 #if CMK_BIGSIM_CHARM
1124   // _TRACE_BG_BEGIN_EXECUTE_NOMSG("CHECKPOINT_RESUME", &curLog);
1125   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
1126 #endif
1127 }
1128
1129 void ampiParent::Checkpoint(int len, const char* dname){
1130   if (len == 0) {
1131     // memory checkpoint
1132     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1133     CkStartMemCheckpoint(cb);
1134   }
1135   else {
1136     char dirname[256];
1137     strncpy(dirname,dname,len);
1138     dirname[len]='\0';
1139     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1140     CkStartCheckpoint(dirname,cb);
1141   }
1142 }
1143 void ampiParent::ResumeThread(void){
1144   thread->resume();
1145 }
1146
1147 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
1148     int *keyval, void* extra_state){
1149   KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
1150   int idx = kvlist.size();
1151   kvlist.resize(idx+1);
1152   kvlist[idx] = newnode;
1153   *keyval = idx;
1154   return 0;
1155 }
1156   int ampiParent::freeKeyval(int *keyval){
1157     if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
1158       return -1;
1159     delete kvlist[*keyval];
1160     kvlist[*keyval] = NULL;
1161     *keyval = MPI_KEYVAL_INVALID;
1162     return 0;
1163   }
1164
1165   int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
1166     if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1167       return -1;
1168     ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1169     // Enlarge the keyval list:
1170     while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
1171     cs.getKeyvals()[keyval]=attribute_val;
1172     return 0;
1173   }
1174
1175 int ampiParent::kv_is_builtin(int keyval) {
1176   switch(keyval) {
1177     case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
1178     case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
1179     case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
1180     case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
1181     case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
1182     case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
1183     case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
1184     case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
1185     default: return 0;
1186   };
1187 }
1188
1189 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
1190   *flag = false;
1191   if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
1192     *flag=true;
1193     *(int **)attribute_val = kv_builtin_storage;  // all default tags are ints
1194     return 0;
1195   }
1196   if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1197     return -1; /* invalid keyval */
1198
1199   ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1200   if (keyval>=cs.getKeyvals().size())  
1201     return 0; /* we don't have a value yet */
1202   if (cs.getKeyvals()[keyval]==0)
1203     return 0; /* we had a value, but now it's zero */
1204   /* Otherwise, we have a good value */
1205   *flag = true;
1206   *(void **)attribute_val = cs.getKeyvals()[keyval];
1207   return 0;
1208 }
1209 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
1210   /* no way to delete an attribute: just overwrite it with 0 */
1211   return putAttr(comm,keyval,0);
1212 }
1213
1214 //----------------------- ampi -------------------------
1215 void ampi::init(void) {
1216   parent=NULL;
1217   thread=NULL;
1218   msgs=NULL;
1219   posted_ireqs=NULL;
1220   resumeOnRecv=false;
1221   AsyncEvacuate(CmiFalse);
1222 }
1223
1224 ampi::ampi()
1225 {
1226   /* this constructor only exists so we can create an empty array during split */
1227   CkAbort("Default ampi constructor should never be called");
1228 }
1229
1230   ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
1231 :parentProxy(parent_)
1232 {
1233   init();
1234
1235   myComm=s; myComm.setArrayID(thisArrayID);
1236   myRank=myComm.getRankForIndex(thisIndex);
1237
1238   findParent(false);
1239
1240   msgs = CmmNew();
1241   posted_ireqs = CmmNew();
1242   nbcasts = 0;
1243
1244 #if AMPI_COMLIB
1245   comlibProxy = thisProxy; // Will later be associated with comlib
1246 #endif
1247
1248   seqEntries=parent->ckGetArraySize();
1249   oorder.init (seqEntries);
1250 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1251   if(thisIndex == 0){
1252     /*      CkAssert(CkMyPe() == 0);
1253      *              CkGroupID _myManagerGID = thisProxy.ckGetArrayID();     
1254      *                      CkAssert(numElements);
1255      *                              printf("ampi::ampi setting numInitial to %d on manager at gid %d \n",numElements,_myManagerGID.idx);
1256      *                                      CkArray *_myManager = thisProxy.ckLocalBranch();
1257      *                                              _myManager->setNumInitial(numElements);*/
1258   }
1259 #endif
1260 }
1261
1262 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
1263     ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
1264 :parentProxy(parent_)
1265 {
1266 #if AMPI_COMLIB
1267   ciStreaming = ciStreaming_;
1268   ciBcast = ciBcast_;
1269   ciAllgather = ciAllgather_;
1270   ciAlltoall = ciAlltoall_;
1271
1272   init();
1273
1274   myComm=s; myComm.setArrayID(thisArrayID);
1275   myRank=myComm.getRankForIndex(thisIndex);
1276
1277   findParent(false);
1278
1279   msgs = CmmNew();
1280   posted_ireqs = CmmNew();
1281   nbcasts = 0;
1282
1283   comlibProxy = thisProxy;
1284   CmiPrintf("comlibProxy created as a copy of thisProxy, no associate call\n");
1285
1286 #if AMPI_COMLIB
1287   //  ComlibAssociateProxy(ciAlltoall, comlibProxy);
1288 #endif
1289
1290   seqEntries=parent->ckGetArraySize();
1291   oorder.init (seqEntries);
1292 #endif
1293 }
1294
1295 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
1296 {
1297   init();
1298
1299   seqEntries=-1;
1300 }
1301
1302 void ampi::ckJustMigrated(void)
1303 {
1304   findParent(true);
1305   ArrayElement1D::ckJustMigrated();
1306 }
1307
1308 void ampi::ckJustRestored(void)
1309 {
1310   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
1311     findParent(true);
1312   ArrayElement1D::ckJustRestored();
1313
1314   //BIGSIM_OOC DEBUGGING
1315   //CkPrintf("In ampi[%d] thread[%p]:   ", thisIndex, thread);
1316   //CthPrintThdMagic(thread->getTid()); 
1317 }
1318
1319 void ampi::findParent(bool forMigration) {
1320   STARTUP_DEBUG("ampi> finding my parent")
1321     parent=parentProxy[thisIndex].ckLocal();
1322   if (parent==NULL) CkAbort("AMPI can't find its parent!");
1323   thread=parent->registerAmpi(this,myComm,forMigration);
1324   if (thread==NULL) CkAbort("AMPI can't find its thread!");
1325   //    printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
1326 }
1327
1328 //The following method should be called on the first element of the
1329 //ampi array
1330 void ampi::allInitDone(CkReductionMsg *m){
1331   FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
1332     thisProxy.setInitDoneFlag();
1333   delete m;
1334 }
1335
1336 void ampi::setInitDoneFlag(){
1337   //CkPrintf("ampi[%d]::setInitDone called!\n", thisIndex);
1338   parent->ampiInitCallDone=1;
1339   parent->getTCharmThread()->start();
1340 }
1341
1342 static void cmm_pup_ampi_message(pup_er p,void **msg) {
1343   CkPupMessage(*(PUP::er *)p,msg,1);
1344   if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
1345   //    printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
1346 }
1347
1348 static void cmm_pup_posted_ireq(pup_er p,void **msg) {
1349
1350   pup_int(p, (int *)msg);
1351
1352   /*    if(pup_isUnpacking(p)){
1353   // *msg = new IReq;
1354   //when unpacking, nothing is needed to do since *msg is an index
1355   //(of type integer) to the ampiParent::ampiReqs (the AmpiRequestList)
1356   }
1357   if(!pup_isUnpacking(p)){
1358   AmpiRequestList *reqL = getReqs();
1359   int retIdx = reqL->findRequestIndex((IReq *)*msg);
1360   if(retIdx==-1){
1361   CmiAbort("An AmpiRequest instance should be found for an instance in posted_ireq!\n");
1362   }
1363   pup_int(p, retIdx)
1364   }
1365    */
1366   //    ((IReq *)*msg)->pup(*(PUP::er *)p);
1367
1368   //    if (pup_isDeleting(p)) delete (IReq *)*msg;
1369   //    printf("[%d] pupping postd irequests %p \n",CkMyPe(),*msg);
1370 }
1371
1372 void ampi::pup(PUP::er &p)
1373 {
1374   if(!p.isUserlevel())
1375     ArrayElement1D::pup(p);//Pack superclass
1376   p|parentProxy;
1377   p|myComm;
1378   p|myRank;
1379   p|nbcasts;
1380   p|tmpVec;
1381   p|remoteProxy;
1382   p|resumeOnRecv;
1383 #if AMPI_COMLIB
1384   p|comlibProxy;
1385   p|ciStreaming;
1386   p|ciBcast;
1387   p|ciAllgather;
1388   p|ciAlltoall;
1389
1390   if(p.isUnpacking()){
1391     //    ciStreaming.setSourcePe();
1392     //    ciBcast.setSourcePe();
1393     //    ciAllgather.setSourcePe();
1394     //    ciAlltoall.setSourcePe();
1395   }
1396 #endif
1397
1398   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
1399
1400   //BIGSIM_OOC DEBUGGING
1401   //if(!p.isUnpacking()){
1402   //CkPrintf("ampi[%d]::packing: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1403   //}
1404
1405   posted_ireqs = CmmPup((pup_er)&p, posted_ireqs, cmm_pup_posted_ireq);
1406
1407   //if(p.isUnpacking()){
1408   //CkPrintf("ampi[%d]::unpacking: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1409   //}
1410
1411   p|seqEntries;
1412   p|oorder;
1413 }
1414
1415 ampi::~ampi()
1416 {
1417   if (CkInRestarting() || _BgOutOfCoreFlag==1) {
1418     // in restarting, we need to flush messages
1419     int tags[3], sts[3];
1420     tags[0] = tags[1] = tags[2] = CmmWildCard;
1421     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1422     while (msg) {
1423       delete msg;
1424       msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1425     }
1426   }
1427
1428   CmmFree(msgs);
1429   CmmFreeAll(posted_ireqs);
1430 }
1431
1432 //------------------------ Communicator Splitting ---------------------
1433 class ampiSplitKey {
1434   public:
1435     int nextSplitComm;
1436     int color; //New class of processes we'll belong to
1437     int key; //To determine rank in new ordering
1438     int rank; //Rank in old ordering
1439     ampiSplitKey() {}
1440     ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
1441       :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
1442 };
1443
1444 /* "type" may indicate whether call is for a cartesian topology etc. */
1445
1446 void ampi::split(int color,int key,MPI_Comm *dest, int type)
1447 {
1448 #if CMK_BIGSIM_CHARM
1449   void *curLog;         // store current log in timeline
1450   _TRACE_BG_TLINE_END(&curLog);
1451   //  TRACE_BG_AMPI_SUSPEND();
1452 #endif
1453   if (type == CART_TOPOL) {
1454     ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
1455     int rootIdx=myComm.getIndexForRank(0);
1456     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1457     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1458
1459     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1460     MPI_Comm newComm=parent->getNextCart()-1;
1461     *dest=newComm;
1462   } else {
1463     ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
1464     int rootIdx=myComm.getIndexForRank(0);
1465     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1466     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1467
1468     thread->suspend(); //Resumed by ampiParent::splitChildRegister
1469     MPI_Comm newComm=parent->getNextSplit()-1;
1470     *dest=newComm;
1471   }
1472 #if CMK_BIGSIM_CHARM
1473   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "SPLIT_RESUME", curLog);
1474   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("SPLIT_RESUME", &curLog);
1475   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
1476 #endif
1477 }
1478
1479 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
1480   const ampiSplitKey *a=(const ampiSplitKey *)a_;
1481   const ampiSplitKey *b=(const ampiSplitKey *)b_;
1482   if (a->color!=b->color) return a->color-b->color;
1483   if (a->key!=b->key) return a->key-b->key;
1484   return a->rank-b->rank;
1485 }
1486
1487 void ampi::splitPhase1(CkReductionMsg *msg)
1488 {
1489   //Order the keys, which orders the ranks properly:
1490   int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1491   ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1492   if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1493   qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1494
1495   MPI_Comm newComm = -1;
1496   for(int i=0;i<nKeys;i++)
1497     if(keys[i].nextSplitComm>newComm)
1498       newComm = keys[i].nextSplitComm;
1499
1500   //Loop over the sorted keys, which gives us the new arrays:
1501   int lastColor=keys[0].color-1; //The color we're building an array for
1502   CProxy_ampi lastAmpi; //The array for lastColor
1503   int lastRoot=0; //C value for new rank 0 process for latest color
1504   ampiCommStruct lastComm; //Communicator info. for latest color
1505   for (int c=0;c<nKeys;c++) {
1506     if (keys[c].color!=lastColor)
1507     { //Hit a new color-- need to build a new communicator and array
1508       lastColor=keys[c].color;
1509       lastRoot=c;
1510       CkArrayOptions opts;
1511       opts.bindTo(parentProxy);
1512       opts.setNumInitial(0);
1513       CkArrayID unusedAID; ampiCommStruct unusedComm;
1514       lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1515       lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1516
1517       CkVec<int> indices; //Maps rank to array indices for new arrau
1518       for (int i=c;i<nKeys;i++) {
1519         if (keys[i].color!=lastColor) break; //Done with this color
1520         int idx=myComm.getIndexForRank(keys[i].rank);
1521         indices.push_back(idx);
1522       }
1523
1524       //FIXME: create a new communicator for each color, instead of
1525       // (confusingly) re-using the same MPI_Comm number for each.
1526       lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1527     }
1528     int newRank=c-lastRoot;
1529     int newIdx=lastComm.getIndexForRank(newRank);
1530
1531     //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1532     lastAmpi[newIdx].insert(parentProxy,lastComm);
1533   }
1534
1535   delete msg;
1536 }
1537
1538 //...newly created array elements register with the parent, which calls:
1539 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1540   int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1541   if (splitComm.size()<=idx) splitComm.resize(idx+1);
1542   splitComm[idx]=new ampiCommStruct(s);
1543   thread->resume(); //Matches suspend at end of ampi::split
1544 }
1545
1546 //-----------------create communicator from group--------------
1547 // The procedure is like that of comm_split very much,
1548 // so the code is shamelessly copied from above
1549 //   1. reduction to make sure all members have called
1550 //   2. the root in the old communicator create the new array
1551 //   3. ampiParent::register is called to register new array as new comm
1552 class vecStruct {
1553   public:
1554     int nextgroup;
1555     groupStruct vec;
1556     vecStruct():nextgroup(-1){}
1557     vecStruct(int nextgroup_, groupStruct vec_)
1558       : nextgroup(nextgroup_), vec(vec_) { }
1559 };
1560
1561 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1562   int rootIdx=vec[0];
1563   tmpVec = vec;
1564   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1565   MPI_Comm nextgroup = parent->getNextGroup();
1566   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1567
1568   if(getPosOp(thisIndex,vec)>=0){
1569     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1570     MPI_Comm retcomm = parent->getNextGroup()-1;
1571     *newcomm = retcomm;
1572   }else{
1573     *newcomm = MPI_COMM_NULL;
1574   }
1575 }
1576
1577 void ampi::commCreatePhase1(CkReductionMsg *msg){
1578   MPI_Comm *nextGroupComm = (int *)msg->getData();
1579
1580   CkArrayOptions opts;
1581   opts.bindTo(parentProxy);
1582   opts.setNumInitial(0);
1583   CkArrayID unusedAID;
1584   ampiCommStruct unusedComm;
1585   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1586   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1587
1588   groupStruct indices = tmpVec;
1589   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1590   for(int i=0;i<indices.size();i++){
1591     int newIdx=indices[i];
1592     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1593   }
1594   delete msg;
1595 }
1596
1597 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1598   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1599   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1600   groupComm[idx]=new ampiCommStruct(s);
1601   thread->resume(); //Matches suspend at end of ampi::split
1602 }
1603
1604 /* Virtual topology communicator creation */
1605 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1606   int rootIdx=vec[0];
1607   tmpVec = vec;
1608   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1609
1610   MPI_Comm nextcart = parent->getNextCart();
1611   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1612
1613   if(getPosOp(thisIndex,vec)>=0){
1614     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1615     MPI_Comm retcomm = parent->getNextCart()-1;
1616     *newcomm = retcomm;
1617   }else
1618     *newcomm = MPI_COMM_NULL;
1619 }
1620
1621 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1622   MPI_Comm *nextCartComm = (int *)msg->getData();
1623
1624   CkArrayOptions opts;
1625   opts.bindTo(parentProxy);
1626   opts.setNumInitial(0);
1627   CkArrayID unusedAID;
1628   ampiCommStruct unusedComm;
1629   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1630   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1631
1632   groupStruct indices = tmpVec;
1633   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1634       size(),indices);
1635   for(int i=0;i<indices.size();i++){
1636     int newIdx=indices[i];
1637     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1638   }
1639   delete msg;
1640 }
1641
1642 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1643   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1644   if (cartComm.size()<=idx) {
1645     cartComm.resize(idx+1);
1646     cartComm.length()=idx+1;
1647   }
1648   cartComm[idx]=new ampiCommStruct(s);
1649   thread->resume(); //Matches suspend at end of ampi::cartCreate
1650 }
1651
1652 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1653   int rootIdx=vec[0];
1654   tmpVec = vec;
1655   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1656       myComm.getProxy());
1657   MPI_Comm nextgraph = parent->getNextGraph();
1658   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1659
1660   if(getPosOp(thisIndex,vec)>=0){
1661     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1662     MPI_Comm retcomm = parent->getNextGraph()-1;
1663     *newcomm = retcomm;
1664   }else
1665     *newcomm = MPI_COMM_NULL;
1666 }
1667
1668 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1669   MPI_Comm *nextGraphComm = (int *)msg->getData();
1670
1671   CkArrayOptions opts;
1672   opts.bindTo(parentProxy);
1673   opts.setNumInitial(0);
1674   CkArrayID unusedAID;
1675   ampiCommStruct unusedComm;
1676   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1677   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1678
1679   groupStruct indices = tmpVec;
1680   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1681       .size(),indices);
1682   for(int i=0;i<indices.size();i++){
1683     int newIdx=indices[i];
1684     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1685   }
1686   delete msg;
1687 }
1688
1689 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1690   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1691   if (graphComm.size()<=idx) {
1692     graphComm.resize(idx+1);
1693     graphComm.length()=idx+1;
1694   }
1695   graphComm[idx]=new ampiCommStruct(s);
1696   thread->resume(); //Matches suspend at end of ampi::graphCreate
1697 }
1698
1699 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1700   if(thisIndex==root) { // not everybody gets the valid rvec
1701     tmpVec = rvec;
1702   }
1703   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1704   MPI_Comm nextinter = parent->getNextInter();
1705   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1706
1707   thread->suspend(); //Resumed by ampiParent::interChildRegister
1708   MPI_Comm newcomm=parent->getNextInter()-1;
1709   *ncomm=newcomm;
1710 }
1711
1712 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1713   MPI_Comm *nextInterComm = (int *)msg->getData();
1714
1715   groupStruct lgroup = myComm.getIndices();
1716   CkArrayOptions opts;
1717   opts.bindTo(parentProxy);
1718   opts.setNumInitial(0);
1719   CkArrayID unusedAID;
1720   ampiCommStruct unusedComm;
1721   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1722   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1723
1724   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1725   for(int i=0;i<lgroup.size();i++){
1726     int newIdx=lgroup[i];
1727     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1728   }
1729
1730   parentProxy[0].ExchangeProxy(newAmpi);
1731   delete msg;
1732 }
1733
1734 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1735   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1736   if (interComm.size()<=idx) interComm.resize(idx+1);
1737   interComm[idx]=new ampiCommStruct(s);
1738   //thread->resume(); // don't resume it yet, till parent set remote proxy
1739 }
1740
1741 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1742   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1743     groupStruct lvec = myComm.getIndices();
1744     groupStruct rvec = myComm.getRemoteIndices();
1745     int rsize = rvec.size();
1746     tmpVec = lvec;
1747     for(int i=0;i<rsize;i++)
1748       tmpVec.push_back(rvec[i]);
1749     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1750   }else{
1751     tmpVec.resize(0);
1752   }
1753
1754   int rootIdx=myComm.getIndexForRank(0);
1755   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1756   MPI_Comm nextintra = parent->getNextIntra();
1757   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1758
1759   thread->suspend(); //Resumed by ampiParent::interChildRegister
1760   MPI_Comm newcomm=parent->getNextIntra()-1;
1761   *ncomm=newcomm;
1762 }
1763
1764 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1765   if(tmpVec.size()==0) { delete msg; return; }
1766   MPI_Comm *nextIntraComm = (int *)msg->getData();
1767   CkArrayOptions opts;
1768   opts.bindTo(parentProxy);
1769   opts.setNumInitial(0);
1770   CkArrayID unusedAID;
1771   ampiCommStruct unusedComm;
1772   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1773   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1774
1775   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1776   for(int i=0;i<tmpVec.size();i++){
1777     int newIdx=tmpVec[i];
1778     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1779   }
1780   delete msg;
1781 }
1782
1783 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1784   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1785   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1786   intraComm[idx]=new ampiCommStruct(s);
1787   thread->resume(); //Matches suspend at end of ampi::split
1788 }
1789
1790 //------------------------ communication -----------------------
1791 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1792 {
1793   if (universeNo>MPI_COMM_WORLD) {
1794     int worldDex=universeNo-MPI_COMM_WORLD-1;
1795     if (worldDex>=_mpi_nworlds)
1796       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1797     return mpi_worlds[worldDex].comm;
1798   }
1799   CkAbort("Bad communicator passed to universeComm2CommStruct");
1800   return mpi_worlds[0].comm; // meaningless return
1801 }
1802
1803 void ampi::block(void){
1804   thread->suspend();
1805 }
1806
1807 void ampi::yield(void){
1808   thread->schedule();
1809 }
1810
1811 void ampi::unblock(void){
1812   thread->resume();
1813 }
1814
1815   void ampi::ssend_ack(int sreq_idx){
1816     if (sreq_idx == 1)
1817       thread->resume();           // MPI_Ssend
1818     else {
1819       sreq_idx -= 2;              // start from 2
1820       AmpiRequestList *reqs = &(parent->ampiReqs);
1821       SReq *sreq = (SReq *)(*reqs)[sreq_idx];
1822       sreq->statusIreq = true;
1823       if (resumeOnRecv) {
1824         thread->resume();
1825       }
1826     }
1827   }
1828
1829   void
1830 ampi::generic(AmpiMsg* msg)
1831 {
1832   MSG_ORDER_DEBUG(
1833       CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1834         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1835       )
1836 #if CMK_BIGSIM_CHARM
1837     TRACE_BG_ADD_TAG("AMPI_generic");
1838   msg->event = NULL;
1839 #endif
1840
1841   int sync = UsrToEnv(msg)->getRef();
1842   int srcIdx;
1843   if (sync)  srcIdx = msg->srcIdx;
1844
1845   //    AmpiMsg *msgcopy = msg;
1846   if(msg->seq != -1) {
1847     int srcIdx=msg->srcIdx;
1848     int n=oorder.put(srcIdx,msg);
1849     if (n>0) { // This message was in-order
1850       inorder(msg);
1851       if (n>1) { // It enables other, previously out-of-order messages
1852         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1853           inorder(msg);
1854         }
1855       }
1856     }
1857   } else { //Cross-world or system messages are unordered
1858     inorder(msg);
1859   }
1860
1861   // msg may be free'ed from calling inorder()
1862   if (sync>0) {         // send an ack to sender
1863     CProxy_ampi pa(thisArrayID);
1864     pa[srcIdx].ssend_ack(sync);
1865   }
1866
1867   if(resumeOnRecv){
1868     //CkPrintf("Calling TCharm::resume at ampi::generic!\n");
1869     thread->resume();
1870   }
1871 }
1872
1873 inline static AmpiRequestList *getReqs(void); 
1874
1875   void
1876 ampi::inorder(AmpiMsg* msg)
1877 {
1878   MSG_ORDER_DEBUG(
1879       CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1880         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1881       )
1882     // check posted recvs
1883     int tags[3], sts[3];
1884   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1885   IReq *ireq = NULL;
1886   if (CpvAccess(CmiPICMethod) != 2) {
1887 #if 0
1888     //IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1889     ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1890 #else
1891 #if CMK_BIGSIM_CHARM
1892     _TRACE_BG_TLINE_END(&msg->event);    // store current log
1893     msg->eventPe = CmiMyPe();
1894 #endif
1895     //in case ampi has not initialized and posted_ireqs are only inserted 
1896     //at AMPI_Irecv (MPI_Irecv)
1897     AmpiRequestList *reqL = &(parent->ampiReqs);
1898     //When storing the req index, it's 1-based. The reason is stated in the comments
1899     //in AMPI_Irecv function.
1900     int ireqIdx = (int)((long)CmmGet(posted_ireqs, 3, tags, sts));
1901     if(reqL->size()>0 && ireqIdx>0)
1902       ireq = (IReq *)(*reqL)[ireqIdx-1];
1903     //CkPrintf("[%d] ampi::inorder, ireqIdx=%d\n", thisIndex, ireqIdx);
1904 #endif
1905     //CkPrintf("[%d] ampi::inorder, ireq=%p\n", thisIndex, ireq);
1906     if (ireq) { // receive posted
1907       ireq->receive(this, msg);
1908       // Isaac changed this so that the IReq stores the tag when receiving the message, 
1909       // instead of using this user supplied tag which could be MPI_ANY_TAG
1910       // Formerly the following line was not commented out:
1911       //ireq->tag = sts[0];         
1912       //ireq->src = sts[1];
1913       //ireq->comm = sts[2];
1914     } else {
1915       CmmPut(msgs, 3, tags, msg);
1916     }
1917   }
1918   else
1919     CmmPut(msgs, 3, tags, msg);
1920 }
1921
1922 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
1923 {
1924   int tags[3];
1925   tags[0] = t; tags[1] = s; tags[2] = comm;
1926   AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1927   return msg;
1928 }
1929
1930 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1931     int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm, int sync)
1932 {
1933   CkDDT_DataType *ddt = getDDT()->getType(type);
1934   int len = ddt->getSize(count);
1935   int sIdx=thisIndex;
1936   int seq = -1;
1937   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_TAG_UB_VALUE) //Not cross-module: set seqno
1938     seq = oorder.nextOutgoing(destIdx);
1939   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1940   if (sync) UsrToEnv(msg)->setRef(sync);
1941   TCharm::activateVariable(buf);
1942   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1943   TCharm::deactivateVariable(buf);
1944   return msg;
1945 }
1946
1947 #if AMPI_COMLIB
1948   void
1949 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1950 {
1951   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1952 }
1953 #endif
1954
1955   void
1956 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, int sync)
1957 {
1958 #if CMK_TRACE_IN_CHARM
1959   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
1960 #endif
1961
1962 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1963   MPI_Comm disComm = myComm.getComm();
1964   ampi *dis = getAmpiInstance(disComm);
1965   CpvAccess(_currentObj) = dis;
1966 #endif
1967
1968   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1969   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),sync);
1970
1971 #if CMK_TRACE_IN_CHARM
1972   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
1973 #endif
1974
1975   if (sync == 1) {
1976     // waiting for receiver side
1977     resumeOnRecv = false;            // so no one else awakes it
1978     block();
1979   }
1980 }
1981
1982   void
1983 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1984 {
1985   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
1986   memcpy(msg->data, buf, len);
1987   CProxy_ampi pa(aid);
1988   pa[idx].generic(msg);
1989 }
1990
1991   void
1992 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy, int sync)
1993 {
1994   if(rank==MPI_PROC_NULL) return;
1995   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1996   int destIdx = dest.getIndexForRank(rank);
1997   if(isInter()){
1998     sRank = parent->thisIndex;
1999     destcomm = MPI_COMM_FIRST_INTER;
2000     destIdx = dest.getIndexForRemoteRank(rank);
2001     arrproxy = remoteProxy;
2002   }
2003   MSG_ORDER_DEBUG(
2004       CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
2005       )
2006
2007     arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm,sync));
2008
2009 #if 0
2010 #if CMK_TRACE_ENABLED
2011   int size=0;
2012   MPI_Type_size(type,&size);
2013   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
2014 #endif
2015 #endif
2016 }
2017
2018   int
2019 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
2020 {
2021   CkDDT_DataType *ddt = getDDT()->getType(type);
2022   int len = ddt->getSize(count);
2023
2024   if(msg->length < len){ // only at rare case shall we reset count by using divide
2025     count = msg->length/(ddt->getSize(1));
2026   }
2027
2028   TCharm::activateVariable(buf);
2029   if (t==MPI_REDUCE_TAG) {      // reduction msg
2030     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
2031   } else {
2032     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
2033   }
2034   TCharm::deactivateVariable(buf);
2035   return 0;
2036 }
2037
2038   int
2039 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
2040 {
2041   MPI_Comm disComm = myComm.getComm();
2042   if(s==MPI_PROC_NULL) {
2043     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
2044     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
2045     ((MPI_Status *)sts)->MPI_LENGTH = 0;
2046     return 0;
2047   }
2048 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
2049   _LOG_E_END_AMPI_PROCESSING(thisIndex)
2050 #endif
2051 #if CMK_BIGSIM_CHARM
2052    void *curLog;                // store current log in timeline
2053   _TRACE_BG_TLINE_END(&curLog);
2054   //  TRACE_BG_AMPI_SUSPEND();
2055 #if CMK_TRACE_IN_CHARM
2056   if(CpvAccess(traceOn)) traceSuspend();
2057 #endif
2058 #endif
2059
2060   if(isInter()){
2061     s = myComm.getIndexForRemoteRank(s);
2062     comm = MPI_COMM_FIRST_INTER;
2063   }
2064
2065   int tags[3];
2066   AmpiMsg *msg = 0;
2067
2068   MSG_ORDER_DEBUG(
2069       CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
2070       )
2071
2072   ampi *dis = getAmpiInstance(disComm);
2073 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2074   //  dis->yield();
2075   //  processRemoteMlogMessages();
2076 #endif
2077   int dosuspend = 0;
2078   while(1) {
2079     //This is done to take into account the case in which an ampi 
2080     // thread has migrated while waiting for a message
2081     tags[0] = t; tags[1] = s; tags[2] = comm;
2082     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
2083     if (msg) break;
2084     dis->resumeOnRecv=true;
2085     dis->thread->suspend();
2086     dosuspend = 1;
2087     dis = getAmpiInstance(disComm);
2088   }
2089
2090 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2091   CpvAccess(_currentObj) = dis;
2092   MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
2093 #endif
2094
2095     dis->resumeOnRecv=false;
2096
2097   if(sts)
2098     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2099   int status = dis->processMessage(msg, t, s, buf, count, type);
2100   if (status != 0) return status;
2101
2102 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
2103   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
2104 #endif
2105
2106 #if CMK_BIGSIM_CHARM
2107 #if CMK_TRACE_IN_CHARM
2108     //if(CpvAccess(traceOn)) CthTraceResume(thread->getThread());
2109     //Due to the reason mentioned the in the while loop above, we need to 
2110     //use "dis" as "this" in the case of migration (or out-of-core execution in BigSim)
2111     if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
2112 #endif
2113   //TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", &curLog, 1);
2114   //TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0);
2115   //_TRACE_BG_SET_INFO((char *)msg, "RECV_RESUME",  &curLog, 1);
2116 #if 0
2117 #if 1
2118   if (!dosuspend) {
2119     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 1);
2120     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2121   }
2122   else
2123 #endif
2124     TRACE_BG_ADD_TAG("RECV_RESUME_THREAD");
2125 #else
2126   TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
2127   if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2128 #endif
2129 #endif
2130
2131   delete msg;
2132   return 0;
2133 }
2134
2135   void
2136 ampi::probe(int t, int s, int comm, int *sts)
2137 {
2138   int tags[3];
2139 #if CMK_BIGSIM_CHARM
2140   void *curLog;         // store current log in timeline
2141   _TRACE_BG_TLINE_END(&curLog);
2142   //  TRACE_BG_AMPI_SUSPEND();
2143 #endif
2144
2145   AmpiMsg *msg = 0;
2146   resumeOnRecv=true;
2147   while(1) {
2148     tags[0] = t; tags[1] = s; tags[2] = comm;
2149     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2150     if (msg) break;
2151     thread->suspend();
2152   }
2153   resumeOnRecv=false;
2154   if(sts)
2155     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2156 #if CMK_BIGSIM_CHARM
2157   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
2158   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
2159 #endif
2160 }
2161
2162   int
2163 ampi::iprobe(int t, int s, int comm, int *sts)
2164 {
2165   int tags[3];
2166   AmpiMsg *msg = 0;
2167   tags[0] = t; tags[1] = s; tags[2] = comm;
2168   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2169   if (msg) {
2170     if(sts)
2171       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2172     return 1;
2173   }
2174 #if CMK_BIGSIM_CHARM
2175   void *curLog;         // store current log in timeline
2176   _TRACE_BG_TLINE_END(&curLog);
2177   //  TRACE_BG_AMPI_SUSPEND();
2178 #endif
2179   thread->schedule();
2180 #if CMK_BIGSIM_CHARM
2181   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("IPROBE_RESUME", &curLog);
2182   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
2183 #endif
2184   return 0;
2185 }
2186
2187
2188 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
2189   void
2190 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
2191 {
2192   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2193   int rootIdx=dest.getIndexForRank(root);
2194   if(rootIdx==thisIndex) {
2195 #if 0//AMPI_COMLIB
2196     ciBcast.beginIteration();
2197     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2198 #else
2199 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2200     CpvAccess(_currentObj) = this;
2201 #endif
2202     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2203 #endif
2204   }
2205   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
2206   nbcasts++;
2207 }
2208
2209   void
2210 ampi::bcastraw(void* buf, int len, CkArrayID aid)
2211 {
2212   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, MPI_COMM_WORLD);
2213   memcpy(msg->data, buf, len);
2214   CProxy_ampi pa(aid);
2215   pa.generic(msg);
2216 }
2217
2218
2219   AmpiMsg* 
2220 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
2221 {
2222   CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
2223   int unit;
2224   CkDDT_DataType *ddt = getDDT()->getType(type);
2225   unit = ddt->getSize(1);
2226   int totalsize = unit*cnt;
2227
2228   AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
2229   char* addr = (char*)Alltoallbuff+disp*unit;
2230   ddt->serialize((char*)msg->data, addr, cnt, (-1));
2231   return msg;
2232 }
2233
2234 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
2235     void *attr_in, void *attr_out, int *flag){
2236   (*flag) = 0;
2237   return (MPI_SUCCESS);
2238 }
2239 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
2240     void *attr_in, void *attr_out, int *flag){
2241   (*(void **)attr_out) = attr_in;
2242   (*flag) = 1;
2243   return (MPI_SUCCESS);
2244 }
2245 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
2246   return (MPI_SUCCESS);
2247 }
2248
2249
2250 void AmpiSeqQ::init(int numP) 
2251 {
2252   elements.init(numP);
2253 }
2254
2255 AmpiSeqQ::~AmpiSeqQ () {
2256 }
2257
2258 void AmpiSeqQ::pup(PUP::er &p) {
2259   p|out;
2260   p|elements;
2261 }
2262
2263 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
2264 {
2265   AmpiOtherElement &el=elements[srcIdx];
2266 #if CMK_ERROR_CHECKING
2267   if (msg->seq<el.seqIncoming)
2268     CkAbort("AMPI Logic error: received late out-of-order message!\n");
2269 #endif
2270   out.enq(msg);
2271   el.nOut++; // We have another message in the out-of-order queue
2272 }
2273
2274 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
2275 {
2276   AmpiOtherElement &el=elements[srcIdx];
2277   if (el.nOut==0) return 0; // No more out-of-order left.
2278   // Walk through our out-of-order queue, searching for our next message:
2279   for (int i=0;i<out.length();i++) {
2280     AmpiMsg *msg=out.deq();
2281     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
2282       el.seqIncoming++;
2283       el.nOut--; // We have one less message out-of-order
2284       return msg;
2285     }
2286     else
2287       out.enq(msg);
2288   }
2289   // We walked the whole queue-- ours is not there.
2290   return 0;
2291 }
2292
2293 //BIGSIM_OOC DEBUGGING: Output for AmpiRequest and its children classes
2294 void AmpiRequest::print(){
2295   CmiPrintf("In AmpiRequest: buf=%p, count=%d, type=%d, src=%d, tag=%d, comm=%d, isvalid=%d\n", buf, count, type, src, tag, comm, isvalid);
2296 }
2297
2298 void PersReq::print(){
2299   AmpiRequest::print();
2300   CmiPrintf("In PersReq: sndrcv=%d\n", sndrcv);
2301 }
2302
2303 void IReq::print(){
2304   AmpiRequest::print();
2305   CmiPrintf("In IReq: this=%p, status=%d, length=%d\n", this, statusIreq, length);
2306 }
2307
2308 void ATAReq::print(){ //not complete for myreqs
2309   AmpiRequest::print();
2310   CmiPrintf("In ATAReq: elmcount=%d, idx=%d\n", elmcount, idx);
2311
2312
2313 void SReq::print(){
2314   AmpiRequest::print();
2315   CmiPrintf("In SReq: this=%p, status=%d\n", this, statusIreq);
2316 }
2317
2318 void AmpiRequestList::pup(PUP::er &p) { 
2319   if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
2320     return;
2321   }
2322
2323   p(blklen); //Allocated size of block
2324   p(len); //Number of used elements in block
2325   if(p.isUnpacking()){
2326     makeBlock(blklen,len);
2327   }
2328   int count=0;
2329   for(int i=0;i<len;i++){
2330     char nonnull;
2331     if(!p.isUnpacking()){
2332       if(block[i] == NULL){
2333         nonnull = 0;
2334       }else{
2335         nonnull = block[i]->getType();
2336       }
2337     }   
2338     p(nonnull);
2339     if(nonnull != 0){
2340       if(p.isUnpacking()){
2341         switch(nonnull){
2342           case 1:
2343             block[i] = new PersReq;
2344             break;
2345           case 2:       
2346             block[i] = new IReq;
2347             break;
2348           case 3:       
2349             block[i] = new ATAReq;
2350             break;
2351         }
2352       } 
2353       block[i]->pup(p);
2354       count++;
2355     }else{
2356       block[i] = 0;
2357     }
2358   }
2359   if(p.isDeleting()){
2360     freeBlock();
2361   }
2362 }
2363
2364 //------------------ External Interface -----------------
2365 ampiParent *getAmpiParent(void) {
2366   ampiParent *p = CtvAccess(ampiPtr);
2367 #if CMK_ERROR_CHECKING
2368   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
2369 #endif
2370   return p;
2371 }
2372
2373 ampi *getAmpiInstance(MPI_Comm comm) {
2374   ampi *ptr=getAmpiParent()->comm2ampi(comm);
2375 #if CMK_ERROR_CHECKING
2376   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
2377 #endif
2378   return ptr;
2379 }
2380
2381 inline static AmpiRequestList *getReqs(void) {
2382   return &(getAmpiParent()->ampiReqs);
2383 }
2384
2385 inline void checkComm(MPI_Comm comm){
2386 #if CMK_ERROR_CHECKING
2387   getAmpiParent()->checkComm(comm);
2388 #endif
2389 }
2390
2391 inline void checkRequest(MPI_Request req){
2392 #if CMK_ERROR_CHECKING
2393   getReqs()->checkRequest(req);
2394 #endif
2395 }
2396
2397 inline void checkRequests(int n, MPI_Request* reqs){
2398 #if CMK_ERROR_CHECKING
2399   AmpiRequestList* reqlist = getReqs();
2400   for(int i=0;i<n;i++)
2401     reqlist->checkRequest(reqs[i]);
2402 #endif
2403 }
2404
2405 CDECL void AMPI_Migrate(void)
2406 {
2407   //  AMPIAPI("AMPI_Migrate");
2408 #if 0
2409 #if CMK_BIGSIM_CHARM
2410   TRACE_BG_AMPI_SUSPEND();
2411 #endif
2412 #endif
2413   TCHARM_Migrate();
2414
2415 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2416   ampi *currentAmpi = getAmpiInstance(MPI_COMM_WORLD);
2417   CpvAccess(_currentObj) = currentAmpi;
2418 #endif
2419
2420 #if CMK_BIGSIM_CHARM
2421   //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2422   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
2423 #endif
2424 }
2425
2426
2427 CDECL void AMPI_Evacuate(void)
2428 {
2429   TCHARM_Evacuate();
2430 }
2431
2432
2433
2434 CDECL void AMPI_Migrateto(int destPE)
2435 {
2436   AMPIAPI("AMPI_MigrateTo");
2437 #if 0
2438 #if CMK_BIGSIM_CHARM
2439   TRACE_BG_AMPI_SUSPEND();
2440 #endif
2441 #endif
2442   TCHARM_Migrate_to(destPE);
2443 #if CMK_BIGSIM_CHARM
2444   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
2445   TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
2446 #endif
2447 }
2448
2449 CDECL void AMPI_MigrateTo(int destPE)
2450 {
2451   AMPI_Migrateto(destPE);
2452 }
2453
2454 CDECL void AMPI_Async_Migrate(void)
2455 {
2456   AMPIAPI("AMPI_Async_Migrate");
2457 #if 0
2458 #if CMK_BIGSIM_CHARM
2459   TRACE_BG_AMPI_SUSPEND();
2460 #endif
2461 #endif
2462   TCHARM_Async_Migrate();
2463 #if CMK_BIGSIM_CHARM
2464   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2465   TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
2466 #endif
2467 }
2468
2469 CDECL void AMPI_Allow_Migrate(void)
2470 {
2471   AMPIAPI("AMPI_Allow_Migrate");
2472 #if 0
2473 #if CMK_BIGSIM_CHARM
2474   TRACE_BG_AMPI_SUSPEND();
2475 #endif
2476 #endif
2477   TCHARM_Allow_Migrate();
2478 #if CMK_BIGSIM_CHARM
2479   TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
2480 #endif
2481 }
2482
2483 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
2484 #if CMK_LBDB_ON
2485   //AMPIAPI("AMPI_Setmigratable");
2486   ampi *ptr=getAmpiInstance(comm);
2487   ptr->setMigratable(mig);
2488 #else
2489   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
2490 #endif
2491 }
2492
2493 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
2494 {
2495   //AMPIAPI("AMPI_Init");
2496   if (nodeinit_has_been_called) {
2497     AMPIAPI("AMPI_Init");
2498     char **argv;
2499     if (p_argv) argv=*p_argv;
2500     else argv=CkGetArgv();
2501     ampiInit(argv);
2502     if (p_argc) *p_argc=CmiGetArgc(argv);
2503   }
2504   else
2505   { /* Charm hasn't been started yet! */
2506     CkAbort("AMPI_Init> Charm is not initialized!");
2507   }
2508
2509   return 0;
2510 }
2511
2512 CDECL int AMPI_Initialized(int *isInit)
2513 {
2514   if (nodeinit_has_been_called) {
2515     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2516     *isInit=CtvAccess(ampiInitDone);
2517   }
2518   else /* !nodeinit_has_been_called */ {
2519     *isInit=nodeinit_has_been_called;
2520   }
2521   return 0;
2522 }
2523
2524 CDECL int AMPI_Finalized(int *isFinalized)
2525 {
2526   AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2527   *isFinalized=CtvAccess(ampiFinalized);
2528   return 0;
2529 }
2530
2531 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
2532 {
2533   //AMPIAPI("AMPI_Comm_rank");
2534
2535 #if CMK_ERROR_CHECKING 
2536   if(checkCommunicator(comm) != MPI_SUCCESS)
2537     return checkCommunicator(comm);
2538 #endif
2539
2540 #if AMPIMSGLOG
2541   ampiParent* pptr = getAmpiParent();
2542   if(msgLogRead){
2543     PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
2544     return 0;
2545   }
2546 #endif
2547
2548   *rank = getAmpiInstance(comm)->getRank(comm);
2549
2550 #if AMPIMSGLOG
2551   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2552     PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
2553   }
2554 #endif
2555   return 0;
2556 }
2557
2558   CDECL
2559 int AMPI_Comm_size(MPI_Comm comm, int *size)
2560 {
2561   //AMPIAPI("AMPI_Comm_size");
2562
2563 #if CMK_ERROR_CHECKING 
2564   if(checkCommunicator(comm) != MPI_SUCCESS)
2565     return checkCommunicator(comm);
2566 #endif
2567
2568 #if AMPIMSGLOG
2569   ampiParent* pptr = getAmpiParent();
2570   if(msgLogRead){
2571     PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
2572     return 0;
2573   }
2574 #endif
2575
2576   *size = getAmpiInstance(comm)->getSize(comm);
2577
2578 #if AMPIMSGLOG
2579   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2580     PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
2581   }
2582 #endif
2583
2584   return 0;
2585 }
2586
2587   CDECL
2588 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
2589 {
2590
2591 #if CMK_ERROR_CHECKING 
2592   if(checkCommunicator(comm1) != MPI_SUCCESS)
2593     return checkCommunicator(comm1);
2594   if(checkCommunicator(comm2) != MPI_SUCCESS)
2595     return checkCommunicator(comm2);
2596 #endif
2597
2598   AMPIAPI("AMPI_Comm_compare");
2599   if(comm1==comm2) *result=MPI_IDENT;
2600   else{
2601     int equal=1;
2602     CkVec<int> ind1, ind2;
2603     ind1 = getAmpiInstance(comm1)->getIndices();
2604     ind2 = getAmpiInstance(comm2)->getIndices();
2605     if(ind1.size()==ind2.size()){
2606       for(int i=0;i<ind1.size();i++)
2607         if(ind1[i] != ind2[i]) { equal=0; break; }
2608     }
2609     if(equal==1) *result=MPI_CONGRUENT;
2610     else *result=MPI_UNEQUAL;
2611   }
2612   return 0;
2613 }
2614
2615 CDECL void AMPI_Exit(int /*exitCode*/)
2616 {
2617   AMPIAPI("AMPI_Exit");
2618   //finalizeBigSimTrace();
2619   TCHARM_Done();
2620 }
2621 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
2622 {
2623   AMPI_Exit(*exitCode);
2624 }
2625
2626   CDECL
2627 int AMPI_Finalize(void)
2628 {
2629   AMPIAPI("AMPI_Finalize");
2630 #if PRINT_IDLE
2631   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
2632 #endif
2633 #if AMPI_COUNTER
2634   getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
2635 #endif
2636   CtvAccess(ampiFinalized)=1;
2637
2638 #if CMK_BIGSIM_CHARM
2639 #if 0
2640   TRACE_BG_AMPI_SUSPEND();
2641 #endif
2642 #if CMK_TRACE_IN_CHARM
2643   if(CpvAccess(traceOn)) traceSuspend();
2644 #endif
2645 #endif
2646
2647   //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
2648   AMPI_Exit(0);
2649   return 0;
2650 }
2651
2652 CDECL
2653 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
2654     int tag, MPI_Comm comm) {
2655
2656 #if CMK_ERROR_CHECKING
2657   int ret;
2658   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2659   if(ret != MPI_SUCCESS)
2660     return ret;
2661 #endif
2662
2663   AMPIAPI("AMPI_Send");
2664 #if AMPIMSGLOG
2665   if(msgLogRead){
2666     return 0;
2667   }
2668 #endif
2669
2670   ampi *ptr = getAmpiInstance(comm);
2671 #if AMPI_COMLIB
2672   if(enableStreaming){  
2673     //    ptr->getStreaming().beginIteration();
2674     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2675   } else
2676 #endif
2677     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
2678 #if AMPI_COUNTER
2679   getAmpiParent()->counters.send++;
2680 #endif
2681 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2682   //  ptr->yield();
2683   //  //  processRemoteMlogMessages();
2684 #endif
2685   return 0;
2686 }
2687
2688   CDECL
2689 int AMPI_Ssend(void *msg, int count, MPI_Datatype type, int dest,
2690     int tag, MPI_Comm comm)
2691 {
2692 #if CMK_ERROR_CHECKING
2693   int ret;
2694   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2695   if(ret != MPI_SUCCESS)
2696     return ret;
2697 #endif
2698
2699   AMPIAPI("AMPI_Ssend");
2700 #if AMPIMSGLOG
2701   if(msgLogRead){
2702     return 0;
2703   }
2704 #endif
2705
2706   ampi *ptr = getAmpiInstance(comm);
2707 #if AMPI_COMLIB
2708   if(enableStreaming){
2709     ptr->getStreaming().beginIteration();
2710     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2711   } else
2712 #endif
2713     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm, 1);
2714 #if AMPI_COUNTER
2715   getAmpiParent()->counters.send++;
2716 #endif
2717
2718   return 0;
2719 }
2720
2721   CDECL
2722 int AMPI_Issend(void *buf, int count, MPI_Datatype type, int dest,
2723     int tag, MPI_Comm comm, MPI_Request *request)
2724 {
2725   AMPIAPI("AMPI_Issend");
2726
2727 #if CMK_ERROR_CHECKING
2728   int ret;
2729   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
2730   if(ret != MPI_SUCCESS)
2731   {
2732     *request = MPI_REQUEST_NULL;
2733     return ret;
2734   }
2735 #endif
2736
2737 #if AMPIMSGLOG
2738   ampiParent* pptr = getAmpiParent();
2739   if(msgLogRead){
2740     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
2741     return 0;
2742   }
2743 #endif
2744
2745   USER_CALL_DEBUG("AMPI_Issend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
2746   ampi *ptr = getAmpiInstance(comm);
2747   AmpiRequestList* reqs = getReqs();
2748   SReq *newreq = new SReq(comm);
2749   *request = reqs->insert(newreq);
2750   // 1:  blocking now  - used by MPI_Ssend
2751   // >=2:  the index of the requests - used by MPI_Issend
2752   ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm, *request+2);
2753 #if AMPI_COUNTER
2754   getAmpiParent()->counters.isend++;
2755 #endif
2756
2757 #if AMPIMSGLOG
2758   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2759     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
2760   }
2761 #endif
2762
2763   return 0;
2764 }
2765
2766   CDECL
2767 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
2768     MPI_Comm comm, MPI_Status *status)
2769 {
2770   AMPIAPI("AMPI_Recv");
2771
2772 #if CMK_ERROR_CHECKING
2773   int ret;
2774   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, msg, 1);
2775   if(ret != MPI_SUCCESS)
2776     return ret;
2777 #endif
2778
2779 #if AMPIMSGLOG
2780   ampiParent* pptr = getAmpiParent();
2781   if(msgLogRead){
2782     (*(pptr->fromPUPer))|(pptr->pupBytes);
2783     PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
2784     PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
2785     return 0;
2786   }
2787 #endif
2788
2789   ampi *ptr = getAmpiInstance(comm);
2790   if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
2791
2792 #if AMPI_COUNTER
2793   getAmpiParent()->counters.recv++;
2794 #endif
2795
2796 #if AMPIMSGLOG
2797   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2798     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2799     (*(pptr->toPUPer))|(pptr->pupBytes);
2800     PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
2801     PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
2802   }
2803 #endif
2804
2805   return 0;
2806 }
2807
2808   CDECL
2809 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
2810 {
2811
2812 #if CMK_ERROR_CHECKING
2813   int ret;
2814   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2815   if(ret != MPI_SUCCESS)
2816     return ret;
2817 #endif
2818
2819   AMPIAPI("AMPI_Probe");
2820   ampi *ptr = getAmpiInstance(comm);
2821   ptr->probe(tag,src, comm, (int*) status);
2822   return 0;
2823 }
2824
2825   CDECL
2826 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
2827 {
2828   AMPIAPI("AMPI_Iprobe");
2829
2830 #if CMK_ERROR_CHECKING
2831   int ret;
2832   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2833   if(ret != MPI_SUCCESS)
2834     return ret;
2835 #endif
2836
2837   ampi *ptr = getAmpiInstance(comm);
2838   *flag = ptr->iprobe(tag,src,comm,(int*) status);
2839   return 0;
2840 }
2841
2842   CDECL
2843 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
2844     int stag, void *rbuf, int rcount, int rtype,
2845     int src, int rtag, MPI_Comm comm, MPI_Status *sts)
2846 {
2847   AMPIAPI("AMPI_Sendrecv");
2848
2849 #if CMK_ERROR_CHECKING
2850   if (sbuf == MPI_IN_PLACE || rbuf == MPI_IN_PLACE)
2851     CmiAbort("MPI_sendrecv does not accept MPI_IN_PLACE; use MPI_Sendrecv_replace instead");
2852
2853   int ret;
2854   ret = errorCheck(comm, 1, scount, 1, stype, 1, stag, 1, dest, 1, sbuf, 1);
2855   if(ret != MPI_SUCCESS)
2856     return ret;
2857   ret = errorCheck(comm, 1, rcount, 1, rtype, 1, rtag, 1, src, 1, rbuf, 1);
2858   if(ret != MPI_SUCCESS)
2859     return ret;
2860 #endif
2861
2862   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
2863   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
2864   if (se) return se;
2865   else return re;
2866 }
2867
2868   CDECL
2869 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
2870     int dest, int sendtag, int source, int recvtag,
2871     MPI_Comm comm, MPI_Status *status)
2872 {
2873   AMPIAPI("AMPI_Sendrecv_replace");
2874   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
2875       buf, count, datatype, source, recvtag, comm, status);
2876 }
2877
2878
2879   CDECL
2880 int AMPI_Barrier(MPI_Comm comm)
2881 {
2882   AMPIAPI("AMPI_Barrier");
2883
2884 #if CMK_ERROR_CHECKING
2885   if(checkCommunicator(comm) != MPI_SUCCESS)
2886     return checkCommunicator(comm);
2887 #endif
2888
2889   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
2890
2891   TRACE_BG_AMPI_LOG(MPI_BARRIER, 0);
2892
2893   //HACK: Use collective operation as a barrier.
2894   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
2895
2896   //BIGSIM_OOC DEBUGGING
2897   //CkPrintf("%d: in AMPI_Barrier, after AMPI_Allreduce\n", getAmpiParent()->thisIndex);
2898 #if AMPI_COUNTER
2899   getAmpiParent()->counters.barrier++;
2900 #endif
2901   return 0;
2902 }
2903
2904   CDECL
2905 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
2906     MPI_Comm comm)
2907 {
2908   AMPIAPI("AMPI_Bcast");
2909
2910 #if CMK_ERROR_CHECKING
2911   int ret;
2912   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, buf, 1);
2913   if(ret != MPI_SUCCESS)
2914     return ret;
2915 #endif
2916
2917   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
2918   if(comm==MPI_COMM_SELF) return 0;
2919
2920 #if AMPIMSGLOG
2921   ampiParent* pptr = getAmpiParent();
2922   if(msgLogRead){
2923     (*(pptr->fromPUPer))|(pptr->pupBytes);
2924     PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
2925     return 0;
2926   }
2927 #endif
2928
2929   ampi* ptr = getAmpiInstance(comm);
2930   ptr->bcast(root, buf, count, type,comm);
2931 #if AMPI_COUNTER
2932   getAmpiParent()->counters.bcast++;
2933 #endif
2934
2935 #if AMPIMSGLOG
2936   if(msgLogWrite && record_msglog(pptr->thisIndex)) {
2937     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2938     (*(pptr->toPUPer))|(pptr->pupBytes);
2939     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
2940   }
2941 #endif
2942 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2943   //  ptr->yield();
2944   //  //  processRemoteMlogMessages();
2945 #endif
2946
2947   return 0;
2948 }
2949
2950 /// This routine is called with the results of a Reduce or AllReduce
2951 const int MPI_REDUCE_SOURCE=0;
2952 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2953 void ampi::reduceResult(CkReductionMsg *msg)
2954 {
2955   MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2956   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2957       thisArrayID,thisIndex);
2958   delete msg;
2959 }
2960
2961 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2962 {
2963   int szdata = ddt->getSize(count);
2964   int szhdr = sizeof(AmpiOpHeader);
2965   AmpiOpHeader newhdr(op,type,count,szdata); 
2966   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2967   memcpy(msg->getData(),&newhdr,szhdr);
2968   if (count > 0) {
2969     TCharm::activateVariable(inbuf);
2970     ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2971     TCharm::deactivateVariable(inbuf);
2972   }
2973   return msg;
2974 }
2975
2976 // Copy the MPI datatype "type" from inbuf to outbuf
2977 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2978   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2979   //  deserializing into the output.
2980   ampi *ptr = getAmpiInstance(comm);
2981   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2982   int len=ddt->getSize(count);
2983   char *serialized=new char[len];
2984   TCharm::activateVariable(inbuf);
2985   TCharm::activateVariable(outbuf);
2986   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
2987   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
2988   TCharm::deactivateVariable(outbuf);
2989   TCharm::deactivateVariable(inbuf);
2990   delete [] serialized;         // < memory leak!  // gzheng 
2991
2992   return MPI_SUCCESS;
2993 }
2994
2995 static void handle_MPI_IN_PLACE(void* &inbuf, void* &outbuf)
2996 {
2997   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
2998   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
2999   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
3000 }
3001
3002 #define SYNCHRONOUS_REDUCE                           0
3003
3004   CDECL
3005 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
3006     int root, MPI_Comm comm)
3007 {
3008   AMPIAPI("AMPI_Reduce");
3009
3010   handle_MPI_IN_PLACE(inbuf, outbuf);
3011
3012 #if CMK_ERROR_CHECKING
3013   int ret;
3014   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, inbuf, 1,
3015                    outbuf, getAmpiInstance(comm)->getRank(comm) == root);
3016   if(ret != MPI_SUCCESS)
3017     return ret;
3018 #endif
3019
3020   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3021
3022 #if AMPIMSGLOG
3023   ampiParent* pptr = getAmpiParent();
3024   if(msgLogRead){
3025     (*(pptr->fromPUPer))|(pptr->pupBytes);
3026     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
3027     return 0;
3028   }
3029 #endif
3030
3031   ampi *ptr = getAmpiInstance(comm);
3032   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
3033   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
3034   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
3035
3036   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3037
3038   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
3039   msg->setCallback(reduceCB);
3040   MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
3041   ptr->contribute(msg);
3042
3043   if (ptr->thisIndex == rootIdx){
3044     /*HACK: Use recv() to block until reduction data comes back*/
3045     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3046       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
3047
3048 #if SYNCHRONOUS_REDUCE
3049       AmpiMsg *msg = new (0, 0) AmpiMsg(-1, MPI_REDUCE_TAG, -1, rootIdx, 0, MPI_REDUCE_COMM);
3050       CProxy_ampi pa(ptr->getProxy());
3051       pa.generic(msg);
3052 #endif
3053   }
3054 #if SYNCHRONOUS_REDUCE
3055   ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, NULL, 0, type, MPI_REDUCE_COMM);
3056 #endif
3057
3058 #if AMPI_COUNTER
3059   getAmpiParent()->counters.reduce++;
3060 #endif
3061
3062 #if AMPIMSGLOG
3063   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3064     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3065     (*(pptr->toPUPer))|(pptr->pupBytes);
3066     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3067   }
3068 #endif
3069
3070   return 0;
3071 }
3072
3073   CDECL
3074 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
3075     MPI_Op op, MPI_Comm comm)
3076 {
3077   AMPIAPI("AMPI_Allreduce");
3078
3079   handle_MPI_IN_PLACE(inbuf, outbuf);
3080
3081 #if CMK_ERROR_CHECKING
3082   int ret;
3083   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, inbuf, 1, outbuf, 1);
3084   if(ret != MPI_SUCCESS)
3085     return ret;
3086 #endif
3087
3088   ampi *ptr = getAmpiInstance(comm);
3089
3090   CkDDT_DataType *ddt_type = ptr->getDDT()->getType(type);
3091
3092 #if CMK_BIGSIM_CHARM
3093   TRACE_BG_AMPI_LOG(MPI_ALLREDUCE, ddt_type->getSize(count));
3094 #endif
3095
3096   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3097
3098 #if AMPIMSGLOG
3099   ampiParent* pptr = getAmpiParent();
3100   if(msgLogRead){
3101     (*(pptr->fromPUPer))|(pptr->pupBytes);
3102     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
3103     //    CkExit();
3104     return 0;
3105   }
3106 #endif
3107
3108   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
3109   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
3110
3111   CkReductionMsg *msg=makeRednMsg(ddt_type, inbuf, count, type, op);
3112   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3113   msg->setCallback(allreduceCB);
3114   ptr->contribute(msg);
3115
3116   /*HACK: Use recv() to block until the reduction data comes back*/
3117   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3118     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
3119 #if AMPI_COUNTER
3120   getAmpiParent()->counters.allreduce++;
3121 #endif
3122
3123 #if AMPIMSGLOG
3124   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3125     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3126     (*(pptr->toPUPer))|(pptr->pupBytes);
3127     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3128     //    CkExit();
3129   }
3130 #endif
3131
3132   return 0;
3133 }
3134
3135   CDECL
3136 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
3137     MPI_Op op, MPI_Comm comm, MPI_Request* request)
3138 {
3139   AMPIAPI("AMPI_Iallreduce");
3140
3141   handle_MPI_IN_PLACE(inbuf, outbuf);
3142
3143 #if CMK_ERROR_CHECKING
3144   int ret;
3145   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, inbuf, 1, outbuf, 1);
3146   if(ret != MPI_SUCCESS)
3147   {
3148     *request = MPI_REQUEST_NULL;
3149     return ret;
3150   }
3151 #endif
3152
3153   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3154
3155   checkRequest(*request);
3156   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
3157   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
3158   ampi *ptr = getAmpiInstance(comm);
3159
3160   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3161   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3162   msg->setCallback(allreduceCB);
3163   ptr->contribute(msg);
3164
3165   // using irecv instead recv to non-block the call and get request pointer
3166   AmpiRequestList* reqs = getReqs();
3167   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
3168   *request = reqs->insert(newreq);
3169   return 0;
3170 }
3171
3172   CDECL
3173 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
3174     MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
3175 {
3176   AMPIAPI("AMPI_Reduce_scatter");
3177
3178   handle_MPI_IN_PLACE(sendbuf, recvbuf);
3179
3180 #if CMK_ERROR_CHECKING
3181   int ret;
3182   ret = errorCheck(comm, 1, 0, 0, datatype, 1, 0, 0, 0, 0, sendbuf, 1, recvbuf, 1);
3183   if(ret != MPI_SUCCESS)
3184     return ret;
3185 #endif
3186
3187   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
3188   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
3189   ampi *ptr = getAmpiInstance(comm);
3190   int size = ptr->getSize(comm);
3191   int count=0;
3192   int *displs = new int [size];
3193   int len;
3194   void *tmpbuf;
3195
3196   //under construction
3197   for(int i=0;i<size;i++){
3198     displs[i] = count;
3199     count+= recvcounts[i];
3200   }
3201   len = ptr->getDDT()->getType(datatype)->getSize(count);
3202   tmpbuf = malloc(len);
3203   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
3204   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
3205       recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
3206   free(tmpbuf);
3207   delete [] displs;     // < memory leak ! // gzheng
3208   return 0;
3209 }
3210
3211 /***** MPI_Scan algorithm (from MPICH) *******
3212   recvbuf = sendbuf;
3213   partial_scan = sendbuf;
3214   mask = 0x1;
3215   while (mask < size) {
3216   dst = rank^mask;
3217   if (dst < size) {
3218   send partial_scan to dst;
3219   recv from dst into tmp_buf;
3220   if (rank > dst) {
3221   partial_scan = tmp_buf + partial_scan;
3222   recvbuf = tmp_buf + recvbuf;
3223   }
3224   else {
3225   if (op is commutative)
3226   partial_scan = tmp_buf + partial_scan;
3227   else {
3228   tmp_buf = partial_scan + tmp_buf;
3229   partial_scan = tmp_buf;
3230   }
3231   }
3232   }
3233   mask <<= 1;
3234   }
3235  ***** MPI_Scan algorithm (from MPICH) *******/
3236
3237 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
3238   (op)(invec,inoutvec,&count,&datatype);
3239 }
3240 CDECL
3241 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
3242   AMPIAPI("AMPI_Scan");
3243
3244 #if CMK_ERROR_CHECKING
3245   if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
3246     CmiAbort("AMPI_Scan does not implement MPI_IN_PLACE");
3247
3248   int ret;
3249   ret = errorCheck(comm, 1, count, 1, datatype, 1, 0, 0, 0, 0, sendbuf, 1, recvbuf, 1);
3250   if(ret != MPI_SUCCESS)
3251     return ret;
3252 #endif
3253
3254   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
3255   MPI_Status sts;
3256   ampi *ptr = getAmpiInstance(comm);
3257   int size = ptr->getSize(comm);
3258   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
3259   int rank = ptr->getRank(comm);
3260   int mask = 0x1;
3261   int dst;
3262   void* tmp_buf = malloc(blklen);
3263   void* partial_scan = malloc(blklen);
3264
3265   memcpy(recvbuf, sendbuf, blklen);
3266   memcpy(partial_scan, sendbuf, blklen);
3267   while(mask < size){
3268     dst = rank^mask;
3269     if(dst < size){
3270       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
3271           tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
3272       if(rank > dst){
3273         (op)(tmp_buf,partial_scan,&count,&datatype);
3274         (op)(tmp_buf,recvbuf,&count,&datatype);
3275       }else {
3276         (op)(partial_scan,tmp_buf,&count,&datatype);
3277         memcpy(partial_scan,tmp_buf,blklen);
3278       }
3279     }
3280     mask <<= 1;
3281
3282   }
3283
3284   free(tmp_buf);
3285   free(partial_scan);
3286 #if AMPI_COUNTER
3287   getAmpiParent()->counters.scan++;
3288 #endif
3289   return 0;
3290 }
3291
3292 CDECL
3293 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
3294   //AMPIAPI("AMPI_Op_create");
3295   *op = function;
3296   return 0;
3297 }
3298
3299 CDECL
3300 int AMPI_Op_free(MPI_Op *op){
3301   //AMPIAPI("AMPI_Op_free");
3302   *op = MPI_OP_NULL;
3303   return 0;
3304 }
3305
3306
3307   CDECL
3308 double AMPI_Wtime(void)
3309 {
3310   //  AMPIAPI("AMPI_Wtime");
3311
3312 #if AMPIMSGLOG
3313   double ret=TCHARM_Wall_timer();
3314   ampiParent* pptr = getAmpiParent();
3315   if(msgLogRead){
3316     (*(pptr->fromPUPer))|ret;
3317     return ret;
3318   }
3319
3320   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3321     (*(pptr->toPUPer))|ret;
3322   }
3323 #endif
3324
3325 #if CMK_BIGSIM_CHARM
3326   return BgGetTime();
3327 #else
3328   return TCHARM_Wall_timer();
3329 #endif
3330 }
3331
3332 CDECL
3333 double AMPI_Wtick(void){
3334   //AMPIAPI("AMPI_Wtick");
3335   return 1e-6;
3336 }
3337
3338
3339 int PersReq::start(){
3340   if(sndrcv == 1 || sndrcv == 3) { // send or ssend request
3341     ampi *ptr=getAmpiInstance(comm);
3342     ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm, sndrcv==3?1:0);
3343   }
3344   return 0;
3345 }
3346
3347   CDECL
3348 int AMPI_Start(MPI_Request *request)
3349 {
3350   AMPIAPI("AMPI_Start");
3351   checkRequest(*request);
3352   AmpiRequestList *reqs = getReqs();
3353   if(-1==(*reqs)[*request]->start()) {
3354     CkAbort("MPI_Start could be used only on persistent communication requests!");
3355   }
3356   return 0;
3357 }
3358
3359 CDECL
3360 int AMPI_Startall(int count, MPI_Request *requests){
3361   AMPIAPI("AMPI_Startall");
3362   checkRequests(count,requests);
3363   AmpiRequestList *reqs = getReqs();
3364   for(int i=0;i<count;i++){
3365     if(-1==(*reqs)[requests[i]]->start())
3366       CkAbort("MPI_Start could be used only on persistent communication requests!");
3367   }
3368   return 0;
3369 }
3370
3371 /* organize the indices of requests into a vector of a vector: 
3372  * level 1 is different msg envelope matches
3373  * level 2 is (posting) ordered requests of with envelope
3374  * each time multiple completion call loop over first elem of level 1
3375  * and move the matched to the NULL request slot.   
3376  * warning: this does not work with I-Alltoall requests */
3377 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
3378   for(int i=0;i<count;i++){
3379     if(reqs[i]!=MPI_REQUEST_NULL)
3380       return 0;
3381   }
3382   return 1;
3383 }
3384 inline int matchReq(MPI_Request ia, MPI_Request ib){
3385   checkRequest(ia);  
3386   checkRequest(ib);
3387   AmpiRequestList* reqs = getReqs();
3388   AmpiRequest *a, *b;
3389   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
3390   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
3391   a=(*reqs)[ia];  b=(*reqs)[ib];
3392   if(a->tag != b->tag) return 0;
3393   if(a->src != b->src) return 0;
3394   if(a->comm != b->comm) return 0;
3395   return 1;
3396 }
3397 inline void swapInt(int& a,int& b){
3398   int tmp;
3399   tmp=a; a=b; b=tmp;
3400 }
3401 inline void sortedIndex(int n, int* arr, int* idx){
3402   int i,j;
3403   for(i=0;i<n;i++) 
3404     idx[i]=i;
3405   for (i=0; i<n-1; i++) 
3406     for (j=0; j<n-1-i; j++)
3407       if (arr[idx[j+1]] < arr[idx[j]]) 
3408         swapInt(idx[j+1],idx[j]);
3409 }
3410 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
3411   CkAssert(count!=0);
3412   int *newidx = new int [count];
3413   int flag;
3414   sortedIndex(count,arr,newidx);
3415   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
3416   CkVec<int> slot;
3417   vec->push_back(slot);
3418   (*vec)[0].push_back(newidx[0]);
3419   for(int i=1;i<count;i++){
3420     flag=0;
3421     for(int j=0;j<vec->size();j++){
3422       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
3423         ((*vec)[j]).push_back(newidx[i]);
3424         flag++;
3425       }
3426     }
3427     if(!flag){
3428       CkVec<int> newslot;
3429       newslot.push_back(newidx[i]);
3430       vec->push_back(newslot);
3431     }else{
3432       CkAssert(flag==1);
3433     }
3434   }
3435   delete [] newidx;
3436   return vec;
3437 }
3438 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
3439   printf("vec content: ");
3440   for(int i=0;i<vec.size();i++){
3441     printf("{");
3442     for(int j=0;j<(vec[i]).size();j++){
3443       printf(" %d ",arr[(vec[i])[j]]);
3444     }
3445     printf("} ");
3446   }
3447   printf("\n");
3448 }
3449
3450 int PersReq::wait(MPI_Status *sts){
3451   if(sndrcv == 2) {
3452     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3453       CkAbort("AMPI> Error in persistent request wait");
3454 #if CMK_BIGSIM_CHARM
3455     _TRACE_BG_TLINE_END(&event);
3456 #endif
3457   }
3458   return 0;
3459 }
3460
3461 int IReq::wait(MPI_Status *sts){
3462   if(CpvAccess(CmiPICMethod) == 2) {
3463     AMPI_DEBUG("In weird clause of IReq::wait\n");
3464     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3465       CkAbort("AMPI> Error in non-blocking request wait");
3466
3467     return 0;
3468   }
3469
3470   //Copy "this" to a local variable in the case that "this" pointer
3471   //is updated during the out-of-core emulation.
3472
3473   // optimization for Irecv
3474   // generic() writes directly to the buffer, so the only thing we
3475   // do here is to wait
3476   ampi *ptr = getAmpiInstance(comm);
3477
3478   //BIGSIM_OOC DEBUGGING
3479   //int ooccnt=0;
3480   //int ampiIndex = ptr->thisIndex;
3481   //CmiPrintf("%d: IReq's status=%d\n", ampiIndex, statusIreq);
3482
3483   while (statusIreq == false) {
3484     //BIGSIM_OOC DEBUGGING
3485     //CmiPrintf("Before blocking: %dth time: %d: in Ireq::wait\n", ++ooccnt, ptr->thisIndex);
3486     //print();
3487
3488     ptr->resumeOnRecv=true;
3489     ptr->block();
3490
3491     //BIGSIM_OOC DEBUGGING
3492     //CmiPrintf("[%d] After blocking: in Ireq::wait\n", ptr->thisIndex);
3493     //CmiPrintf("IReq's this pointer: %p\n", this);
3494     //print();
3495
3496 #if CMK_BIGSIM_CHARM
3497     //Because of the out-of-core emulation, this pointer is changed after in-out
3498     //memory operation. So we need to return from this function and do the while loop
3499     //in the outer function call.       
3500     if(_BgInOutOfCoreMode)
3501       return -1;
3502 #endif  
3503   }   // end of while
3504   ptr->resumeOnRecv=false;
3505
3506   AMPI_DEBUG("IReq::wait has resumed\n");
3507
3508   if(sts) {
3509     AMPI_DEBUG("Setting sts->MPI_TAG to this->tag=%d in IReq::wait  this=%p\n", (int)this->tag, this);
3510     sts->MPI_TAG = tag;
3511     sts->MPI_SOURCE = src;
3512     sts->MPI_COMM = comm;
3513     sts->MPI_LENGTH = length;
3514   }
3515
3516   return 0;
3517 }
3518
3519 int ATAReq::wait(MPI_Status *sts){
3520   int i;
3521   for(i=0;i<count;i++){
3522     if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3523           myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
3524       CkAbort("AMPI> Error in alltoall request wait");
3525 #if CMK_BIGSIM_CHARM
3526     _TRACE_BG_TLINE_END(&myreqs[i].event);
3527 #endif
3528   }
3529 #if CMK_BIGSIM_CHARM
3530   //TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", NULL, 0);
3531   TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0, 1);
3532   for (i=0; i<count; i++)
3533     _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
3534   _TRACE_BG_TLINE_END(&event);
3535 #endif
3536   return 0;
3537 }
3538
3539 int SReq::wait(MPI_Status *sts){
3540   ampi *ptr = getAmpiInstance(comm);
3541   while (statusIreq == false) {
3542     ptr->resumeOnRecv = true;
3543     ptr->block();
3544     ptr = getAmpiInstance(comm);
3545     ptr->resumeOnRecv = false;
3546   }
3547   return 0;
3548 }
3549
3550   CDECL
3551 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
3552 {
3553   AMPIAPI("AMPI_Wait");
3554   if(*request == MPI_REQUEST_NULL){
3555     stsempty(*sts);
3556     return 0;
3557   }
3558   checkRequest(*request);
3559   AmpiRequestList* reqs = getReqs();
3560
3561 #if AMPIMSGLOG
3562   ampiParent* pptr = getAmpiParent();
3563   if(msgLogRead){
3564     (*(pptr->fromPUPer))|(pptr->pupBytes);
3565     PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3566     PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
3567     return 0;
3568   }
3569 #endif
3570
3571   AMPI_DEBUG("AMPI_Wait request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3572   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3573   //(*reqs)[*request]->wait(sts);
3574   int waitResult = -1;
3575   do{
3576     AmpiRequest *waitReq = (*reqs)[*request];
3577     waitResult = waitReq->wait(sts);
3578     if(_BgInOutOfCoreMode){
3579       reqs = getReqs();
3580     }
3581   }while(waitResult==-1);
3582
3583
3584   AMPI_DEBUG("AMPI_Wait after calling wait, request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3585
3586
3587 #if AMPIMSGLOG
3588   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3589     (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
3590     (*(pptr->toPUPer))|(pptr->pupBytes);
3591     PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3592     PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
3593   }
3594 #endif
3595
3596   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3597     reqs->free(*request);
3598     *request = MPI_REQUEST_NULL;
3599   }
3600
3601   AMPI_DEBUG("End of AMPI_Wait\n");
3602
3603   return 0;
3604 }
3605
3606   CDECL
3607 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
3608 {
3609   AMPIAPI("AMPI_Waitall");
3610   if(count==0) return MPI_SUCCESS;
3611   checkRequests(count,request);
3612   int i,j,oldPe;
3613   AmpiRequestList* reqs = getReqs();
3614   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3615
3616 #if AMPIMSGLOG
3617   ampiParent* pptr = getAmpiParent();
3618   if(msgLogRead){
3619     for(i=0;i<reqvec->size();i++){
3620       for(j=0;j<((*reqvec)[i]).size();j++){
3621         if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3622           stsempty(sts[((*reqvec)[i])[j]]);
3623           continue;
3624         }
3625         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3626         (*(pptr->fromPUPer))|(pptr->pupBytes);
3627         PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3628         PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3629       }
3630     }
3631     return 0;
3632   }
3633 #endif
3634
3635 #if CMK_BIGSIM_CHARM
3636   void *curLog;         // store current log in timeline
3637   _TRACE_BG_TLINE_END(&curLog);
3638 #if 0
3639   TRACE_BG_AMPI_SUSPEND();
3640 #endif
3641 #endif
3642   for(i=0;i<reqvec->size();i++){
3643     for(j=0;j<((*reqvec)[i]).size();j++){
3644       //CkPrintf("[%d] in loop [%d, %d]\n", pptr->thisIndex,i, j);
3645       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3646         stsempty(sts[((*reqvec)[i])[j]]);
3647         continue;
3648       }
3649       oldPe = CkMyPe();
3650
3651       int waitResult = -1;
3652       do{       
3653         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3654         waitResult = waitReq->wait(&sts[((*reqvec)[i])[j]]);
3655         if(_BgInOutOfCoreMode){
3656           reqs = getReqs();
3657           reqvec = vecIndex(count, request);
3658         }
3659
3660 #if AMPIMSGLOG
3661         if(msgLogWrite && record_msglog(pptr->thisIndex)){
3662           (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
3663           (*(pptr->toPUPer))|(pptr->pupBytes);
3664           PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3665           PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3666         }
3667 #endif
3668
3669       }while(waitResult==-1);
3670
3671 #if 1
3672 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3673       //for fault evacuation
3674       if(oldPe != CkMyPe()){
3675 #endif
3676         reqs = getReqs();
3677         reqvec  = vecIndex(count,request);
3678 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3679       }
3680 #endif
3681 #endif
3682     }
3683   }
3684 #if CMK_BIGSIM_CHARM
3685   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
3686 #endif
3687   // free memory of requests
3688   for(i=0;i<count;i++){ 
3689     if(request[i] == MPI_REQUEST_NULL)
3690       continue;
3691     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
3692       reqs->free(request[i]);
3693       request[i] = MPI_REQUEST_NULL;
3694     }
3695   }
3696   delete reqvec;
3697   return 0;
3698 }
3699
3700   CDECL
3701 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
3702 {
3703   AMPIAPI("AMPI_Waitany");
3704
3705   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
3706   if(count == 0) return MPI_SUCCESS;
3707   checkRequests(count,request);
3708   if(areInactiveReqs(count,request)){
3709     *idx=MPI_UNDEFINED;
3710     stsempty(*sts);
3711     return MPI_SUCCESS;
3712   }
3713   int flag=0;
3714   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3715   while(count>0){ /* keep looping until some request finishes: */
3716     for(int i=0;i<reqvec->size();i++){
3717       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
3718       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
3719         *idx = ((*reqvec)[i])[0];
3720         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
3721         return 0;
3722       }
3723     }
3724     /* no requests have finished yet-- schedule and try again */
3725     AMPI_Yield(MPI_COMM_WORLD);
3726   }
3727   *idx = MPI_UNDEFINED;
3728   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
3729   delete reqvec;
3730   return 0;
3731 }
3732
3733   CDECL
3734 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
3735     int *array_of_indices, MPI_Status *array_of_statuses)
3736 {
3737   AMPIAPI("AMPI_Waitsome");
3738   checkRequests(incount,array_of_requests);
3739   if(areInactiveReqs(incount,array_of_requests)){
3740     *outcount=MPI_UNDEFINED;
3741     return MPI_SUCCESS;
3742   }
3743   MPI_Status sts;
3744   int i;
3745   int flag=0, realflag=0;
3746   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3747   *outcount = 0;
3748   while(1){
3749     for(i=0;i<reqvec->size();i++){
3750       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3751       if(flag == 1){ 
3752         array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3753         array_of_statuses[(*outcount)++]=sts;
3754         if(sts.MPI_COMM != 0)
3755           realflag=1; // there is real(non null) request
3756       }
3757     }
3758     if(realflag && outcount>0) break;
3759   }
3760   delete reqvec;
3761   return 0;
3762 }
3763
3764   CmiBool PersRe