08a134ab3b1869d4aad9d28dcf682021fa770863
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1
2 #define AMPIMSGLOG    0
3
4 #define exit exit /*Supress definition of exit in ampi.h*/
5 #include "ampiimpl.h"
6 #include "tcharm.h"
7 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
8 #include "ampiEvents.h" /*** for trace generation for projector *****/
9 #include "ampiProjections.h"
10 #endif
11
12 #if CMK_BIGSIM_CHARM
13 #include "bigsim_logs.h"
14 #endif
15
16 #define CART_TOPOL 1
17 #define AMPI_PRINT_IDLE 0
18
19 /* change this define to "x" to trace all send/recv's */
20 #define MSG_ORDER_DEBUG(x)  //x /* empty */
21 /* change this define to "x" to trace user calls */
22 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
23 #define STARTUP_DEBUG(x)  //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
24 #define FUNCCALL_DEBUG(x) //x /* empty */
25
26 static CkDDT *getDDT(void) {
27   return getAmpiParent()->myDDT;
28 }
29
30   inline int checkCommunicator(MPI_Comm comm) {
31     if(comm == MPI_COMM_NULL)
32       return MPI_ERR_COMM;
33     return MPI_SUCCESS;
34   }
35
36   inline int checkCount(int count) {
37     if(count < 0)
38       return MPI_ERR_COUNT;
39     return MPI_SUCCESS;
40   }
41
42   inline int checkData(MPI_Datatype data) {
43     if(data == MPI_DATATYPE_NULL)
44       return MPI_ERR_TYPE;
45     return MPI_SUCCESS;
46   }
47
48   inline int checkTag(int tag) {
49     if(tag != MPI_ANY_TAG && tag < 0)
50       return MPI_ERR_TAG;
51     return MPI_SUCCESS;
52   }
53
54 inline int checkRank(int rank, MPI_Comm comm) {
55   int size;
56   AMPI_Comm_size(comm, &size);
57   if(((rank >= 0) && (rank < size)) || (rank == MPI_ANY_SOURCE) || (rank ==
58         MPI_PROC_NULL))
59     return MPI_SUCCESS;
60   return MPI_ERR_RANK;
61 }
62
63   inline int checkBuf(void *buf, int count) {
64     if((count != 0 && buf == NULL))
65       return MPI_ERR_BUFFER;
66     return MPI_SUCCESS;
67   }
68
69 inline int errorCheck(MPI_Comm comm, int ifComm, int count, int ifCount,
70                       MPI_Datatype data, int ifData, int tag, int ifTag,
71                       int rank, int ifRank,
72                       void *buf1, int ifBuf1, void *buf2 = 0, int ifBuf2 = 0) {
73   int ret;
74   if(ifComm) { 
75     ret = checkCommunicator(comm);
76     if(ret != MPI_SUCCESS)
77       return ret;
78   }
79   if(ifCount) {
80     ret = checkCount(count);
81     if(ret != MPI_SUCCESS)
82       return ret;
83   }
84   if(ifData) {
85     ret = checkData(data);
86     if(ret != MPI_SUCCESS)
87       return ret;
88   }
89   if(ifTag) {
90     ret = checkTag(tag);
91     if(ret != MPI_SUCCESS)
92       return ret;
93   }
94   if(ifRank) {
95     ret = checkRank(rank,comm);
96     if(ret != MPI_SUCCESS)
97       return ret;
98   }
99   if(ifBuf1) {
100     ret = checkBuf(buf1,count);
101     if(ret != MPI_SUCCESS)
102       return ret;
103   }
104   if(ifBuf2) {
105     ret = checkBuf(buf2,count);
106     if(ret != MPI_SUCCESS)
107       return ret;
108   }
109   return MPI_SUCCESS;
110 }
111
112 //------------- startup -------------
113 static mpi_comm_worlds mpi_worlds;
114
115 int _mpi_nworlds; /*Accessed by ampif*/
116 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
117
118 /* ampiReducer: AMPI's generic reducer type 
119    MPI_Op is function pointer to MPI_User_function
120    so that it can be packed into AmpiOpHeader, shipped 
121    with the reduction message, and then plugged into 
122    the ampiReducer. 
123    One little trick is the ampi::recv which receives
124    the final reduction message will see additional
125    sizeof(AmpiOpHeader) bytes in the buffer before
126    any user data.                             */
127 class AmpiComplex { 
128   public: 
129     double re, im; 
130     void operator+=(const AmpiComplex &a) {
131       re+=a.re;
132       im+=a.im;
133     }
134     void operator*=(const AmpiComplex &a) {
135       double nu_re=re*a.re-im*a.im;
136       im=re*a.im+im*a.re;
137       re=nu_re;
138     }
139     int operator>(const AmpiComplex &a) {
140       CkAbort("Cannot compare complex numbers with MPI_MAX");
141       return 0;
142     }
143     int operator<(const AmpiComplex &a) {
144       CkAbort("Cannot compare complex numbers with MPI_MIN");
145       return 0;
146     }
147 };
148 typedef struct { float val; int idx; } FloatInt;
149 typedef struct { double val; int idx; } DoubleInt;
150 typedef struct { long val; int idx; } LongInt;
151 typedef struct { int val; int idx; } IntInt;
152 typedef struct { short val; int idx; } ShortInt;
153 typedef struct { long double val; int idx; } LongdoubleInt;
154 typedef struct { float val; float idx; } FloatFloat;
155 typedef struct { double val; double idx; } DoubleDouble;
156
157
158 #define MPI_OP_SWITCH(OPNAME) \
159   int i; \
160 switch (*datatype) { \
161   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
162   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
163   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
164   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
165   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
166   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
167   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
168   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
169   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
170   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
171   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
172   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
173   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
174   default: \
175            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
176   CmiAbort("Unsupported MPI datatype for MPI Op"); \
177 };\
178
179 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
180 #define MPI_OP_IMPL(type) \
181   if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
182   MPI_OP_SWITCH(MPI_MAX)
183 #undef MPI_OP_IMPL
184 }
185
186 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
187 #define MPI_OP_IMPL(type) \
188   if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
189   MPI_OP_SWITCH(MPI_MIN)
190 #undef MPI_OP_IMPL
191 }
192
193 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
194 #define MPI_OP_IMPL(type) \
195   ((type *)inoutvec)[i] += ((type *)invec)[i];
196   MPI_OP_SWITCH(MPI_SUM)
197 #undef MPI_OP_IMPL
198 }
199
200 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
201 #define MPI_OP_IMPL(type) \
202   ((type *)inoutvec)[i] *= ((type *)invec)[i];
203   MPI_OP_SWITCH(MPI_PROD)
204 #undef MPI_OP_IMPL
205 }
206
207 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
208   int i;  
209   switch (*datatype) {
210     case MPI_INT:
211     case MPI_LOGICAL:
212       for(i=0;i<(*len);i++)
213         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
214       break;
215     default:
216       ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
217       CmiAbort("exiting");
218   }
219 }
220 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
221   int i; 
222   switch (*datatype) {
223     case MPI_INT:
224       for(i=0;i<(*len);i++)
225         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
226       break;
227     case MPI_BYTE:
228       for(i=0;i<(*len);i++)
229         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
230       break;
231     default:
232       ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
233       CmiAbort("exiting");
234   }
235 }
236 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
237   int i;  
238   switch (*datatype) {
239     case MPI_INT:
240     case MPI_LOGICAL:
241       for(i=0;i<(*len);i++)
242         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
243       break;
244     default:
245       ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
246       CmiAbort("exiting");
247   }
248 }
249 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
250   int i;  
251   switch (*datatype) {
252     case MPI_INT:
253       for(i=0;i<(*len);i++)
254         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
255       break;
256     case MPI_BYTE:
257       for(i=0;i<(*len);i++)
258         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
259       break;
260     default:
261       ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
262       CmiAbort("exiting");
263   }
264 }
265 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
266   int i;  
267   switch (*datatype) {
268     case MPI_INT:
269     case MPI_LOGICAL:
270       for(i=0;i<(*len);i++)
271         ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
272       break;
273     default:
274       ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
275       CmiAbort("exiting");
276   }
277 }
278 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
279   int i;  
280   switch (*datatype) {
281     case MPI_INT:
282       for(i=0;i<(*len);i++)
283         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
284       break;
285     case MPI_BYTE:
286       for(i=0;i<(*len);i++)
287         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
288       break;
289     case MPI_UNSIGNED:
290       for(i=0;i<(*len);i++)
291         ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
292       break;
293     default:
294       ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
295       CmiAbort("exiting");
296   }
297 }
298
299 #ifndef MIN
300 #define MIN(a,b) (a < b ? a : b)
301 #endif
302
303 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
304   int i;  
305
306   switch (*datatype) {
307     case MPI_FLOAT_INT:
308       for(i=0;i<(*len);i++)
309         if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
310           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
311         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
312           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
313       break;
314     case MPI_DOUBLE_INT:
315       for(i=0;i<(*len);i++)
316         if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
317           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
318         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
319           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
320
321       break;
322     case MPI_LONG_INT:
323       for(i=0;i<(*len);i++)
324         if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
325           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
326         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
327           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
328       break;
329     case MPI_2INT:
330       for(i=0;i<(*len);i++)
331         if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
332           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
333         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
334           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
335       break;
336     case MPI_SHORT_INT:
337       for(i=0;i<(*len);i++)
338         if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
339           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
340         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
341           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
342       break;
343     case MPI_LONG_DOUBLE_INT:
344       for(i=0;i<(*len);i++)
345         if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
346           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
347         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
348           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
349       break;
350     case MPI_2FLOAT:
351       for(i=0;i<(*len);i++)
352         if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
353           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
354         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
355           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
356       break;
357     case MPI_2DOUBLE:
358       for(i=0;i<(*len);i++)
359         if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
360           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
361         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
362           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
363       break;
364     default:
365       ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
366       CmiAbort("exiting");
367   }
368 }
369 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
370   int i;  
371   switch (*datatype) {
372     case MPI_FLOAT_INT:
373       for(i=0;i<(*len);i++)
374         if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
375           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
376         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
377           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
378       break;
379     case MPI_DOUBLE_INT:
380       for(i=0;i<(*len);i++)
381         if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
382           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
383         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
384           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
385       break;
386     case MPI_LONG_INT:
387       for(i=0;i<(*len);i++)
388         if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
389           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
390         else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
391           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
392       break;
393     case MPI_2INT:
394       for(i=0;i<(*len);i++)
395         if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
396           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
397         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
398           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
399       break;
400     case MPI_SHORT_INT:
401       for(i=0;i<(*len);i++)
402         if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
403           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
404         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
405           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
406       break;
407     case MPI_LONG_DOUBLE_INT:
408       for(i=0;i<(*len);i++)
409         if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
410           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
411         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
412           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
413       break;
414     case MPI_2FLOAT:
415       for(i=0;i<(*len);i++)
416         if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
417           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
418         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
419           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
420       break;
421     case MPI_2DOUBLE:
422       for(i=0;i<(*len);i++)
423         if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
424           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
425         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
426           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
427       break;
428     default:
429       ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
430       CmiAbort("exiting");
431   }
432 }
433
434 // every msg contains a AmpiOpHeader structure before user data
435 // FIXME: non-commutative operations require messages be ordered by rank
436 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
437   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
438   MPI_Datatype dtype;
439   int szhdr, szdata, len;
440   MPI_User_function* func;
441   func = hdr->func;
442   dtype = hdr->dtype;  
443   szdata = hdr->szdata;
444   len = hdr->len;  
445   szhdr = sizeof(AmpiOpHeader);
446
447   //Assuming extent == size
448   void *ret = malloc(szhdr+szdata);
449   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
450   for(int i=1;i<nMsg;i++){
451     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
452   }
453   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
454   free(ret);
455   return retmsg;
456 }
457
458 CkReduction::reducerType AmpiReducer;
459
460 class Builtin_kvs{
461   public:
462     int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
463     Builtin_kvs(){
464       tag_ub = MPI_TAG_UB_VALUE; 
465       host = MPI_PROC_NULL;
466       io = 0;
467       wtime_is_global = 0;
468       keyval_mype = CkMyPe();
469       keyval_numpes = CkNumPes();
470       keyval_mynode = CkMyNode();
471       keyval_numnodes = CkNumNodes();
472     }
473 };
474
475 // ------------ startup support -----------
476 int _ampi_fallback_setup_count;
477 CDECL void AMPI_Setup(void);
478 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
479
480 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
481
482 /*Main routine used when missing MPI_Setup routine*/
483 CDECL void AMPI_Fallback_Main(int argc,char **argv)
484 {
485   AMPI_Main_cpp(argc,argv);
486   AMPI_Main(argc,argv);
487   FTN_NAME(MPI_MAIN,mpi_main)();
488 }
489
490 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
491 /*Startup routine used if user *doesn't* write
492   a TCHARM_User_setup routine.
493  */
494 CDECL void AMPI_Setup_Switch(void) {
495   _ampi_fallback_setup_count=0;
496   FTN_NAME(AMPI_SETUP,ampi_setup)();
497   AMPI_Setup();
498   if (_ampi_fallback_setup_count==2)
499   { //Missing AMPI_Setup in both C and Fortran:
500     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
501   }
502 }
503
504 static int nodeinit_has_been_called=0;
505 CtvDeclare(ampiParent*, ampiPtr);
506 CtvDeclare(int, ampiInitDone);
507 CtvDeclare(void*,stackBottom);
508 CtvDeclare(int, ampiFinalized);
509 CkpvDeclare(Builtin_kvs, bikvs);
510 CkpvDeclare(int,argvExtracted);
511 static int enableStreaming = 0;
512
513 CDECL long ampiCurrentStackUsage(){
514   int localVariable;
515
516   unsigned long p1 =  (unsigned long)((void*)&localVariable);
517   unsigned long p2 =  (unsigned long)(CtvAccess(stackBottom));
518
519
520   if(p1 > p2)
521     return p1 - p2;
522   else
523     return  p2 - p1;
524
525 }
526
527 FDECL void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
528   long usage = ampiCurrentStackUsage();
529   CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
530 }
531
532
533 CDECL void AMPI_threadstart(void *data);
534 static int AMPI_threadstart_idx = -1;
535
536 static void ampiNodeInit(void)
537 {
538   _mpi_nworlds=0;
539   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
540   {
541     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
542   }
543   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
544
545   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
546
547   CmiAssert(AMPI_threadstart_idx == -1);    // only initialize once
548   AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
549
550   nodeinit_has_been_called=1;
551
552    // ASSUME NO ANYTIME MIGRATION and STATIC INSERTON
553   _isAnytimeMigration = false;
554   _isStaticInsertion = true;
555 }
556
557 #if PRINT_IDLE
558 static double totalidle=0.0, startT=0.0;
559 static int beginHandle, endHandle;
560 static void BeginIdle(void *dummy,double curWallTime)
561 {
562   startT = curWallTime;
563 }
564 static void EndIdle(void *dummy,double curWallTime)
565 {
566   totalidle += curWallTime - startT;
567 }
568 #endif
569
570 /* for fortran reduction operation table to handle mapping */
571 typedef MPI_Op  MPI_Op_Array[128];
572 CtvDeclare(int, mpi_opc);
573 CtvDeclare(MPI_Op_Array, mpi_ops);
574
575 static void ampiProcInit(void){
576   CtvInitialize(ampiParent*, ampiPtr);
577   CtvInitialize(int,ampiInitDone);
578   CtvInitialize(int,ampiFinalized);
579   CtvInitialize(void*,stackBottom);
580
581
582   CtvInitialize(MPI_Op_Array, mpi_ops);
583   CtvInitialize(int, mpi_opc);
584
585   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
586   CkpvAccess(bikvs) = Builtin_kvs();
587
588   CkpvInitialize(int, argvExtracted);
589   CkpvAccess(argvExtracted) = 0;
590
591 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
592   REGISTER_AMPI
593 #endif
594   initAmpiProjections();
595
596   char **argv=CkGetArgv();
597 #if AMPI_COMLIB  
598   if(CkpvAccess(argvExtracted)==0){
599     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
600   }
601 #endif
602
603 #if AMPIMSGLOG
604   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
605   //msgLogRead = CmiGetArgFlag(argv, "+msgLogRead");
606   if (CmiGetArgIntDesc(argv,"+msgLogRead", &msgLogRank, "Re-play message processing order for AMPI")) {
607     msgLogRead = 1;
608   }
609   //CmiGetArgInt(argv, "+msgLogRank", &msgLogRank);
610   char *procs = NULL;
611   if (CmiGetArgStringDesc(argv, "+msgLogRanks", &procs, "A list of AMPI processors to record , e.g. 0,10,20-30")) {
612     msgLogRanks.set(procs);
613   }
614   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
615   if (CkMyPe() == 0) {
616     if (msgLogWrite) CmiPrintf("Writing AMPI messages of rank %s to log: %s\n", procs?procs:"", msgLogFilename);
617     if (msgLogRead) CmiPrintf("Reading AMPI messages of rank %s from log: %s\n", procs?procs:"", msgLogFilename);
618   }
619 #endif
620
621   // initBigSimTrace(1,outtiming);
622 }
623
624 #if AMPIMSGLOG
625 static inline int record_msglog(int rank){
626   return msgLogRanks.includes(rank);
627 }
628 #endif
629
630 void AMPI_Install_Idle_Timer(){
631 #if AMPI_PRINT_IDLE
632   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
633   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
634 #endif
635 }
636
637 void AMPI_Uninstall_Idle_Timer(){
638 #if AMPI_PRINT_IDLE
639   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
640   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
641 #endif
642 }
643
644 PUPfunctionpointer(MPI_MainFn)
645
646   class MPI_threadstart_t {
647     public:
648       MPI_MainFn fn;
649       MPI_threadstart_t() {}
650       MPI_threadstart_t(MPI_MainFn fn_)
651         :fn(fn_) {}
652       void start(void) {
653         char **argv=CmiCopyArgs(CkGetArgv());
654         int argc=CkGetArgc();
655
656         // Set a pointer to somewhere close to the bottom of the stack.
657         // This is used for roughly estimating the stack usage later.
658         CtvAccess(stackBottom) = &argv;
659
660 #if CMK_AMPI_FNPTR_HACK
661         AMPI_Fallback_Main(argc,argv);
662 #else
663         (fn)(argc,argv);
664 #endif
665       }
666       void pup(PUP::er &p) {
667         p|fn;
668       }
669   };
670 PUPmarshall(MPI_threadstart_t)
671
672 CDECL void AMPI_threadstart(void *data)
673 {
674   STARTUP_DEBUG("MPI_threadstart")
675     MPI_threadstart_t t;
676   pupFromBuf(data,t);
677 #if CMK_TRACE_IN_CHARM
678   if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
679 #endif
680   t.start();
681 }
682
683 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
684 {
685   STARTUP_DEBUG("ampiCreateMain")
686     int _nchunks=TCHARM_Get_num_chunks();
687   //Make a new threads array:
688   MPI_threadstart_t s(mainFn);
689   memBuf b; pupIntoBuf(b,s);
690   TCHARM_Create_data( _nchunks,AMPI_threadstart_idx,
691       b.getData(), b.getSize());
692 }
693
694 /* TCharm Semaphore ID's for AMPI startup */
695 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
696 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
697
698 static CProxy_ampiWorlds ampiWorldsGroup;
699
700 static void init_operations()
701 {
702   CtvInitialize(MPI_Op_Array, mpi_ops);
703   int i = 0;
704   MPI_Op *tab = CtvAccess(mpi_ops);
705   tab[i++] = MPI_MAX;
706   tab[i++] = MPI_MIN;
707   tab[i++] = MPI_SUM;
708   tab[i++] = MPI_PROD;
709   tab[i++] = MPI_LAND;
710   tab[i++] = MPI_BAND;
711   tab[i++] = MPI_LOR;
712   tab[i++] = MPI_BOR;
713   tab[i++] = MPI_LXOR;
714   tab[i++] = MPI_BXOR;
715   tab[i++] = MPI_MAXLOC;
716   tab[i++] = MPI_MINLOC;
717
718   CtvInitialize(int, mpi_opc);
719   CtvAccess(mpi_opc) = i;
720 }
721
722 /*
723    Called from MPI_Init, a collective initialization call:
724    creates a new AMPI array and attaches it to the current
725    set of TCHARM threads.
726  */
727 static ampi *ampiInit(char **argv)
728 {
729   FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CmiMyPe(), TCHARM_Element());)
730     if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
731   STARTUP_DEBUG("ampiInit> begin")
732
733     MPI_Comm new_world;
734   int _nchunks;
735   CkArrayOptions opts;
736   CProxy_ampiParent parent;
737   if (TCHARM_Element()==0) //the rank of a tcharm object
738   { /* I'm responsible for building the arrays: */
739     STARTUP_DEBUG("ampiInit> creating arrays")
740
741       // FIXME: Need to serialize global communicator allocation in one place.
742       //Allocate the next communicator
743       if(_mpi_nworlds == MPI_MAX_COMM_WORLDS)
744       {
745         CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
746       }
747     int new_idx=_mpi_nworlds;
748     new_world=MPI_COMM_WORLD+new_idx; // Isaac guessed there shouldn't be a +1 here
749
750     //Create and attach the ampiParent array
751     CkArrayID threads;
752     opts=TCHARM_Attach_start(&threads,&_nchunks);
753     parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
754     STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
755   }
756   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
757
758   FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
759
760     if (TCHARM_Element()==0)
761     {
762       //Make a new ampi array
763       CkArrayID empty;
764
765       ampiCommStruct worldComm(new_world,empty,_nchunks);
766       CProxy_ampi arr;
767
768
769 #if AMPI_COMLIB
770
771       ComlibInstanceHandle ciStreaming = 1;
772       ComlibInstanceHandle ciBcast = 2;
773       ComlibInstanceHandle ciAllgather = 3;
774       ComlibInstanceHandle ciAlltoall = 4;
775
776       arr=CProxy_ampi::ckNew(parent, worldComm, ciStreaming, ciBcast, ciAllgather, ciAlltoall, opts);
777
778
779       CkPrintf("Using untested comlib code in ampi.C\n");
780
781       Strategy *sStreaming = new StreamingStrategy(1,10);
782       CkAssert(ciStreaming == ComlibRegister(sStreaming));
783
784       Strategy *sBcast = new BroadcastStrategy(USE_HYPERCUBE);
785       CkAssert(ciBcast = ComlibRegister(sBcast));
786
787       Strategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
788       CkAssert(ciAllgather = ComlibRegister(sAllgather));
789
790       Strategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
791       CkAssert(ciAlltoall = ComlibRegister(sAlltoall));
792
793       CmiPrintf("Created AMPI comlib strategies in new manner\n");
794
795       // FIXME: Propogate the comlib table here
796       CkpvAccess(conv_com_object).doneCreating();
797 #else
798       arr=CProxy_ampi::ckNew(parent,worldComm,opts);
799 #endif
800
801       //Broadcast info. to the mpi_worlds array
802       // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
803       ampiCommStruct newComm(new_world,arr,_nchunks);
804       //CkPrintf("In ampiInit: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
805       if (ampiWorldsGroup.ckGetGroupID().isZero())
806         ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
807       else
808         ampiWorldsGroup.add(newComm);
809       STARTUP_DEBUG("ampiInit> arrays created")
810
811     }
812
813   // Find our ampi object:
814   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
815   CtvAccess(ampiInitDone)=1;
816   CtvAccess(ampiFinalized)=0;
817   STARTUP_DEBUG("ampiInit> complete")
818 #if CMK_BIGSIM_CHARM
819     //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
820     TRACE_BG_ADD_TAG("AMPI_START");
821 #endif
822
823   init_operations();     // initialize fortran reduction operation table
824
825   getAmpiParent()->ampiInitCallDone = 0;
826
827   CProxy_ampi cbproxy = ptr->getProxy();
828   CkCallback cb(CkIndex_ampi::allInitDone(NULL), cbproxy[0]);
829   ptr->contribute(0, NULL, CkReduction::sum_int, cb);
830
831   ampiParent *thisParent = getAmpiParent(); 
832   while(thisParent->ampiInitCallDone!=1){
833     //CkPrintf("In checking ampiInitCallDone(%d) loop at parent %d!\n", thisParent->ampiInitCallDone, thisParent->thisIndex);
834     thisParent->getTCharmThread()->stop();
835     /* 
836      * thisParent needs to be updated in case of the parent is being pupped.
837      * In such case, thisParent got changed
838      */
839     thisParent = getAmpiParent();
840   }
841
842 #ifdef CMK_BIGSIM_CHARM
843   BgSetStartOutOfCore();
844 #endif
845
846   return ptr;
847 }
848
849 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
850 class ampiWorlds : public CBase_ampiWorlds {
851   public:
852     ampiWorlds(const ampiCommStruct &nextWorld) {
853       ampiWorldsGroup=thisgroup;
854       //CkPrintf("In constructor: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
855       add(nextWorld);
856     }
857     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
858     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
859     void add(const ampiCommStruct &nextWorld) {
860       int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD); // Isaac guessed there shouldn't be a +1 after the MPI_COMM_WORLD
861       mpi_worlds[new_idx].comm=nextWorld;
862       if (_mpi_nworlds<=new_idx) _mpi_nworlds=new_idx+1;
863       STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
864     }
865 };
866
867 //-------------------- ampiParent -------------------------
868   ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
869 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
870 {
871   int barrier = 0x1234;
872   STARTUP_DEBUG("ampiParent> starting up")
873     thread=NULL;
874   worldPtr=NULL;
875   myDDT=&myDDTsto;
876   prepareCtv();
877
878   init();
879
880   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
881   AsyncEvacuate(CmiFalse);
882 }
883
884 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
885   thread=NULL;
886   worldPtr=NULL;
887   myDDT=&myDDTsto;
888
889   init();
890
891   AsyncEvacuate(CmiFalse);
892 }
893
894 void ampiParent::pup(PUP::er &p) {
895   ArrayElement1D::pup(p);
896   p|threads;
897   p|worldNo;           // why it was missing from here before??
898   p|worldStruct;
899   myDDT->pup(p);
900   p|splitComm;
901   p|groupComm;
902   p|groups;
903
904   //BIGSIM_OOC DEBUGGING
905   //if(!p.isUnpacking()){
906   //    CmiPrintf("ampiParent[%d] packing ampiRequestList: \n", thisIndex);
907   //    ampiReqs.print();
908   //}
909
910   p|ampiReqs;
911
912   //BIGSIM_OOC DEBUGGING
913   //if(p.isUnpacking()){
914   //    CmiPrintf("ampiParent[%d] unpacking ampiRequestList: \n", thisIndex);
915   //    ampiReqs.print();
916   //}
917
918   p|RProxyCnt;
919   p|tmpRProxy;
920   p|winStructList;
921   p|infos;
922
923   p|ampiInitCallDone;
924 }
925 void ampiParent::prepareCtv(void) {
926   thread=threads[thisIndex].ckLocal();
927   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
928   CtvAccessOther(thread->getThread(),ampiPtr) = this;
929   STARTUP_DEBUG("ampiParent> found TCharm")
930 }
931
932 void ampiParent::init(){
933   CkAssert(groups.size() == 0);
934   groups.push_back(new groupStruct);
935
936 #if AMPIMSGLOG
937   if(msgLogWrite && record_msglog(thisIndex)){
938     char fname[128];
939     sprintf(fname, "%s.%d", msgLogFilename,thisIndex);
940 #if CMK_PROJECTIONS_USE_ZLIB && 0
941     fMsgLog = gzopen(fname,"wb");
942     toPUPer = new PUP::tozDisk(fMsgLog);
943 #else
944     fMsgLog = fopen(fname,"wb");
945     CmiAssert(fMsgLog != NULL);
946     toPUPer = new PUP::toDisk(fMsgLog);
947 #endif
948   }else if(msgLogRead){
949     char fname[128];
950     sprintf(fname, "%s.%d", msgLogFilename,msgLogRank);
951 #if CMK_PROJECTIONS_USE_ZLIB && 0
952     fMsgLog = gzopen(fname,"rb");
953     fromPUPer = new PUP::fromzDisk(fMsgLog);
954 #else
955     fMsgLog = fopen(fname,"rb");
956     CmiAssert(fMsgLog != NULL);
957     fromPUPer = new PUP::fromDisk(fMsgLog);
958 #endif
959     CkPrintf("AMPI> opened message log file: %s for replay\n", fname);
960   }
961 #endif
962 }
963
964 void ampiParent::finalize(){
965 #if AMPIMSGLOG
966   if(msgLogWrite && record_msglog(thisIndex)){
967     delete toPUPer;
968 #if CMK_PROJECTIONS_USE_ZLIB && 0
969     gzclose(fMsgLog);
970 #else
971     fclose(fMsgLog);
972 #endif
973   }else if(msgLogRead){
974     delete fromPUPer;
975 #if CMK_PROJECTIONS_USE_ZLIB && 0
976     gzclose(fMsgLog);
977 #else
978     fclose(fMsgLog);
979 #endif
980   }
981 #endif
982 }
983
984 void ampiParent::ckJustMigrated(void) {
985   ArrayElement1D::ckJustMigrated();
986   prepareCtv();
987 }
988
989 void ampiParent::ckJustRestored(void) {
990   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
991     ArrayElement1D::ckJustRestored();
992   prepareCtv();
993
994   //BIGSIM_OOC DEBUGGING
995   //CkPrintf("In ampiParent[%d] with TCharm thread=%p:   ",thisIndex, thread);
996   //CthPrintThdMagic(thread->getTid()); 
997 }
998
999 ampiParent::~ampiParent() {
1000   STARTUP_DEBUG("ampiParent> destructor called");
1001   finalize();
1002 }
1003
1004 //Children call this when they are first created or just migrated
1005 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
1006 {
1007   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
1008
1009   if (s.getComm()>=MPI_COMM_WORLD)
1010   { //We now have our COMM_WORLD-- register it
1011     //Note that split communicators don't keep a raw pointer, so
1012     //they don't need to re-register on migration.
1013     if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
1014     worldPtr=ptr;
1015     worldStruct=s;
1016
1017     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
1018     CkVec<int> _indices;
1019     _indices.push_back(thisIndex);
1020     selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
1021   }
1022
1023   if (!forMigration)
1024   { //Register the new communicator:
1025     MPI_Comm comm = s.getComm();
1026     STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
1027       if (comm>=MPI_COMM_WORLD) { 
1028         // Pass the new ampi to the waiting ampiInit
1029         thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
1030       } else if (isSplit(comm)) {
1031         splitChildRegister(s);
1032       } else if (isGroup(comm)) {
1033         groupChildRegister(s);
1034       } else if (isCart(comm)) {
1035         cartChildRegister(s);
1036       } else if (isGraph(comm)) {
1037         graphChildRegister(s);
1038       } else if (isInter(comm)) {
1039         interChildRegister(s);
1040       } else if (isIntra(comm)) {
1041         intraChildRegister(s);
1042       }else
1043         CkAbort("ampiParent recieved child with bad communicator");
1044   }
1045
1046   return thread;
1047 }
1048
1049 //BIGSIM_OOC DEBUGGING
1050 //Move the comm2ampi from inline to normal function for the sake of debugging
1051 /*ampi *ampiParent::comm2ampi(MPI_Comm comm){
1052 //BIGSIM_OOC DEBUGGING
1053 //CmiPrintf("%d, in ampiParent::comm2ampi, comm=%d\n", thisIndex, comm);
1054 if (comm==MPI_COMM_WORLD) return worldPtr;
1055 if (comm==MPI_COMM_SELF) return worldPtr;
1056 if (comm==worldNo) return worldPtr;
1057 if (isSplit(comm)) {
1058 const ampiCommStruct &st=getSplit(comm);
1059 return st.getProxy()[thisIndex].ckLocal();
1060 }
1061 if (isGroup(comm)) {
1062 const ampiCommStruct &st=getGroup(comm);
1063 return st.getProxy()[thisIndex].ckLocal();
1064 }
1065 if (isCart(comm)) {
1066 const ampiCommStruct &st = getCart(comm);
1067 return st.getProxy()[thisIndex].ckLocal();
1068 }
1069 if (isGraph(comm)) {
1070 const ampiCommStruct &st = getGraph(comm);
1071 return st.getProxy()[thisIndex].ckLocal();
1072 }
1073 if (isInter(comm)) {
1074 const ampiCommStruct &st=getInter(comm);
1075 return st.getProxy()[thisIndex].ckLocal();
1076 }
1077 if (isIntra(comm)) {
1078 const ampiCommStruct &st=getIntra(comm);
1079 return st.getProxy()[thisIndex].ckLocal();
1080 }
1081 if (comm>MPI_COMM_WORLD) return worldPtr; //Use MPI_WORLD ampi for cross-world messages:
1082 CkAbort("Invalid communicator used!");
1083 return NULL;
1084 }*/
1085
1086 // reduction client data - preparation for checkpointing
1087 class ckptClientStruct {
1088   public:
1089     const char *dname;
1090     ampiParent *ampiPtr;
1091     ckptClientStruct(const char *s, ampiParent *a): dname(s), ampiPtr(a) {}
1092 };
1093
1094 static void checkpointClient(void *param,void *msg)
1095 {
1096   ckptClientStruct *client = (ckptClientStruct*)param;
1097   const char *dname = client->dname;
1098   ampiParent *ampiPtr = client->ampiPtr;
1099   ampiPtr->Checkpoint(strlen(dname), dname);
1100   delete client;
1101 }
1102
1103 void ampiParent::startCheckpoint(const char* dname){
1104   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
1105   if (thisIndex==0) {
1106     ckptClientStruct *clientData = new ckptClientStruct(dname, this);
1107     CkCallback cb(checkpointClient, clientData);
1108     contribute(0, NULL, CkReduction::sum_int, cb);
1109   }
1110   else
1111     contribute(0, NULL, CkReduction::sum_int);
1112
1113 #if 0
1114 #if CMK_BIGSIM_CHARM
1115   void *curLog;         // store current log in timeline
1116   _TRACE_BG_TLINE_END(&curLog);
1117   TRACE_BG_AMPI_SUSPEND();
1118 #endif
1119 #endif
1120
1121   thread->stop();
1122
1123 #if CMK_BIGSIM_CHARM
1124   // _TRACE_BG_BEGIN_EXECUTE_NOMSG("CHECKPOINT_RESUME", &curLog);
1125   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
1126 #endif
1127 }
1128
1129 void ampiParent::Checkpoint(int len, const char* dname){
1130   if (len == 0) {
1131     // memory checkpoint
1132     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1133     CkStartMemCheckpoint(cb);
1134   }
1135   else {
1136     char dirname[256];
1137     strncpy(dirname,dname,len);
1138     dirname[len]='\0';
1139     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1140     CkStartCheckpoint(dirname,cb);
1141   }
1142 }
1143 void ampiParent::ResumeThread(void){
1144   thread->resume();
1145 }
1146
1147 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
1148     int *keyval, void* extra_state){
1149   KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
1150   int idx = kvlist.size();
1151   kvlist.resize(idx+1);
1152   kvlist[idx] = newnode;
1153   *keyval = idx;
1154   return 0;
1155 }
1156   int ampiParent::freeKeyval(int *keyval){
1157     if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
1158       return -1;
1159     delete kvlist[*keyval];
1160     kvlist[*keyval] = NULL;
1161     *keyval = MPI_KEYVAL_INVALID;
1162     return 0;
1163   }
1164
1165   int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
1166     if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1167       return -1;
1168     ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1169     // Enlarge the keyval list:
1170     while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
1171     cs.getKeyvals()[keyval]=attribute_val;
1172     return 0;
1173   }
1174
1175 int ampiParent::kv_is_builtin(int keyval) {
1176   switch(keyval) {
1177     case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
1178     case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
1179     case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
1180     case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
1181     case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
1182     case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
1183     case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
1184     case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
1185     default: return 0;
1186   };
1187 }
1188
1189 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
1190   *flag = false;
1191   if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
1192     *flag=true;
1193     *(int **)attribute_val = kv_builtin_storage;  // all default tags are ints
1194     return 0;
1195   }
1196   if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1197     return -1; /* invalid keyval */
1198
1199   ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1200   if (keyval>=cs.getKeyvals().size())  
1201     return 0; /* we don't have a value yet */
1202   if (cs.getKeyvals()[keyval]==0)
1203     return 0; /* we had a value, but now it's zero */
1204   /* Otherwise, we have a good value */
1205   *flag = true;
1206   *(void **)attribute_val = cs.getKeyvals()[keyval];
1207   return 0;
1208 }
1209 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
1210   /* no way to delete an attribute: just overwrite it with 0 */
1211   return putAttr(comm,keyval,0);
1212 }
1213
1214 //----------------------- ampi -------------------------
1215 void ampi::init(void) {
1216   parent=NULL;
1217   thread=NULL;
1218   msgs=NULL;
1219   posted_ireqs=NULL;
1220   resumeOnRecv=false;
1221   AsyncEvacuate(CmiFalse);
1222 }
1223
1224 ampi::ampi()
1225 {
1226   /* this constructor only exists so we can create an empty array during split */
1227   CkAbort("Default ampi constructor should never be called");
1228 }
1229
1230   ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
1231 :parentProxy(parent_)
1232 {
1233   init();
1234
1235   myComm=s; myComm.setArrayID(thisArrayID);
1236   myRank=myComm.getRankForIndex(thisIndex);
1237
1238   findParent(false);
1239
1240   msgs = CmmNew();
1241   posted_ireqs = CmmNew();
1242   nbcasts = 0;
1243
1244 #if AMPI_COMLIB
1245   comlibProxy = thisProxy; // Will later be associated with comlib
1246 #endif
1247
1248   seqEntries=parent->ckGetArraySize();
1249   oorder.init (seqEntries);
1250 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1251   if(thisIndex == 0){
1252     /*      CkAssert(CkMyPe() == 0);
1253      *              CkGroupID _myManagerGID = thisProxy.ckGetArrayID();     
1254      *                      CkAssert(numElements);
1255      *                              printf("ampi::ampi setting numInitial to %d on manager at gid %d \n",numElements,_myManagerGID.idx);
1256      *                                      CkArray *_myManager = thisProxy.ckLocalBranch();
1257      *                                              _myManager->setNumInitial(numElements);*/
1258   }
1259 #endif
1260 }
1261
1262 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
1263     ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
1264 :parentProxy(parent_)
1265 {
1266 #if AMPI_COMLIB
1267   ciStreaming = ciStreaming_;
1268   ciBcast = ciBcast_;
1269   ciAllgather = ciAllgather_;
1270   ciAlltoall = ciAlltoall_;
1271
1272   init();
1273
1274   myComm=s; myComm.setArrayID(thisArrayID);
1275   myRank=myComm.getRankForIndex(thisIndex);
1276
1277   findParent(false);
1278
1279   msgs = CmmNew();
1280   posted_ireqs = CmmNew();
1281   nbcasts = 0;
1282
1283   comlibProxy = thisProxy;
1284   CmiPrintf("comlibProxy created as a copy of thisProxy, no associate call\n");
1285
1286 #if AMPI_COMLIB
1287   //  ComlibAssociateProxy(ciAlltoall, comlibProxy);
1288 #endif
1289
1290   seqEntries=parent->ckGetArraySize();
1291   oorder.init (seqEntries);
1292 #endif
1293 }
1294
1295 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
1296 {
1297   init();
1298
1299   seqEntries=-1;
1300 }
1301
1302 void ampi::ckJustMigrated(void)
1303 {
1304   findParent(true);
1305   ArrayElement1D::ckJustMigrated();
1306 }
1307
1308 void ampi::ckJustRestored(void)
1309 {
1310   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
1311     findParent(true);
1312   ArrayElement1D::ckJustRestored();
1313
1314   //BIGSIM_OOC DEBUGGING
1315   //CkPrintf("In ampi[%d] thread[%p]:   ", thisIndex, thread);
1316   //CthPrintThdMagic(thread->getTid()); 
1317 }
1318
1319 void ampi::findParent(bool forMigration) {
1320   STARTUP_DEBUG("ampi> finding my parent")
1321     parent=parentProxy[thisIndex].ckLocal();
1322   if (parent==NULL) CkAbort("AMPI can't find its parent!");
1323   thread=parent->registerAmpi(this,myComm,forMigration);
1324   if (thread==NULL) CkAbort("AMPI can't find its thread!");
1325   //    printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
1326 }
1327
1328 //The following method should be called on the first element of the
1329 //ampi array
1330 void ampi::allInitDone(CkReductionMsg *m){
1331   FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
1332     thisProxy.setInitDoneFlag();
1333   delete m;
1334 }
1335
1336 void ampi::setInitDoneFlag(){
1337   //CkPrintf("ampi[%d]::setInitDone called!\n", thisIndex);
1338   parent->ampiInitCallDone=1;
1339   parent->getTCharmThread()->start();
1340 }
1341
1342 static void cmm_pup_ampi_message(pup_er p,void **msg) {
1343   CkPupMessage(*(PUP::er *)p,msg,1);
1344   if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
1345   //    printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
1346 }
1347
1348 static void cmm_pup_posted_ireq(pup_er p,void **msg) {
1349
1350   pup_int(p, (int *)msg);
1351
1352   /*    if(pup_isUnpacking(p)){
1353   // *msg = new IReq;
1354   //when unpacking, nothing is needed to do since *msg is an index
1355   //(of type integer) to the ampiParent::ampiReqs (the AmpiRequestList)
1356   }
1357   if(!pup_isUnpacking(p)){
1358   AmpiRequestList *reqL = getReqs();
1359   int retIdx = reqL->findRequestIndex((IReq *)*msg);
1360   if(retIdx==-1){
1361   CmiAbort("An AmpiRequest instance should be found for an instance in posted_ireq!\n");
1362   }
1363   pup_int(p, retIdx)
1364   }
1365    */
1366   //    ((IReq *)*msg)->pup(*(PUP::er *)p);
1367
1368   //    if (pup_isDeleting(p)) delete (IReq *)*msg;
1369   //    printf("[%d] pupping postd irequests %p \n",CkMyPe(),*msg);
1370 }
1371
1372 void ampi::pup(PUP::er &p)
1373 {
1374   if(!p.isUserlevel())
1375     ArrayElement1D::pup(p);//Pack superclass
1376   p|parentProxy;
1377   p|myComm;
1378   p|myRank;
1379   p|nbcasts;
1380   p|tmpVec;
1381   p|remoteProxy;
1382   p|resumeOnRecv;
1383 #if AMPI_COMLIB
1384   p|comlibProxy;
1385   p|ciStreaming;
1386   p|ciBcast;
1387   p|ciAllgather;
1388   p|ciAlltoall;
1389
1390   if(p.isUnpacking()){
1391     //    ciStreaming.setSourcePe();
1392     //    ciBcast.setSourcePe();
1393     //    ciAllgather.setSourcePe();
1394     //    ciAlltoall.setSourcePe();
1395   }
1396 #endif
1397
1398   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
1399
1400   //BIGSIM_OOC DEBUGGING
1401   //if(!p.isUnpacking()){
1402   //CkPrintf("ampi[%d]::packing: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1403   //}
1404
1405   posted_ireqs = CmmPup((pup_er)&p, posted_ireqs, cmm_pup_posted_ireq);
1406
1407   //if(p.isUnpacking()){
1408   //CkPrintf("ampi[%d]::unpacking: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1409   //}
1410
1411   p|seqEntries;
1412   p|oorder;
1413 }
1414
1415 ampi::~ampi()
1416 {
1417   if (CkInRestarting() || _BgOutOfCoreFlag==1) {
1418     // in restarting, we need to flush messages
1419     int tags[3], sts[3];
1420     tags[0] = tags[1] = tags[2] = CmmWildCard;
1421     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1422     while (msg) {
1423       delete msg;
1424       msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1425     }
1426   }
1427
1428   CmmFree(msgs);
1429   CmmFreeAll(posted_ireqs);
1430 }
1431
1432 //------------------------ Communicator Splitting ---------------------
1433 class ampiSplitKey {
1434   public:
1435     int nextSplitComm;
1436     int color; //New class of processes we'll belong to
1437     int key; //To determine rank in new ordering
1438     int rank; //Rank in old ordering
1439     ampiSplitKey() {}
1440     ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
1441       :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
1442 };
1443
1444 /* "type" may indicate whether call is for a cartesian topology etc. */
1445
1446 void ampi::split(int color,int key,MPI_Comm *dest, int type)
1447 {
1448 #if CMK_BIGSIM_CHARM
1449   void *curLog;         // store current log in timeline
1450   _TRACE_BG_TLINE_END(&curLog);
1451   //  TRACE_BG_AMPI_SUSPEND();
1452 #endif
1453   if (type == CART_TOPOL) {
1454     ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
1455     int rootIdx=myComm.getIndexForRank(0);
1456     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1457     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1458
1459     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1460     MPI_Comm newComm=parent->getNextCart()-1;
1461     *dest=newComm;
1462   } else {
1463     ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
1464     int rootIdx=myComm.getIndexForRank(0);
1465     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1466     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1467
1468     thread->suspend(); //Resumed by ampiParent::splitChildRegister
1469     MPI_Comm newComm=parent->getNextSplit()-1;
1470     *dest=newComm;
1471   }
1472 #if CMK_BIGSIM_CHARM
1473   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "SPLIT_RESUME", curLog);
1474   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("SPLIT_RESUME", &curLog);
1475   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
1476 #endif
1477 }
1478
1479 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
1480   const ampiSplitKey *a=(const ampiSplitKey *)a_;
1481   const ampiSplitKey *b=(const ampiSplitKey *)b_;
1482   if (a->color!=b->color) return a->color-b->color;
1483   if (a->key!=b->key) return a->key-b->key;
1484   return a->rank-b->rank;
1485 }
1486
1487 void ampi::splitPhase1(CkReductionMsg *msg)
1488 {
1489   //Order the keys, which orders the ranks properly:
1490   int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1491   ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1492   if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1493   qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1494
1495   MPI_Comm newComm = -1;
1496   for(int i=0;i<nKeys;i++)
1497     if(keys[i].nextSplitComm>newComm)
1498       newComm = keys[i].nextSplitComm;
1499
1500   //Loop over the sorted keys, which gives us the new arrays:
1501   int lastColor=keys[0].color-1; //The color we're building an array for
1502   CProxy_ampi lastAmpi; //The array for lastColor
1503   int lastRoot=0; //C value for new rank 0 process for latest color
1504   ampiCommStruct lastComm; //Communicator info. for latest color
1505   for (int c=0;c<nKeys;c++) {
1506     if (keys[c].color!=lastColor)
1507     { //Hit a new color-- need to build a new communicator and array
1508       lastColor=keys[c].color;
1509       lastRoot=c;
1510       CkArrayOptions opts;
1511       opts.bindTo(parentProxy);
1512       opts.setNumInitial(0);
1513       CkArrayID unusedAID; ampiCommStruct unusedComm;
1514       lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1515       lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1516
1517       CkVec<int> indices; //Maps rank to array indices for new arrau
1518       for (int i=c;i<nKeys;i++) {
1519         if (keys[i].color!=lastColor) break; //Done with this color
1520         int idx=myComm.getIndexForRank(keys[i].rank);
1521         indices.push_back(idx);
1522       }
1523
1524       //FIXME: create a new communicator for each color, instead of
1525       // (confusingly) re-using the same MPI_Comm number for each.
1526       lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1527     }
1528     int newRank=c-lastRoot;
1529     int newIdx=lastComm.getIndexForRank(newRank);
1530
1531     //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1532     lastAmpi[newIdx].insert(parentProxy,lastComm);
1533   }
1534
1535   delete msg;
1536 }
1537
1538 //...newly created array elements register with the parent, which calls:
1539 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1540   int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1541   if (splitComm.size()<=idx) splitComm.resize(idx+1);
1542   splitComm[idx]=new ampiCommStruct(s);
1543   thread->resume(); //Matches suspend at end of ampi::split
1544 }
1545
1546 //-----------------create communicator from group--------------
1547 // The procedure is like that of comm_split very much,
1548 // so the code is shamelessly copied from above
1549 //   1. reduction to make sure all members have called
1550 //   2. the root in the old communicator create the new array
1551 //   3. ampiParent::register is called to register new array as new comm
1552 class vecStruct {
1553   public:
1554     int nextgroup;
1555     groupStruct vec;
1556     vecStruct():nextgroup(-1){}
1557     vecStruct(int nextgroup_, groupStruct vec_)
1558       : nextgroup(nextgroup_), vec(vec_) { }
1559 };
1560
1561 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1562   int rootIdx=vec[0];
1563   tmpVec = vec;
1564   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1565   MPI_Comm nextgroup = parent->getNextGroup();
1566   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1567
1568   if(getPosOp(thisIndex,vec)>=0){
1569     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1570     MPI_Comm retcomm = parent->getNextGroup()-1;
1571     *newcomm = retcomm;
1572   }else{
1573     *newcomm = MPI_COMM_NULL;
1574   }
1575 }
1576
1577 void ampi::commCreatePhase1(CkReductionMsg *msg){
1578   MPI_Comm *nextGroupComm = (int *)msg->getData();
1579
1580   CkArrayOptions opts;
1581   opts.bindTo(parentProxy);
1582   opts.setNumInitial(0);
1583   CkArrayID unusedAID;
1584   ampiCommStruct unusedComm;
1585   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1586   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1587
1588   groupStruct indices = tmpVec;
1589   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1590   for(int i=0;i<indices.size();i++){
1591     int newIdx=indices[i];
1592     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1593   }
1594   delete msg;
1595 }
1596
1597 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1598   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1599   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1600   groupComm[idx]=new ampiCommStruct(s);
1601   thread->resume(); //Matches suspend at end of ampi::split
1602 }
1603
1604 /* Virtual topology communicator creation */
1605 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1606   int rootIdx=vec[0];
1607   tmpVec = vec;
1608   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1609
1610   MPI_Comm nextcart = parent->getNextCart();
1611   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1612
1613   if(getPosOp(thisIndex,vec)>=0){
1614     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1615     MPI_Comm retcomm = parent->getNextCart()-1;
1616     *newcomm = retcomm;
1617   }else
1618     *newcomm = MPI_COMM_NULL;
1619 }
1620
1621 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1622   MPI_Comm *nextCartComm = (int *)msg->getData();
1623
1624   CkArrayOptions opts;
1625   opts.bindTo(parentProxy);
1626   opts.setNumInitial(0);
1627   CkArrayID unusedAID;
1628   ampiCommStruct unusedComm;
1629   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1630   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1631
1632   groupStruct indices = tmpVec;
1633   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1634       size(),indices);
1635   for(int i=0;i<indices.size();i++){
1636     int newIdx=indices[i];
1637     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1638   }
1639   delete msg;
1640 }
1641
1642 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1643   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1644   if (cartComm.size()<=idx) {
1645     cartComm.resize(idx+1);
1646     cartComm.length()=idx+1;
1647   }
1648   cartComm[idx]=new ampiCommStruct(s);
1649   thread->resume(); //Matches suspend at end of ampi::cartCreate
1650 }
1651
1652 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1653   int rootIdx=vec[0];
1654   tmpVec = vec;
1655   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1656       myComm.getProxy());
1657   MPI_Comm nextgraph = parent->getNextGraph();
1658   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1659
1660   if(getPosOp(thisIndex,vec)>=0){
1661     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1662     MPI_Comm retcomm = parent->getNextGraph()-1;
1663     *newcomm = retcomm;
1664   }else
1665     *newcomm = MPI_COMM_NULL;
1666 }
1667
1668 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1669   MPI_Comm *nextGraphComm = (int *)msg->getData();
1670
1671   CkArrayOptions opts;
1672   opts.bindTo(parentProxy);
1673   opts.setNumInitial(0);
1674   CkArrayID unusedAID;
1675   ampiCommStruct unusedComm;
1676   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1677   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1678
1679   groupStruct indices = tmpVec;
1680   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1681       .size(),indices);
1682   for(int i=0;i<indices.size();i++){
1683     int newIdx=indices[i];
1684     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1685   }
1686   delete msg;
1687 }
1688
1689 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1690   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1691   if (graphComm.size()<=idx) {
1692     graphComm.resize(idx+1);
1693     graphComm.length()=idx+1;
1694   }
1695   graphComm[idx]=new ampiCommStruct(s);
1696   thread->resume(); //Matches suspend at end of ampi::graphCreate
1697 }
1698
1699 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1700   if(thisIndex==root) { // not everybody gets the valid rvec
1701     tmpVec = rvec;
1702   }
1703   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1704   MPI_Comm nextinter = parent->getNextInter();
1705   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1706
1707   thread->suspend(); //Resumed by ampiParent::interChildRegister
1708   MPI_Comm newcomm=parent->getNextInter()-1;
1709   *ncomm=newcomm;
1710 }
1711
1712 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1713   MPI_Comm *nextInterComm = (int *)msg->getData();
1714
1715   groupStruct lgroup = myComm.getIndices();
1716   CkArrayOptions opts;
1717   opts.bindTo(parentProxy);
1718   opts.setNumInitial(0);
1719   CkArrayID unusedAID;
1720   ampiCommStruct unusedComm;
1721   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1722   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1723
1724   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1725   for(int i=0;i<lgroup.size();i++){
1726     int newIdx=lgroup[i];
1727     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1728   }
1729
1730   parentProxy[0].ExchangeProxy(newAmpi);
1731   delete msg;
1732 }
1733
1734 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1735   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1736   if (interComm.size()<=idx) interComm.resize(idx+1);
1737   interComm[idx]=new ampiCommStruct(s);
1738   //thread->resume(); // don't resume it yet, till parent set remote proxy
1739 }
1740
1741 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1742   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1743     groupStruct lvec = myComm.getIndices();
1744     groupStruct rvec = myComm.getRemoteIndices();
1745     int rsize = rvec.size();
1746     tmpVec = lvec;
1747     for(int i=0;i<rsize;i++)
1748       tmpVec.push_back(rvec[i]);
1749     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1750   }else{
1751     tmpVec.resize(0);
1752   }
1753
1754   int rootIdx=myComm.getIndexForRank(0);
1755   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1756   MPI_Comm nextintra = parent->getNextIntra();
1757   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1758
1759   thread->suspend(); //Resumed by ampiParent::interChildRegister
1760   MPI_Comm newcomm=parent->getNextIntra()-1;
1761   *ncomm=newcomm;
1762 }
1763
1764 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1765   if(tmpVec.size()==0) { delete msg; return; }
1766   MPI_Comm *nextIntraComm = (int *)msg->getData();
1767   CkArrayOptions opts;
1768   opts.bindTo(parentProxy);
1769   opts.setNumInitial(0);
1770   CkArrayID unusedAID;
1771   ampiCommStruct unusedComm;
1772   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1773   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1774
1775   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1776   for(int i=0;i<tmpVec.size();i++){
1777     int newIdx=tmpVec[i];
1778     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1779   }
1780   delete msg;
1781 }
1782
1783 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1784   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1785   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1786   intraComm[idx]=new ampiCommStruct(s);
1787   thread->resume(); //Matches suspend at end of ampi::split
1788 }
1789
1790 //------------------------ communication -----------------------
1791 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1792 {
1793   if (universeNo>MPI_COMM_WORLD) {
1794     int worldDex=universeNo-MPI_COMM_WORLD-1;
1795     if (worldDex>=_mpi_nworlds)
1796       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1797     return mpi_worlds[worldDex].comm;
1798   }
1799   CkAbort("Bad communicator passed to universeComm2CommStruct");
1800   return mpi_worlds[0].comm; // meaningless return
1801 }
1802
1803 void ampi::block(void){
1804   thread->suspend();
1805 }
1806
1807 void ampi::yield(void){
1808   thread->schedule();
1809 }
1810
1811 void ampi::unblock(void){
1812   thread->resume();
1813 }
1814
1815   void ampi::ssend_ack(int sreq_idx){
1816     if (sreq_idx == 1)
1817       thread->resume();           // MPI_Ssend
1818     else {
1819       sreq_idx -= 2;              // start from 2
1820       AmpiRequestList *reqs = &(parent->ampiReqs);
1821       SReq *sreq = (SReq *)(*reqs)[sreq_idx];
1822       sreq->statusIreq = true;
1823       if (resumeOnRecv) {
1824         thread->resume();
1825       }
1826     }
1827   }
1828
1829   void
1830 ampi::generic(AmpiMsg* msg)
1831 {
1832   MSG_ORDER_DEBUG(
1833       CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1834         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1835       )
1836 #if CMK_BIGSIM_CHARM
1837     TRACE_BG_ADD_TAG("AMPI_generic");
1838   msg->event = NULL;
1839 #endif
1840
1841   int sync = UsrToEnv(msg)->getRef();
1842   int srcIdx;
1843   if (sync)  srcIdx = msg->srcIdx;
1844
1845   //    AmpiMsg *msgcopy = msg;
1846   if(msg->seq != -1) {
1847     int srcIdx=msg->srcIdx;
1848     int n=oorder.put(srcIdx,msg);
1849     if (n>0) { // This message was in-order
1850       inorder(msg);
1851       if (n>1) { // It enables other, previously out-of-order messages
1852         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1853           inorder(msg);
1854         }
1855       }
1856     }
1857   } else { //Cross-world or system messages are unordered
1858     inorder(msg);
1859   }
1860
1861   // msg may be free'ed from calling inorder()
1862   if (sync>0) {         // send an ack to sender
1863     CProxy_ampi pa(thisArrayID);
1864     pa[srcIdx].ssend_ack(sync);
1865   }
1866
1867   if(resumeOnRecv){
1868     //CkPrintf("Calling TCharm::resume at ampi::generic!\n");
1869     thread->resume();
1870   }
1871 }
1872
1873 inline static AmpiRequestList *getReqs(void); 
1874
1875   void
1876 ampi::inorder(AmpiMsg* msg)
1877 {
1878   MSG_ORDER_DEBUG(
1879       CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1880         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1881       )
1882     // check posted recvs
1883     int tags[3], sts[3];
1884   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1885   IReq *ireq = NULL;
1886   if (CpvAccess(CmiPICMethod) != 2) {
1887 #if 0
1888     //IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1889     ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1890 #else
1891 #if CMK_BIGSIM_CHARM
1892     _TRACE_BG_TLINE_END(&msg->event);    // store current log
1893     msg->eventPe = CmiMyPe();
1894 #endif
1895     //in case ampi has not initialized and posted_ireqs are only inserted 
1896     //at AMPI_Irecv (MPI_Irecv)
1897     AmpiRequestList *reqL = &(parent->ampiReqs);
1898     //When storing the req index, it's 1-based. The reason is stated in the comments
1899     //in AMPI_Irecv function.
1900     int ireqIdx = (int)((long)CmmGet(posted_ireqs, 3, tags, sts));
1901     if(reqL->size()>0 && ireqIdx>0)
1902       ireq = (IReq *)(*reqL)[ireqIdx-1];
1903     //CkPrintf("[%d] ampi::inorder, ireqIdx=%d\n", thisIndex, ireqIdx);
1904 #endif
1905     //CkPrintf("[%d] ampi::inorder, ireq=%p\n", thisIndex, ireq);
1906     if (ireq) { // receive posted
1907       ireq->receive(this, msg);
1908       // Isaac changed this so that the IReq stores the tag when receiving the message, 
1909       // instead of using this user supplied tag which could be MPI_ANY_TAG
1910       // Formerly the following line was not commented out:
1911       //ireq->tag = sts[0];         
1912       //ireq->src = sts[1];
1913       //ireq->comm = sts[2];
1914     } else {
1915       CmmPut(msgs, 3, tags, msg);
1916     }
1917   }
1918   else
1919     CmmPut(msgs, 3, tags, msg);
1920 }
1921
1922 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
1923 {
1924   int tags[3];
1925   tags[0] = t; tags[1] = s; tags[2] = comm;
1926   AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1927   return msg;
1928 }
1929
1930 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1931     int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm, int sync)
1932 {
1933   CkDDT_DataType *ddt = getDDT()->getType(type);
1934   int len = ddt->getSize(count);
1935   int sIdx=thisIndex;
1936   int seq = -1;
1937   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_TAG_UB_VALUE) //Not cross-module: set seqno
1938     seq = oorder.nextOutgoing(destIdx);
1939   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1940   if (sync) UsrToEnv(msg)->setRef(sync);
1941   TCharm::activateVariable(buf);
1942   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1943   TCharm::deactivateVariable(buf);
1944   return msg;
1945 }
1946
1947 #if AMPI_COMLIB
1948   void
1949 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1950 {
1951   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1952 }
1953 #endif
1954
1955   void
1956 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, int sync)
1957 {
1958 #if CMK_TRACE_IN_CHARM
1959   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
1960 #endif
1961
1962 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1963   MPI_Comm disComm = myComm.getComm();
1964   ampi *dis = getAmpiInstance(disComm);
1965   CpvAccess(_currentObj) = dis;
1966 #endif
1967
1968   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1969   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),sync);
1970
1971 #if CMK_TRACE_IN_CHARM
1972   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
1973 #endif
1974
1975   if (sync == 1) {
1976     // waiting for receiver side
1977     resumeOnRecv = false;            // so no one else awakes it
1978     block();
1979   }
1980 }
1981
1982   void
1983 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1984 {
1985   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
1986   memcpy(msg->data, buf, len);
1987   CProxy_ampi pa(aid);
1988   pa[idx].generic(msg);
1989 }
1990
1991   void
1992 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy, int sync)
1993 {
1994   if(rank==MPI_PROC_NULL) return;
1995   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1996   int destIdx = dest.getIndexForRank(rank);
1997   if(isInter()){
1998     sRank = parent->thisIndex;
1999     destcomm = MPI_COMM_FIRST_INTER;
2000     destIdx = dest.getIndexForRemoteRank(rank);
2001     arrproxy = remoteProxy;
2002   }
2003   MSG_ORDER_DEBUG(
2004       CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
2005       )
2006
2007     arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm,sync));
2008
2009 #if 0
2010 #if CMK_TRACE_ENABLED
2011   int size=0;
2012   MPI_Type_size(type,&size);
2013   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
2014 #endif
2015 #endif
2016 }
2017
2018   int
2019 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
2020 {
2021   CkDDT_DataType *ddt = getDDT()->getType(type);
2022   int len = ddt->getSize(count);
2023
2024   if(msg->length < len){ // only at rare case shall we reset count by using divide
2025     count = msg->length/(ddt->getSize(1));
2026   }
2027
2028   TCharm::activateVariable(buf);
2029   if (t==MPI_REDUCE_TAG) {      // reduction msg
2030     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
2031   } else {
2032     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
2033   }
2034   TCharm::deactivateVariable(buf);
2035   return 0;
2036 }
2037
2038   int
2039 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
2040 {
2041   MPI_Comm disComm = myComm.getComm();
2042   if(s==MPI_PROC_NULL) {
2043     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
2044     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
2045     ((MPI_Status *)sts)->MPI_LENGTH = 0;
2046     return 0;
2047   }
2048 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
2049   _LOG_E_END_AMPI_PROCESSING(thisIndex)
2050 #endif
2051 #if CMK_BIGSIM_CHARM
2052    void *curLog;                // store current log in timeline
2053   _TRACE_BG_TLINE_END(&curLog);
2054   //  TRACE_BG_AMPI_SUSPEND();
2055 #if CMK_TRACE_IN_CHARM
2056   if(CpvAccess(traceOn)) traceSuspend();
2057 #endif
2058 #endif
2059
2060   if(isInter()){
2061     s = myComm.getIndexForRemoteRank(s);
2062     comm = MPI_COMM_FIRST_INTER;
2063   }
2064
2065   int tags[3];
2066   AmpiMsg *msg = 0;
2067
2068   MSG_ORDER_DEBUG(
2069       CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
2070       )
2071
2072   ampi *dis = getAmpiInstance(disComm);
2073 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2074   //  dis->yield();
2075   //  processRemoteMlogMessages();
2076 #endif
2077   int dosuspend = 0;
2078   while(1) {
2079     //This is done to take into account the case in which an ampi 
2080     // thread has migrated while waiting for a message
2081     tags[0] = t; tags[1] = s; tags[2] = comm;
2082     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
2083     if (msg) break;
2084     dis->resumeOnRecv=true;
2085     dis->thread->suspend();
2086     dosuspend = 1;
2087     dis = getAmpiInstance(disComm);
2088   }
2089
2090 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2091   CpvAccess(_currentObj) = dis;
2092   MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
2093 #endif
2094
2095     dis->resumeOnRecv=false;
2096
2097   if(sts)
2098     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2099   int status = dis->processMessage(msg, t, s, buf, count, type);
2100   if (status != 0) return status;
2101
2102 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
2103   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
2104 #endif
2105
2106 #if CMK_BIGSIM_CHARM
2107 #if CMK_TRACE_IN_CHARM
2108     //if(CpvAccess(traceOn)) CthTraceResume(thread->getThread());
2109     //Due to the reason mentioned the in the while loop above, we need to 
2110     //use "dis" as "this" in the case of migration (or out-of-core execution in BigSim)
2111     if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
2112 #endif
2113   //TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", &curLog, 1);
2114   //TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0);
2115   //_TRACE_BG_SET_INFO((char *)msg, "RECV_RESUME",  &curLog, 1);
2116 #if 0
2117 #if 1
2118   if (!dosuspend) {
2119     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 1);
2120     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2121   }
2122   else
2123 #endif
2124     TRACE_BG_ADD_TAG("RECV_RESUME_THREAD");
2125 #else
2126   TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
2127   if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2128 #endif
2129 #endif
2130
2131   delete msg;
2132   return 0;
2133 }
2134
2135   void
2136 ampi::probe(int t, int s, int comm, int *sts)
2137 {
2138   int tags[3];
2139 #if CMK_BIGSIM_CHARM
2140   void *curLog;         // store current log in timeline
2141   _TRACE_BG_TLINE_END(&curLog);
2142   //  TRACE_BG_AMPI_SUSPEND();
2143 #endif
2144
2145   AmpiMsg *msg = 0;
2146   resumeOnRecv=true;
2147   while(1) {
2148     tags[0] = t; tags[1] = s; tags[2] = comm;
2149     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2150     if (msg) break;
2151     thread->suspend();
2152   }
2153   resumeOnRecv=false;
2154   if(sts)
2155     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2156 #if CMK_BIGSIM_CHARM
2157   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
2158   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
2159 #endif
2160 }
2161
2162   int
2163 ampi::iprobe(int t, int s, int comm, int *sts)
2164 {
2165   int tags[3];
2166   AmpiMsg *msg = 0;
2167   tags[0] = t; tags[1] = s; tags[2] = comm;
2168   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2169   if (msg) {
2170     if(sts)
2171       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2172     return 1;
2173   }
2174 #if CMK_BIGSIM_CHARM
2175   void *curLog;         // store current log in timeline
2176   _TRACE_BG_TLINE_END(&curLog);
2177   //  TRACE_BG_AMPI_SUSPEND();
2178 #endif
2179   thread->schedule();
2180 #if CMK_BIGSIM_CHARM
2181   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("IPROBE_RESUME", &curLog);
2182   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
2183 #endif
2184   return 0;
2185 }
2186
2187
2188 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
2189   void
2190 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
2191 {
2192   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2193   int rootIdx=dest.getIndexForRank(root);
2194   if(rootIdx==thisIndex) {
2195 #if 0//AMPI_COMLIB
2196     ciBcast.beginIteration();
2197     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2198 #else
2199 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2200     CpvAccess(_currentObj) = this;
2201 #endif
2202     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2203 #endif
2204   }
2205   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
2206   nbcasts++;
2207 }
2208
2209   void
2210 ampi::bcastraw(void* buf, int len, CkArrayID aid)
2211 {
2212   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, MPI_COMM_WORLD);
2213   memcpy(msg->data, buf, len);
2214   CProxy_ampi pa(aid);
2215   pa.generic(msg);
2216 }
2217
2218
2219   AmpiMsg* 
2220 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
2221 {
2222   CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
2223   int unit;
2224   CkDDT_DataType *ddt = getDDT()->getType(type);
2225   unit = ddt->getSize(1);
2226   int totalsize = unit*cnt;
2227
2228   AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
2229   char* addr = (char*)Alltoallbuff+disp*unit;
2230   ddt->serialize((char*)msg->data, addr, cnt, (-1));
2231   return msg;
2232 }
2233
2234 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
2235     void *attr_in, void *attr_out, int *flag){
2236   (*flag) = 0;
2237   return (MPI_SUCCESS);
2238 }
2239 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
2240     void *attr_in, void *attr_out, int *flag){
2241   (*(void **)attr_out) = attr_in;
2242   (*flag) = 1;
2243   return (MPI_SUCCESS);
2244 }
2245 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
2246   return (MPI_SUCCESS);
2247 }
2248
2249
2250 void AmpiSeqQ::init(int numP) 
2251 {
2252   elements.init(numP);
2253 }
2254
2255 AmpiSeqQ::~AmpiSeqQ () {
2256 }
2257
2258 void AmpiSeqQ::pup(PUP::er &p) {
2259   p|out;
2260   p|elements;
2261 }
2262
2263 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
2264 {
2265   AmpiOtherElement &el=elements[srcIdx];
2266 #if CMK_ERROR_CHECKING
2267   if (msg->seq<el.seqIncoming)
2268     CkAbort("AMPI Logic error: received late out-of-order message!\n");
2269 #endif
2270   out.enq(msg);
2271   el.nOut++; // We have another message in the out-of-order queue
2272 }
2273
2274 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
2275 {
2276   AmpiOtherElement &el=elements[srcIdx];
2277   if (el.nOut==0) return 0; // No more out-of-order left.
2278   // Walk through our out-of-order queue, searching for our next message:
2279   for (int i=0;i<out.length();i++) {
2280     AmpiMsg *msg=out.deq();
2281     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
2282       el.seqIncoming++;
2283       el.nOut--; // We have one less message out-of-order
2284       return msg;
2285     }
2286     else
2287       out.enq(msg);
2288   }
2289   // We walked the whole queue-- ours is not there.
2290   return 0;
2291 }
2292
2293 //BIGSIM_OOC DEBUGGING: Output for AmpiRequest and its children classes
2294 void AmpiRequest::print(){
2295   CmiPrintf("In AmpiRequest: buf=%p, count=%d, type=%d, src=%d, tag=%d, comm=%d, isvalid=%d\n", buf, count, type, src, tag, comm, isvalid);
2296 }
2297
2298 void PersReq::print(){
2299   AmpiRequest::print();
2300   CmiPrintf("In PersReq: sndrcv=%d\n", sndrcv);
2301 }
2302
2303 void IReq::print(){
2304   AmpiRequest::print();
2305   CmiPrintf("In IReq: this=%p, status=%d, length=%d\n", this, statusIreq, length);
2306 }
2307
2308 void ATAReq::print(){ //not complete for myreqs
2309   AmpiRequest::print();
2310   CmiPrintf("In ATAReq: elmcount=%d, idx=%d\n", elmcount, idx);
2311
2312
2313 void SReq::print(){
2314   AmpiRequest::print();
2315   CmiPrintf("In SReq: this=%p, status=%d\n", this, statusIreq);
2316 }
2317
2318 void AmpiRequestList::pup(PUP::er &p) { 
2319   if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
2320     return;
2321   }
2322
2323   p(blklen); //Allocated size of block
2324   p(len); //Number of used elements in block
2325   if(p.isUnpacking()){
2326     makeBlock(blklen,len);
2327   }
2328   int count=0;
2329   for(int i=0;i<len;i++){
2330     char nonnull;
2331     if(!p.isUnpacking()){
2332       if(block[i] == NULL){
2333         nonnull = 0;
2334       }else{
2335         nonnull = block[i]->getType();
2336       }
2337     }   
2338     p(nonnull);
2339     if(nonnull != 0){
2340       if(p.isUnpacking()){
2341         switch(nonnull){
2342           case 1:
2343             block[i] = new PersReq;
2344             break;
2345           case 2:       
2346             block[i] = new IReq;
2347             break;
2348           case 3:       
2349             block[i] = new ATAReq;
2350             break;
2351         }
2352       } 
2353       block[i]->pup(p);
2354       count++;
2355     }else{
2356       block[i] = 0;
2357     }
2358   }
2359   if(p.isDeleting()){
2360     freeBlock();
2361   }
2362 }
2363
2364 //------------------ External Interface -----------------
2365 ampiParent *getAmpiParent(void) {
2366   ampiParent *p = CtvAccess(ampiPtr);
2367 #if CMK_ERROR_CHECKING
2368   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
2369 #endif
2370   return p;
2371 }
2372
2373 ampi *getAmpiInstance(MPI_Comm comm) {
2374   ampi *ptr=getAmpiParent()->comm2ampi(comm);
2375 #if CMK_ERROR_CHECKING
2376   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
2377 #endif
2378   return ptr;
2379 }
2380
2381 inline static AmpiRequestList *getReqs(void) {
2382   return &(getAmpiParent()->ampiReqs);
2383 }
2384
2385 inline void checkComm(MPI_Comm comm){
2386 #if CMK_ERROR_CHECKING
2387   getAmpiParent()->checkComm(comm);
2388 #endif
2389 }
2390
2391 inline void checkRequest(MPI_Request req){
2392 #if CMK_ERROR_CHECKING
2393   getReqs()->checkRequest(req);
2394 #endif
2395 }
2396
2397 inline void checkRequests(int n, MPI_Request* reqs){
2398 #if CMK_ERROR_CHECKING
2399   AmpiRequestList* reqlist = getReqs();
2400   for(int i=0;i<n;i++)
2401     reqlist->checkRequest(reqs[i]);
2402 #endif
2403 }
2404
2405 CDECL void AMPI_Migrate(void)
2406 {
2407   //  AMPIAPI("AMPI_Migrate");
2408 #if 0
2409 #if CMK_BIGSIM_CHARM
2410   TRACE_BG_AMPI_SUSPEND();
2411 #endif
2412 #endif
2413   TCHARM_Migrate();
2414
2415 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2416   ampi *currentAmpi = getAmpiInstance(MPI_COMM_WORLD);
2417   CpvAccess(_currentObj) = currentAmpi;
2418 #endif
2419
2420 #if CMK_BIGSIM_CHARM
2421   //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2422   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
2423 #endif
2424 }
2425
2426
2427 CDECL void AMPI_Evacuate(void)
2428 {
2429   TCHARM_Evacuate();
2430 }
2431
2432
2433
2434 CDECL void AMPI_Migrateto(int destPE)
2435 {
2436   AMPIAPI("AMPI_MigrateTo");
2437 #if 0
2438 #if CMK_BIGSIM_CHARM
2439   TRACE_BG_AMPI_SUSPEND();
2440 #endif
2441 #endif
2442   TCHARM_Migrate_to(destPE);
2443 #if CMK_BIGSIM_CHARM
2444   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
2445   TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
2446 #endif
2447 }
2448
2449 CDECL void AMPI_MigrateTo(int destPE)
2450 {
2451   AMPI_Migrateto(destPE);
2452 }
2453
2454 CDECL void AMPI_Async_Migrate(void)
2455 {
2456   AMPIAPI("AMPI_Async_Migrate");
2457 #if 0
2458 #if CMK_BIGSIM_CHARM
2459   TRACE_BG_AMPI_SUSPEND();
2460 #endif
2461 #endif
2462   TCHARM_Async_Migrate();
2463 #if CMK_BIGSIM_CHARM
2464   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2465   TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
2466 #endif
2467 }
2468
2469 CDECL void AMPI_Allow_Migrate(void)
2470 {
2471   AMPIAPI("AMPI_Allow_Migrate");
2472 #if 0
2473 #if CMK_BIGSIM_CHARM
2474   TRACE_BG_AMPI_SUSPEND();
2475 #endif
2476 #endif
2477   TCHARM_Allow_Migrate();
2478 #if CMK_BIGSIM_CHARM
2479   TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
2480 #endif
2481 }
2482
2483 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
2484 #if CMK_LBDB_ON
2485   //AMPIAPI("AMPI_Setmigratable");
2486   ampi *ptr=getAmpiInstance(comm);
2487   ptr->setMigratable(mig);
2488 #else
2489   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
2490 #endif
2491 }
2492
2493 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
2494 {
2495   //AMPIAPI("AMPI_Init");
2496   if (nodeinit_has_been_called) {
2497     AMPIAPI("AMPI_Init");
2498     char **argv;
2499     if (p_argv) argv=*p_argv;
2500     else argv=CkGetArgv();
2501     ampiInit(argv);
2502     if (p_argc) *p_argc=CmiGetArgc(argv);
2503   }
2504   else
2505   { /* Charm hasn't been started yet! */
2506     CkAbort("AMPI_Init> Charm is not initialized!");
2507   }
2508
2509   return 0;
2510 }
2511
2512 CDECL int AMPI_Initialized(int *isInit)
2513 {
2514   if (nodeinit_has_been_called) {
2515     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2516     *isInit=CtvAccess(ampiInitDone);
2517   }
2518   else /* !nodeinit_has_been_called */ {
2519     *isInit=nodeinit_has_been_called;
2520   }
2521   return 0;
2522 }
2523
2524 CDECL int AMPI_Finalized(int *isFinalized)
2525 {
2526   AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2527   *isFinalized=CtvAccess(ampiFinalized);
2528   return 0;
2529 }
2530
2531 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
2532 {
2533   //AMPIAPI("AMPI_Comm_rank");
2534
2535 #if CMK_ERROR_CHECKING 
2536   if(checkCommunicator(comm) != MPI_SUCCESS)
2537     return checkCommunicator(comm);
2538 #endif
2539
2540 #if AMPIMSGLOG
2541   ampiParent* pptr = getAmpiParent();
2542   if(msgLogRead){
2543     PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
2544     return 0;
2545   }
2546 #endif
2547
2548   *rank = getAmpiInstance(comm)->getRank(comm);
2549
2550 #if AMPIMSGLOG
2551   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2552     PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
2553   }
2554 #endif
2555   return 0;
2556 }
2557
2558   CDECL
2559 int AMPI_Comm_size(MPI_Comm comm, int *size)
2560 {
2561   //AMPIAPI("AMPI_Comm_size");
2562
2563 #if CMK_ERROR_CHECKING 
2564   if(checkCommunicator(comm) != MPI_SUCCESS)
2565     return checkCommunicator(comm);
2566 #endif
2567
2568 #if AMPIMSGLOG
2569   ampiParent* pptr = getAmpiParent();
2570   if(msgLogRead){
2571     PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
2572     return 0;
2573   }
2574 #endif
2575
2576   *size = getAmpiInstance(comm)->getSize(comm);
2577
2578 #if AMPIMSGLOG
2579   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2580     PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
2581   }
2582 #endif
2583
2584   return 0;
2585 }
2586
2587   CDECL
2588 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
2589 {
2590
2591 #if CMK_ERROR_CHECKING 
2592   if(checkCommunicator(comm1) != MPI_SUCCESS)
2593     return checkCommunicator(comm1);
2594   if(checkCommunicator(comm2) != MPI_SUCCESS)
2595     return checkCommunicator(comm2);
2596 #endif
2597
2598   AMPIAPI("AMPI_Comm_compare");
2599   if(comm1==comm2) *result=MPI_IDENT;
2600   else{
2601     int equal=1;
2602     CkVec<int> ind1, ind2;
2603     ind1 = getAmpiInstance(comm1)->getIndices();
2604     ind2 = getAmpiInstance(comm2)->getIndices();
2605     if(ind1.size()==ind2.size()){
2606       for(int i=0;i<ind1.size();i++)
2607         if(ind1[i] != ind2[i]) { equal=0; break; }
2608     }
2609     if(equal==1) *result=MPI_CONGRUENT;
2610     else *result=MPI_UNEQUAL;
2611   }
2612   return 0;
2613 }
2614
2615 CDECL void AMPI_Exit(int /*exitCode*/)
2616 {
2617   AMPIAPI("AMPI_Exit");
2618   //finalizeBigSimTrace();
2619   TCHARM_Done();
2620 }
2621 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
2622 {
2623   AMPI_Exit(*exitCode);
2624 }
2625
2626   CDECL
2627 int AMPI_Finalize(void)
2628 {
2629   AMPIAPI("AMPI_Finalize");
2630 #if PRINT_IDLE
2631   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
2632 #endif
2633 #if AMPI_COUNTER
2634   getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
2635 #endif
2636   CtvAccess(ampiFinalized)=1;
2637
2638 #if CMK_BIGSIM_CHARM
2639 #if 0
2640   TRACE_BG_AMPI_SUSPEND();
2641 #endif
2642 #if CMK_TRACE_IN_CHARM
2643   if(CpvAccess(traceOn)) traceSuspend();
2644 #endif
2645 #endif
2646
2647   //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
2648   AMPI_Exit(0);
2649   return 0;
2650 }
2651
2652 CDECL
2653 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
2654     int tag, MPI_Comm comm) {
2655
2656 #if CMK_ERROR_CHECKING
2657   int ret;
2658   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2659   if(ret != MPI_SUCCESS)
2660     return ret;
2661 #endif
2662
2663   AMPIAPI("AMPI_Send");
2664 #if AMPIMSGLOG
2665   if(msgLogRead){
2666     return 0;
2667   }
2668 #endif
2669
2670   ampi *ptr = getAmpiInstance(comm);
2671 #if AMPI_COMLIB
2672   if(enableStreaming){  
2673     //    ptr->getStreaming().beginIteration();
2674     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2675   } else
2676 #endif
2677     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
2678 #if AMPI_COUNTER
2679   getAmpiParent()->counters.send++;
2680 #endif
2681 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2682   //  ptr->yield();
2683   //  //  processRemoteMlogMessages();
2684 #endif
2685   return 0;
2686 }
2687
2688   CDECL
2689 int AMPI_Ssend(void *msg, int count, MPI_Datatype type, int dest,
2690     int tag, MPI_Comm comm)
2691 {
2692 #if CMK_ERROR_CHECKING
2693   int ret;
2694   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2695   if(ret != MPI_SUCCESS)
2696     return ret;
2697 #endif
2698
2699   AMPIAPI("AMPI_Ssend");
2700 #if AMPIMSGLOG
2701   if(msgLogRead){
2702     return 0;
2703   }
2704 #endif
2705
2706   ampi *ptr = getAmpiInstance(comm);
2707 #if AMPI_COMLIB
2708   if(enableStreaming){
2709     ptr->getStreaming().beginIteration();
2710     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2711   } else
2712 #endif
2713     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm, 1);
2714 #if AMPI_COUNTER
2715   getAmpiParent()->counters.send++;
2716 #endif
2717
2718   return 0;
2719 }
2720
2721   CDECL
2722 int AMPI_Issend(void *buf, int count, MPI_Datatype type, int dest,
2723     int tag, MPI_Comm comm, MPI_Request *request)
2724 {
2725   AMPIAPI("AMPI_Issend");
2726
2727 #if CMK_ERROR_CHECKING
2728   int ret;
2729   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
2730   if(ret != MPI_SUCCESS)
2731   {
2732     *request = MPI_REQUEST_NULL;
2733     return ret;
2734   }
2735 #endif
2736
2737 #if AMPIMSGLOG
2738   ampiParent* pptr = getAmpiParent();
2739   if(msgLogRead){
2740     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
2741     return 0;
2742   }
2743 #endif
2744
2745   USER_CALL_DEBUG("AMPI_Issend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
2746   ampi *ptr = getAmpiInstance(comm);
2747   AmpiRequestList* reqs = getReqs();
2748   SReq *newreq = new SReq(comm);
2749   *request = reqs->insert(newreq);
2750   // 1:  blocking now  - used by MPI_Ssend
2751   // >=2:  the index of the requests - used by MPI_Issend
2752   ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm, *request+2);
2753 #if AMPI_COUNTER
2754   getAmpiParent()->counters.isend++;
2755 #endif
2756
2757 #if AMPIMSGLOG
2758   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2759     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
2760   }
2761 #endif
2762
2763   return 0;
2764 }
2765
2766   CDECL
2767 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
2768     MPI_Comm comm, MPI_Status *status)
2769 {
2770   AMPIAPI("AMPI_Recv");
2771
2772 #if CMK_ERROR_CHECKING
2773   int ret;
2774   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, msg, 1);
2775   if(ret != MPI_SUCCESS)
2776     return ret;
2777 #endif
2778
2779 #if AMPIMSGLOG
2780   ampiParent* pptr = getAmpiParent();
2781   if(msgLogRead){
2782     (*(pptr->fromPUPer))|(pptr->pupBytes);
2783     PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
2784     PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
2785     return 0;
2786   }
2787 #endif
2788
2789   ampi *ptr = getAmpiInstance(comm);
2790   if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
2791
2792 #if AMPI_COUNTER
2793   getAmpiParent()->counters.recv++;
2794 #endif
2795
2796 #if AMPIMSGLOG
2797   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2798     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2799     (*(pptr->toPUPer))|(pptr->pupBytes);
2800     PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
2801     PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
2802   }
2803 #endif
2804
2805   return 0;
2806 }
2807
2808   CDECL
2809 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
2810 {
2811
2812 #if CMK_ERROR_CHECKING
2813   int ret;
2814   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2815   if(ret != MPI_SUCCESS)
2816     return ret;
2817 #endif
2818
2819   AMPIAPI("AMPI_Probe");
2820   ampi *ptr = getAmpiInstance(comm);
2821   ptr->probe(tag,src, comm, (int*) status);
2822   return 0;
2823 }
2824
2825   CDECL
2826 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
2827 {
2828   AMPIAPI("AMPI_Iprobe");
2829
2830 #if CMK_ERROR_CHECKING
2831   int ret;
2832   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2833   if(ret != MPI_SUCCESS)
2834     return ret;
2835 #endif
2836
2837   ampi *ptr = getAmpiInstance(comm);
2838   *flag = ptr->iprobe(tag,src,comm,(int*) status);
2839   return 0;
2840 }
2841
2842   CDECL
2843 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
2844     int stag, void *rbuf, int rcount, int rtype,
2845     int src, int rtag, MPI_Comm comm, MPI_Status *sts)
2846 {
2847   AMPIAPI("AMPI_Sendrecv");
2848
2849 #if CMK_ERROR_CHECKING
2850   int ret;
2851   ret = errorCheck(comm, 1, scount, 1, stype, 1, stag, 1, dest, 1, sbuf, 1);
2852   if(ret != MPI_SUCCESS)
2853     return ret;
2854   ret = errorCheck(comm, 1, rcount, 1, rtype, 1, rtag, 1, src, 1, rbuf, 1);
2855   if(ret != MPI_SUCCESS)
2856     return ret;
2857 #endif
2858
2859   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
2860   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
2861   if (se) return se;
2862   else return re;
2863 }
2864
2865   CDECL
2866 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
2867     int dest, int sendtag, int source, int recvtag,
2868     MPI_Comm comm, MPI_Status *status)
2869 {
2870   AMPIAPI("AMPI_Sendrecv_replace");
2871   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
2872       buf, count, datatype, source, recvtag, comm, status);
2873 }
2874
2875
2876   CDECL
2877 int AMPI_Barrier(MPI_Comm comm)
2878 {
2879   AMPIAPI("AMPI_Barrier");
2880
2881 #if CMK_ERROR_CHECKING
2882   if(checkCommunicator(comm) != MPI_SUCCESS)
2883     return checkCommunicator(comm);
2884 #endif
2885
2886   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
2887
2888   TRACE_BG_AMPI_LOG(MPI_BARRIER, 0);
2889
2890   //HACK: Use collective operation as a barrier.
2891   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
2892
2893   //BIGSIM_OOC DEBUGGING
2894   //CkPrintf("%d: in AMPI_Barrier, after AMPI_Allreduce\n", getAmpiParent()->thisIndex);
2895 #if AMPI_COUNTER
2896   getAmpiParent()->counters.barrier++;
2897 #endif
2898   return 0;
2899 }
2900
2901   CDECL
2902 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
2903     MPI_Comm comm)
2904 {
2905   AMPIAPI("AMPI_Bcast");
2906
2907 #if CMK_ERROR_CHECKING
2908   int ret;
2909   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, buf, 1);
2910   if(ret != MPI_SUCCESS)
2911     return ret;
2912 #endif
2913
2914   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
2915   if(comm==MPI_COMM_SELF) return 0;
2916
2917 #if AMPIMSGLOG
2918   ampiParent* pptr = getAmpiParent();
2919   if(msgLogRead){
2920     (*(pptr->fromPUPer))|(pptr->pupBytes);
2921     PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
2922     return 0;
2923   }
2924 #endif
2925
2926   ampi* ptr = getAmpiInstance(comm);
2927   ptr->bcast(root, buf, count, type,comm);
2928 #if AMPI_COUNTER
2929   getAmpiParent()->counters.bcast++;
2930 #endif
2931
2932 #if AMPIMSGLOG
2933   if(msgLogWrite && record_msglog(pptr->thisIndex)) {
2934     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2935     (*(pptr->toPUPer))|(pptr->pupBytes);
2936     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
2937   }
2938 #endif
2939 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2940   //  ptr->yield();
2941   //  //  processRemoteMlogMessages();
2942 #endif
2943
2944   return 0;
2945 }
2946
2947 /// This routine is called with the results of a Reduce or AllReduce
2948 const int MPI_REDUCE_SOURCE=0;
2949 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2950 void ampi::reduceResult(CkReductionMsg *msg)
2951 {
2952   MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2953   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2954       thisArrayID,thisIndex);
2955   delete msg;
2956 }
2957
2958 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2959 {
2960   int szdata = ddt->getSize(count);
2961   int szhdr = sizeof(AmpiOpHeader);
2962   AmpiOpHeader newhdr(op,type,count,szdata); 
2963   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2964   memcpy(msg->getData(),&newhdr,szhdr);
2965   if (count > 0) {
2966     TCharm::activateVariable(inbuf);
2967     ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2968     TCharm::deactivateVariable(inbuf);
2969   }
2970   return msg;
2971 }
2972
2973 // Copy the MPI datatype "type" from inbuf to outbuf
2974 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2975   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2976   //  deserializing into the output.
2977   ampi *ptr = getAmpiInstance(comm);
2978   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2979   int len=ddt->getSize(count);
2980   char *serialized=new char[len];
2981   TCharm::activateVariable(inbuf);
2982   TCharm::activateVariable(outbuf);
2983   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
2984   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
2985   TCharm::deactivateVariable(outbuf);
2986   TCharm::deactivateVariable(inbuf);
2987   delete [] serialized;         // < memory leak!  // gzheng 
2988
2989   return MPI_SUCCESS;
2990 }
2991
2992 static void handle_MPI_IN_PLACE(void* &inbuf, void* &outbuf)
2993 {
2994   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
2995   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
2996   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
2997 }
2998
2999 #define SYNCHRONOUS_REDUCE                           0
3000
3001   CDECL
3002 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
3003     int root, MPI_Comm comm)
3004 {
3005   AMPIAPI("AMPI_Reduce");
3006
3007   handle_MPI_IN_PLACE(inbuf, outbuf);
3008
3009 #if CMK_ERROR_CHECKING
3010   int ret;
3011   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, inbuf, 1,
3012                    outbuf, getAmpiInstance(comm)->getRank(comm) == root);
3013   if(ret != MPI_SUCCESS)
3014     return ret;
3015 #endif
3016
3017   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3018
3019 #if AMPIMSGLOG
3020   ampiParent* pptr = getAmpiParent();
3021   if(msgLogRead){
3022     (*(pptr->fromPUPer))|(pptr->pupBytes);
3023     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
3024     return 0;
3025   }
3026 #endif
3027
3028   ampi *ptr = getAmpiInstance(comm);
3029   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
3030   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
3031   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
3032
3033   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3034
3035   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
3036   msg->setCallback(reduceCB);
3037   MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
3038   ptr->contribute(msg);
3039
3040   if (ptr->thisIndex == rootIdx){
3041     /*HACK: Use recv() to block until reduction data comes back*/
3042     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3043       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
3044
3045 #if SYNCHRONOUS_REDUCE
3046       AmpiMsg *msg = new (0, 0) AmpiMsg(-1, MPI_REDUCE_TAG, -1, rootIdx, 0, MPI_REDUCE_COMM);
3047       CProxy_ampi pa(ptr->getProxy());
3048       pa.generic(msg);
3049 #endif
3050   }
3051 #if SYNCHRONOUS_REDUCE
3052   ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, NULL, 0, type, MPI_REDUCE_COMM);
3053 #endif
3054
3055 #if AMPI_COUNTER
3056   getAmpiParent()->counters.reduce++;
3057 #endif
3058
3059 #if AMPIMSGLOG
3060   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3061     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3062     (*(pptr->toPUPer))|(pptr->pupBytes);
3063     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3064   }
3065 #endif
3066
3067   return 0;
3068 }
3069
3070   CDECL
3071 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
3072     MPI_Op op, MPI_Comm comm)
3073 {
3074   AMPIAPI("AMPI_Allreduce");
3075
3076   handle_MPI_IN_PLACE(inbuf, outbuf);
3077
3078 #if CMK_ERROR_CHECKING
3079   int ret;
3080   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, inbuf, 1, outbuf, 1);
3081   if(ret != MPI_SUCCESS)
3082     return ret;
3083 #endif
3084
3085   ampi *ptr = getAmpiInstance(comm);
3086
3087   CkDDT_DataType *ddt_type = ptr->getDDT()->getType(type);
3088
3089 #if CMK_BIGSIM_CHARM
3090   TRACE_BG_AMPI_LOG(MPI_ALLREDUCE, ddt_type->getSize(count));
3091 #endif
3092
3093   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3094
3095 #if AMPIMSGLOG
3096   ampiParent* pptr = getAmpiParent();
3097   if(msgLogRead){
3098     (*(pptr->fromPUPer))|(pptr->pupBytes);
3099     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
3100     //    CkExit();
3101     return 0;
3102   }
3103 #endif
3104
3105   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
3106   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
3107
3108   CkReductionMsg *msg=makeRednMsg(ddt_type, inbuf, count, type, op);
3109   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3110   msg->setCallback(allreduceCB);
3111   ptr->contribute(msg);
3112
3113   /*HACK: Use recv() to block until the reduction data comes back*/
3114   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3115     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
3116 #if AMPI_COUNTER
3117   getAmpiParent()->counters.allreduce++;
3118 #endif
3119
3120 #if AMPIMSGLOG
3121   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3122     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3123     (*(pptr->toPUPer))|(pptr->pupBytes);
3124     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3125     //    CkExit();
3126   }
3127 #endif
3128
3129   return 0;
3130 }
3131
3132   CDECL
3133 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
3134     MPI_Op op, MPI_Comm comm, MPI_Request* request)
3135 {
3136   AMPIAPI("AMPI_Iallreduce");
3137
3138   handle_MPI_IN_PLACE(inbuf, outbuf);
3139
3140 #if CMK_ERROR_CHECKING
3141   int ret;
3142   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, inbuf, 1, outbuf, 1);
3143   if(ret != MPI_SUCCESS)
3144   {
3145     *request = MPI_REQUEST_NULL;
3146     return ret;
3147   }
3148 #endif
3149
3150   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3151
3152   checkRequest(*request);
3153   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
3154   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
3155   ampi *ptr = getAmpiInstance(comm);
3156
3157   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3158   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3159   msg->setCallback(allreduceCB);
3160   ptr->contribute(msg);
3161
3162   // using irecv instead recv to non-block the call and get request pointer
3163   AmpiRequestList* reqs = getReqs();
3164   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
3165   *request = reqs->insert(newreq);
3166   return 0;
3167 }
3168
3169   CDECL
3170 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
3171     MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
3172 {
3173   AMPIAPI("AMPI_Reduce_scatter");
3174
3175   handle_MPI_IN_PLACE(sendbuf, recvbuf);
3176
3177 #if CMK_ERROR_CHECKING
3178   int ret;
3179   ret = errorCheck(comm, 1, 0, 0, datatype, 1, 0, 0, 0, 0, sendbuf, 1, recvbuf, 1);
3180   if(ret != MPI_SUCCESS)
3181     return ret;
3182 #endif
3183
3184   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
3185   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
3186   ampi *ptr = getAmpiInstance(comm);
3187   int size = ptr->getSize(comm);
3188   int count=0;
3189   int *displs = new int [size];
3190   int len;
3191   void *tmpbuf;
3192
3193   //under construction
3194   for(int i=0;i<size;i++){
3195     displs[i] = count;
3196     count+= recvcounts[i];
3197   }
3198   len = ptr->getDDT()->getType(datatype)->getSize(count);
3199   tmpbuf = malloc(len);
3200   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
3201   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
3202       recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
3203   free(tmpbuf);
3204   delete [] displs;     // < memory leak ! // gzheng
3205   return 0;
3206 }
3207
3208 /***** MPI_Scan algorithm (from MPICH) *******
3209   recvbuf = sendbuf;
3210   partial_scan = sendbuf;
3211   mask = 0x1;
3212   while (mask < size) {
3213   dst = rank^mask;
3214   if (dst < size) {
3215   send partial_scan to dst;
3216   recv from dst into tmp_buf;
3217   if (rank > dst) {
3218   partial_scan = tmp_buf + partial_scan;
3219   recvbuf = tmp_buf + recvbuf;
3220   }
3221   else {
3222   if (op is commutative)
3223   partial_scan = tmp_buf + partial_scan;
3224   else {
3225   tmp_buf = partial_scan + tmp_buf;
3226   partial_scan = tmp_buf;
3227   }
3228   }
3229   }
3230   mask <<= 1;
3231   }
3232  ***** MPI_Scan algorithm (from MPICH) *******/
3233
3234 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
3235   (op)(invec,inoutvec,&count,&datatype);
3236 }
3237 CDECL
3238 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
3239   AMPIAPI("AMPI_Scan");
3240
3241 #if CMK_ERROR_CHECKING
3242   if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
3243     CmiAbort("AMPI_Scan does not implement MPI_IN_PLACE");
3244
3245   int ret;
3246   ret = errorCheck(comm, 1, count, 1, datatype, 1, 0, 0, 0, 0, sendbuf, 1, recvbuf, 1);
3247   if(ret != MPI_SUCCESS)
3248     return ret;
3249 #endif
3250
3251   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
3252   MPI_Status sts;
3253   ampi *ptr = getAmpiInstance(comm);
3254   int size = ptr->getSize(comm);
3255   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
3256   int rank = ptr->getRank(comm);
3257   int mask = 0x1;
3258   int dst;
3259   void* tmp_buf = malloc(blklen);
3260   void* partial_scan = malloc(blklen);
3261
3262   memcpy(recvbuf, sendbuf, blklen);
3263   memcpy(partial_scan, sendbuf, blklen);
3264   while(mask < size){
3265     dst = rank^mask;
3266     if(dst < size){
3267       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
3268           tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
3269       if(rank > dst){
3270         (op)(tmp_buf,partial_scan,&count,&datatype);
3271         (op)(tmp_buf,recvbuf,&count,&datatype);
3272       }else {
3273         (op)(partial_scan,tmp_buf,&count,&datatype);
3274         memcpy(partial_scan,tmp_buf,blklen);
3275       }
3276     }
3277     mask <<= 1;
3278
3279   }
3280
3281   free(tmp_buf);
3282   free(partial_scan);
3283 #if AMPI_COUNTER
3284   getAmpiParent()->counters.scan++;
3285 #endif
3286   return 0;
3287 }
3288
3289 CDECL
3290 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
3291   //AMPIAPI("AMPI_Op_create");
3292   *op = function;
3293   return 0;
3294 }
3295
3296 CDECL
3297 int AMPI_Op_free(MPI_Op *op){
3298   //AMPIAPI("AMPI_Op_free");
3299   *op = MPI_OP_NULL;
3300   return 0;
3301 }
3302
3303
3304   CDECL
3305 double AMPI_Wtime(void)
3306 {
3307   //  AMPIAPI("AMPI_Wtime");
3308
3309 #if AMPIMSGLOG
3310   double ret=TCHARM_Wall_timer();
3311   ampiParent* pptr = getAmpiParent();
3312   if(msgLogRead){
3313     (*(pptr->fromPUPer))|ret;
3314     return ret;
3315   }
3316
3317   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3318     (*(pptr->toPUPer))|ret;
3319   }
3320 #endif
3321
3322 #if CMK_BIGSIM_CHARM
3323   return BgGetTime();
3324 #else
3325   return TCHARM_Wall_timer();
3326 #endif
3327 }
3328
3329 CDECL
3330 double AMPI_Wtick(void){
3331   //AMPIAPI("AMPI_Wtick");
3332   return 1e-6;
3333 }
3334
3335
3336 int PersReq::start(){
3337   if(sndrcv == 1 || sndrcv == 3) { // send or ssend request
3338     ampi *ptr=getAmpiInstance(comm);
3339     ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm, sndrcv==3?1:0);
3340   }
3341   return 0;
3342 }
3343
3344   CDECL
3345 int AMPI_Start(MPI_Request *request)
3346 {
3347   AMPIAPI("AMPI_Start");
3348   checkRequest(*request);
3349   AmpiRequestList *reqs = getReqs();
3350   if(-1==(*reqs)[*request]->start()) {
3351     CkAbort("MPI_Start could be used only on persistent communication requests!");
3352   }
3353   return 0;
3354 }
3355
3356 CDECL
3357 int AMPI_Startall(int count, MPI_Request *requests){
3358   AMPIAPI("AMPI_Startall");
3359   checkRequests(count,requests);
3360   AmpiRequestList *reqs = getReqs();
3361   for(int i=0;i<count;i++){
3362     if(-1==(*reqs)[requests[i]]->start())
3363       CkAbort("MPI_Start could be used only on persistent communication requests!");
3364   }
3365   return 0;
3366 }
3367
3368 /* organize the indices of requests into a vector of a vector: 
3369  * level 1 is different msg envelope matches
3370  * level 2 is (posting) ordered requests of with envelope
3371  * each time multiple completion call loop over first elem of level 1
3372  * and move the matched to the NULL request slot.   
3373  * warning: this does not work with I-Alltoall requests */
3374 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
3375   for(int i=0;i<count;i++){
3376     if(reqs[i]!=MPI_REQUEST_NULL)
3377       return 0;
3378   }
3379   return 1;
3380 }
3381 inline int matchReq(MPI_Request ia, MPI_Request ib){
3382   checkRequest(ia);  
3383   checkRequest(ib);
3384   AmpiRequestList* reqs = getReqs();
3385   AmpiRequest *a, *b;
3386   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
3387   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
3388   a=(*reqs)[ia];  b=(*reqs)[ib];
3389   if(a->tag != b->tag) return 0;
3390   if(a->src != b->src) return 0;
3391   if(a->comm != b->comm) return 0;
3392   return 1;
3393 }
3394 inline void swapInt(int& a,int& b){
3395   int tmp;
3396   tmp=a; a=b; b=tmp;
3397 }
3398 inline void sortedIndex(int n, int* arr, int* idx){
3399   int i,j;
3400   for(i=0;i<n;i++) 
3401     idx[i]=i;
3402   for (i=0; i<n-1; i++) 
3403     for (j=0; j<n-1-i; j++)
3404       if (arr[idx[j+1]] < arr[idx[j]]) 
3405         swapInt(idx[j+1],idx[j]);
3406 }
3407 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
3408   CkAssert(count!=0);
3409   int *newidx = new int [count];
3410   int flag;
3411   sortedIndex(count,arr,newidx);
3412   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
3413   CkVec<int> slot;
3414   vec->push_back(slot);
3415   (*vec)[0].push_back(newidx[0]);
3416   for(int i=1;i<count;i++){
3417     flag=0;
3418     for(int j=0;j<vec->size();j++){
3419       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
3420         ((*vec)[j]).push_back(newidx[i]);
3421         flag++;
3422       }
3423     }
3424     if(!flag){
3425       CkVec<int> newslot;
3426       newslot.push_back(newidx[i]);
3427       vec->push_back(newslot);
3428     }else{
3429       CkAssert(flag==1);
3430     }
3431   }
3432   delete [] newidx;
3433   return vec;
3434 }
3435 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
3436   printf("vec content: ");
3437   for(int i=0;i<vec.size();i++){
3438     printf("{");
3439     for(int j=0;j<(vec[i]).size();j++){
3440       printf(" %d ",arr[(vec[i])[j]]);
3441     }
3442     printf("} ");
3443   }
3444   printf("\n");
3445 }
3446
3447 int PersReq::wait(MPI_Status *sts){
3448   if(sndrcv == 2) {
3449     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3450       CkAbort("AMPI> Error in persistent request wait");
3451 #if CMK_BIGSIM_CHARM
3452     _TRACE_BG_TLINE_END(&event);
3453 #endif
3454   }
3455   return 0;
3456 }
3457
3458 int IReq::wait(MPI_Status *sts){
3459   if(CpvAccess(CmiPICMethod) == 2) {
3460     AMPI_DEBUG("In weird clause of IReq::wait\n");
3461     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3462       CkAbort("AMPI> Error in non-blocking request wait");
3463
3464     return 0;
3465   }
3466
3467   //Copy "this" to a local variable in the case that "this" pointer
3468   //is updated during the out-of-core emulation.
3469
3470   // optimization for Irecv
3471   // generic() writes directly to the buffer, so the only thing we
3472   // do here is to wait
3473   ampi *ptr = getAmpiInstance(comm);
3474
3475   //BIGSIM_OOC DEBUGGING
3476   //int ooccnt=0;
3477   //int ampiIndex = ptr->thisIndex;
3478   //CmiPrintf("%d: IReq's status=%d\n", ampiIndex, statusIreq);
3479
3480   while (statusIreq == false) {
3481     //BIGSIM_OOC DEBUGGING
3482     //CmiPrintf("Before blocking: %dth time: %d: in Ireq::wait\n", ++ooccnt, ptr->thisIndex);
3483     //print();
3484
3485     ptr->resumeOnRecv=true;
3486     ptr->block();
3487
3488     //BIGSIM_OOC DEBUGGING
3489     //CmiPrintf("[%d] After blocking: in Ireq::wait\n", ptr->thisIndex);
3490     //CmiPrintf("IReq's this pointer: %p\n", this);
3491     //print();
3492
3493 #if CMK_BIGSIM_CHARM
3494     //Because of the out-of-core emulation, this pointer is changed after in-out
3495     //memory operation. So we need to return from this function and do the while loop
3496     //in the outer function call.       
3497     if(_BgInOutOfCoreMode)
3498       return -1;
3499 #endif  
3500   }   // end of while
3501   ptr->resumeOnRecv=false;
3502
3503   AMPI_DEBUG("IReq::wait has resumed\n");
3504
3505   if(sts) {
3506     AMPI_DEBUG("Setting sts->MPI_TAG to this->tag=%d in IReq::wait  this=%p\n", (int)this->tag, this);
3507     sts->MPI_TAG = tag;
3508     sts->MPI_SOURCE = src;
3509     sts->MPI_COMM = comm;
3510     sts->MPI_LENGTH = length;
3511   }
3512
3513   return 0;
3514 }
3515
3516 int ATAReq::wait(MPI_Status *sts){
3517   int i;
3518   for(i=0;i<count;i++){
3519     if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3520           myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
3521       CkAbort("AMPI> Error in alltoall request wait");
3522 #if CMK_BIGSIM_CHARM
3523     _TRACE_BG_TLINE_END(&myreqs[i].event);
3524 #endif
3525   }
3526 #if CMK_BIGSIM_CHARM
3527   //TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", NULL, 0);
3528   TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0, 1);
3529   for (i=0; i<count; i++)
3530     _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
3531   _TRACE_BG_TLINE_END(&event);
3532 #endif
3533   return 0;
3534 }
3535
3536 int SReq::wait(MPI_Status *sts){
3537   ampi *ptr = getAmpiInstance(comm);
3538   while (statusIreq == false) {
3539     ptr->resumeOnRecv = true;
3540     ptr->block();
3541     ptr = getAmpiInstance(comm);
3542     ptr->resumeOnRecv = false;
3543   }
3544   return 0;
3545 }
3546
3547   CDECL
3548 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
3549 {
3550   AMPIAPI("AMPI_Wait");
3551   if(*request == MPI_REQUEST_NULL){
3552     stsempty(*sts);
3553     return 0;
3554   }
3555   checkRequest(*request);
3556   AmpiRequestList* reqs = getReqs();
3557
3558 #if AMPIMSGLOG
3559   ampiParent* pptr = getAmpiParent();
3560   if(msgLogRead){
3561     (*(pptr->fromPUPer))|(pptr->pupBytes);
3562     PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3563     PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
3564     return 0;
3565   }
3566 #endif
3567
3568   AMPI_DEBUG("AMPI_Wait request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3569   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3570   //(*reqs)[*request]->wait(sts);
3571   int waitResult = -1;
3572   do{
3573     AmpiRequest *waitReq = (*reqs)[*request];
3574     waitResult = waitReq->wait(sts);
3575     if(_BgInOutOfCoreMode){
3576       reqs = getReqs();
3577     }
3578   }while(waitResult==-1);
3579
3580
3581   AMPI_DEBUG("AMPI_Wait after calling wait, request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3582
3583
3584 #if AMPIMSGLOG
3585   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3586     (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
3587     (*(pptr->toPUPer))|(pptr->pupBytes);
3588     PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3589     PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
3590   }
3591 #endif
3592
3593   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3594     reqs->free(*request);
3595     *request = MPI_REQUEST_NULL;
3596   }
3597
3598   AMPI_DEBUG("End of AMPI_Wait\n");
3599
3600   return 0;
3601 }
3602
3603   CDECL
3604 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
3605 {
3606   AMPIAPI("AMPI_Waitall");
3607   if(count==0) return MPI_SUCCESS;
3608   checkRequests(count,request);
3609   int i,j,oldPe;
3610   AmpiRequestList* reqs = getReqs();
3611   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3612
3613 #if AMPIMSGLOG
3614   ampiParent* pptr = getAmpiParent();
3615   if(msgLogRead){
3616     for(i=0;i<reqvec->size();i++){
3617       for(j=0;j<((*reqvec)[i]).size();j++){
3618         if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3619           stsempty(sts[((*reqvec)[i])[j]]);
3620           continue;
3621         }
3622         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3623         (*(pptr->fromPUPer))|(pptr->pupBytes);
3624         PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3625         PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3626       }
3627     }
3628     return 0;
3629   }
3630 #endif
3631
3632 #if CMK_BIGSIM_CHARM
3633   void *curLog;         // store current log in timeline
3634   _TRACE_BG_TLINE_END(&curLog);
3635 #if 0
3636   TRACE_BG_AMPI_SUSPEND();
3637 #endif
3638 #endif
3639   for(i=0;i<reqvec->size();i++){
3640     for(j=0;j<((*reqvec)[i]).size();j++){
3641       //CkPrintf("[%d] in loop [%d, %d]\n", pptr->thisIndex,i, j);
3642       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3643         stsempty(sts[((*reqvec)[i])[j]]);
3644         continue;
3645       }
3646       oldPe = CkMyPe();
3647
3648       int waitResult = -1;
3649       do{       
3650         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3651         waitResult = waitReq->wait(&sts[((*reqvec)[i])[j]]);
3652         if(_BgInOutOfCoreMode){
3653           reqs = getReqs();
3654           reqvec = vecIndex(count, request);
3655         }
3656
3657 #if AMPIMSGLOG
3658         if(msgLogWrite && record_msglog(pptr->thisIndex)){
3659           (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
3660           (*(pptr->toPUPer))|(pptr->pupBytes);
3661           PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3662           PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3663         }
3664 #endif
3665
3666       }while(waitResult==-1);
3667
3668 #if 1
3669 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3670       //for fault evacuation
3671       if(oldPe != CkMyPe()){
3672 #endif
3673         reqs = getReqs();
3674         reqvec  = vecIndex(count,request);
3675 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3676       }
3677 #endif
3678 #endif
3679     }
3680   }
3681 #if CMK_BIGSIM_CHARM
3682   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
3683 #endif
3684   // free memory of requests
3685   for(i=0;i<count;i++){ 
3686     if(request[i] == MPI_REQUEST_NULL)
3687       continue;
3688     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
3689       reqs->free(request[i]);
3690       request[i] = MPI_REQUEST_NULL;
3691     }
3692   }
3693   delete reqvec;
3694   return 0;
3695 }
3696
3697   CDECL
3698 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
3699 {
3700   AMPIAPI("AMPI_Waitany");
3701
3702   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
3703   if(count == 0) return MPI_SUCCESS;
3704   checkRequests(count,request);
3705   if(areInactiveReqs(count,request)){
3706     *idx=MPI_UNDEFINED;
3707     stsempty(*sts);
3708     return MPI_SUCCESS;
3709   }
3710   int flag=0;
3711   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3712   while(count>0){ /* keep looping until some request finishes: */
3713     for(int i=0;i<reqvec->size();i++){
3714       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
3715       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
3716         *idx = ((*reqvec)[i])[0];
3717         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
3718         return 0;
3719       }
3720     }
3721     /* no requests have finished yet-- schedule and try again */
3722     AMPI_Yield(MPI_COMM_WORLD);
3723   }
3724   *idx = MPI_UNDEFINED;
3725   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
3726   delete reqvec;
3727   return 0;
3728 }
3729
3730   CDECL
3731 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
3732     int *array_of_indices, MPI_Status *array_of_statuses)
3733 {
3734   AMPIAPI("AMPI_Waitsome");
3735   checkRequests(incount,array_of_requests);
3736   if(areInactiveReqs(incount,array_of_requests)){
3737     *outcount=MPI_UNDEFINED;
3738     return MPI_SUCCESS;
3739   }
3740   MPI_Status sts;
3741   int i;
3742   int flag=0, realflag=0;
3743   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3744   *outcount = 0;
3745   while(1){
3746     for(i=0;i<reqvec->size();i++){
3747       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3748       if(flag == 1){ 
3749         array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3750         array_of_statuses[(*outcount)++]=sts;
3751         if(sts.MPI_COMM != 0)
3752           realflag=1; // there is real(non null) request
3753       }
3754     }
3755     if(realflag && outcount>0) break;
3756   }
3757   delete reqvec;
3758   return 0;
3759 }
3760
3761   CmiBool PersReq::test(MPI_Status *sts){
3762     if(sndrcv == 2)     // recv request
3763       return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3764     else                        // send request
3765       return 1;
3766
3767   }
3768   void PersReq::complete(MPI_Status *sts){
3769     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3770       CkAbort("AMPI> Error in persistent request complete");
3771   }
3772
3773 CmiBool IReq::test(MPI_Status *sts){
3774   if (statusIreq == true) {           
3775     if(sts)
3776       sts->MPI_LENGTH = length;           
3777     return true;
3778   }
3779   else {
3780     getAmpiInstance(comm)->yield();
3781     return false;
3782   }
3783   /*
3784      return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3785    */
3786 }
3787
3788 CmiBool SReq::test(MPI_Status *sts){
3789   if (statusIreq == true) {
3790     return true;
3791   }
3792   else {
3793     getAmpiInstance(comm)->yield();
3794     return false;
3795   }
3796 }
3797
3798 void IReq::complete(MPI_Status *sts){
3799   wait(sts);
3800   /*
3801      if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3802      CkAbort("AMPI> Error in non-blocking request complete");
3803    */
3804 }
3805
3806 void SReq::complete(MPI_Status *sts){
3807   wait(sts);
3808 }
3809
3810 void IReq::receive(ampi *ptr, AmpiMsg *msg)
3811 {
3812   int sts = ptr->processMessage(msg, tag, src, buf, count, type);
3813   statusIreq = (sts == 0);
3814   length = msg->length;
3815   this->tag = msg->tag; // Although not required, we also extract tag from msg
3816   src = msg->srcRank; // Although not required, we also extract src from msg
3817   comm = msg->comm;
3818   AMPI_DEBUG("Setting this->tag to %d in IReq::receive this=%p\n", (int)this->tag, this);
3819 #if CMK_BIGSIM_CHARM
3820   event = msg->event; 
3821 #endif
3822   delete msg;
3823
3824   //BIGSIM_OOC DEBUGGING
3825   //CmiPrintf("In IReq::receive, this=%p ", this);
3826   //print();
3827 }
3828
3829 CmiBool ATAReq::test(MPI_Status *sts){
3830   int i, flag=1;
3831   for(i=0;i<count;i++){
3832     flag *= getAmpiInstance(myreqs[i].comm)->iprobe(myreqs[i].tag, myreqs[i].src,
3833         myreqs[i].comm, (int*) sts);
3834   }
3835   return flag;
3836 }
3837 void ATAReq::complete(MPI_Status *sts){
3838   int i;
3839   for(i=0;i<count;i++){
3840     if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3841           myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int*)sts))
3842       CkAbort("AMPI> Error in alltoall request complete");
3843   }
3844 }
3845
3846   CDECL
3847 int AMPI_Test(MPI_Request *request, int *flag, MPI_Status *sts)
3848 {
3849   AMPIAPI("AMPI_Test");
3850   if(*request==MPI_REQUEST_NULL) {
3851     *flag = 1;
3852     stsempty(*sts);
3853     return 0;
3854   }
3855   checkRequest(*request);
3856   AmpiRequestList* reqs = getReqs();
3857   if(1 == (*flag = (*reqs)[*request]->test(sts))){
3858     (*reqs)[*request]->complete(sts);
3859     if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3860       reqs->free(*request);
3861       *request = MPI_REQUEST_NULL;
3862     }
3863   }
3864   return 0;
3865 }
3866
3867 CDECL
3868 int AMPI_Testany(int count, MPI_Request *request, int *index, int *flag, MPI_Status *sts){
3869   AMPIAPI("AMPI_Testany");
3870   checkRequests(count,request);
3871   if(areInactiveReqs(count,request)){
3872     *flag=1;
3873     *index=MPI_UNDEFINED;
3874     stsempty(*sts);
3875     return MPI_SUCCESS;
3876   }
3877   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3878   *flag=0;
3879   for(int i=0;i<reqvec->size();i++){
3880     AMPI_Test(&request[((*reqvec)[i])[0]], flag, sts);
3881     if(*flag==1 && sts->MPI_COMM!=0){ // skip MPI_REQUEST_NULL
3882       *index = ((*reqvec)[i])[0];
3883       return 0;
3884     }
3885   }
3886   *index = MPI_UNDEFINED;
3887   delete reqvec;
3888   return 0;
3889 }
3890
3891   CDECL
3892 int AMPI_Testall(int count, MPI_Request *request, int *flag, MPI_Status *sts)
3893 {
3894   AMPIAPI("AMPI_Testall");
3895   if(count==0) return MPI_SUCCESS;
3896   checkRequests(count,request);
3897   int tmpflag;
3898   int i,j;
3899   AmpiRequestList* reqs = getReqs();
3900   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3901   *flag = 1;  
3902   for(i=0;i<reqvec->size();i++){
3903     for(j=0;j<((*reqvec)[i]).size();j++){
3904       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL)
3905         continue;
3906       tmpflag = (*reqs)[request[((*reqvec)[i])[j]]]->test(&sts[((*reqvec)[i])[j]]);
3907       *flag *= tmpflag;
3908     }
3909   }
3910   if(flag) 
3911     MPI_Waitall(count,request,sts);
3912   delete reqvec;        
3913   return 0;
3914 }
3915
3916   CDECL
3917 int AMPI_Testsome(int incount, MPI_Request *array_of_requests, int *outcount,
3918     int *array_of_indices, MPI_Status *array_of_statuses)
3919 {
3920   AMPIAPI("AMPI_Testsome");
3921   checkRequests(incount,array_of_requests);
3922   if(areInactiveReqs(incount,array_of_requests)){
3923     *outcount=MPI_UNDEFINED;
3924     return MPI_SUCCESS;
3925   }
3926   MPI_Status sts;
3927   int flag;
3928   int i;
3929   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3930   *outcount = 0;
3931   for(i=0;i<reqvec->size();i++){
3932     AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3933     if(flag == 1){
3934       array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3935       array_of_statuses[(*outcount)++]=sts;
3936     }
3937   }
3938   delete reqvec;
3939   return 0;
3940 }
3941
3942 CDECL
3943 int AMPI_Request_free(MPI_Request *request){
3944   AMPIAPI("AMPI_Request_free");
3945   if(*request==MPI_REQUEST_NULL) return 0;
3946   checkRequest(*request);
3947   AmpiRequestList* reqs = getReqs();
3948   reqs->free(*request);
3949   *request = MPI_REQUEST_NULL;
3950   return 0;
3951 }
3952
3953 CDECL
3954 int AMPI_Cancel(MPI_Request *request){
3955   AMPIAPI("AMPI_Cancel");
3956   return AMPI_Request_free(request);
3957 }
3958
3959 CDECL
3960 int AMPI_Test_cancelled(MPI_Status* status, int* flag) {
3961   /* FIXME: always returns success */
3962   *flag = 1;
3963   return 0;
3964 }
3965
3966   CDECL
3967 int AMPI_Recv_init(void *buf, int count, int type, int src, int tag,
3968     MPI_Comm comm, MPI_Request *req)
3969 {
3970   AMPIAPI("AMPI_Recv_init");
3971
3972 #if CMK_ERROR_CHECKING
3973   int ret;
3974   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, buf, 1);
3975   if(ret != MPI_SUCCESS)
3976   {
3977     *req = MPI_REQUEST_NULL;
3978     return ret;
3979   }
3980 #endif
3981
3982   AmpiRequestList* reqs = getReqs();
3983   PersReq *newreq = new PersReq(buf,count,type,src,tag,comm,2);
3984   *req = reqs->insert(newreq);
3985   return 0;
3986 }
3987
3988   CDECL
3989 int AMPI_Send_init(void *buf, int count, int type, int dest, int tag,
3990     MPI_Comm comm, MPI_Request *req)
3991 {
3992   AMPIAPI("AMPI_Send_init");
3993
3994 #if CMK_ERROR_CHECKING
3995   int ret;
3996   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
3997   if(ret != MPI_SUCCESS)
3998   {
3999     *req = MPI_REQUEST_NULL;
4000     return ret;
4001   }
4002 #endif
4003
4004   AmpiRequestList* reqs = getReqs();
4005   PersReq *newreq = new PersReq(buf,count,type,dest,tag,comm,1);
4006   *req = reqs->insert(newreq);
4007   return 0;
4008 }
4009
4010   CDECL
4011 int AMPI_Ssend_init(void *buf, int count, int type, int dest, int tag,
4012     MPI_Comm comm, MPI_Request *req)
4013 {
4014   AMPIAPI("AMPI_Ssend_init");
4015
4016 #if CMK_ERROR_CHECKING
4017   int ret;
4018   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
4019   if(ret != MPI_SUCCESS)
4020   {
4021     *req = MPI_REQUEST_NULL;  
4022     return ret;
4023   }
4024 #endif
4025
4026   AmpiRequestList* reqs = getReqs();
4027   PersReq *newreq = new PersReq(buf,count,type,dest,tag,comm,3);
4028   *req = reqs->insert(newreq);
4029   return 0;
4030 }
4031
4032   CDECL
4033 int AMPI_Type_contiguous(int count, MPI_Datatype oldtype,
4034     MPI_Datatype *newtype)
4035 {
4036   AMPIAPI("AMPI_Type_contiguous");
4037   getDDT()->newContiguous(count, oldtype, newtype);
4038   return 0;
4039 }
4040
4041   CDECL
4042 int AMPI_Type_vector(int count, int blocklength, int stride,
4043     MPI_Datatype oldtype, MPI_Datatype*  newtype)
4044 {
4045   AMPIAPI("AMPI_Type_vector");
4046   getDDT()->newVector(count, blocklength, stride, oldtype, newtype);
4047   return 0 ;
4048 }
4049
4050   CDECL
4051 int AMPI_Type_hvector(int count, int blocklength, MPI_Aint stride, 
4052     MPI_Datatype oldtype, MPI_Datatype*  newtype)
4053 {
4054   AMPIAPI("AMPI_Type_hvector");
4055   getDDT()->newHVector(count, blocklength, stride, oldtype, newtype);
4056   return 0 ;
4057 }
4058
4059   CDECL
4060 int AMPI_Type_indexed(int count, int* arrBlength, int* arrDisp, 
4061     MPI_Datatype oldtype, MPI_Datatype*  newtype)
4062 {
4063   AMPIAPI("AMPI_Type_indexed");
4064   getDDT()->newIndexed(count, arrBlength, arrDisp, oldtype, newtype);
4065   return 0 ;
4066 }
4067
4068   CDECL
4069 int AMPI_Type_hindexed(int count, int* arrBlength, MPI_Aint* arrDisp,
4070     MPI_Datatype oldtype, MPI_Datatype*  newtype)
4071 {
4072   AMPIAPI("AMPI_Type_hindexed");
4073   getDDT()->newHIndexed(count, arrBlength, arrDisp, oldtype, newtype);
4074   return 0 ;
4075 }
4076
4077   CDECL
4078 int AMPI_Type_struct(int count, int* arrBlength, int* arrDisp, 
4079     MPI_Datatype* oldtype, MPI_Datatype*  newtype)
4080 {
4081   AMPIAPI("AMPI_Type_struct");
4082   getDDT()->newStruct(count, arrBlength, arrDisp, oldtype, newtype);
4083   return 0 ;
4084 }
4085
4086   CDECL
4087 int AMPI_Type_commit(MPI_Datatype *datatype)
4088 {
4089   AMPIAPI("AMPI_Type_commit");
4090   return 0;
4091 }
4092
4093   CDECL
4094 int AMPI_Type_free(MPI_Datatype *datatype)
4095 {
4096   AMPIAPI("AMPI_Type_free");
4097   getDDT()->freeType(datatype);
4098   return 0;
4099 }
4100
4101
4102   CDECL
4103 int AMPI_Type_extent(MPI_Datatype datatype, MPI_Aint *extent)
4104 {
4105   AMPIAPI("AMPI_Type_extent");
4106   *extent = getDDT()->getExtent(datatype);
4107   return 0;
4108 }
4109
4110   CDECL
4111 int AMPI_Type_size(MPI_Datatype datatype, int *size)
4112 {
4113   AMPIAPI("AMPI_Type_size");
4114   *size=getDDT()->getSize(datatype);
4115   return 0;
4116 }
4117
4118   CDECL
4119 int AMPI_Isend(void *buf, int count, MPI_Datatype type, int dest,
4120     int tag, MPI_Comm comm, MPI_Request *request)
4121 {
4122   AMPIAPI("AMPI_Isend");
4123
4124 #if CMK_ERROR_CHECKING
4125   int ret;
4126   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
4127   if(ret != MPI_SUCCESS)
4128   {
4129     *request = MPI_REQUEST_NULL;
4130     return ret;
4131   }
4132 #endif
4133
4134 #if AMPIMSGLOG
4135   ampiParent* pptr = getAmpiParent();
4136   if(msgLogRead){
4137     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
4138     return 0;
4139   }
4140 #endif
4141
4142   USER_CALL_DEBUG("AMPI_Isend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
4143   ampi *ptr = getAmpiInstance(comm);
4144 #if AMPI_COMLIB
4145   if(enableStreaming){
4146     //    ptr->getStreaming().beginIteration();
4147     ptr->comlibsend(tag,ptr->getRank(comm),buf,count,type,dest,comm);
4148   } else
4149 #endif
4150     ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm);
4151   *request = MPI_REQUEST_NULL;
4152 #if AMPI_COUNTER
4153   getAmpiParent()->counters.isend++;
4154 #endif
4155
4156 #if AMPIMSGLOG
4157   if(msgLogWrite && record_msglog(pptr->thisIndex)){
4158     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
4159   }
4160 #endif
4161
4162   return 0;
4163 }
4164
4165   CDECL
4166 int AMPI_Irecv(void *buf, int count, MPI_Datatype type, int src,
4167     int tag, MPI_Comm comm, MPI_Request *request)
4168 {
4169   AMPIAPI("AMPI_Irecv");
4170
4171 #if CMK_ERROR_CHECKING
4172   int ret;
4173   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, buf, 1);
4174   if(ret != MPI_SUCCESS)
4175     return ret;
4176 #endif
4177
4178   if(src==MPI_PROC_NULL) { *request = MPI_REQUEST_NULL; return 0; }
4179   USER_CALL_DEBUG("AMPI_Irecv("<<type<<","<<src<<","<<tag<<","<<comm<<")");
4180   AmpiRequestList* reqs = getReqs();
4181   IReq *newreq = new IReq(buf,count,type,src,tag,comm);
4182   *request = reqs->insert(newreq);
4183
4184 #if AMPIMSGLOG
4185   ampiParent* pptr = getAmpiParent();
4186   if(msgLogRead){
4187     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
4188     return 0;
4189   }
4190 #endif
4191
4192   // if msg is here, shall we do receive right away ??
4193   // posted irecv
4194   // message already arraved?
4195   ampi *ptr = getAmpiInstance(comm);
4196   AmpiMsg *msg = NULL;
4197   if (CpvAccess(CmiPICMethod) != 2)       // not copyglobals 
4198   {
4199     msg = ptr->getMessage(tag, src, comm, &newreq->tag);
4200   }
4201   if (msg) {
4202     newreq->receive(ptr, msg);
4203   } else {
4204     // post receive
4205     int tags[3];
4206     tags[0] = tag; tags[1] = src; tags[2] = comm;
4207 #if 0
4208     CmmPut(ptr->posted_ireqs, 3, tags, newreq);
4209 #else    
4210     //just insert the index of the newreq in the ampiParent::ampiReqs
4211     //to posted_ireqs. Such change is due to the need for Out-of-core Emulation
4212     //in BigSim. Before this change, posted_ireqs and ampiReqs both hold pointers to
4213     //AmpiRequest instances. After going through the Pupping routines, both will have
4214     //pointers to different AmpiRequest instances and no longer refer to the same AmpiRequest
4215     //instance. Therefore, to keep both always accessing the same AmpiRequest instance,
4216     //posted_ireqs stores the index (an integer) to ampiReqs. 
4217     //The index is 1-based rather 0-based because when pulling entries from posted_ireqs,
4218     //if not found, a "0"(i.e. NULL) is returned, this confuses the indexing of ampiReqs. 
4219     CmmPut(ptr->posted_ireqs, 3, tags, (void *)(CmiIntPtr)((*request)+1));
4220 #endif
4221   }
4222
4223 #if AMPI_COUNTER
4224   getAmpiParent()->counters.irecv++;
4225 #endif
4226
4227 #if AMPIMSGLOG
4228   if(msgLogWrite && record_msglog(pptr->thisIndex)){
4229     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
4230   }
4231 #endif
4232
4233   return 0;
4234 }
4235
4236   CDECL
4237 int AMPI_Ireduce(void *sendbuf, void *recvbuf, int count, int type, MPI_Op op,
4238     int root, MPI_Comm comm, MPI_Request *request)
4239 {
4240   AMPIAPI("AMPI_Ireduce");
4241
4242   handle_MPI_IN_PLACE(sendbuf, recvbuf);
4243
4244 #if CMK_ERROR_CHECKING
4245   int ret;
4246   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, sendbuf, 1,
4247                    recvbuf, getAmpiInstance(comm)->getRank(comm) == root);
4248   if(ret != MPI_SUCCESS)
4249   {
4250     *request = MPI_REQUEST_NULL;
4251     return ret;
4252   }
4253 #endif
4254
4255   if(op == MPI_OP_NULL) CkAbort("MPI_Ireduce called with MPI_OP_NULL!!!");
4256   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,sendbuf,recvbuf);
4257   ampi *ptr = getAmpiInstance(comm);
4258   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),sendbuf,count,type,op);
4259   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
4260   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
4261   msg->setCallback(reduceCB);
4262   ptr->contribute(msg);
4263
4264   if (ptr->thisIndex == rootIdx){
4265     // using irecv instead recv to non-block the call and get request pointer
4266     AmpiRequestList* reqs = getReqs();
4267     IReq *newreq = new IReq(recvbuf,count,type,0,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
4268     *request = reqs->insert(newreq);
4269   }
4270   return 0;
4271 }
4272
4273   CDECL
4274 int AMPI_Allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
4275     void *recvbuf, int recvcount, MPI_Datatype recvtype,
4276     MPI_Comm comm)
4277 {
4278   AMPIAPI("AMPI_Allgather");
4279
4280   if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
4281     CmiAbort("AMPI_Allgather does not implement MPI_IN_PLACE");
4282
4283 #if CMK_ERROR_CHECKING
4284   int ret;
4285   ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
4286   if(ret != MPI_SUCCESS)
4287     return ret;
4288   ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
4289   if(ret != MPI_SUCCESS)
4290     return ret;
4291 #endif
4292
4293   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgather not allowed for Inter-communicator!");
4294   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
4295
4296   ampi *ptr = getAmpiInstance(comm);
4297   int size = ptr->getSize(comm);
4298   int i;
4299
4300 #if AMPI_COMLIB  
4301
4302   if(comm == MPI_COMM_WORLD) {
4303     // commlib support
4304
4305     for(i=0;i<size;i++) {
4306       ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4307           sendtype, i, comm, ptr->getComlibProxy());
4308     }
4309     ptr->getAllgatherStrategy()->doneInserting();
4310
4311   } else 
4312 #endif
4313
4314     for(i=0;i<size;i++) {
4315       ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4316           sendtype, i, comm);
4317     }
4318
4319
4320   MPI_Status status;
4321   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
4322   int itemsize = dttype->getSize(recvcount) ;
4323
4324   for(i=0;i<size;i++) {
4325     AMPI_Recv(((char*)recvbuf)+(itemsize*i), recvcount, recvtype,
4326         i, MPI_GATHER_TAG, comm, &status);
4327   }
4328 #if AMPI_COUNTER
4329   getAmpiParent()->counters.allgather++;
4330 #endif
4331   return 0;
4332 }
4333
4334   CDECL
4335 int AMPI_Iallgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
4336     void *recvbuf, int recvcount, MPI_Datatype recvtype,
4337     MPI_Comm comm, MPI_Request* request)
4338 {
4339   AMPIAPI("AMPI_Iallgather");
4340
4341   if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
4342     CmiAbort("AMPI_Iallgather does not implement MPI_IN_PLACE");
4343
4344 #if CMK_ERROR_CHECKING
4345   int ret;
4346   ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
4347   if(ret != MPI_SUCCESS)
4348   {
4349     *request = MPI_REQUEST_NULL;
4350     return ret;
4351   }
4352   ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
4353   if(ret != MPI_SUCCESS)
4354   {
4355     *request = MPI_REQUEST_NULL;
4356     return ret;
4357   }
4358 #endif
4359
4360   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallgather not allowed for Inter-communicator!");
4361   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
4362
4363   ampi *ptr = getAmpiInstance(comm);
4364   int size = ptr->getSize(comm);
4365   int i;
4366 #if AMPI_COMLIB
4367   if(comm == MPI_COMM_WORLD) {
4368     // commlib support
4369     //      ptr->getAllgather().beginIteration();
4370     for(i=0;i<size;i++) {
4371       ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4372           sendtype, i, comm, ptr->getComlibProxy());
4373     }
4374     ptr->getAllgatherStrategy()->doneInserting();
4375   } else 
4376 #endif
4377     for(i=0;i<size;i++) {
4378       ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4379           sendtype, i, comm);
4380     }
4381
4382   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
4383   int itemsize = dttype->getSize(recvcount) ;
4384
4385   // copy+paste from MPI_Irecv
4386   AmpiRequestList* reqs = getReqs();
4387   ATAReq *newreq = new ATAReq(size);
4388   for(i=0;i<size;i++){
4389     if(newreq->addReq(((char*)recvbuf)+(itemsize*i),recvcount,recvtype,i,MPI_GATHER_TAG,comm)!=(i+1))
4390       CkAbort("MPI_Iallgather: Error adding requests into ATAReq!");
4391   }
4392   *request = reqs->insert(newreq);
4393   AMPI_DEBUG("MPI_Iallgather: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
4394
4395   return 0;
4396 }
4397
4398   CDECL
4399 int AMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
4400     void *recvbuf, int *recvcounts, int *displs,
4401     MPI_Datatype recvtype, MPI_Comm comm)
4402 {
4403   AMPIAPI("AMPI_Allgatherv");
4404
4405   if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
4406     CmiAbort("AMPI_Allgatherv does not implement MPI_IN_PLACE");
4407
4408 #if CMK_ERROR_CHECKING
4409   int ret;
4410   ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
4411   if(ret != MPI_SUCCESS)
4412     return ret;
4413 #endif
4414
4415   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgatherv not allowed for Inter-communicator!");
4416   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
4417
4418   ampi *ptr = getAmpiInstance(comm);
4419   int size = ptr->getSize(comm);
4420   int i;
4421 #if AMPI_COMLIB
4422   if(comm == MPI_COMM_WORLD) {
4423     // commlib support
4424     //      ptr->getAllgather().beginIteration();
4425     for(i=0;i<size;i++) {
4426       ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4427           sendtype, i, comm, ptr->getComlibProxy());
4428     }
4429     ptr->getAllgatherStrategy()->doneInserting();
4430   } else
4431 #endif 
4432     for(i=0;i<size;i++) {
4433       ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4434           sendtype, i, comm);
4435     }
4436
4437   MPI_Status status;
4438   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
4439   int itemsize = dttype->getSize() ;
4440
4441   for(i=0;i<size;i++) {
4442     AMPI_Recv(((char*)recvbuf)+(itemsize*displs[i]), recvcounts[i], recvtype,
4443         i, MPI_GATHER_TAG, comm, &status);
4444   }
4445 #if AMPI_COUNTER
4446   getAmpiParent()->counters.allgather++;
4447 #endif
4448   return 0;
4449 }
4450
4451   CDECL
4452 int AMPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
4453     void *recvbuf, int recvcount, MPI_Datatype recvtype,
4454     int root, MPI_Comm comm)
4455 {
4456   AMPIAPI("AMPI_Gather");
4457
4458   if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
4459     CmiAbort("AMPI_Gather does not implement MPI_IN_PLACE");
4460
4461 #if CMK_ERROR_CHECKING
4462   int ret;
4463   ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
4464   if(ret != MPI_SUCCESS)
4465     return ret;
4466 #endif
4467
4468   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
4469
4470 #if AMPIMSGLOG
4471   ampiParent* pptr = getAmpiParent();
4472   if(msgLogRead){
4473     (*(pptr->fromPUPer))|(pptr->pupBytes);
4474     PUParray(*(pptr->fromPUPer), (char *)recvbuf, (pptr->pupBytes));
4475     return 0;
4476   }
4477 #endif
4478
4479   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Gather not allowed for Inter-communicator!");
4480
4481   ampi *ptr = getAmpiInstance(comm);
4482
4483 #if CMK_ERROR_CHECKING
4484   ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0,
4485                    recvbuf, ptr->getRank(comm) == root);
4486   if(ret != MPI_SUCCESS)
4487     return ret;
4488 #endif
4489
4490   int size = ptr->getSize(comm);