f44bffa7209b7858a14e1e6d26862f1442b4d39d
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1
2 #define AMPIMSGLOG    0
3
4 #define exit exit /*Supress definition of exit in ampi.h*/
5 #include "ampiimpl.h"
6 #include "tcharm.h"
7 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
8 #include "ampiEvents.h" /*** for trace generation for projector *****/
9 #include "ampiProjections.h"
10 #endif
11
12 #if CMK_BIGSIM_CHARM
13 #include "bigsim_logs.h"
14 #endif
15
16 #define CART_TOPOL 1
17 #define AMPI_PRINT_IDLE 0
18
19 /* change this define to "x" to trace all send/recv's */
20 #define MSG_ORDER_DEBUG(x)  //x /* empty */
21 /* change this define to "x" to trace user calls */
22 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
23 #define STARTUP_DEBUG(x)  //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
24 #define FUNCCALL_DEBUG(x) //x /* empty */
25
26 static CkDDT *getDDT(void) {
27   return getAmpiParent()->myDDT;
28 }
29
30   inline int checkCommunicator(MPI_Comm comm) {
31     if(comm == MPI_COMM_NULL)
32       return MPI_ERR_COMM;
33     return MPI_SUCCESS;
34   }
35
36   inline int checkCount(int count) {
37     if(count < 0)
38       return MPI_ERR_COUNT;
39     return MPI_SUCCESS;
40   }
41
42   inline int checkData(MPI_Datatype data) {
43     if(data == MPI_DATATYPE_NULL)
44       return MPI_ERR_TYPE;
45     return MPI_SUCCESS;
46   }
47
48   inline int checkTag(int tag) {
49     if(tag != MPI_ANY_TAG && tag < 0)
50       return MPI_ERR_TAG;
51     return MPI_SUCCESS;
52   }
53
54 inline int checkRank(int rank, MPI_Comm comm) {
55   int size;
56   AMPI_Comm_size(comm, &size);
57   if(((rank >= 0) && (rank < size)) || (rank == MPI_ANY_SOURCE) || (rank ==
58         MPI_PROC_NULL))
59     return MPI_SUCCESS;
60   return MPI_ERR_RANK;
61 }
62
63   inline int checkBuf(void *buf, int count) {
64     if((count != 0 && buf == NULL) || buf == MPI_IN_PLACE)
65       return MPI_ERR_BUFFER;
66     return MPI_SUCCESS;
67   }
68
69 inline int errorCheck(MPI_Comm comm, int ifComm, int count, int ifCount,
70                       MPI_Datatype data, int ifData, int tag, int ifTag,
71                       int rank, int ifRank,
72                       void *buf1, int ifBuf1, void *buf2 = 0, int ifBuf2 = 0) {
73   int ret;
74   if(ifComm) { 
75     ret = checkCommunicator(comm);
76     if(ret != MPI_SUCCESS)
77       return ret;
78   }
79   if(ifCount) {
80     ret = checkCount(count);
81     if(ret != MPI_SUCCESS)
82       return ret;
83   }
84   if(ifData) {
85     ret = checkData(data);
86     if(ret != MPI_SUCCESS)
87       return ret;
88   }
89   if(ifTag) {
90     ret = checkTag(tag);
91     if(ret != MPI_SUCCESS)
92       return ret;
93   }
94   if(ifRank) {
95     ret = checkRank(rank,comm);
96     if(ret != MPI_SUCCESS)
97       return ret;
98   }
99   if(ifBuf1) {
100     ret = checkBuf(buf1,count);
101     if(ret != MPI_SUCCESS)
102       return ret;
103   }
104   if(ifBuf2) {
105     ret = checkBuf(buf2,count);
106     if(ret != MPI_SUCCESS)
107       return ret;
108   }
109   return MPI_SUCCESS;
110 }
111
112 //------------- startup -------------
113 static mpi_comm_worlds mpi_worlds;
114
115 int _mpi_nworlds; /*Accessed by ampif*/
116 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
117
118 /* ampiReducer: AMPI's generic reducer type 
119    MPI_Op is function pointer to MPI_User_function
120    so that it can be packed into AmpiOpHeader, shipped 
121    with the reduction message, and then plugged into 
122    the ampiReducer. 
123    One little trick is the ampi::recv which receives
124    the final reduction message will see additional
125    sizeof(AmpiOpHeader) bytes in the buffer before
126    any user data.                             */
127 class AmpiComplex { 
128   public: 
129     double re, im; 
130     void operator+=(const AmpiComplex &a) {
131       re+=a.re;
132       im+=a.im;
133     }
134     void operator*=(const AmpiComplex &a) {
135       double nu_re=re*a.re-im*a.im;
136       im=re*a.im+im*a.re;
137       re=nu_re;
138     }
139     int operator>(const AmpiComplex &a) {
140       CkAbort("Cannot compare complex numbers with MPI_MAX");
141       return 0;
142     }
143     int operator<(const AmpiComplex &a) {
144       CkAbort("Cannot compare complex numbers with MPI_MIN");
145       return 0;
146     }
147 };
148 typedef struct { float val; int idx; } FloatInt;
149 typedef struct { double val; int idx; } DoubleInt;
150 typedef struct { long val; int idx; } LongInt;
151 typedef struct { int val; int idx; } IntInt;
152 typedef struct { short val; int idx; } ShortInt;
153 typedef struct { long double val; int idx; } LongdoubleInt;
154 typedef struct { float val; float idx; } FloatFloat;
155 typedef struct { double val; double idx; } DoubleDouble;
156
157
158 #define MPI_OP_SWITCH(OPNAME) \
159   int i; \
160 switch (*datatype) { \
161   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
162   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
163   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
164   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
165   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
166   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
167   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
168   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
169   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
170   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
171   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
172   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
173   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
174   default: \
175            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
176   CmiAbort("Unsupported MPI datatype for MPI Op"); \
177 };\
178
179 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
180 #define MPI_OP_IMPL(type) \
181   if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
182   MPI_OP_SWITCH(MPI_MAX)
183 #undef MPI_OP_IMPL
184 }
185
186 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
187 #define MPI_OP_IMPL(type) \
188   if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
189   MPI_OP_SWITCH(MPI_MIN)
190 #undef MPI_OP_IMPL
191 }
192
193 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
194 #define MPI_OP_IMPL(type) \
195   ((type *)inoutvec)[i] += ((type *)invec)[i];
196   MPI_OP_SWITCH(MPI_SUM)
197 #undef MPI_OP_IMPL
198 }
199
200 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
201 #define MPI_OP_IMPL(type) \
202   ((type *)inoutvec)[i] *= ((type *)invec)[i];
203   MPI_OP_SWITCH(MPI_PROD)
204 #undef MPI_OP_IMPL
205 }
206
207 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
208   int i;  
209   switch (*datatype) {
210     case MPI_INT:
211     case MPI_LOGICAL:
212       for(i=0;i<(*len);i++)
213         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
214       break;
215     default:
216       ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
217       CmiAbort("exiting");
218   }
219 }
220 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
221   int i; 
222   switch (*datatype) {
223     case MPI_INT:
224       for(i=0;i<(*len);i++)
225         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
226       break;
227     case MPI_BYTE:
228       for(i=0;i<(*len);i++)
229         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
230       break;
231     default:
232       ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
233       CmiAbort("exiting");
234   }
235 }
236 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
237   int i;  
238   switch (*datatype) {
239     case MPI_INT:
240     case MPI_LOGICAL:
241       for(i=0;i<(*len);i++)
242         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
243       break;
244     default:
245       ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
246       CmiAbort("exiting");
247   }
248 }
249 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
250   int i;  
251   switch (*datatype) {
252     case MPI_INT:
253       for(i=0;i<(*len);i++)
254         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
255       break;
256     case MPI_BYTE:
257       for(i=0;i<(*len);i++)
258         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
259       break;
260     default:
261       ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
262       CmiAbort("exiting");
263   }
264 }
265 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
266   int i;  
267   switch (*datatype) {
268     case MPI_INT:
269     case MPI_LOGICAL:
270       for(i=0;i<(*len);i++)
271         ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
272       break;
273     default:
274       ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
275       CmiAbort("exiting");
276   }
277 }
278 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
279   int i;  
280   switch (*datatype) {
281     case MPI_INT:
282       for(i=0;i<(*len);i++)
283         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
284       break;
285     case MPI_BYTE:
286       for(i=0;i<(*len);i++)
287         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
288       break;
289     case MPI_UNSIGNED:
290       for(i=0;i<(*len);i++)
291         ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
292       break;
293     default:
294       ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
295       CmiAbort("exiting");
296   }
297 }
298
299 #ifndef MIN
300 #define MIN(a,b) (a < b ? a : b)
301 #endif
302
303 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
304   int i;  
305
306   switch (*datatype) {
307     case MPI_FLOAT_INT:
308       for(i=0;i<(*len);i++)
309         if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
310           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
311         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
312           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
313       break;
314     case MPI_DOUBLE_INT:
315       for(i=0;i<(*len);i++)
316         if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
317           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
318         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
319           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
320
321       break;
322     case MPI_LONG_INT:
323       for(i=0;i<(*len);i++)
324         if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
325           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
326         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
327           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
328       break;
329     case MPI_2INT:
330       for(i=0;i<(*len);i++)
331         if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
332           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
333         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
334           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
335       break;
336     case MPI_SHORT_INT:
337       for(i=0;i<(*len);i++)
338         if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
339           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
340         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
341           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
342       break;
343     case MPI_LONG_DOUBLE_INT:
344       for(i=0;i<(*len);i++)
345         if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
346           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
347         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
348           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
349       break;
350     case MPI_2FLOAT:
351       for(i=0;i<(*len);i++)
352         if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
353           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
354         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
355           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
356       break;
357     case MPI_2DOUBLE:
358       for(i=0;i<(*len);i++)
359         if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
360           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
361         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
362           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
363       break;
364     default:
365       ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
366       CmiAbort("exiting");
367   }
368 }
369 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
370   int i;  
371   switch (*datatype) {
372     case MPI_FLOAT_INT:
373       for(i=0;i<(*len);i++)
374         if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
375           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
376         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
377           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
378       break;
379     case MPI_DOUBLE_INT:
380       for(i=0;i<(*len);i++)
381         if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
382           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
383         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
384           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
385       break;
386     case MPI_LONG_INT:
387       for(i=0;i<(*len);i++)
388         if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
389           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
390         else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
391           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
392       break;
393     case MPI_2INT:
394       for(i=0;i<(*len);i++)
395         if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
396           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
397         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
398           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
399       break;
400     case MPI_SHORT_INT:
401       for(i=0;i<(*len);i++)
402         if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
403           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
404         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
405           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
406       break;
407     case MPI_LONG_DOUBLE_INT:
408       for(i=0;i<(*len);i++)
409         if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
410           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
411         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
412           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
413       break;
414     case MPI_2FLOAT:
415       for(i=0;i<(*len);i++)
416         if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
417           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
418         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
419           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
420       break;
421     case MPI_2DOUBLE:
422       for(i=0;i<(*len);i++)
423         if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
424           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
425         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
426           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
427       break;
428     default:
429       ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
430       CmiAbort("exiting");
431   }
432 }
433
434 // every msg contains a AmpiOpHeader structure before user data
435 // FIXME: non-commutative operations require messages be ordered by rank
436 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
437   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
438   MPI_Datatype dtype;
439   int szhdr, szdata, len;
440   MPI_User_function* func;
441   func = hdr->func;
442   dtype = hdr->dtype;  
443   szdata = hdr->szdata;
444   len = hdr->len;  
445   szhdr = sizeof(AmpiOpHeader);
446
447   //Assuming extent == size
448   void *ret = malloc(szhdr+szdata);
449   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
450   for(int i=1;i<nMsg;i++){
451     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
452   }
453   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
454   free(ret);
455   return retmsg;
456 }
457
458 CkReduction::reducerType AmpiReducer;
459
460 class Builtin_kvs{
461   public:
462     int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
463     Builtin_kvs(){
464       tag_ub = MPI_TAG_UB_VALUE; 
465       host = MPI_PROC_NULL;
466       io = 0;
467       wtime_is_global = 0;
468       keyval_mype = CkMyPe();
469       keyval_numpes = CkNumPes();
470       keyval_mynode = CkMyNode();
471       keyval_numnodes = CkNumNodes();
472     }
473 };
474
475 // ------------ startup support -----------
476 int _ampi_fallback_setup_count;
477 CDECL void AMPI_Setup(void);
478 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
479
480 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
481
482 /*Main routine used when missing MPI_Setup routine*/
483 CDECL void AMPI_Fallback_Main(int argc,char **argv)
484 {
485   AMPI_Main_cpp(argc,argv);
486   AMPI_Main(argc,argv);
487   FTN_NAME(MPI_MAIN,mpi_main)();
488 }
489
490 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
491 /*Startup routine used if user *doesn't* write
492   a TCHARM_User_setup routine.
493  */
494 CDECL void AMPI_Setup_Switch(void) {
495   _ampi_fallback_setup_count=0;
496   FTN_NAME(AMPI_SETUP,ampi_setup)();
497   AMPI_Setup();
498   if (_ampi_fallback_setup_count==2)
499   { //Missing AMPI_Setup in both C and Fortran:
500     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
501   }
502 }
503
504 static int nodeinit_has_been_called=0;
505 CtvDeclare(ampiParent*, ampiPtr);
506 CtvDeclare(int, ampiInitDone);
507 CtvDeclare(void*,stackBottom);
508 CtvDeclare(int, ampiFinalized);
509 CkpvDeclare(Builtin_kvs, bikvs);
510 CkpvDeclare(int,argvExtracted);
511 static int enableStreaming = 0;
512
513 CDECL long ampiCurrentStackUsage(){
514   int localVariable;
515
516   unsigned long p1 =  (unsigned long)((void*)&localVariable);
517   unsigned long p2 =  (unsigned long)(CtvAccess(stackBottom));
518
519
520   if(p1 > p2)
521     return p1 - p2;
522   else
523     return  p2 - p1;
524
525 }
526
527 FDECL void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
528   long usage = ampiCurrentStackUsage();
529   CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
530 }
531
532
533 CDECL void AMPI_threadstart(void *data);
534 static int AMPI_threadstart_idx = -1;
535
536 static void ampiNodeInit(void)
537 {
538   _mpi_nworlds=0;
539   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
540   {
541     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
542   }
543   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
544
545   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
546
547   CmiAssert(AMPI_threadstart_idx == -1);    // only initialize once
548   AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
549
550   nodeinit_has_been_called=1;
551
552    // ASSUME NO ANYTIME MIGRATION and STATIC INSERTON
553   _isAnytimeMigration = false;
554   _isStaticInsertion = true;
555 }
556
557 #if PRINT_IDLE
558 static double totalidle=0.0, startT=0.0;
559 static int beginHandle, endHandle;
560 static void BeginIdle(void *dummy,double curWallTime)
561 {
562   startT = curWallTime;
563 }
564 static void EndIdle(void *dummy,double curWallTime)
565 {
566   totalidle += curWallTime - startT;
567 }
568 #endif
569
570 /* for fortran reduction operation table to handle mapping */
571 typedef MPI_Op  MPI_Op_Array[128];
572 CtvDeclare(int, mpi_opc);
573 CtvDeclare(MPI_Op_Array, mpi_ops);
574
575 static void ampiProcInit(void){
576   CtvInitialize(ampiParent*, ampiPtr);
577   CtvInitialize(int,ampiInitDone);
578   CtvInitialize(int,ampiFinalized);
579   CtvInitialize(void*,stackBottom);
580
581
582   CtvInitialize(MPI_Op_Array, mpi_ops);
583   CtvInitialize(int, mpi_opc);
584
585   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
586   CkpvAccess(bikvs) = Builtin_kvs();
587
588   CkpvInitialize(int, argvExtracted);
589   CkpvAccess(argvExtracted) = 0;
590
591 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
592   REGISTER_AMPI
593 #endif
594   initAmpiProjections();
595
596   char **argv=CkGetArgv();
597 #if AMPI_COMLIB  
598   if(CkpvAccess(argvExtracted)==0){
599     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
600   }
601 #endif
602
603 #if AMPIMSGLOG
604   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
605   //msgLogRead = CmiGetArgFlag(argv, "+msgLogRead");
606   if (CmiGetArgIntDesc(argv,"+msgLogRead", &msgLogRank, "Re-play message processing order for AMPI")) {
607     msgLogRead = 1;
608   }
609   //CmiGetArgInt(argv, "+msgLogRank", &msgLogRank);
610   char *procs = NULL;
611   if (CmiGetArgStringDesc(argv, "+msgLogRanks", &procs, "A list of AMPI processors to record , e.g. 0,10,20-30")) {
612     msgLogRanks.set(procs);
613   }
614   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
615   if (CkMyPe() == 0) {
616     if (msgLogWrite) CmiPrintf("Writing AMPI messages of rank %s to log: %s\n", procs?procs:"", msgLogFilename);
617     if (msgLogRead) CmiPrintf("Reading AMPI messages of rank %s from log: %s\n", procs?procs:"", msgLogFilename);
618   }
619 #endif
620
621   // initBigSimTrace(1,outtiming);
622 }
623
624 #if AMPIMSGLOG
625 static inline int record_msglog(int rank){
626   return msgLogRanks.includes(rank);
627 }
628 #endif
629
630 void AMPI_Install_Idle_Timer(){
631 #if AMPI_PRINT_IDLE
632   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
633   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
634 #endif
635 }
636
637 void AMPI_Uninstall_Idle_Timer(){
638 #if AMPI_PRINT_IDLE
639   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
640   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
641 #endif
642 }
643
644 PUPfunctionpointer(MPI_MainFn)
645
646   class MPI_threadstart_t {
647     public:
648       MPI_MainFn fn;
649       MPI_threadstart_t() {}
650       MPI_threadstart_t(MPI_MainFn fn_)
651         :fn(fn_) {}
652       void start(void) {
653         char **argv=CmiCopyArgs(CkGetArgv());
654         int argc=CkGetArgc();
655
656         // Set a pointer to somewhere close to the bottom of the stack.
657         // This is used for roughly estimating the stack usage later.
658         CtvAccess(stackBottom) = &argv;
659
660 #if CMK_AMPI_FNPTR_HACK
661         AMPI_Fallback_Main(argc,argv);
662 #else
663         (fn)(argc,argv);
664 #endif
665       }
666       void pup(PUP::er &p) {
667         p|fn;
668       }
669   };
670 PUPmarshall(MPI_threadstart_t)
671
672 CDECL void AMPI_threadstart(void *data)
673 {
674   STARTUP_DEBUG("MPI_threadstart")
675     MPI_threadstart_t t;
676   pupFromBuf(data,t);
677 #if CMK_TRACE_IN_CHARM
678   if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
679 #endif
680   t.start();
681 }
682
683 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
684 {
685   STARTUP_DEBUG("ampiCreateMain")
686     int _nchunks=TCHARM_Get_num_chunks();
687   //Make a new threads array:
688   MPI_threadstart_t s(mainFn);
689   memBuf b; pupIntoBuf(b,s);
690   TCHARM_Create_data( _nchunks,AMPI_threadstart_idx,
691       b.getData(), b.getSize());
692 }
693
694 /* TCharm Semaphore ID's for AMPI startup */
695 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
696 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
697
698 static CProxy_ampiWorlds ampiWorldsGroup;
699
700 static void init_operations()
701 {
702   CtvInitialize(MPI_Op_Array, mpi_ops);
703   int i = 0;
704   MPI_Op *tab = CtvAccess(mpi_ops);
705   tab[i++] = MPI_MAX;
706   tab[i++] = MPI_MIN;
707   tab[i++] = MPI_SUM;
708   tab[i++] = MPI_PROD;
709   tab[i++] = MPI_LAND;
710   tab[i++] = MPI_BAND;
711   tab[i++] = MPI_LOR;
712   tab[i++] = MPI_BOR;
713   tab[i++] = MPI_LXOR;
714   tab[i++] = MPI_BXOR;
715   tab[i++] = MPI_MAXLOC;
716   tab[i++] = MPI_MINLOC;
717
718   CtvInitialize(int, mpi_opc);
719   CtvAccess(mpi_opc) = i;
720 }
721
722 /*
723    Called from MPI_Init, a collective initialization call:
724    creates a new AMPI array and attaches it to the current
725    set of TCHARM threads.
726  */
727 static ampi *ampiInit(char **argv)
728 {
729   FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CmiMyPe(), TCHARM_Element());)
730     if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
731   STARTUP_DEBUG("ampiInit> begin")
732
733     MPI_Comm new_world;
734   int _nchunks;
735   CkArrayOptions opts;
736   CProxy_ampiParent parent;
737   if (TCHARM_Element()==0) //the rank of a tcharm object
738   { /* I'm responsible for building the arrays: */
739     STARTUP_DEBUG("ampiInit> creating arrays")
740
741       // FIXME: Need to serialize global communicator allocation in one place.
742       //Allocate the next communicator
743       if(_mpi_nworlds == MPI_MAX_COMM_WORLDS)
744       {
745         CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
746       }
747     int new_idx=_mpi_nworlds;
748     new_world=MPI_COMM_WORLD+new_idx; // Isaac guessed there shouldn't be a +1 here
749
750     //Create and attach the ampiParent array
751     CkArrayID threads;
752     opts=TCHARM_Attach_start(&threads,&_nchunks);
753     parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
754     STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
755   }
756   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
757
758   FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
759
760     if (TCHARM_Element()==0)
761     {
762       //Make a new ampi array
763       CkArrayID empty;
764
765       ampiCommStruct worldComm(new_world,empty,_nchunks);
766       CProxy_ampi arr;
767
768
769 #if AMPI_COMLIB
770
771       ComlibInstanceHandle ciStreaming = 1;
772       ComlibInstanceHandle ciBcast = 2;
773       ComlibInstanceHandle ciAllgather = 3;
774       ComlibInstanceHandle ciAlltoall = 4;
775
776       arr=CProxy_ampi::ckNew(parent, worldComm, ciStreaming, ciBcast, ciAllgather, ciAlltoall, opts);
777
778
779       CkPrintf("Using untested comlib code in ampi.C\n");
780
781       Strategy *sStreaming = new StreamingStrategy(1,10);
782       CkAssert(ciStreaming == ComlibRegister(sStreaming));
783
784       Strategy *sBcast = new BroadcastStrategy(USE_HYPERCUBE);
785       CkAssert(ciBcast = ComlibRegister(sBcast));
786
787       Strategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
788       CkAssert(ciAllgather = ComlibRegister(sAllgather));
789
790       Strategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
791       CkAssert(ciAlltoall = ComlibRegister(sAlltoall));
792
793       CmiPrintf("Created AMPI comlib strategies in new manner\n");
794
795       // FIXME: Propogate the comlib table here
796       CkpvAccess(conv_com_object).doneCreating();
797 #else
798       arr=CProxy_ampi::ckNew(parent,worldComm,opts);
799 #endif
800
801       //Broadcast info. to the mpi_worlds array
802       // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
803       ampiCommStruct newComm(new_world,arr,_nchunks);
804       //CkPrintf("In ampiInit: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
805       if (ampiWorldsGroup.ckGetGroupID().isZero())
806         ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
807       else
808         ampiWorldsGroup.add(newComm);
809       STARTUP_DEBUG("ampiInit> arrays created")
810
811     }
812
813   // Find our ampi object:
814   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
815   CtvAccess(ampiInitDone)=1;
816   CtvAccess(ampiFinalized)=0;
817   STARTUP_DEBUG("ampiInit> complete")
818 #if CMK_BIGSIM_CHARM
819     //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
820     TRACE_BG_ADD_TAG("AMPI_START");
821 #endif
822
823   init_operations();     // initialize fortran reduction operation table
824
825   getAmpiParent()->ampiInitCallDone = 0;
826
827   CProxy_ampi cbproxy = ptr->getProxy();
828   CkCallback cb(CkIndex_ampi::allInitDone(NULL), cbproxy[0]);
829   ptr->contribute(0, NULL, CkReduction::sum_int, cb);
830
831   ampiParent *thisParent = getAmpiParent(); 
832   while(thisParent->ampiInitCallDone!=1){
833     //CkPrintf("In checking ampiInitCallDone(%d) loop at parent %d!\n", thisParent->ampiInitCallDone, thisParent->thisIndex);
834     thisParent->getTCharmThread()->stop();
835     /* 
836      * thisParent needs to be updated in case of the parent is being pupped.
837      * In such case, thisParent got changed
838      */
839     thisParent = getAmpiParent();
840   }
841
842 #ifdef CMK_BIGSIM_CHARM
843   BgSetStartOutOfCore();
844 #endif
845
846   return ptr;
847 }
848
849 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
850 class ampiWorlds : public CBase_ampiWorlds {
851   public:
852     ampiWorlds(const ampiCommStruct &nextWorld) {
853       ampiWorldsGroup=thisgroup;
854       //CkPrintf("In constructor: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
855       add(nextWorld);
856     }
857     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
858     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
859     void add(const ampiCommStruct &nextWorld) {
860       int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD); // Isaac guessed there shouldn't be a +1 after the MPI_COMM_WORLD
861       mpi_worlds[new_idx].comm=nextWorld;
862       if (_mpi_nworlds<=new_idx) _mpi_nworlds=new_idx+1;
863       STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
864     }
865 };
866
867 //-------------------- ampiParent -------------------------
868   ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
869 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
870 {
871   int barrier = 0x1234;
872   STARTUP_DEBUG("ampiParent> starting up")
873     thread=NULL;
874   worldPtr=NULL;
875   myDDT=&myDDTsto;
876   prepareCtv();
877
878   init();
879
880   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
881   AsyncEvacuate(CmiFalse);
882   //CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisProxy(thisIndex));
883   //setChkpResumeClient(cb);
884 }
885
886 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
887   thread=NULL;
888   worldPtr=NULL;
889   myDDT=&myDDTsto;
890
891   init();
892
893   AsyncEvacuate(CmiFalse);
894 }
895
896 void ampiParent::pup(PUP::er &p) {
897   ArrayElement1D::pup(p);
898   p|threads;
899   p|worldNo;           // why it was missing from here before??
900   p|worldStruct;
901   myDDT->pup(p);
902   p|splitComm;
903   p|groupComm;
904   p|groups;
905
906   //BIGSIM_OOC DEBUGGING
907   //if(!p.isUnpacking()){
908   //    CmiPrintf("ampiParent[%d] packing ampiRequestList: \n", thisIndex);
909   //    ampiReqs.print();
910   //}
911
912   p|ampiReqs;
913
914   //BIGSIM_OOC DEBUGGING
915   //if(p.isUnpacking()){
916   //    CmiPrintf("ampiParent[%d] unpacking ampiRequestList: \n", thisIndex);
917   //    ampiReqs.print();
918   //}
919
920   p|RProxyCnt;
921   p|tmpRProxy;
922   p|winStructList;
923   p|infos;
924
925   p|ampiInitCallDone;
926   if(p.isUnpacking()&&CkInRestarting()){
927     //CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisProxy(thisIndex));
928     //setChkpResumeClient(cb);
929   }
930 }
931 void ampiParent::prepareCtv(void) {
932   thread=threads[thisIndex].ckLocal();
933   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
934   CtvAccessOther(thread->getThread(),ampiPtr) = this;
935   STARTUP_DEBUG("ampiParent> found TCharm")
936 }
937
938 void ampiParent::init(){
939   CkAssert(groups.size() == 0);
940   groups.push_back(new groupStruct);
941
942 #if AMPIMSGLOG
943   if(msgLogWrite && record_msglog(thisIndex)){
944     char fname[128];
945     sprintf(fname, "%s.%d", msgLogFilename,thisIndex);
946 #if CMK_PROJECTIONS_USE_ZLIB && 0
947     fMsgLog = gzopen(fname,"wb");
948     toPUPer = new PUP::tozDisk(fMsgLog);
949 #else
950     fMsgLog = fopen(fname,"wb");
951     CmiAssert(fMsgLog != NULL);
952     toPUPer = new PUP::toDisk(fMsgLog);
953 #endif
954   }else if(msgLogRead){
955     char fname[128];
956     sprintf(fname, "%s.%d", msgLogFilename,msgLogRank);
957 #if CMK_PROJECTIONS_USE_ZLIB && 0
958     fMsgLog = gzopen(fname,"rb");
959     fromPUPer = new PUP::fromzDisk(fMsgLog);
960 #else
961     fMsgLog = fopen(fname,"rb");
962     CmiAssert(fMsgLog != NULL);
963     fromPUPer = new PUP::fromDisk(fMsgLog);
964 #endif
965     CkPrintf("AMPI> opened message log file: %s for replay\n", fname);
966   }
967 #endif
968 }
969
970 void ampiParent::finalize(){
971 #if AMPIMSGLOG
972   if(msgLogWrite && record_msglog(thisIndex)){
973     delete toPUPer;
974 #if CMK_PROJECTIONS_USE_ZLIB && 0
975     gzclose(fMsgLog);
976 #else
977     fclose(fMsgLog);
978 #endif
979   }else if(msgLogRead){
980     delete fromPUPer;
981 #if CMK_PROJECTIONS_USE_ZLIB && 0
982     gzclose(fMsgLog);
983 #else
984     fclose(fMsgLog);
985 #endif
986   }
987 #endif
988 }
989
990 void ampiParent::ckJustMigrated(void) {
991   ArrayElement1D::ckJustMigrated();
992   prepareCtv();
993 }
994
995 void ampiParent::ckJustRestored(void) {
996   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
997     ArrayElement1D::ckJustRestored();
998   prepareCtv();
999
1000   //BIGSIM_OOC DEBUGGING
1001   //CkPrintf("In ampiParent[%d] with TCharm thread=%p:   ",thisIndex, thread);
1002   //CthPrintThdMagic(thread->getTid()); 
1003 }
1004
1005 ampiParent::~ampiParent() {
1006   STARTUP_DEBUG("ampiParent> destructor called");
1007   finalize();
1008 }
1009
1010 //Children call this when they are first created or just migrated
1011 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
1012 {
1013   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
1014
1015   if (s.getComm()>=MPI_COMM_WORLD)
1016   { //We now have our COMM_WORLD-- register it
1017     //Note that split communicators don't keep a raw pointer, so
1018     //they don't need to re-register on migration.
1019     if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
1020     worldPtr=ptr;
1021         s.getIndices();
1022     worldStruct=s;
1023
1024     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
1025     CkVec<int> _indices;
1026     _indices.push_back(thisIndex);
1027     selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
1028   }
1029
1030   if (!forMigration)
1031   { //Register the new communicator:
1032     MPI_Comm comm = s.getComm();
1033     STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
1034       if (comm>=MPI_COMM_WORLD) { 
1035         // Pass the new ampi to the waiting ampiInit
1036         thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
1037       } else if (isSplit(comm)) {
1038         splitChildRegister(s);
1039       } else if (isGroup(comm)) {
1040         groupChildRegister(s);
1041       } else if (isCart(comm)) {
1042         cartChildRegister(s);
1043       } else if (isGraph(comm)) {
1044         graphChildRegister(s);
1045       } else if (isInter(comm)) {
1046         interChildRegister(s);
1047       } else if (isIntra(comm)) {
1048         intraChildRegister(s);
1049       }else
1050         CkAbort("ampiParent recieved child with bad communicator");
1051   }
1052
1053   return thread;
1054 }
1055
1056 //BIGSIM_OOC DEBUGGING
1057 //Move the comm2ampi from inline to normal function for the sake of debugging
1058 /*ampi *ampiParent::comm2ampi(MPI_Comm comm){
1059 //BIGSIM_OOC DEBUGGING
1060 //CmiPrintf("%d, in ampiParent::comm2ampi, comm=%d\n", thisIndex, comm);
1061 if (comm==MPI_COMM_WORLD) return worldPtr;
1062 if (comm==MPI_COMM_SELF) return worldPtr;
1063 if (comm==worldNo) return worldPtr;
1064 if (isSplit(comm)) {
1065 const ampiCommStruct &st=getSplit(comm);
1066 return st.getProxy()[thisIndex].ckLocal();
1067 }
1068 if (isGroup(comm)) {
1069 const ampiCommStruct &st=getGroup(comm);
1070 return st.getProxy()[thisIndex].ckLocal();
1071 }
1072 if (isCart(comm)) {
1073 const ampiCommStruct &st = getCart(comm);
1074 return st.getProxy()[thisIndex].ckLocal();
1075 }
1076 if (isGraph(comm)) {
1077 const ampiCommStruct &st = getGraph(comm);
1078 return st.getProxy()[thisIndex].ckLocal();
1079 }
1080 if (isInter(comm)) {
1081 const ampiCommStruct &st=getInter(comm);
1082 return st.getProxy()[thisIndex].ckLocal();
1083 }
1084 if (isIntra(comm)) {
1085 const ampiCommStruct &st=getIntra(comm);
1086 return st.getProxy()[thisIndex].ckLocal();
1087 }
1088 if (comm>MPI_COMM_WORLD) return worldPtr; //Use MPI_WORLD ampi for cross-world messages:
1089 CkAbort("Invalid communicator used!");
1090 return NULL;
1091 }*/
1092
1093 // reduction client data - preparation for checkpointing
1094 class ckptClientStruct {
1095   public:
1096     const char *dname;
1097     ampiParent *ampiPtr;
1098     ckptClientStruct(const char *s, ampiParent *a): dname(s), ampiPtr(a) {}
1099 };
1100
1101 static void checkpointClient(void *param,void *msg)
1102 {
1103   ckptClientStruct *client = (ckptClientStruct*)param;
1104   const char *dname = client->dname;
1105   ampiParent *ampiPtr = client->ampiPtr;
1106   ampiPtr->Checkpoint(strlen(dname), dname);
1107   delete client;
1108 }
1109
1110 void ampiParent::startCheckpoint(const char* dname){
1111   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
1112   if (thisIndex==0) {
1113     ckptClientStruct *clientData = new ckptClientStruct(dname, this);
1114     CkCallback cb(checkpointClient, clientData);
1115     contribute(0, NULL, CkReduction::sum_int, cb);
1116   }
1117   else
1118     contribute(0, NULL, CkReduction::sum_int);
1119
1120 #if 0
1121 #if CMK_BIGSIM_CHARM
1122   void *curLog;         // store current log in timeline
1123   _TRACE_BG_TLINE_END(&curLog);
1124   TRACE_BG_AMPI_SUSPEND();
1125 #endif
1126 #endif
1127
1128   thread->stop();
1129
1130 #if CMK_BIGSIM_CHARM
1131   // _TRACE_BG_BEGIN_EXECUTE_NOMSG("CHECKPOINT_RESUME", &curLog);
1132   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
1133 #endif
1134 }
1135
1136 void ampiParent::ChkpSync(){
1137         AtChkpSync();
1138         CkPrintf("[%d] ampi parent go to sync\n",CkMyPe());
1139         thread->stop();
1140 }
1141
1142 void ampiParent::Checkpoint(int len, const char* dname){
1143   if (len == 0) {
1144     // memory checkpoint
1145     CmiPrintf("[%d]ampi start checkpoint\n",CmiMyPartition());
1146     fflush(stdout);
1147     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1148     CkStartMemCheckpoint(cb);
1149   }
1150   else {
1151     char dirname[256];
1152     strncpy(dirname,dname,len);
1153     dirname[len]='\0';
1154     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1155     CkStartCheckpoint(dirname,cb);
1156   }
1157 }
1158 void ampiParent::ResumeThread(void){
1159   //thread->resume();
1160   thread->start();
1161   if(CmiMyPartition()==0)
1162   CmiPrintf("[%d][%d] ampi parent resume\n",CmiMyPartition(),CkMyPe());
1163   fflush(stdout);
1164 }
1165
1166 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
1167     int *keyval, void* extra_state){
1168   KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
1169   int idx = kvlist.size();
1170   kvlist.resize(idx+1);
1171   kvlist[idx] = newnode;
1172   *keyval = idx;
1173   return 0;
1174 }
1175   int ampiParent::freeKeyval(int *keyval){
1176     if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
1177       return -1;
1178     delete kvlist[*keyval];
1179     kvlist[*keyval] = NULL;
1180     *keyval = MPI_KEYVAL_INVALID;
1181     return 0;
1182   }
1183
1184   int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
1185     if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1186       return -1;
1187     ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1188     // Enlarge the keyval list:
1189     while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
1190     cs.getKeyvals()[keyval]=attribute_val;
1191     return 0;
1192   }
1193
1194 int ampiParent::kv_is_builtin(int keyval) {
1195   switch(keyval) {
1196     case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
1197     case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
1198     case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
1199     case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
1200     case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
1201     case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
1202     case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
1203     case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
1204     default: return 0;
1205   };
1206 }
1207
1208 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
1209   *flag = false;
1210   if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
1211     *flag=true;
1212     *(int **)attribute_val = kv_builtin_storage;  // all default tags are ints
1213     return 0;
1214   }
1215   if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1216     return -1; /* invalid keyval */
1217
1218   ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1219   if (keyval>=cs.getKeyvals().size())  
1220     return 0; /* we don't have a value yet */
1221   if (cs.getKeyvals()[keyval]==0)
1222     return 0; /* we had a value, but now it's zero */
1223   /* Otherwise, we have a good value */
1224   *flag = true;
1225   *(void **)attribute_val = cs.getKeyvals()[keyval];
1226   return 0;
1227 }
1228 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
1229   /* no way to delete an attribute: just overwrite it with 0 */
1230   return putAttr(comm,keyval,0);
1231 }
1232
1233 //----------------------- ampi -------------------------
1234 void ampi::init(void) {
1235   parent=NULL;
1236   thread=NULL;
1237   msgs=NULL;
1238   posted_ireqs=NULL;
1239   resumeOnRecv=false;
1240   AsyncEvacuate(CmiFalse);
1241 }
1242
1243 ampi::ampi()
1244 {
1245   /* this constructor only exists so we can create an empty array during split */
1246   CkAbort("Default ampi constructor should never be called");
1247 }
1248
1249   ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
1250 :parentProxy(parent_)
1251 {
1252   init();
1253
1254   myComm=s; myComm.setArrayID(thisArrayID);
1255   myRank=myComm.getRankForIndex(thisIndex);
1256
1257   findParent(false);
1258
1259   msgs = CmmNew();
1260   posted_ireqs = CmmNew();
1261   nbcasts = 0;
1262
1263 #if AMPI_COMLIB
1264   comlibProxy = thisProxy; // Will later be associated with comlib
1265 #endif
1266
1267   seqEntries=parent->ckGetArraySize();
1268   oorder.init (seqEntries);
1269 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1270   if(thisIndex == 0){
1271     /*      CkAssert(CkMyPe() == 0);
1272      *              CkGroupID _myManagerGID = thisProxy.ckGetArrayID();     
1273      *                      CkAssert(numElements);
1274      *                              printf("ampi::ampi setting numInitial to %d on manager at gid %d \n",numElements,_myManagerGID.idx);
1275      *                                      CkArray *_myManager = thisProxy.ckLocalBranch();
1276      *                                              _myManager->setNumInitial(numElements);*/
1277   }
1278 #endif
1279 }
1280
1281 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
1282     ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
1283 :parentProxy(parent_)
1284 {
1285 #if AMPI_COMLIB
1286   ciStreaming = ciStreaming_;
1287   ciBcast = ciBcast_;
1288   ciAllgather = ciAllgather_;
1289   ciAlltoall = ciAlltoall_;
1290
1291   init();
1292
1293   myComm=s; myComm.setArrayID(thisArrayID);
1294   myRank=myComm.getRankForIndex(thisIndex);
1295
1296   findParent(false);
1297
1298   msgs = CmmNew();
1299   posted_ireqs = CmmNew();
1300   nbcasts = 0;
1301
1302   comlibProxy = thisProxy;
1303   CmiPrintf("comlibProxy created as a copy of thisProxy, no associate call\n");
1304
1305 #if AMPI_COMLIB
1306   //  ComlibAssociateProxy(ciAlltoall, comlibProxy);
1307 #endif
1308
1309   seqEntries=parent->ckGetArraySize();
1310   oorder.init (seqEntries);
1311 #endif
1312 }
1313
1314 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
1315 {
1316   init();
1317
1318   seqEntries=-1;
1319 }
1320
1321 void ampi::ckJustMigrated(void)
1322 {
1323   findParent(true);
1324   ArrayElement1D::ckJustMigrated();
1325 }
1326
1327 void ampi::ckJustRestored(void)
1328 {
1329   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
1330     findParent(true);
1331   ArrayElement1D::ckJustRestored();
1332
1333   //BIGSIM_OOC DEBUGGING
1334   //CkPrintf("In ampi[%d] thread[%p]:   ", thisIndex, thread);
1335   //CthPrintThdMagic(thread->getTid()); 
1336 }
1337
1338 void ampi::findParent(bool forMigration) {
1339   STARTUP_DEBUG("ampi> finding my parent")
1340     parent=parentProxy[thisIndex].ckLocal();
1341   if (parent==NULL) CkAbort("AMPI can't find its parent!");
1342   thread=parent->registerAmpi(this,myComm,forMigration);
1343   if (thread==NULL) CkAbort("AMPI can't find its thread!");
1344   //    printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
1345 }
1346
1347 //The following method should be called on the first element of the
1348 //ampi array
1349 void ampi::allInitDone(CkReductionMsg *m){
1350   FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
1351     thisProxy.setInitDoneFlag();
1352   delete m;
1353 }
1354
1355 void ampi::setInitDoneFlag(){
1356   //CkPrintf("ampi[%d]::setInitDone called!\n", thisIndex);
1357   parent->ampiInitCallDone=1;
1358   parent->getTCharmThread()->start();
1359 }
1360
1361 static void cmm_pup_ampi_message(pup_er p,void **msg) {
1362   CkPupMessage(*(PUP::er *)p,msg,1);
1363   if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
1364   //    printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
1365 }
1366
1367 static void cmm_pup_posted_ireq(pup_er p,void **msg) {
1368
1369   pup_int(p, (int *)msg);
1370
1371   /*    if(pup_isUnpacking(p)){
1372   // *msg = new IReq;
1373   //when unpacking, nothing is needed to do since *msg is an index
1374   //(of type integer) to the ampiParent::ampiReqs (the AmpiRequestList)
1375   }
1376   if(!pup_isUnpacking(p)){
1377   AmpiRequestList *reqL = getReqs();
1378   int retIdx = reqL->findRequestIndex((IReq *)*msg);
1379   if(retIdx==-1){
1380   CmiAbort("An AmpiRequest instance should be found for an instance in posted_ireq!\n");
1381   }
1382   pup_int(p, retIdx)
1383   }
1384    */
1385   //    ((IReq *)*msg)->pup(*(PUP::er *)p);
1386
1387   //    if (pup_isDeleting(p)) delete (IReq *)*msg;
1388   //    printf("[%d] pupping postd irequests %p \n",CkMyPe(),*msg);
1389 }
1390
1391 void ampi::pup(PUP::er &p)
1392 {
1393   if(!p.isUserlevel())
1394     ArrayElement1D::pup(p);//Pack superclass
1395   p|parentProxy;
1396   p|myComm;
1397   p|myRank;
1398   p|nbcasts;
1399   p|tmpVec;
1400   p|remoteProxy;
1401   p|resumeOnRecv;
1402 #if AMPI_COMLIB
1403   p|comlibProxy;
1404   p|ciStreaming;
1405   p|ciBcast;
1406   p|ciAllgather;
1407   p|ciAlltoall;
1408
1409   if(p.isUnpacking()){
1410     //    ciStreaming.setSourcePe();
1411     //    ciBcast.setSourcePe();
1412     //    ciAllgather.setSourcePe();
1413     //    ciAlltoall.setSourcePe();
1414   }
1415 #endif
1416
1417   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
1418
1419   //BIGSIM_OOC DEBUGGING
1420   //if(!p.isUnpacking()){
1421   //CkPrintf("ampi[%d]::packing: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1422   //}
1423
1424   posted_ireqs = CmmPup((pup_er)&p, posted_ireqs, cmm_pup_posted_ireq);
1425
1426   p|seqEntries;
1427   p|oorder;
1428 }
1429
1430 ampi::~ampi()
1431 {
1432   if (CkInRestarting() || _BgOutOfCoreFlag==1) {
1433     // in restarting, we need to flush messages
1434     int tags[3], sts[3];
1435     tags[0] = tags[1] = tags[2] = CmmWildCard;
1436     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1437     while (msg) {
1438       delete msg;
1439       msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1440     }
1441   }
1442
1443   CmmFree(msgs);
1444   CmmFreeAll(posted_ireqs);
1445 }
1446
1447 //------------------------ Communicator Splitting ---------------------
1448 class ampiSplitKey {
1449   public:
1450     int nextSplitComm;
1451     int color; //New class of processes we'll belong to
1452     int key; //To determine rank in new ordering
1453     int rank; //Rank in old ordering
1454     ampiSplitKey() {}
1455     ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
1456       :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
1457 };
1458
1459 /* "type" may indicate whether call is for a cartesian topology etc. */
1460
1461 void ampi::split(int color,int key,MPI_Comm *dest, int type)
1462 {
1463 #if CMK_BIGSIM_CHARM
1464   void *curLog;         // store current log in timeline
1465   _TRACE_BG_TLINE_END(&curLog);
1466   //  TRACE_BG_AMPI_SUSPEND();
1467 #endif
1468   if (type == CART_TOPOL) {
1469     ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
1470     int rootIdx=myComm.getIndexForRank(0);
1471     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1472     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1473
1474     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1475     MPI_Comm newComm=parent->getNextCart()-1;
1476     *dest=newComm;
1477   } else {
1478     ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
1479     int rootIdx=myComm.getIndexForRank(0);
1480     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1481     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1482
1483     thread->suspend(); //Resumed by ampiParent::splitChildRegister
1484     MPI_Comm newComm=parent->getNextSplit()-1;
1485     *dest=newComm;
1486   }
1487 #if CMK_BIGSIM_CHARM
1488   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "SPLIT_RESUME", curLog);
1489   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("SPLIT_RESUME", &curLog);
1490   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
1491 #endif
1492 }
1493
1494 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
1495   const ampiSplitKey *a=(const ampiSplitKey *)a_;
1496   const ampiSplitKey *b=(const ampiSplitKey *)b_;
1497   if (a->color!=b->color) return a->color-b->color;
1498   if (a->key!=b->key) return a->key-b->key;
1499   return a->rank-b->rank;
1500 }
1501
1502 void ampi::splitPhase1(CkReductionMsg *msg)
1503 {
1504   //Order the keys, which orders the ranks properly:
1505   int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1506   ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1507   if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1508   qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1509
1510   MPI_Comm newComm = -1;
1511   for(int i=0;i<nKeys;i++)
1512     if(keys[i].nextSplitComm>newComm)
1513       newComm = keys[i].nextSplitComm;
1514
1515   //Loop over the sorted keys, which gives us the new arrays:
1516   int lastColor=keys[0].color-1; //The color we're building an array for
1517   CProxy_ampi lastAmpi; //The array for lastColor
1518   int lastRoot=0; //C value for new rank 0 process for latest color
1519   ampiCommStruct lastComm; //Communicator info. for latest color
1520   for (int c=0;c<nKeys;c++) {
1521     if (keys[c].color!=lastColor)
1522     { //Hit a new color-- need to build a new communicator and array
1523       lastColor=keys[c].color;
1524       lastRoot=c;
1525       CkArrayOptions opts;
1526       opts.bindTo(parentProxy);
1527       opts.setNumInitial(0);
1528       CkArrayID unusedAID; ampiCommStruct unusedComm;
1529       lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1530       lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1531
1532       CkVec<int> indices; //Maps rank to array indices for new arrau
1533       for (int i=c;i<nKeys;i++) {
1534         if (keys[i].color!=lastColor) break; //Done with this color
1535         int idx=myComm.getIndexForRank(keys[i].rank);
1536         indices.push_back(idx);
1537       }
1538
1539       //FIXME: create a new communicator for each color, instead of
1540       // (confusingly) re-using the same MPI_Comm number for each.
1541       lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1542     }
1543     int newRank=c-lastRoot;
1544     int newIdx=lastComm.getIndexForRank(newRank);
1545
1546     //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1547     lastAmpi[newIdx].insert(parentProxy,lastComm);
1548   }
1549
1550   delete msg;
1551 }
1552
1553 //...newly created array elements register with the parent, which calls:
1554 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1555   int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1556   if (splitComm.size()<=idx) splitComm.resize(idx+1);
1557   splitComm[idx]=new ampiCommStruct(s);
1558   thread->resume(); //Matches suspend at end of ampi::split
1559 }
1560
1561 //-----------------create communicator from group--------------
1562 // The procedure is like that of comm_split very much,
1563 // so the code is shamelessly copied from above
1564 //   1. reduction to make sure all members have called
1565 //   2. the root in the old communicator create the new array
1566 //   3. ampiParent::register is called to register new array as new comm
1567 class vecStruct {
1568   public:
1569     int nextgroup;
1570     groupStruct vec;
1571     vecStruct():nextgroup(-1){}
1572     vecStruct(int nextgroup_, groupStruct vec_)
1573       : nextgroup(nextgroup_), vec(vec_) { }
1574 };
1575
1576 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1577   int rootIdx=vec[0];
1578   tmpVec = vec;
1579   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1580   MPI_Comm nextgroup = parent->getNextGroup();
1581   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1582
1583   if(getPosOp(thisIndex,vec)>=0){
1584     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1585     MPI_Comm retcomm = parent->getNextGroup()-1;
1586     *newcomm = retcomm;
1587   }else{
1588     *newcomm = MPI_COMM_NULL;
1589   }
1590 }
1591
1592 void ampi::commCreatePhase1(CkReductionMsg *msg){
1593   MPI_Comm *nextGroupComm = (int *)msg->getData();
1594
1595   CkArrayOptions opts;
1596   opts.bindTo(parentProxy);
1597   opts.setNumInitial(0);
1598   CkArrayID unusedAID;
1599   ampiCommStruct unusedComm;
1600   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1601   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1602
1603   groupStruct indices = tmpVec;
1604   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1605   for(int i=0;i<indices.size();i++){
1606     int newIdx=indices[i];
1607     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1608   }
1609   delete msg;
1610 }
1611
1612 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1613   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1614   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1615   groupComm[idx]=new ampiCommStruct(s);
1616   thread->resume(); //Matches suspend at end of ampi::split
1617 }
1618
1619 /* Virtual topology communicator creation */
1620 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1621   int rootIdx=vec[0];
1622   tmpVec = vec;
1623   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1624
1625   MPI_Comm nextcart = parent->getNextCart();
1626   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1627
1628   if(getPosOp(thisIndex,vec)>=0){
1629     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1630     MPI_Comm retcomm = parent->getNextCart()-1;
1631     *newcomm = retcomm;
1632   }else
1633     *newcomm = MPI_COMM_NULL;
1634 }
1635
1636 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1637   MPI_Comm *nextCartComm = (int *)msg->getData();
1638
1639   CkArrayOptions opts;
1640   opts.bindTo(parentProxy);
1641   opts.setNumInitial(0);
1642   CkArrayID unusedAID;
1643   ampiCommStruct unusedComm;
1644   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1645   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1646
1647   groupStruct indices = tmpVec;
1648   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1649       size(),indices);
1650   for(int i=0;i<indices.size();i++){
1651     int newIdx=indices[i];
1652     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1653   }
1654   delete msg;
1655 }
1656
1657 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1658   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1659   if (cartComm.size()<=idx) {
1660     cartComm.resize(idx+1);
1661     cartComm.length()=idx+1;
1662   }
1663   cartComm[idx]=new ampiCommStruct(s);
1664   thread->resume(); //Matches suspend at end of ampi::cartCreate
1665 }
1666
1667 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1668   int rootIdx=vec[0];
1669   tmpVec = vec;
1670   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1671       myComm.getProxy());
1672   MPI_Comm nextgraph = parent->getNextGraph();
1673   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1674
1675   if(getPosOp(thisIndex,vec)>=0){
1676     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1677     MPI_Comm retcomm = parent->getNextGraph()-1;
1678     *newcomm = retcomm;
1679   }else
1680     *newcomm = MPI_COMM_NULL;
1681 }
1682
1683 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1684   MPI_Comm *nextGraphComm = (int *)msg->getData();
1685
1686   CkArrayOptions opts;
1687   opts.bindTo(parentProxy);
1688   opts.setNumInitial(0);
1689   CkArrayID unusedAID;
1690   ampiCommStruct unusedComm;
1691   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1692   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1693
1694   groupStruct indices = tmpVec;
1695   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1696       .size(),indices);
1697   for(int i=0;i<indices.size();i++){
1698     int newIdx=indices[i];
1699     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1700   }
1701   delete msg;
1702 }
1703
1704 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1705   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1706   if (graphComm.size()<=idx) {
1707     graphComm.resize(idx+1);
1708     graphComm.length()=idx+1;
1709   }
1710   graphComm[idx]=new ampiCommStruct(s);
1711   thread->resume(); //Matches suspend at end of ampi::graphCreate
1712 }
1713
1714 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1715   if(thisIndex==root) { // not everybody gets the valid rvec
1716     tmpVec = rvec;
1717   }
1718   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1719   MPI_Comm nextinter = parent->getNextInter();
1720   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1721
1722   thread->suspend(); //Resumed by ampiParent::interChildRegister
1723   MPI_Comm newcomm=parent->getNextInter()-1;
1724   *ncomm=newcomm;
1725 }
1726
1727 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1728   MPI_Comm *nextInterComm = (int *)msg->getData();
1729
1730   groupStruct lgroup = myComm.getIndices();
1731   CkArrayOptions opts;
1732   opts.bindTo(parentProxy);
1733   opts.setNumInitial(0);
1734   CkArrayID unusedAID;
1735   ampiCommStruct unusedComm;
1736   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1737   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1738
1739   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1740   for(int i=0;i<lgroup.size();i++){
1741     int newIdx=lgroup[i];
1742     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1743   }
1744
1745   parentProxy[0].ExchangeProxy(newAmpi);
1746   delete msg;
1747 }
1748
1749 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1750   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1751   if (interComm.size()<=idx) interComm.resize(idx+1);
1752   interComm[idx]=new ampiCommStruct(s);
1753   //thread->resume(); // don't resume it yet, till parent set remote proxy
1754 }
1755
1756 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1757   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1758     groupStruct lvec = myComm.getIndices();
1759     groupStruct rvec = myComm.getRemoteIndices();
1760     int rsize = rvec.size();
1761     tmpVec = lvec;
1762     for(int i=0;i<rsize;i++)
1763       tmpVec.push_back(rvec[i]);
1764     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1765   }else{
1766     tmpVec.resize(0);
1767   }
1768
1769   int rootIdx=myComm.getIndexForRank(0);
1770   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1771   MPI_Comm nextintra = parent->getNextIntra();
1772   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1773
1774   thread->suspend(); //Resumed by ampiParent::interChildRegister
1775   MPI_Comm newcomm=parent->getNextIntra()-1;
1776   *ncomm=newcomm;
1777 }
1778
1779 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1780   if(tmpVec.size()==0) { delete msg; return; }
1781   MPI_Comm *nextIntraComm = (int *)msg->getData();
1782   CkArrayOptions opts;
1783   opts.bindTo(parentProxy);
1784   opts.setNumInitial(0);
1785   CkArrayID unusedAID;
1786   ampiCommStruct unusedComm;
1787   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1788   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1789
1790   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1791   for(int i=0;i<tmpVec.size();i++){
1792     int newIdx=tmpVec[i];
1793     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1794   }
1795   delete msg;
1796 }
1797
1798 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1799   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1800   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1801   intraComm[idx]=new ampiCommStruct(s);
1802   thread->resume(); //Matches suspend at end of ampi::split
1803 }
1804
1805 //------------------------ communication -----------------------
1806 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1807 {
1808   if (universeNo>MPI_COMM_WORLD) {
1809     int worldDex=universeNo-MPI_COMM_WORLD-1;
1810     if (worldDex>=_mpi_nworlds)
1811       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1812     return mpi_worlds[worldDex].comm;
1813   }
1814   CkAbort("Bad communicator passed to universeComm2CommStruct");
1815   return mpi_worlds[0].comm; // meaningless return
1816 }
1817
1818 void ampi::block(void){
1819   thread->suspend();
1820 }
1821
1822 void ampi::yield(void){
1823   thread->schedule();
1824 }
1825
1826 void ampi::unblock(void){
1827   thread->resume();
1828 }
1829
1830   void ampi::ssend_ack(int sreq_idx){
1831     if (sreq_idx == 1)
1832       thread->resume();           // MPI_Ssend
1833     else {
1834       sreq_idx -= 2;              // start from 2
1835       AmpiRequestList *reqs = &(parent->ampiReqs);
1836       SReq *sreq = (SReq *)(*reqs)[sreq_idx];
1837       sreq->statusIreq = true;
1838       if (resumeOnRecv) {
1839         thread->resume();
1840       }
1841     }
1842   }
1843
1844   void
1845 ampi::generic(AmpiMsg* msg)
1846 {
1847   MSG_ORDER_DEBUG(
1848       CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1849         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1850       )
1851 #if CMK_BIGSIM_CHARM
1852     TRACE_BG_ADD_TAG("AMPI_generic");
1853   msg->event = NULL;
1854 #endif
1855
1856   int sync = UsrToEnv(msg)->getRef();
1857   int srcIdx;
1858   if (sync)  srcIdx = msg->srcIdx;
1859
1860   //    AmpiMsg *msgcopy = msg;
1861   if(msg->seq != -1) {
1862     int srcIdx=msg->srcIdx;
1863     int n=oorder.put(srcIdx,msg);
1864     if (n>0) { // This message was in-order
1865       inorder(msg);
1866       if (n>1) { // It enables other, previously out-of-order messages
1867         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1868           inorder(msg);
1869         }
1870       }
1871     }
1872   } else { //Cross-world or system messages are unordered
1873     inorder(msg);
1874   }
1875
1876   // msg may be free'ed from calling inorder()
1877   if (sync>0) {         // send an ack to sender
1878     CProxy_ampi pa(thisArrayID);
1879     pa[srcIdx].ssend_ack(sync);
1880   }
1881
1882   if(resumeOnRecv){
1883     //CkPrintf("Calling TCharm::resume at ampi::generic!\n");
1884     thread->resume();
1885   }
1886 }
1887
1888 inline static AmpiRequestList *getReqs(void); 
1889
1890   void
1891 ampi::inorder(AmpiMsg* msg)
1892 {
1893   MSG_ORDER_DEBUG(
1894       CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1895         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1896       )
1897     // check posted recvs
1898     int tags[3], sts[3];
1899   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1900   IReq *ireq = NULL;
1901   if (CpvAccess(CmiPICMethod) != 2) {
1902 #if 0
1903     //IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1904     ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1905 #else
1906 #if CMK_BIGSIM_CHARM
1907     _TRACE_BG_TLINE_END(&msg->event);    // store current log
1908     msg->eventPe = CmiMyPe();
1909 #endif
1910     //in case ampi has not initialized and posted_ireqs are only inserted 
1911     //at AMPI_Irecv (MPI_Irecv)
1912     AmpiRequestList *reqL = &(parent->ampiReqs);
1913     //When storing the req index, it's 1-based. The reason is stated in the comments
1914     //in AMPI_Irecv function.
1915     int ireqIdx = (int)((long)CmmGet(posted_ireqs, 3, tags, sts));
1916     if(reqL->size()>0 && ireqIdx>0)
1917       ireq = (IReq *)(*reqL)[ireqIdx-1];
1918     //CkPrintf("[%d] ampi::inorder, ireqIdx=%d\n", thisIndex, ireqIdx);
1919 #endif
1920     //CkPrintf("[%d] ampi::inorder, ireq=%p\n", thisIndex, ireq);
1921     if (ireq) { // receive posted
1922       ireq->receive(this, msg);
1923       // Isaac changed this so that the IReq stores the tag when receiving the message, 
1924       // instead of using this user supplied tag which could be MPI_ANY_TAG
1925       // Formerly the following line was not commented out:
1926       //ireq->tag = sts[0];         
1927       //ireq->src = sts[1];
1928       //ireq->comm = sts[2];
1929     } else {
1930       CmmPut(msgs, 3, tags, msg);
1931     }
1932   }
1933   else
1934     CmmPut(msgs, 3, tags, msg);
1935 }
1936
1937 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
1938 {
1939   int tags[3];
1940   tags[0] = t; tags[1] = s; tags[2] = comm;
1941   AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1942   return msg;
1943 }
1944
1945 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1946     int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm, int sync)
1947 {
1948   CkDDT_DataType *ddt = getDDT()->getType(type);
1949   int len = ddt->getSize(count);
1950   int sIdx=thisIndex;
1951   int seq = -1;
1952   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_ATA_SEQ_TAG) //Not cross-module: set seqno
1953     seq = oorder.nextOutgoing(destIdx);
1954   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1955   if (sync) UsrToEnv(msg)->setRef(sync);
1956   TCharm::activateVariable(buf);
1957   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1958   TCharm::deactivateVariable(buf);
1959   return msg;
1960 }
1961
1962 #if AMPI_COMLIB
1963   void
1964 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1965 {
1966   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1967 }
1968 #endif
1969
1970   void
1971 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, int sync)
1972 {
1973 #if CMK_TRACE_IN_CHARM
1974   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
1975 #endif
1976
1977 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1978   MPI_Comm disComm = myComm.getComm();
1979   ampi *dis = getAmpiInstance(disComm);
1980   CpvAccess(_currentObj) = dis;
1981 #endif
1982
1983   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1984   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),sync);
1985
1986 #if CMK_TRACE_IN_CHARM
1987   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
1988 #endif
1989
1990   if (sync == 1) {
1991     // waiting for receiver side
1992     resumeOnRecv = false;            // so no one else awakes it
1993     block();
1994   }
1995 }
1996
1997   void
1998 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1999 {
2000   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
2001   memcpy(msg->data, buf, len);
2002   CProxy_ampi pa(aid);
2003   pa[idx].generic(msg);
2004 }
2005
2006   void
2007 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy, int sync)
2008 {
2009   if(rank==MPI_PROC_NULL) return;
2010   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2011   int destIdx = dest.getIndexForRank(rank);
2012   if(isInter()){
2013     sRank = parent->thisIndex;
2014     destcomm = MPI_COMM_FIRST_INTER;
2015     destIdx = dest.getIndexForRemoteRank(rank);
2016     arrproxy = remoteProxy;
2017   }
2018   MSG_ORDER_DEBUG(
2019       CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
2020       )
2021
2022     arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm,sync));
2023
2024 #if 0
2025 #if CMK_TRACE_ENABLED
2026   int size=0;
2027   MPI_Type_size(type,&size);
2028   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
2029 #endif
2030 #endif
2031 }
2032
2033   int
2034 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
2035 {
2036   CkDDT_DataType *ddt = getDDT()->getType(type);
2037   int len = ddt->getSize(count);
2038
2039   if(msg->length < len){ // only at rare case shall we reset count by using divide
2040     count = msg->length/(ddt->getSize(1));
2041   }
2042
2043   TCharm::activateVariable(buf);
2044   if (t==MPI_REDUCE_TAG) {      // reduction msg
2045     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
2046   } else {
2047     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
2048   }
2049   TCharm::deactivateVariable(buf);
2050   return 0;
2051 }
2052
2053   int
2054 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
2055 {
2056   MPI_Comm disComm = myComm.getComm();
2057   if(s==MPI_PROC_NULL) {
2058     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
2059     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
2060     ((MPI_Status *)sts)->MPI_LENGTH = 0;
2061     return 0;
2062   }
2063 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
2064   _LOG_E_END_AMPI_PROCESSING(thisIndex)
2065 #endif
2066 #if CMK_BIGSIM_CHARM
2067    void *curLog;                // store current log in timeline
2068   _TRACE_BG_TLINE_END(&curLog);
2069   //  TRACE_BG_AMPI_SUSPEND();
2070 #if CMK_TRACE_IN_CHARM
2071   if(CpvAccess(traceOn)) traceSuspend();
2072 #endif
2073 #endif
2074
2075   if(isInter()){
2076     s = myComm.getIndexForRemoteRank(s);
2077     comm = MPI_COMM_FIRST_INTER;
2078   }
2079
2080   int tags[3];
2081   AmpiMsg *msg = 0;
2082
2083   MSG_ORDER_DEBUG(
2084       CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
2085       )
2086
2087   ampi *dis = getAmpiInstance(disComm);
2088 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2089   //  dis->yield();
2090   //  processRemoteMlogMessages();
2091 #endif
2092   int dosuspend = 0;
2093   while(1) {
2094     //This is done to take into account the case in which an ampi 
2095     // thread has migrated while waiting for a message
2096     tags[0] = t; tags[1] = s; tags[2] = comm;
2097     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
2098     if (msg) break;
2099     dis->resumeOnRecv=true;
2100     dis->thread->suspend();
2101     dosuspend = 1;
2102     dis = getAmpiInstance(disComm);
2103   }
2104
2105 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2106   CpvAccess(_currentObj) = dis;
2107   MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
2108 #endif
2109
2110     dis->resumeOnRecv=false;
2111
2112   if(sts)
2113     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2114   int status = dis->processMessage(msg, t, s, buf, count, type);
2115   if (status != 0) return status;
2116
2117 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
2118   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
2119 #endif
2120
2121 #if CMK_BIGSIM_CHARM
2122 #if CMK_TRACE_IN_CHARM
2123     //if(CpvAccess(traceOn)) CthTraceResume(thread->getThread());
2124     //Due to the reason mentioned the in the while loop above, we need to 
2125     //use "dis" as "this" in the case of migration (or out-of-core execution in BigSim)
2126     if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
2127 #endif
2128   //TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", &curLog, 1);
2129   //TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0);
2130   //_TRACE_BG_SET_INFO((char *)msg, "RECV_RESUME",  &curLog, 1);
2131 #if 0
2132 #if 1
2133   if (!dosuspend) {
2134     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 1);
2135     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2136   }
2137   else
2138 #endif
2139     TRACE_BG_ADD_TAG("RECV_RESUME_THREAD");
2140 #else
2141   TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
2142   if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2143 #endif
2144 #endif
2145
2146   delete msg;
2147   return 0;
2148 }
2149
2150   void
2151 ampi::probe(int t, int s, int comm, int *sts)
2152 {
2153   int tags[3];
2154 #if CMK_BIGSIM_CHARM
2155   void *curLog;         // store current log in timeline
2156   _TRACE_BG_TLINE_END(&curLog);
2157   //  TRACE_BG_AMPI_SUSPEND();
2158 #endif
2159
2160   AmpiMsg *msg = 0;
2161   resumeOnRecv=true;
2162   while(1) {
2163     tags[0] = t; tags[1] = s; tags[2] = comm;
2164     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2165     if (msg) break;
2166     thread->suspend();
2167   }
2168   resumeOnRecv=false;
2169   if(sts)
2170     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2171 #if CMK_BIGSIM_CHARM
2172   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
2173   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
2174 #endif
2175 }
2176
2177   int
2178 ampi::iprobe(int t, int s, int comm, int *sts)
2179 {
2180   int tags[3];
2181   AmpiMsg *msg = 0;
2182   tags[0] = t; tags[1] = s; tags[2] = comm;
2183   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2184   if (msg) {
2185     if(sts)
2186       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2187     return 1;
2188   }
2189 #if CMK_BIGSIM_CHARM
2190   void *curLog;         // store current log in timeline
2191   _TRACE_BG_TLINE_END(&curLog);
2192   //  TRACE_BG_AMPI_SUSPEND();
2193 #endif
2194   thread->schedule();
2195 #if CMK_BIGSIM_CHARM
2196   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("IPROBE_RESUME", &curLog);
2197   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
2198 #endif
2199   return 0;
2200 }
2201
2202
2203 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
2204   void
2205 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
2206 {
2207   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2208   int rootIdx=dest.getIndexForRank(root);
2209   if(rootIdx==thisIndex) {
2210 #if 0//AMPI_COMLIB
2211     ciBcast.beginIteration();
2212     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2213 #else
2214 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2215     CpvAccess(_currentObj) = this;
2216 #endif
2217     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2218 #endif
2219   }
2220   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
2221   nbcasts++;
2222 }
2223
2224   void
2225 ampi::bcastraw(void* buf, int len, CkArrayID aid)
2226 {
2227   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, MPI_COMM_WORLD);
2228   memcpy(msg->data, buf, len);
2229   CProxy_ampi pa(aid);
2230   pa.generic(msg);
2231 }
2232
2233
2234   AmpiMsg* 
2235 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
2236 {
2237   CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
2238   int unit;
2239   CkDDT_DataType *ddt = getDDT()->getType(type);
2240   unit = ddt->getSize(1);
2241   int totalsize = unit*cnt;
2242
2243   AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
2244   char* addr = (char*)Alltoallbuff+disp*unit;
2245   ddt->serialize((char*)msg->data, addr, cnt, (-1));
2246   return msg;
2247 }
2248
2249 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
2250     void *attr_in, void *attr_out, int *flag){
2251   (*flag) = 0;
2252   return (MPI_SUCCESS);
2253 }
2254 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
2255     void *attr_in, void *attr_out, int *flag){
2256   (*(void **)attr_out) = attr_in;
2257   (*flag) = 1;
2258   return (MPI_SUCCESS);
2259 }
2260 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
2261   return (MPI_SUCCESS);
2262 }
2263
2264
2265 void AmpiSeqQ::init(int numP) 
2266 {
2267   elements.init(numP);
2268 }
2269
2270 AmpiSeqQ::~AmpiSeqQ () {
2271 }
2272
2273 void AmpiSeqQ::pup(PUP::er &p) {
2274   p|out;
2275   p|elements;
2276 }
2277
2278 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
2279 {
2280   AmpiOtherElement &el=elements[srcIdx];
2281 #if CMK_ERROR_CHECKING
2282   if (msg->seq<el.seqIncoming)
2283     CkAbort("AMPI Logic error: received late out-of-order message!\n");
2284 #endif
2285   out.enq(msg);
2286   el.nOut++; // We have another message in the out-of-order queue
2287 }
2288
2289 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
2290 {
2291   AmpiOtherElement &el=elements[srcIdx];
2292   if (el.nOut==0) return 0; // No more out-of-order left.
2293   // Walk through our out-of-order queue, searching for our next message:
2294   for (int i=0;i<out.length();i++) {
2295     AmpiMsg *msg=out.deq();
2296     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
2297       el.seqIncoming++;
2298       el.nOut--; // We have one less message out-of-order
2299       return msg;
2300     }
2301     else
2302       out.enq(msg);
2303   }
2304   // We walked the whole queue-- ours is not there.
2305   return 0;
2306 }
2307
2308 //BIGSIM_OOC DEBUGGING: Output for AmpiRequest and its children classes
2309 void AmpiRequest::print(){
2310   CmiPrintf("In AmpiRequest: buf=%p, count=%d, type=%d, src=%d, tag=%d, comm=%d, isvalid=%d\n", buf, count, type, src, tag, comm, isvalid);
2311 }
2312
2313 void PersReq::print(){
2314   AmpiRequest::print();
2315   CmiPrintf("In PersReq: sndrcv=%d\n", sndrcv);
2316 }
2317
2318 void IReq::print(){
2319   AmpiRequest::print();
2320   CmiPrintf("In IReq: this=%p, status=%d, length=%d\n", this, statusIreq, length);
2321 }
2322
2323 void ATAReq::print(){ //not complete for myreqs
2324   AmpiRequest::print();
2325   CmiPrintf("In ATAReq: elmcount=%d, idx=%d\n", elmcount, idx);
2326
2327
2328 void SReq::print(){
2329   AmpiRequest::print();
2330   CmiPrintf("In SReq: this=%p, status=%d\n", this, statusIreq);
2331 }
2332
2333 void AmpiRequestList::pup(PUP::er &p) { 
2334   if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
2335     return;
2336   }
2337
2338   p(blklen); //Allocated size of block
2339   p(len); //Number of used elements in block
2340   if(p.isUnpacking()){
2341     makeBlock(blklen,len);
2342   }
2343   int count=0;
2344   for(int i=0;i<len;i++){
2345     char nonnull;
2346     if(!p.isUnpacking()){
2347       if(block[i] == NULL){
2348         nonnull = 0;
2349       }else{
2350         nonnull = block[i]->getType();
2351       }
2352     }   
2353     p(nonnull);
2354     if(nonnull != 0){
2355       if(p.isUnpacking()){
2356         switch(nonnull){
2357           case 1:
2358             block[i] = new PersReq;
2359             break;
2360           case 2:       
2361             block[i] = new IReq;
2362             break;
2363           case 3:       
2364             block[i] = new ATAReq;
2365             break;
2366         }
2367       } 
2368       block[i]->pup(p);
2369       count++;
2370     }else{
2371       block[i] = 0;
2372     }
2373   }
2374   if(p.isDeleting()){
2375     freeBlock();
2376   }
2377 }
2378
2379 //------------------ External Interface -----------------
2380 ampiParent *getAmpiParent(void) {
2381   ampiParent *p = CtvAccess(ampiPtr);
2382 #if CMK_ERROR_CHECKING
2383   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
2384 #endif
2385   return p;
2386 }
2387
2388 ampi *getAmpiInstance(MPI_Comm comm) {
2389   ampi *ptr=getAmpiParent()->comm2ampi(comm);
2390 #if CMK_ERROR_CHECKING
2391   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
2392 #endif
2393   return ptr;
2394 }
2395
2396 inline static AmpiRequestList *getReqs(void) {
2397   return &(getAmpiParent()->ampiReqs);
2398 }
2399
2400 inline void checkComm(MPI_Comm comm){
2401 #if CMK_ERROR_CHECKING
2402   getAmpiParent()->checkComm(comm);
2403 #endif
2404 }
2405
2406 inline void checkRequest(MPI_Request req){
2407 #if CMK_ERROR_CHECKING
2408   getReqs()->checkRequest(req);
2409 #endif
2410 }
2411
2412 inline void checkRequests(int n, MPI_Request* reqs){
2413 #if CMK_ERROR_CHECKING
2414   AmpiRequestList* reqlist = getReqs();
2415   for(int i=0;i<n;i++)
2416     reqlist->checkRequest(reqs[i]);
2417 #endif
2418 }
2419
2420 CDECL void AMPI_Migrate(void)
2421 {
2422   //  AMPIAPI("AMPI_Migrate");
2423 #if 0
2424 #if CMK_BIGSIM_CHARM
2425   TRACE_BG_AMPI_SUSPEND();
2426 #endif
2427 #endif
2428   TCHARM_Migrate();
2429
2430 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2431   ampi *currentAmpi = getAmpiInstance(MPI_COMM_WORLD);
2432   CpvAccess(_currentObj) = currentAmpi;
2433 #endif
2434
2435 #if CMK_BIGSIM_CHARM
2436   //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2437   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
2438 #endif
2439 }
2440
2441
2442 CDECL void AMPI_Evacuate(void)
2443 {
2444   TCHARM_Evacuate();
2445 }
2446
2447
2448
2449 CDECL void AMPI_Migrateto(int destPE)
2450 {
2451   AMPIAPI("AMPI_MigrateTo");
2452 #if 0
2453 #if CMK_BIGSIM_CHARM
2454   TRACE_BG_AMPI_SUSPEND();
2455 #endif
2456 #endif
2457   TCHARM_Migrate_to(destPE);
2458 #if CMK_BIGSIM_CHARM
2459   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
2460   TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
2461 #endif
2462 }
2463
2464 CDECL void AMPI_MigrateTo(int destPE)
2465 {
2466   AMPI_Migrateto(destPE);
2467 }
2468
2469 CDECL void AMPI_Async_Migrate(void)
2470 {
2471   AMPIAPI("AMPI_Async_Migrate");
2472 #if 0
2473 #if CMK_BIGSIM_CHARM
2474   TRACE_BG_AMPI_SUSPEND();
2475 #endif
2476 #endif
2477   TCHARM_Async_Migrate();
2478 #if CMK_BIGSIM_CHARM
2479   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2480   TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
2481 #endif
2482 }
2483
2484 CDECL void AMPI_Allow_Migrate(void)
2485 {
2486   AMPIAPI("AMPI_Allow_Migrate");
2487 #if 0
2488 #if CMK_BIGSIM_CHARM
2489   TRACE_BG_AMPI_SUSPEND();
2490 #endif
2491 #endif
2492   TCHARM_Allow_Migrate();
2493 #if CMK_BIGSIM_CHARM
2494   TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
2495 #endif
2496 }
2497
2498 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
2499 #if CMK_LBDB_ON
2500   //AMPIAPI("AMPI_Setmigratable");
2501   ampi *ptr=getAmpiInstance(comm);
2502   ptr->setMigratable(mig);
2503 #else
2504   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
2505 #endif
2506 }
2507
2508 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
2509 {
2510   //AMPIAPI("AMPI_Init");
2511   if (nodeinit_has_been_called) {
2512     AMPIAPI("AMPI_Init");
2513     char **argv;
2514     if (p_argv) argv=*p_argv;
2515     else argv=CkGetArgv();
2516     ampiInit(argv);
2517     if (p_argc) *p_argc=CmiGetArgc(argv);
2518   }
2519   else
2520   { /* Charm hasn't been started yet! */
2521     CkAbort("AMPI_Init> Charm is not initialized!");
2522   }
2523
2524   return 0;
2525 }
2526
2527 CDECL int AMPI_Initialized(int *isInit)
2528 {
2529   if (nodeinit_has_been_called) {
2530     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2531     *isInit=CtvAccess(ampiInitDone);
2532   }
2533   else /* !nodeinit_has_been_called */ {
2534     *isInit=nodeinit_has_been_called;
2535   }
2536   return 0;
2537 }
2538
2539 CDECL int AMPI_Finalized(int *isFinalized)
2540 {
2541   AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2542   *isFinalized=CtvAccess(ampiFinalized);
2543   return 0;
2544 }
2545
2546 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
2547 {
2548   //AMPIAPI("AMPI_Comm_rank");
2549
2550 #if CMK_ERROR_CHECKING 
2551   if(checkCommunicator(comm) != MPI_SUCCESS)
2552     return checkCommunicator(comm);
2553 #endif
2554
2555 #if AMPIMSGLOG
2556   ampiParent* pptr = getAmpiParent();
2557   if(msgLogRead){
2558     PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
2559     return 0;
2560   }
2561 #endif
2562
2563   *rank = getAmpiInstance(comm)->getRank(comm);
2564
2565 #if AMPIMSGLOG
2566   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2567     PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
2568   }
2569 #endif
2570   return 0;
2571 }
2572
2573   CDECL
2574 int AMPI_Comm_size(MPI_Comm comm, int *size)
2575 {
2576   //AMPIAPI("AMPI_Comm_size");
2577
2578 #if CMK_ERROR_CHECKING 
2579   if(checkCommunicator(comm) != MPI_SUCCESS)
2580     return checkCommunicator(comm);
2581 #endif
2582
2583 #if AMPIMSGLOG
2584   ampiParent* pptr = getAmpiParent();
2585   if(msgLogRead){
2586     PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
2587     return 0;
2588   }
2589 #endif
2590
2591   *size = getAmpiInstance(comm)->getSize(comm);
2592
2593 #if AMPIMSGLOG
2594   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2595     PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
2596   }
2597 #endif
2598
2599   return 0;
2600 }
2601
2602   CDECL
2603 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
2604 {
2605
2606 #if CMK_ERROR_CHECKING 
2607   if(checkCommunicator(comm1) != MPI_SUCCESS)
2608     return checkCommunicator(comm1);
2609   if(checkCommunicator(comm2) != MPI_SUCCESS)
2610     return checkCommunicator(comm2);
2611 #endif
2612
2613   AMPIAPI("AMPI_Comm_compare");
2614   if(comm1==comm2) *result=MPI_IDENT;
2615   else{
2616     int equal=1;
2617     CkVec<int> ind1, ind2;
2618     ind1 = getAmpiInstance(comm1)->getIndices();
2619     ind2 = getAmpiInstance(comm2)->getIndices();
2620     if(ind1.size()==ind2.size()){
2621       for(int i=0;i<ind1.size();i++)
2622         if(ind1[i] != ind2[i]) { equal=0; break; }
2623     }
2624     if(equal==1) *result=MPI_CONGRUENT;
2625     else *result=MPI_UNEQUAL;
2626   }
2627   return 0;
2628 }
2629
2630 CDECL void AMPI_Exit(int /*exitCode*/)
2631 {
2632   AMPIAPI("AMPI_Exit");
2633   //finalizeBigSimTrace();
2634   TCHARM_Done();
2635 }
2636 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
2637 {
2638   AMPI_Exit(*exitCode);
2639 }
2640
2641   CDECL
2642 int AMPI_Finalize(void)
2643 {
2644   AMPIAPI("AMPI_Finalize");
2645 #if PRINT_IDLE
2646   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
2647 #endif
2648 #if AMPI_COUNTER
2649   getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
2650 #endif
2651   CtvAccess(ampiFinalized)=1;
2652
2653 #if CMK_BIGSIM_CHARM
2654 #if 0
2655   TRACE_BG_AMPI_SUSPEND();
2656 #endif
2657 #if CMK_TRACE_IN_CHARM
2658   if(CpvAccess(traceOn)) traceSuspend();
2659 #endif
2660 #endif
2661
2662   //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
2663   AMPI_Exit(0);
2664   return 0;
2665 }
2666
2667 CDECL
2668 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
2669     int tag, MPI_Comm comm) {
2670
2671 #if CMK_ERROR_CHECKING
2672   int ret;
2673   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2674   if(ret != MPI_SUCCESS)
2675     return ret;
2676 #endif
2677
2678   AMPIAPI("AMPI_Send");
2679 #if AMPIMSGLOG
2680   if(msgLogRead){
2681     return 0;
2682   }
2683 #endif
2684
2685   ampi *ptr = getAmpiInstance(comm);
2686 #if AMPI_COMLIB
2687   if(enableStreaming){  
2688     //    ptr->getStreaming().beginIteration();
2689     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2690   } else
2691 #endif
2692     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
2693 #if AMPI_COUNTER
2694   getAmpiParent()->counters.send++;
2695 #endif
2696 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2697   //  ptr->yield();
2698   //  //  processRemoteMlogMessages();
2699 #endif
2700   return 0;
2701 }
2702
2703   CDECL
2704 int AMPI_Ssend(void *msg, int count, MPI_Datatype type, int dest,
2705     int tag, MPI_Comm comm)
2706 {
2707 #if CMK_ERROR_CHECKING
2708   int ret;
2709   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2710   if(ret != MPI_SUCCESS)
2711     return ret;
2712 #endif
2713
2714   AMPIAPI("AMPI_Ssend");
2715 #if AMPIMSGLOG
2716   if(msgLogRead){
2717     return 0;
2718   }
2719 #endif
2720
2721   ampi *ptr = getAmpiInstance(comm);
2722 #if AMPI_COMLIB
2723   if(enableStreaming){
2724     ptr->getStreaming().beginIteration();
2725     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2726   } else
2727 #endif
2728     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm, 1);
2729 #if AMPI_COUNTER
2730   getAmpiParent()->counters.send++;
2731 #endif
2732
2733   return 0;
2734 }
2735
2736   CDECL
2737 int AMPI_Issend(void *buf, int count, MPI_Datatype type, int dest,
2738     int tag, MPI_Comm comm, MPI_Request *request)
2739 {
2740   AMPIAPI("AMPI_Issend");
2741
2742 #if CMK_ERROR_CHECKING
2743   int ret;
2744   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
2745   if(ret != MPI_SUCCESS)
2746   {
2747     *request = MPI_REQUEST_NULL;
2748     return ret;
2749   }
2750 #endif
2751
2752 #if AMPIMSGLOG
2753   ampiParent* pptr = getAmpiParent();
2754   if(msgLogRead){
2755     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
2756     return 0;
2757   }
2758 #endif
2759
2760   USER_CALL_DEBUG("AMPI_Issend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
2761   ampi *ptr = getAmpiInstance(comm);
2762   AmpiRequestList* reqs = getReqs();
2763   SReq *newreq = new SReq(comm);
2764   *request = reqs->insert(newreq);
2765   // 1:  blocking now  - used by MPI_Ssend
2766   // >=2:  the index of the requests - used by MPI_Issend
2767   ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm, *request+2);
2768 #if AMPI_COUNTER
2769   getAmpiParent()->counters.isend++;
2770 #endif
2771
2772 #if AMPIMSGLOG
2773   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2774     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
2775   }
2776 #endif
2777
2778   return 0;
2779 }
2780
2781   CDECL
2782 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
2783     MPI_Comm comm, MPI_Status *status)
2784 {
2785   AMPIAPI("AMPI_Recv");
2786
2787 #if CMK_ERROR_CHECKING
2788   int ret;
2789   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, msg, 1);
2790   if(ret != MPI_SUCCESS)
2791     return ret;
2792 #endif
2793
2794 #if AMPIMSGLOG
2795   ampiParent* pptr = getAmpiParent();
2796   if(msgLogRead){
2797     (*(pptr->fromPUPer))|(pptr->pupBytes);
2798     PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
2799     PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
2800     return 0;
2801   }
2802 #endif
2803
2804   ampi *ptr = getAmpiInstance(comm);
2805   if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
2806
2807 #if AMPI_COUNTER
2808   getAmpiParent()->counters.recv++;
2809 #endif
2810
2811 #if AMPIMSGLOG
2812   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2813     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2814     (*(pptr->toPUPer))|(pptr->pupBytes);
2815     PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
2816     PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
2817   }
2818 #endif
2819
2820   return 0;
2821 }
2822
2823   CDECL
2824 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
2825 {
2826
2827 #if CMK_ERROR_CHECKING
2828   int ret;
2829   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2830   if(ret != MPI_SUCCESS)
2831     return ret;
2832 #endif
2833
2834   AMPIAPI("AMPI_Probe");
2835   ampi *ptr = getAmpiInstance(comm);
2836   ptr->probe(tag,src, comm, (int*) status);
2837   return 0;
2838 }
2839
2840   CDECL
2841 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
2842 {
2843   AMPIAPI("AMPI_Iprobe");
2844
2845 #if CMK_ERROR_CHECKING
2846   int ret;
2847   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2848   if(ret != MPI_SUCCESS)
2849     return ret;
2850 #endif
2851
2852   ampi *ptr = getAmpiInstance(comm);
2853   *flag = ptr->iprobe(tag,src,comm,(int*) status);
2854   return 0;
2855 }
2856
2857   CDECL
2858 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
2859     int stag, void *rbuf, int rcount, int rtype,
2860     int src, int rtag, MPI_Comm comm, MPI_Status *sts)
2861 {
2862   AMPIAPI("AMPI_Sendrecv");
2863
2864 #if CMK_ERROR_CHECKING
2865   if (sbuf == MPI_IN_PLACE || rbuf == MPI_IN_PLACE)
2866     CmiAbort("MPI_sendrecv does not accept MPI_IN_PLACE; use MPI_Sendrecv_replace instead");
2867
2868   int ret;
2869   ret = errorCheck(comm, 1, scount, 1, stype, 1, stag, 1, dest, 1, sbuf, 1);
2870   if(ret != MPI_SUCCESS)
2871     return ret;
2872   ret = errorCheck(comm, 1, rcount, 1, rtype, 1, rtag, 1, src, 1, rbuf, 1);
2873   if(ret != MPI_SUCCESS)
2874     return ret;
2875 #endif
2876
2877   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
2878   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
2879   if (se) return se;
2880   else return re;
2881 }
2882
2883   CDECL
2884 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
2885     int dest, int sendtag, int source, int recvtag,
2886     MPI_Comm comm, MPI_Status *status)
2887 {
2888   AMPIAPI("AMPI_Sendrecv_replace");
2889   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
2890       buf, count, datatype, source, recvtag, comm, status);
2891 }
2892
2893
2894   CDECL
2895 int AMPI_Barrier(MPI_Comm comm)
2896 {
2897   AMPIAPI("AMPI_Barrier");
2898
2899 #if CMK_ERROR_CHECKING
2900   if(checkCommunicator(comm) != MPI_SUCCESS)
2901     return checkCommunicator(comm);
2902 #endif
2903
2904   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
2905
2906   TRACE_BG_AMPI_LOG(MPI_BARRIER, 0);
2907
2908   //HACK: Use collective operation as a barrier.
2909   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
2910
2911   //BIGSIM_OOC DEBUGGING
2912   //CkPrintf("%d: in AMPI_Barrier, after AMPI_Allreduce\n", getAmpiParent()->thisIndex);
2913 #if AMPI_COUNTER
2914   getAmpiParent()->counters.barrier++;
2915 #endif
2916   return 0;
2917 }
2918
2919   CDECL
2920 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
2921     MPI_Comm comm)
2922 {
2923   AMPIAPI("AMPI_Bcast");
2924
2925 #if CMK_ERROR_CHECKING
2926   int ret;
2927   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, buf, 1);
2928   if(ret != MPI_SUCCESS)
2929     return ret;
2930 #endif
2931
2932   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
2933   if(comm==MPI_COMM_SELF) return 0;
2934
2935 #if AMPIMSGLOG
2936   ampiParent* pptr = getAmpiParent();
2937   if(msgLogRead){
2938     (*(pptr->fromPUPer))|(pptr->pupBytes);
2939     PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
2940     return 0;
2941   }
2942 #endif
2943
2944   ampi* ptr = getAmpiInstance(comm);
2945   ptr->bcast(root, buf, count, type,comm);
2946 #if AMPI_COUNTER
2947   getAmpiParent()->counters.bcast++;
2948 #endif
2949
2950 #if AMPIMSGLOG
2951   if(msgLogWrite && record_msglog(pptr->thisIndex)) {
2952     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2953     (*(pptr->toPUPer))|(pptr->pupBytes);
2954     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
2955   }
2956 #endif
2957 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2958   //  ptr->yield();
2959   //  //  processRemoteMlogMessages();
2960 #endif
2961
2962   return 0;
2963 }
2964
2965 /// This routine is called with the results of a Reduce or AllReduce
2966 const int MPI_REDUCE_SOURCE=0;
2967 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2968 void ampi::reduceResult(CkReductionMsg *msg)
2969 {
2970   MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2971   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2972       thisArrayID,thisIndex);
2973   delete msg;
2974 }
2975
2976 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2977 {
2978   int szdata = ddt->getSize(count);
2979   int szhdr = sizeof(AmpiOpHeader);
2980   AmpiOpHeader newhdr(op,type,count,szdata); 
2981   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2982   memcpy(msg->getData(),&newhdr,szhdr);
2983   if (count > 0) {
2984     TCharm::activateVariable(inbuf);
2985     ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2986     TCharm::deactivateVariable(inbuf);
2987   }
2988   return msg;
2989 }
2990
2991 // Copy the MPI datatype "type" from inbuf to outbuf
2992 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2993   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2994   //  deserializing into the output.
2995   ampi *ptr = getAmpiInstance(comm);
2996   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2997   int len=ddt->getSize(count);
2998   char *serialized=new char[len];
2999   TCharm::activateVariable(inbuf);
3000   TCharm::activateVariable(outbuf);
3001   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
3002   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
3003   TCharm::deactivateVariable(outbuf);
3004   TCharm::deactivateVariable(inbuf);
3005   delete [] serialized;         // < memory leak!  // gzheng 
3006
3007   return MPI_SUCCESS;
3008 }
3009
3010 static void handle_MPI_IN_PLACE(void* &inbuf, void* &outbuf)
3011 {
3012   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
3013   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
3014   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
3015 }
3016
3017 #define SYNCHRONOUS_REDUCE                           0
3018
3019   CDECL
3020 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
3021     int root, MPI_Comm comm)
3022 {
3023   AMPIAPI("AMPI_Reduce");
3024
3025   handle_MPI_IN_PLACE(inbuf, outbuf);
3026
3027 #if CMK_ERROR_CHECKING
3028   int ret;
3029   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, inbuf, 1,
3030                    outbuf, getAmpiInstance(comm)->getRank(comm) == root);
3031   if(ret != MPI_SUCCESS)
3032     return ret;
3033 #endif
3034
3035   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3036
3037 #if AMPIMSGLOG
3038   ampiParent* pptr = getAmpiParent();
3039   if(msgLogRead){
3040     (*(pptr->fromPUPer))|(pptr->pupBytes);
3041     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
3042     return 0;
3043   }
3044 #endif
3045
3046   ampi *ptr = getAmpiInstance(comm);
3047   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
3048   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
3049   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
3050
3051   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3052
3053   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
3054   msg->setCallback(reduceCB);
3055   MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
3056   ptr->contribute(msg);
3057
3058   if (ptr->thisIndex == rootIdx){
3059     /*HACK: Use recv() to block until reduction data comes back*/
3060     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3061       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
3062
3063 #if SYNCHRONOUS_REDUCE
3064       AmpiMsg *msg = new (0, 0) AmpiMsg(-1, MPI_REDUCE_TAG, -1, rootIdx, 0, MPI_REDUCE_COMM);
3065       CProxy_ampi pa(ptr->getProxy());
3066       pa.generic(msg);
3067 #endif
3068   }
3069 #if SYNCHRONOUS_REDUCE
3070   ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, NULL, 0, type, MPI_REDUCE_COMM);
3071 #endif
3072
3073 #if AMPI_COUNTER
3074   getAmpiParent()->counters.reduce++;
3075 #endif
3076
3077 #if AMPIMSGLOG
3078   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3079     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3080     (*(pptr->toPUPer))|(pptr->pupBytes);
3081     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3082   }
3083 #endif
3084
3085   return 0;
3086 }
3087
3088   CDECL
3089 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
3090     MPI_Op op, MPI_Comm comm)
3091 {
3092   AMPIAPI("AMPI_Allreduce");
3093
3094   handle_MPI_IN_PLACE(inbuf, outbuf);
3095
3096 #if CMK_ERROR_CHECKING
3097   int ret;
3098   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, inbuf, 1, outbuf, 1);
3099   if(ret != MPI_SUCCESS)
3100     return ret;
3101 #endif
3102
3103   ampi *ptr = getAmpiInstance(comm);
3104
3105   CkDDT_DataType *ddt_type = ptr->getDDT()->getType(type);
3106
3107 #if CMK_BIGSIM_CHARM
3108   TRACE_BG_AMPI_LOG(MPI_ALLREDUCE, ddt_type->getSize(count));
3109 #endif
3110
3111   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3112
3113 #if AMPIMSGLOG
3114   ampiParent* pptr = getAmpiParent();
3115   if(msgLogRead){
3116     (*(pptr->fromPUPer))|(pptr->pupBytes);
3117     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
3118     //    CkExit();
3119     return 0;
3120   }
3121 #endif
3122
3123   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
3124   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
3125
3126   CkReductionMsg *msg=makeRednMsg(ddt_type, inbuf, count, type, op);
3127   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3128   msg->setCallback(allreduceCB);
3129   ptr->contribute(msg);
3130
3131   /*HACK: Use recv() to block until the reduction data comes back*/
3132   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3133     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
3134 #if AMPI_COUNTER
3135   getAmpiParent()->counters.allreduce++;
3136 #endif
3137
3138 #if AMPIMSGLOG
3139   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3140     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3141     (*(pptr->toPUPer))|(pptr->pupBytes);
3142     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3143     //    CkExit();
3144   }
3145 #endif
3146
3147   return 0;
3148 }
3149
3150   CDECL
3151 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
3152     MPI_Op op, MPI_Comm comm, MPI_Request* request)
3153 {
3154   AMPIAPI("AMPI_Iallreduce");
3155
3156   handle_MPI_IN_PLACE(inbuf, outbuf);
3157
3158 #if CMK_ERROR_CHECKING
3159   int ret;
3160   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, inbuf, 1, outbuf, 1);
3161   if(ret != MPI_SUCCESS)
3162   {
3163     *request = MPI_REQUEST_NULL;
3164     return ret;
3165   }
3166 #endif
3167
3168   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3169
3170   checkRequest(*request);
3171   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
3172   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
3173   ampi *ptr = getAmpiInstance(comm);
3174
3175   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3176   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3177   msg->setCallback(allreduceCB);
3178   ptr->contribute(msg);
3179
3180   // using irecv instead recv to non-block the call and get request pointer
3181   AmpiRequestList* reqs = getReqs();
3182   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
3183   *request = reqs->insert(newreq);
3184   return 0;
3185 }
3186
3187   CDECL
3188 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
3189     MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
3190 {
3191   AMPIAPI("AMPI_Reduce_scatter");
3192
3193   handle_MPI_IN_PLACE(sendbuf, recvbuf);
3194
3195 #if CMK_ERROR_CHECKING
3196   int ret;
3197   ret = errorCheck(comm, 1, 0, 0, datatype, 1, 0, 0, 0, 0, sendbuf, 1, recvbuf, 1);
3198   if(ret != MPI_SUCCESS)
3199     return ret;
3200 #endif
3201
3202   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
3203   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
3204   ampi *ptr = getAmpiInstance(comm);
3205   int size = ptr->getSize(comm);
3206   int count=0;
3207   int *displs = new int [size];
3208   int len;
3209   void *tmpbuf;
3210
3211   //under construction
3212   for(int i=0;i<size;i++){
3213     displs[i] = count;
3214     count+= recvcounts[i];
3215   }
3216   len = ptr->getDDT()->getType(datatype)->getSize(count);
3217   tmpbuf = malloc(len);
3218   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
3219   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
3220       recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
3221   free(tmpbuf);
3222   delete [] displs;     // < memory leak ! // gzheng
3223   return 0;
3224 }
3225
3226 /***** MPI_Scan algorithm (from MPICH) *******
3227   recvbuf = sendbuf;
3228   partial_scan = sendbuf;
3229   mask = 0x1;
3230   while (mask < size) {
3231   dst = rank^mask;
3232   if (dst < size) {
3233   send partial_scan to dst;
3234   recv from dst into tmp_buf;
3235   if (rank > dst) {
3236   partial_scan = tmp_buf + partial_scan;
3237   recvbuf = tmp_buf + recvbuf;
3238   }
3239   else {
3240   if (op is commutative)
3241   partial_scan = tmp_buf + partial_scan;
3242   else {
3243   tmp_buf = partial_scan + tmp_buf;
3244   partial_scan = tmp_buf;
3245   }
3246   }
3247   }
3248   mask <<= 1;
3249   }
3250  ***** MPI_Scan algorithm (from MPICH) *******/
3251
3252 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
3253   (op)(invec,inoutvec,&count,&datatype);
3254 }
3255 CDECL
3256 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
3257   AMPIAPI("AMPI_Scan");
3258
3259 #if CMK_ERROR_CHECKING
3260   if (sendbuf == MPI_IN_PLACE || recvbuf == MPI_IN_PLACE)
3261     CmiAbort("AMPI_Scan does not implement MPI_IN_PLACE");
3262
3263   int ret;
3264   ret = errorCheck(comm, 1, count, 1, datatype, 1, 0, 0, 0, 0, sendbuf, 1, recvbuf, 1);
3265   if(ret != MPI_SUCCESS)
3266     return ret;
3267 #endif
3268
3269   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
3270   MPI_Status sts;
3271   ampi *ptr = getAmpiInstance(comm);
3272   int size = ptr->getSize(comm);
3273   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
3274   int rank = ptr->getRank(comm);
3275   int mask = 0x1;
3276   int dst;
3277   void* tmp_buf = malloc(blklen);
3278   void* partial_scan = malloc(blklen);
3279
3280   memcpy(recvbuf, sendbuf, blklen);
3281   memcpy(partial_scan, sendbuf, blklen);
3282   while(mask < size){
3283     dst = rank^mask;
3284     if(dst < size){
3285       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
3286           tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
3287       if(rank > dst){
3288         (op)(tmp_buf,partial_scan,&count,&datatype);
3289         (op)(tmp_buf,recvbuf,&count,&datatype);
3290       }else {
3291         (op)(partial_scan,tmp_buf,&count,&datatype);
3292         memcpy(partial_scan,tmp_buf,blklen);
3293       }
3294     }
3295     mask <<= 1;
3296
3297   }
3298
3299   free(tmp_buf);
3300   free(partial_scan);
3301 #if AMPI_COUNTER
3302   getAmpiParent()->counters.scan++;
3303 #endif
3304   return 0;
3305 }
3306
3307 CDECL
3308 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
3309   //AMPIAPI("AMPI_Op_create");
3310   *op = function;
3311   return 0;
3312 }
3313
3314 CDECL
3315 int AMPI_Op_free(MPI_Op *op){
3316   //AMPIAPI("AMPI_Op_free");
3317   *op = MPI_OP_NULL;
3318   return 0;
3319 }
3320
3321
3322   CDECL
3323 double AMPI_Wtime(void)
3324 {
3325   //  AMPIAPI("AMPI_Wtime");
3326
3327 #if AMPIMSGLOG
3328   double ret=TCHARM_Wall_timer();
3329   ampiParent* pptr = getAmpiParent();
3330   if(msgLogRead){
3331     (*(pptr->fromPUPer))|ret;
3332     return ret;
3333   }
3334
3335   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3336     (*(pptr->toPUPer))|ret;
3337   }
3338 #endif
3339
3340 #if CMK_BIGSIM_CHARM
3341   return BgGetTime();
3342 #else
3343   return TCHARM_Wall_timer();
3344 #endif
3345 }
3346
3347 CDECL
3348 double AMPI_Wtick(void){
3349   //AMPIAPI("AMPI_Wtick");
3350   return 1e-6;
3351 }
3352
3353
3354 int PersReq::start(){
3355   if(sndrcv == 1 || sndrcv == 3) { // send or ssend request
3356     ampi *ptr=getAmpiInstance(comm);
3357     ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm, sndrcv==3?1:0);
3358   }
3359   return 0;
3360 }
3361
3362   CDECL
3363 int AMPI_Start(MPI_Request *request)
3364 {
3365   AMPIAPI("AMPI_Start");
3366   checkRequest(*request);
3367   AmpiRequestList *reqs = getReqs();
3368   if(-1==(*reqs)[*request]->start()) {
3369     CkAbort("MPI_Start could be used only on persistent communication requests!");
3370   }
3371   return 0;
3372 }
3373
3374 CDECL
3375 int AMPI_Startall(int count, MPI_Request *requests){
3376   AMPIAPI("AMPI_Startall");
3377   checkRequests(count,requests);
3378   AmpiRequestList *reqs = getReqs();
3379   for(int i=0;i<count;i++){
3380     if(-1==(*reqs)[requests[i]]->start())
3381       CkAbort("MPI_Start could be used only on persistent communication requests!");
3382   }
3383   return 0;
3384 }
3385
3386 /* organize the indices of requests into a vector of a vector: 
3387  * level 1 is different msg envelope matches
3388  * level 2 is (posting) ordered requests of with envelope
3389  * each time multiple completion call loop over first elem of level 1
3390  * and move the matched to the NULL request slot.   
3391  * warning: this does not work with I-Alltoall requests */
3392 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
3393   for(int i=0;i<count;i++){
3394     if(reqs[i]!=MPI_REQUEST_NULL)
3395       return 0;
3396   }
3397   return 1;
3398 }
3399 inline int matchReq(MPI_Request ia, MPI_Request ib){
3400   checkRequest(ia);  
3401   checkRequest(ib);
3402   AmpiRequestList* reqs = getReqs();
3403   AmpiRequest *a, *b;
3404   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
3405   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
3406   a=(*reqs)[ia];  b=(*reqs)[ib];
3407   if(a->tag != b->tag) return 0;
3408   if(a->src != b->src) return 0;
3409   if(a->comm != b->comm) return 0;
3410   return 1;
3411 }
3412 inline void swapInt(int& a,int& b){
3413   int tmp;
3414   tmp=a; a=b; b=tmp;
3415 }
3416 inline void sortedIndex(int n, int* arr, int* idx){
3417   int i,j;
3418   for(i=0;i<n;i++) 
3419     idx[i]=i;
3420   for (i=0; i<n-1; i++) 
3421     for (j=0; j<n-1-i; j++)
3422       if (arr[idx[j+1]] < arr[idx[j]]) 
3423         swapInt(idx[j+1],idx[j]);
3424 }
3425 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
3426   CkAssert(count!=0);
3427   int *newidx = new int [count];
3428   int flag;
3429   sortedIndex(count,arr,newidx);
3430   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
3431   CkVec<int> slot;
3432   vec->push_back(slot);
3433   (*vec)[0].push_back(newidx[0]);
3434   for(int i=1;i<count;i++){
3435     flag=0;
3436     for(int j=0;j<vec->size();j++){
3437       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
3438         ((*vec)[j]).push_back(newidx[i]);
3439         flag++;
3440       }
3441     }
3442     if(!flag){
3443       CkVec<int> newslot;
3444       newslot.push_back(newidx[i]);
3445       vec->push_back(newslot);
3446     }else{
3447       CkAssert(flag==1);
3448     }
3449   }
3450   delete [] newidx;
3451   return vec;
3452 }
3453 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
3454   printf("vec content: ");
3455   for(int i=0;i<vec.size();i++){
3456     printf("{");
3457     for(int j=0;j<(vec[i]).size();j++){
3458       printf(" %d ",arr[(vec[i])[j]]);
3459     }
3460     printf("} ");
3461   }
3462   printf("\n");
3463 }
3464
3465 int PersReq::wait(MPI_Status *sts){
3466   if(sndrcv == 2) {
3467     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3468       CkAbort("AMPI> Error in persistent request wait");
3469 #if CMK_BIGSIM_CHARM
3470     _TRACE_BG_TLINE_END(&event);
3471 #endif
3472   }
3473   return 0;
3474 }
3475
3476 int IReq::wait(MPI_Status *sts){
3477   if(CpvAccess(CmiPICMethod) == 2) {
3478     AMPI_DEBUG("In weird clause of IReq::wait\n");
3479     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3480       CkAbort("AMPI> Error in non-blocking request wait");
3481
3482     return 0;
3483   }
3484
3485   //Copy "this" to a local variable in the case that "this" pointer
3486   //is updated during the out-of-core emulation.
3487
3488   // optimization for Irecv
3489   // generic() writes directly to the buffer, so the only thing we
3490   // do here is to wait
3491   ampi *ptr = getAmpiInstance(comm);
3492
3493   //BIGSIM_OOC DEBUGGING
3494   //int ooccnt=0;
3495   //int ampiIndex = ptr->thisIndex;
3496   //CmiPrintf("%d: IReq's status=%d\n", ampiIndex, statusIreq);
3497
3498   while (statusIreq == false) {
3499     //BIGSIM_OOC DEBUGGING
3500     //CmiPrintf("Before blocking: %dth time: %d: in Ireq::wait\n", ++ooccnt, ptr->thisIndex);
3501     //print();
3502
3503     ptr->resumeOnRecv=true;
3504     ptr->block();
3505
3506     //BIGSIM_OOC DEBUGGING
3507     //CmiPrintf("[%d] After blocking: in Ireq::wait\n", ptr->thisIndex);
3508     //CmiPrintf("IReq's this pointer: %p\n", this);
3509     //print();
3510
3511 #if CMK_BIGSIM_CHARM
3512     //Because of the out-of-core emulation, this pointer is changed after in-out
3513     //memory operation. So we need to return from this function and do the while loop
3514     //in the outer function call.       
3515     if(_BgInOutOfCoreMode)
3516       return -1;
3517 #endif  
3518   }   // end of while
3519   ptr->resumeOnRecv=false;
3520
3521   AMPI_DEBUG("IReq::wait has resumed\n");
3522
3523   if(sts) {
3524     AMPI_DEBUG("Setting sts->MPI_TAG to this->tag=%d in IReq::wait  this=%p\n", (int)this->tag, this);
3525     sts->MPI_TAG = tag;
3526     sts->MPI_SOURCE = src;
3527     sts->MPI_COMM = comm;
3528     sts->MPI_LENGTH = length;
3529   }
3530
3531   return 0;
3532 }
3533
3534 int ATAReq::wait(MPI_Status *sts){
3535   int i;
3536   for(i=0;i<count;i++){
3537     if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3538           myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
3539       CkAbort("AMPI> Error in alltoall request wait");
3540 #if CMK_BIGSIM_CHARM
3541     _TRACE_BG_TLINE_END(&myreqs[i].event);
3542 #endif
3543   }
3544 #if CMK_BIGSIM_CHARM
3545   //TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", NULL, 0);
3546   TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0, 1);
3547   for (i=0; i<count; i++)
3548     _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
3549   _TRACE_BG_TLINE_END(&event);
3550 #endif
3551   return 0;
3552 }
3553
3554 int SReq::wait(MPI_Status *sts){
3555   ampi *ptr = getAmpiInstance(comm);
3556   while (statusIreq == false) {
3557     ptr->resumeOnRecv = true;
3558     ptr->block();
3559     ptr = getAmpiInstance(comm);
3560     ptr->resumeOnRecv = false;
3561   }
3562   return 0;
3563 }
3564
3565   CDECL
3566 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
3567 {
3568   AMPIAPI("AMPI_Wait");
3569   if(*request == MPI_REQUEST_NULL){
3570     stsempty(*sts);
3571     return 0;
3572   }
3573   checkRequest(*request);
3574   AmpiRequestList* reqs = getReqs();
3575
3576 #if AMPIMSGLOG
3577   ampiParent* pptr = getAmpiParent();
3578   if(msgLogRead){
3579     (*(pptr->fromPUPer))|(pptr->pupBytes);
3580     PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3581     PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
3582     return 0;
3583   }
3584 #endif
3585
3586   AMPI_DEBUG("AMPI_Wait request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3587   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3588   //(*reqs)[*request]->wait(sts);
3589   int waitResult = -1;
3590   do{
3591     AmpiRequest *waitReq = (*reqs)[*request];
3592     waitResult = waitReq->wait(sts);
3593     if(_BgInOutOfCoreMode){
3594       reqs = getReqs();
3595     }
3596   }while(waitResult==-1);
3597
3598
3599   AMPI_DEBUG("AMPI_Wait after calling wait, request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3600
3601
3602 #if AMPIMSGLOG
3603   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3604     (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
3605     (*(pptr->toPUPer))|(pptr->pupBytes);
3606     PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3607     PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
3608   }
3609 #endif
3610
3611   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3612     reqs->free(*request);
3613     *request = MPI_REQUEST_NULL;
3614   }
3615
3616   AMPI_DEBUG("End of AMPI_Wait\n");
3617
3618   return 0;
3619 }
3620
3621   CDECL
3622 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
3623 {
3624   AMPIAPI("AMPI_Waitall");
3625   if(count==0) return MPI_SUCCESS;
3626   checkRequests(count,request);
3627   int i,j,oldPe;
3628   AmpiRequestList* reqs = getReqs();
3629   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3630
3631 #if AMPIMSGLOG
3632   ampiParent* pptr = getAmpiParent();
3633   if(msgLogRead){
3634     for(i=0;i<reqvec->size();i++){
3635       for(j=0;j<((*reqvec)[i]).size();j++){
3636         if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3637           stsempty(sts[((*reqvec)[i])[j]]);
3638           continue;
3639         }
3640         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3641         (*(pptr->fromPUPer))|(pptr->pupBytes);
3642         PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3643         PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3644       }
3645     }
3646     return 0;
3647   }
3648 #endif
3649
3650 #if CMK_BIGSIM_CHARM
3651   void *curLog;         // store current log in timeline
3652   _TRACE_BG_TLINE_END(&curLog);
3653 #if 0
3654   TRACE_BG_AMPI_SUSPEND();
3655 #endif
3656 #endif
3657   for(i=0;i<reqvec->size();i++){
3658     for(j=0;j<((*reqvec)[i]).size();j++){
3659       //CkPrintf("[%d] in loop [%d, %d]\n", pptr->thisIndex,i, j);
3660       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3661         stsempty(sts[((*reqvec)[i])[j]]);
3662         continue;
3663       }
3664       oldPe = CkMyPe();
3665
3666       int waitResult = -1;
3667       do{       
3668         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3669         waitResult = waitReq->wait(&sts[((*reqvec)[i])[j]]);
3670         if(_BgInOutOfCoreMode){
3671           reqs = getReqs();
3672           reqvec = vecIndex(count, request);
3673         }
3674
3675 #if AMPIMSGLOG
3676         if(msgLogWrite && record_msglog(pptr->thisIndex)){
3677           (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
3678           (*(pptr->toPUPer))|(pptr->pupBytes);
3679           PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3680           PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3681         }
3682 #endif
3683
3684       }while(waitResult==-1);
3685
3686 #if 1
3687 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3688       //for fault evacuation
3689       if(oldPe != CkMyPe()){
3690 #endif
3691         reqs = getReqs();
3692         reqvec  = vecIndex(count,request);
3693 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3694       }
3695 #endif
3696 #endif
3697     }
3698   }
3699 #if CMK_BIGSIM_CHARM
3700   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
3701 #endif
3702   // free memory of requests
3703   for(i=0;i<count;i++){ 
3704     if(request[i] == MPI_REQUEST_NULL)
3705       continue;
3706     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
3707       reqs->free(request[i]);
3708       request[i] = MPI_REQUEST_NULL;
3709     }
3710   }
3711   delete reqvec;
3712   return 0;
3713 }
3714
3715   CDECL
3716 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
3717 {
3718   AMPIAPI("AMPI_Waitany");
3719
3720   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
3721   if(count == 0) return MPI_SUCCESS;
3722   checkRequests(count,request);
3723   if(areInactiveReqs(count,request)){
3724     *idx=MPI_UNDEFINED;
3725     stsempty(*sts);
3726     return MPI_SUCCESS;
3727   }
3728   int flag=0;
3729   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3730   while(count>0){ /* keep looping until some request finishes: */
3731     for(int i=0;i<reqvec->size();i++){
3732       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
3733       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
3734         *idx = ((*reqvec)[i])[0];
3735         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
3736         return 0;
3737       }
3738     }
3739     /* no requests have finished yet-- schedule and try again */
3740     AMPI_Yield(MPI_COMM_WORLD);
3741   }
3742   *idx = MPI_UNDEFINED;
3743   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
3744   delete reqvec;
3745   return 0;
3746 }
3747
3748   CDECL
3749 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
3750     int *array_of_indices, MPI_Status *array_of_statuses)
3751 {
3752   AMPIAPI("AMPI_Waitsome");
3753   checkRequests(incount,array_of_requests);
3754   if(areInactiveReqs(incount,array_of_requests)){
3755     *outcount=MPI_UNDEFINED;
3756     return MPI_SUCCESS;
3757   }
3758   MPI_Status sts;
3759   int i;
3760   int flag=0, realflag=0;
3761   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3762   *outcount = 0;
3763   while(1){
3764     for(i=0;i<reqvec->size();i++){
3765       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3766       if(flag == 1){ 
3767         array_of_indices[(*outcount)]=((*reqvec)[i])[0];