391ad922bf3757f6e55c8b6ae0c967b58c1bf034
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1
2 #define AMPIMSGLOG    0
3
4 #define exit exit /*Supress definition of exit in ampi.h*/
5 #include "ampiimpl.h"
6 #include "tcharm.h"
7 #include "ampiEvents.h" /*** for trace generation for projector *****/
8 #include "ampiProjections.h"
9
10 #if CMK_BLUEGENE_CHARM
11 #include "bigsim_logs.h"
12 #endif
13
14 #define CART_TOPOL 1
15 #define AMPI_PRINT_IDLE 0
16
17 /* change this define to "x" to trace all send/recv's */
18 #define MSG_ORDER_DEBUG(x)  //x /* empty */
19 /* change this define to "x" to trace user calls */
20 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
21 #define STARTUP_DEBUG(x)  //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
22 #define FUNCCALL_DEBUG(x) //x /* empty */
23
24 static CkDDT *getDDT(void) {
25   return getAmpiParent()->myDDT;
26 }
27
28 inline int errorCheck(MPI_Comm comm, int ifComm, int count, int ifCount, 
29     MPI_Datatype data, int ifData, int tag, int ifTag, 
30     int rank, int ifRank, void *buf, int ifBuf);
31
32   inline int checkCommunicator(MPI_Comm comm) {
33     if(comm == MPI_COMM_NULL)
34       return MPI_ERR_COMM;
35     return MPI_SUCCESS;
36   }
37
38   inline int checkCount(int count) {
39     if(count < 0)
40       return MPI_ERR_COUNT;
41     return MPI_SUCCESS;
42   }
43
44   inline int checkData(MPI_Datatype data) {
45     if(data == MPI_DATATYPE_NULL)
46       return MPI_ERR_TYPE;
47     return MPI_SUCCESS;
48   }
49
50   inline int checkTag(int tag) {
51     if(tag != MPI_ANY_TAG && tag < 0)
52       return MPI_ERR_TAG;
53     return MPI_SUCCESS;
54   }
55
56 inline int checkRank(int rank, MPI_Comm comm) {
57   int size;
58   AMPI_Comm_size(comm, &size);
59   if(((rank >= 0) && (rank < size)) || (rank == MPI_ANY_SOURCE) || (rank ==
60         MPI_PROC_NULL))
61     return MPI_SUCCESS;
62   return MPI_ERR_RANK;
63 }
64
65   inline int checkBuf(void *buf, int count) {
66     if((count != 0 && buf == NULL))
67       return MPI_ERR_BUFFER;
68     return MPI_SUCCESS;
69   }
70
71 inline int errorCheck(MPI_Comm comm, int ifComm, int count, int ifCount,
72     MPI_Datatype data, int ifData, int tag, int ifTag,
73     int rank, int ifRank, void *buf, int ifBuf) {
74   int ret;
75   if(ifComm) { 
76     ret = checkCommunicator(comm);
77     if(ret != MPI_SUCCESS)
78       return ret;
79   }
80   if(ifCount) {
81     ret = checkCount(count);
82     if(ret != MPI_SUCCESS)
83       return ret;
84   }
85   if(ifData) {
86     ret = checkData(data);
87     if(ret != MPI_SUCCESS)
88       return ret;
89   }
90   if(ifTag) {
91     ret = checkTag(tag);
92     if(ret != MPI_SUCCESS)
93       return ret;
94   }
95   if(ifRank) {
96     ret = checkRank(rank,comm);
97     if(ret != MPI_SUCCESS)
98       return ret;
99   }
100   if(ifBuf) {
101     ret = checkBuf(buf,count);
102     if(ret != MPI_SUCCESS)
103       return ret;
104   }
105   return MPI_SUCCESS;
106 }
107
108 //------------- startup -------------
109 static mpi_comm_worlds mpi_worlds;
110
111 int _mpi_nworlds; /*Accessed by ampif*/
112 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
113
114 /* ampiReducer: AMPI's generic reducer type 
115    MPI_Op is function pointer to MPI_User_function
116    so that it can be packed into AmpiOpHeader, shipped 
117    with the reduction message, and then plugged into 
118    the ampiReducer. 
119    One little trick is the ampi::recv which receives
120    the final reduction message will see additional
121    sizeof(AmpiOpHeader) bytes in the buffer before
122    any user data.                             */
123 class AmpiComplex { 
124   public: 
125     double re, im; 
126     void operator+=(const AmpiComplex &a) {
127       re+=a.re;
128       im+=a.im;
129     }
130     void operator*=(const AmpiComplex &a) {
131       double nu_re=re*a.re-im*a.im;
132       im=re*a.im+im*a.re;
133       re=nu_re;
134     }
135     int operator>(const AmpiComplex &a) {
136       CkAbort("Cannot compare complex numbers with MPI_MAX");
137       return 0;
138     }
139     int operator<(const AmpiComplex &a) {
140       CkAbort("Cannot compare complex numbers with MPI_MIN");
141       return 0;
142     }
143 };
144 typedef struct { float val; int idx; } FloatInt;
145 typedef struct { double val; int idx; } DoubleInt;
146 typedef struct { long val; int idx; } LongInt;
147 typedef struct { int val; int idx; } IntInt;
148 typedef struct { short val; int idx; } ShortInt;
149 typedef struct { long double val; int idx; } LongdoubleInt;
150 typedef struct { float val; float idx; } FloatFloat;
151 typedef struct { double val; double idx; } DoubleDouble;
152
153
154 #define MPI_OP_SWITCH(OPNAME) \
155   int i; \
156 switch (*datatype) { \
157   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
158   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
159   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
160   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
161   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
162   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
163   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
164   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
165   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
166   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
167   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
168   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
169   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
170   default: \
171            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
172   CmiAbort("Unsupported MPI datatype for MPI Op"); \
173 };\
174
175 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
176 #define MPI_OP_IMPL(type) \
177   if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
178   MPI_OP_SWITCH(MPI_MAX)
179 #undef MPI_OP_IMPL
180 }
181
182 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
183 #define MPI_OP_IMPL(type) \
184   if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
185   MPI_OP_SWITCH(MPI_MIN)
186 #undef MPI_OP_IMPL
187 }
188
189 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
190 #define MPI_OP_IMPL(type) \
191   ((type *)inoutvec)[i] += ((type *)invec)[i];
192   MPI_OP_SWITCH(MPI_SUM)
193 #undef MPI_OP_IMPL
194 }
195
196 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
197 #define MPI_OP_IMPL(type) \
198   ((type *)inoutvec)[i] *= ((type *)invec)[i];
199   MPI_OP_SWITCH(MPI_PROD)
200 #undef MPI_OP_IMPL
201 }
202
203 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
204   int i;  
205   switch (*datatype) {
206     case MPI_INT:
207     case MPI_LOGICAL:
208       for(i=0;i<(*len);i++)
209         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
210       break;
211     default:
212       ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
213       CmiAbort("exiting");
214   }
215 }
216 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
217   int i; 
218   switch (*datatype) {
219     case MPI_INT:
220       for(i=0;i<(*len);i++)
221         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
222       break;
223     case MPI_BYTE:
224       for(i=0;i<(*len);i++)
225         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
226       break;
227     default:
228       ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
229       CmiAbort("exiting");
230   }
231 }
232 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
233   int i;  
234   switch (*datatype) {
235     case MPI_INT:
236     case MPI_LOGICAL:
237       for(i=0;i<(*len);i++)
238         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
239       break;
240     default:
241       ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
242       CmiAbort("exiting");
243   }
244 }
245 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
246   int i;  
247   switch (*datatype) {
248     case MPI_INT:
249       for(i=0;i<(*len);i++)
250         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
251       break;
252     case MPI_BYTE:
253       for(i=0;i<(*len);i++)
254         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
255       break;
256     default:
257       ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
258       CmiAbort("exiting");
259   }
260 }
261 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
262   int i;  
263   switch (*datatype) {
264     case MPI_INT:
265     case MPI_LOGICAL:
266       for(i=0;i<(*len);i++)
267         ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
268       break;
269     default:
270       ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
271       CmiAbort("exiting");
272   }
273 }
274 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
275   int i;  
276   switch (*datatype) {
277     case MPI_INT:
278       for(i=0;i<(*len);i++)
279         ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
280       break;
281     case MPI_BYTE:
282       for(i=0;i<(*len);i++)
283         ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
284       break;
285     case MPI_UNSIGNED:
286       for(i=0;i<(*len);i++)
287         ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
288       break;
289     default:
290       ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
291       CmiAbort("exiting");
292   }
293 }
294
295 #ifndef MIN
296 #define MIN(a,b) (a < b ? a : b)
297 #endif
298
299 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
300   int i;  
301
302   switch (*datatype) {
303     case MPI_FLOAT_INT:
304       for(i=0;i<(*len);i++)
305         if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
306           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
307         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
308           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
309       break;
310     case MPI_DOUBLE_INT:
311       for(i=0;i<(*len);i++)
312         if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
313           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
314         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
315           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
316
317       break;
318     case MPI_LONG_INT:
319       for(i=0;i<(*len);i++)
320         if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
321           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
322         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
323           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
324       break;
325     case MPI_2INT:
326       for(i=0;i<(*len);i++)
327         if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
328           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
329         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
330           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
331       break;
332     case MPI_SHORT_INT:
333       for(i=0;i<(*len);i++)
334         if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
335           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
336         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
337           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
338       break;
339     case MPI_LONG_DOUBLE_INT:
340       for(i=0;i<(*len);i++)
341         if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
342           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
343         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
344           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
345       break;
346     case MPI_2FLOAT:
347       for(i=0;i<(*len);i++)
348         if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
349           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
350         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
351           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
352       break;
353     case MPI_2DOUBLE:
354       for(i=0;i<(*len);i++)
355         if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
356           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
357         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
358           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
359       break;
360     default:
361       ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
362       CmiAbort("exiting");
363   }
364 }
365 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
366   int i;  
367   switch (*datatype) {
368     case MPI_FLOAT_INT:
369       for(i=0;i<(*len);i++)
370         if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
371           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
372         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
373           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
374       break;
375     case MPI_DOUBLE_INT:
376       for(i=0;i<(*len);i++)
377         if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
378           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
379         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
380           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
381       break;
382     case MPI_LONG_INT:
383       for(i=0;i<(*len);i++)
384         if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
385           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
386         else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
387           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
388       break;
389     case MPI_2INT:
390       for(i=0;i<(*len);i++)
391         if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
392           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
393         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
394           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
395       break;
396     case MPI_SHORT_INT:
397       for(i=0;i<(*len);i++)
398         if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
399           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
400         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
401           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
402       break;
403     case MPI_LONG_DOUBLE_INT:
404       for(i=0;i<(*len);i++)
405         if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
406           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
407         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
408           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
409       break;
410     case MPI_2FLOAT:
411       for(i=0;i<(*len);i++)
412         if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
413           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
414         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
415           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
416       break;
417     case MPI_2DOUBLE:
418       for(i=0;i<(*len);i++)
419         if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
420           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
421         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
422           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
423       break;
424     default:
425       ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
426       CmiAbort("exiting");
427   }
428 }
429
430 // every msg contains a AmpiOpHeader structure before user data
431 // FIXME: non-commutative operations require messages be ordered by rank
432 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
433   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
434   MPI_Datatype dtype;
435   int szhdr, szdata, len;
436   MPI_User_function* func;
437   func = hdr->func;
438   dtype = hdr->dtype;  
439   szdata = hdr->szdata;
440   len = hdr->len;  
441   szhdr = sizeof(AmpiOpHeader);
442
443   //Assuming extent == size
444   void *ret = malloc(szhdr+szdata);
445   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
446   for(int i=1;i<nMsg;i++){
447     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
448   }
449   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
450   free(ret);
451   return retmsg;
452 }
453
454 CkReduction::reducerType AmpiReducer;
455
456 class Builtin_kvs{
457   public:
458     int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
459     Builtin_kvs(){
460       tag_ub = MPI_TAG_UB_VALUE; 
461       host = MPI_PROC_NULL;
462       io = 0;
463       wtime_is_global = 0;
464       keyval_mype = CkMyPe();
465       keyval_numpes = CkNumPes();
466       keyval_mynode = CkMyNode();
467       keyval_numnodes = CkNumNodes();
468     }
469 };
470
471 // ------------ startup support -----------
472 int _ampi_fallback_setup_count;
473 CDECL void AMPI_Setup(void);
474 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
475
476 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
477
478 /*Main routine used when missing MPI_Setup routine*/
479 CDECL void AMPI_Fallback_Main(int argc,char **argv)
480 {
481   AMPI_Main_cpp(argc,argv);
482   AMPI_Main(argc,argv);
483   FTN_NAME(MPI_MAIN,mpi_main)();
484 }
485
486 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
487 /*Startup routine used if user *doesn't* write
488   a TCHARM_User_setup routine.
489  */
490 CDECL void AMPI_Setup_Switch(void) {
491   _ampi_fallback_setup_count=0;
492   FTN_NAME(AMPI_SETUP,ampi_setup)();
493   AMPI_Setup();
494   if (_ampi_fallback_setup_count==2)
495   { //Missing AMPI_Setup in both C and Fortran:
496     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
497   }
498 }
499
500 static int nodeinit_has_been_called=0;
501 CtvDeclare(ampiParent*, ampiPtr);
502 CtvDeclare(int, ampiInitDone);
503 CtvDeclare(void*,stackBottom);
504 CtvDeclare(int, ampiFinalized);
505 CkpvDeclare(Builtin_kvs, bikvs);
506 CkpvDeclare(int,argvExtracted);
507 static int enableStreaming = 0;
508
509 CDECL long ampiCurrentStackUsage(){
510   int localVariable;
511
512   unsigned long p1 =  (unsigned long)((void*)&localVariable);
513   unsigned long p2 =  (unsigned long)(CtvAccess(stackBottom));
514
515
516   if(p1 > p2)
517     return p1 - p2;
518   else
519     return  p2 - p1;
520
521 }
522
523 FDECL void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
524   long usage = ampiCurrentStackUsage();
525   CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
526 }
527
528
529 CDECL void AMPI_threadstart(void *data);
530 static int AMPI_threadstart_idx = -1;
531
532 static void ampiNodeInit(void)
533 {
534   _mpi_nworlds=0;
535   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
536   {
537     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
538   }
539   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
540
541   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
542
543   CmiAssert(AMPI_threadstart_idx == -1);    // only initialize once
544   AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
545
546   nodeinit_has_been_called=1;
547 }
548
549 #if PRINT_IDLE
550 static double totalidle=0.0, startT=0.0;
551 static int beginHandle, endHandle;
552 static void BeginIdle(void *dummy,double curWallTime)
553 {
554   startT = curWallTime;
555 }
556 static void EndIdle(void *dummy,double curWallTime)
557 {
558   totalidle += curWallTime - startT;
559 }
560 #endif
561
562 /* for fortran reduction operation table to handle mapping */
563 typedef MPI_Op  MPI_Op_Array[128];
564 CtvDeclare(int, mpi_opc);
565 CtvDeclare(MPI_Op_Array, mpi_ops);
566
567 static void ampiProcInit(void){
568   CtvInitialize(ampiParent*, ampiPtr);
569   CtvInitialize(int,ampiInitDone);
570   CtvInitialize(int,ampiFinalized);
571   CtvInitialize(void*,stackBottom);
572
573
574   CtvInitialize(MPI_Op_Array, mpi_ops);
575   CtvInitialize(int, mpi_opc);
576
577   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
578   CkpvAccess(bikvs) = Builtin_kvs();
579
580   CkpvInitialize(int, argvExtracted);
581   CkpvAccess(argvExtracted) = 0;
582
583   REGISTER_AMPI
584     initAmpiProjections();
585   char **argv=CkGetArgv();
586 #if AMPI_COMLIB  
587   if(CkpvAccess(argvExtracted)==0){
588     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
589   }
590 #endif
591
592 #if AMPIMSGLOG
593   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
594   //msgLogRead = CmiGetArgFlag(argv, "+msgLogRead");
595   if (CmiGetArgIntDesc(argv,"+msgLogRead", &msgLogRank, "Re-play message processing order for AMPI")) {
596     msgLogRead = 1;
597   }
598   //CmiGetArgInt(argv, "+msgLogRank", &msgLogRank);
599   char *procs = NULL;
600   if (CmiGetArgStringDesc(argv, "+msgLogRanks", &procs, "A list of AMPI processors to record , e.g. 0,10,20-30")) {
601     msgLogRanks.set(procs);
602   }
603   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
604   if (CkMyPe() == 0) {
605     if (msgLogWrite) CmiPrintf("Writing AMPI messages of rank %s to log: %s\n", procs?procs:"", msgLogFilename);
606     if (msgLogRead) CmiPrintf("Reading AMPI messages of rank %s from log: %s\n", procs?procs:"", msgLogFilename);
607   }
608 #endif
609
610   // initBigSimTrace(1,outtiming);
611 }
612
613 #if AMPIMSGLOG
614 static inline int record_msglog(int rank){
615   return msgLogRanks.includes(rank);
616 }
617 #endif
618
619 void AMPI_Install_Idle_Timer(){
620 #if AMPI_PRINT_IDLE
621   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
622   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
623 #endif
624 }
625
626 void AMPI_Uninstall_Idle_Timer(){
627 #if AMPI_PRINT_IDLE
628   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
629   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
630 #endif
631 }
632
633 PUPfunctionpointer(MPI_MainFn)
634
635   class MPI_threadstart_t {
636     public:
637       MPI_MainFn fn;
638       MPI_threadstart_t() {}
639       MPI_threadstart_t(MPI_MainFn fn_)
640         :fn(fn_) {}
641       void start(void) {
642         char **argv=CmiCopyArgs(CkGetArgv());
643         int argc=CkGetArgc();
644
645         // Set a pointer to somewhere close to the bottom of the stack.
646         // This is used for roughly estimating the stack usage later.
647         CtvAccess(stackBottom) = &argv;
648
649 #if CMK_AMPI_FNPTR_HACK
650         AMPI_Fallback_Main(argc,argv);
651 #else
652         (fn)(argc,argv);
653 #endif
654       }
655       void pup(PUP::er &p) {
656         p|fn;
657       }
658   };
659 PUPmarshall(MPI_threadstart_t)
660
661 CDECL void AMPI_threadstart(void *data)
662 {
663   STARTUP_DEBUG("MPI_threadstart")
664     MPI_threadstart_t t;
665   pupFromBuf(data,t);
666 #if CMK_TRACE_IN_CHARM
667   if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
668 #endif
669   t.start();
670 }
671
672 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
673 {
674   STARTUP_DEBUG("ampiCreateMain")
675     int _nchunks=TCHARM_Get_num_chunks();
676   //Make a new threads array:
677   MPI_threadstart_t s(mainFn);
678   memBuf b; pupIntoBuf(b,s);
679   TCHARM_Create_data( _nchunks,AMPI_threadstart_idx,
680       b.getData(), b.getSize());
681 }
682
683 /* TCharm Semaphore ID's for AMPI startup */
684 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
685 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
686
687 static CProxy_ampiWorlds ampiWorldsGroup;
688
689 static void init_operations()
690 {
691   CtvInitialize(MPI_Op_Array, mpi_ops);
692   int i = 0;
693   MPI_Op *tab = CtvAccess(mpi_ops);
694   tab[i++] = MPI_MAX;
695   tab[i++] = MPI_MIN;
696   tab[i++] = MPI_SUM;
697   tab[i++] = MPI_PROD;
698   tab[i++] = MPI_LAND;
699   tab[i++] = MPI_BAND;
700   tab[i++] = MPI_LOR;
701   tab[i++] = MPI_BOR;
702   tab[i++] = MPI_LXOR;
703   tab[i++] = MPI_BXOR;
704   tab[i++] = MPI_MAXLOC;
705   tab[i++] = MPI_MINLOC;
706
707   CtvInitialize(int, mpi_opc);
708   CtvAccess(mpi_opc) = i;
709 }
710
711 /*
712    Called from MPI_Init, a collective initialization call:
713    creates a new AMPI array and attaches it to the current
714    set of TCHARM threads.
715  */
716 static ampi *ampiInit(char **argv)
717 {
718   FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CmiMyPe(), TCHARM_Element());)
719     if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
720   STARTUP_DEBUG("ampiInit> begin")
721
722     MPI_Comm new_world;
723   int _nchunks;
724   CkArrayOptions opts;
725   CProxy_ampiParent parent;
726   if (TCHARM_Element()==0) //the rank of a tcharm object
727   { /* I'm responsible for building the arrays: */
728     STARTUP_DEBUG("ampiInit> creating arrays")
729
730       // FIXME: Need to serialize global communicator allocation in one place.
731       //Allocate the next communicator
732       if(_mpi_nworlds == MPI_MAX_COMM_WORLDS)
733       {
734         CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
735       }
736     int new_idx=_mpi_nworlds;
737     new_world=MPI_COMM_WORLD+new_idx; // Isaac guessed there shouldn't be a +1 here
738
739     //Create and attach the ampiParent array
740     CkArrayID threads;
741     opts=TCHARM_Attach_start(&threads,&_nchunks);
742     parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
743     STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
744   }
745   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
746
747   FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
748
749     if (TCHARM_Element()==0)
750     {
751       //Make a new ampi array
752       CkArrayID empty;
753
754       ampiCommStruct worldComm(new_world,empty,_nchunks);
755       CProxy_ampi arr;
756
757
758 #if AMPI_COMLIB
759
760       ComlibInstanceHandle ciStreaming = 1;
761       ComlibInstanceHandle ciBcast = 2;
762       ComlibInstanceHandle ciAllgather = 3;
763       ComlibInstanceHandle ciAlltoall = 4;
764
765       arr=CProxy_ampi::ckNew(parent, worldComm, ciStreaming, ciBcast, ciAllgather, ciAlltoall, opts);
766
767
768       CkPrintf("Using untested comlib code in ampi.C\n");
769
770       Strategy *sStreaming = new StreamingStrategy(1,10);
771       CkAssert(ciStreaming == ComlibRegister(sStreaming));
772
773       Strategy *sBcast = new BroadcastStrategy(USE_HYPERCUBE);
774       CkAssert(ciBcast = ComlibRegister(sBcast));
775
776       Strategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
777       CkAssert(ciAllgather = ComlibRegister(sAllgather));
778
779       Strategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
780       CkAssert(ciAlltoall = ComlibRegister(sAlltoall));
781
782       CmiPrintf("Created AMPI comlib strategies in new manner\n");
783
784       // FIXME: Propogate the comlib table here
785       CkpvAccess(conv_com_object).doneCreating();
786 #else
787       arr=CProxy_ampi::ckNew(parent,worldComm,opts);
788
789 #endif
790
791       //Broadcast info. to the mpi_worlds array
792       // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
793       ampiCommStruct newComm(new_world,arr,_nchunks);
794       //CkPrintf("In ampiInit: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
795       if (ampiWorldsGroup.ckGetGroupID().isZero())
796         ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
797       else
798         ampiWorldsGroup.add(newComm);
799       STARTUP_DEBUG("ampiInit> arrays created")
800
801     }
802
803   // Find our ampi object:
804   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
805   CtvAccess(ampiInitDone)=1;
806   CtvAccess(ampiFinalized)=0;
807   STARTUP_DEBUG("ampiInit> complete")
808 #if CMK_BLUEGENE_CHARM
809     //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
810     TRACE_BG_ADD_TAG("AMPI_START");
811 #endif
812
813   init_operations();     // initialize fortran reduction operation table
814
815   getAmpiParent()->ampiInitCallDone = 0;
816
817   CProxy_ampi cbproxy = ptr->getProxy();
818   CkCallback cb(CkIndex_ampi::allInitDone(NULL), cbproxy[0]);
819   ptr->contribute(0, NULL, CkReduction::sum_int, cb);
820
821   ampiParent *thisParent = getAmpiParent(); 
822   while(thisParent->ampiInitCallDone!=1){
823     //CkPrintf("In checking ampiInitCallDone(%d) loop at parent %d!\n", thisParent->ampiInitCallDone, thisParent->thisIndex);
824     thisParent->getTCharmThread()->stop();
825     /* 
826      * thisParent needs to be updated in case of the parent is being pupped.
827      * In such case, thisParent got changed
828      */
829     thisParent = getAmpiParent();
830   }
831
832 #ifdef CMK_BLUEGENE_CHARM
833   BgSetStartOutOfCore();
834 #endif
835
836   return ptr;
837 }
838
839 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
840 class ampiWorlds : public CBase_ampiWorlds {
841   public:
842     ampiWorlds(const ampiCommStruct &nextWorld) {
843       ampiWorldsGroup=thisgroup;
844       //CkPrintf("In constructor: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
845       add(nextWorld);
846     }
847     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
848     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
849     void add(const ampiCommStruct &nextWorld) {
850       int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD); // Isaac guessed there shouldn't be a +1 after the MPI_COMM_WORLD
851       mpi_worlds[new_idx].comm=nextWorld;
852       if (_mpi_nworlds<=new_idx) _mpi_nworlds=new_idx+1;
853       STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
854     }
855 };
856
857 //-------------------- ampiParent -------------------------
858   ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
859 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
860 {
861   int barrier = 0x1234;
862   STARTUP_DEBUG("ampiParent> starting up")
863     thread=NULL;
864   worldPtr=NULL;
865   myDDT=&myDDTsto;
866   prepareCtv();
867
868   init();
869
870   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
871   AsyncEvacuate(CmiFalse);
872 }
873
874 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
875   thread=NULL;
876   worldPtr=NULL;
877   myDDT=&myDDTsto;
878
879   init();
880
881   AsyncEvacuate(CmiFalse);
882 }
883
884 void ampiParent::pup(PUP::er &p) {
885   ArrayElement1D::pup(p);
886   p|threads;
887   p|worldNo;           // why it was missing from here before??
888   p|worldStruct;
889   myDDT->pup(p);
890   p|splitComm;
891   p|groupComm;
892   p|groups;
893
894   //BIGSIM_OOC DEBUGGING
895   //if(!p.isUnpacking()){
896   //    CmiPrintf("ampiParent[%d] packing ampiRequestList: \n", thisIndex);
897   //    ampiReqs.print();
898   //}
899
900   p|ampiReqs;
901
902   //BIGSIM_OOC DEBUGGING
903   //if(p.isUnpacking()){
904   //    CmiPrintf("ampiParent[%d] unpacking ampiRequestList: \n", thisIndex);
905   //    ampiReqs.print();
906   //}
907
908   p|RProxyCnt;
909   p|tmpRProxy;
910   p|winStructList;
911   p|infos;
912
913   p|ampiInitCallDone;
914 }
915 void ampiParent::prepareCtv(void) {
916   thread=threads[thisIndex].ckLocal();
917   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
918   CtvAccessOther(thread->getThread(),ampiPtr) = this;
919   STARTUP_DEBUG("ampiParent> found TCharm")
920 }
921
922 void ampiParent::init(){
923   CkAssert(groups.size() == 0);
924   groups.push_back(new groupStruct);
925
926 #if AMPIMSGLOG
927   if(msgLogWrite && record_msglog(thisIndex)){
928     char fname[128];
929     sprintf(fname, "%s.%d", msgLogFilename,thisIndex);
930 #if CMK_PROJECTIONS_USE_ZLIB && 0
931     fMsgLog = gzopen(fname,"wb");
932     toPUPer = new PUP::tozDisk(fMsgLog);
933 #else
934     fMsgLog = fopen(fname,"wb");
935     CmiAssert(fMsgLog != NULL);
936     toPUPer = new PUP::toDisk(fMsgLog);
937 #endif
938   }else if(msgLogRead){
939     char fname[128];
940     sprintf(fname, "%s.%d", msgLogFilename,msgLogRank);
941 #if CMK_PROJECTIONS_USE_ZLIB && 0
942     fMsgLog = gzopen(fname,"rb");
943     fromPUPer = new PUP::fromzDisk(fMsgLog);
944 #else
945     fMsgLog = fopen(fname,"rb");
946     CmiAssert(fMsgLog != NULL);
947     fromPUPer = new PUP::fromDisk(fMsgLog);
948 #endif
949     CkPrintf("AMPI> opened message log file: %s for replay\n", fname);
950   }
951 #endif
952 }
953
954 void ampiParent::finalize(){
955 #if AMPIMSGLOG
956   if(msgLogWrite && record_msglog(thisIndex)){
957     delete toPUPer;
958 #if CMK_PROJECTIONS_USE_ZLIB && 0
959     gzclose(fMsgLog);
960 #else
961     fclose(fMsgLog);
962 #endif
963   }else if(msgLogRead){
964     delete fromPUPer;
965 #if CMK_PROJECTIONS_USE_ZLIB && 0
966     gzclose(fMsgLog);
967 #else
968     fclose(fMsgLog);
969 #endif
970   }
971 #endif
972 }
973
974 void ampiParent::ckJustMigrated(void) {
975   ArrayElement1D::ckJustMigrated();
976   prepareCtv();
977 }
978
979 void ampiParent::ckJustRestored(void) {
980   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
981     ArrayElement1D::ckJustRestored();
982   prepareCtv();
983
984   //BIGSIM_OOC DEBUGGING
985   //CkPrintf("In ampiParent[%d] with TCharm thread=%p:   ",thisIndex, thread);
986   //CthPrintThdMagic(thread->getTid()); 
987 }
988
989 ampiParent::~ampiParent() {
990   STARTUP_DEBUG("ampiParent> destructor called");
991   finalize();
992 }
993
994 //Children call this when they are first created or just migrated
995 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
996 {
997   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
998
999   if (s.getComm()>=MPI_COMM_WORLD)
1000   { //We now have our COMM_WORLD-- register it
1001     //Note that split communicators don't keep a raw pointer, so
1002     //they don't need to re-register on migration.
1003     if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
1004     worldPtr=ptr;
1005     worldStruct=s;
1006
1007     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
1008     CkVec<int> _indices;
1009     _indices.push_back(thisIndex);
1010     selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
1011   }
1012
1013   if (!forMigration)
1014   { //Register the new communicator:
1015     MPI_Comm comm = s.getComm();
1016     STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
1017       if (comm>=MPI_COMM_WORLD) { 
1018         // Pass the new ampi to the waiting ampiInit
1019         thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
1020       } else if (isSplit(comm)) {
1021         splitChildRegister(s);
1022       } else if (isGroup(comm)) {
1023         groupChildRegister(s);
1024       } else if (isCart(comm)) {
1025         cartChildRegister(s);
1026       } else if (isGraph(comm)) {
1027         graphChildRegister(s);
1028       } else if (isInter(comm)) {
1029         interChildRegister(s);
1030       } else if (isIntra(comm)) {
1031         intraChildRegister(s);
1032       }else
1033         CkAbort("ampiParent recieved child with bad communicator");
1034   }
1035
1036   return thread;
1037 }
1038
1039 //BIGSIM_OOC DEBUGGING
1040 //Move the comm2ampi from inline to normal function for the sake of debugging
1041 /*ampi *ampiParent::comm2ampi(MPI_Comm comm){
1042 //BIGSIM_OOC DEBUGGING
1043 //CmiPrintf("%d, in ampiParent::comm2ampi, comm=%d\n", thisIndex, comm);
1044 if (comm==MPI_COMM_WORLD) return worldPtr;
1045 if (comm==MPI_COMM_SELF) return worldPtr;
1046 if (comm==worldNo) return worldPtr;
1047 if (isSplit(comm)) {
1048 const ampiCommStruct &st=getSplit(comm);
1049 return st.getProxy()[thisIndex].ckLocal();
1050 }
1051 if (isGroup(comm)) {
1052 const ampiCommStruct &st=getGroup(comm);
1053 return st.getProxy()[thisIndex].ckLocal();
1054 }
1055 if (isCart(comm)) {
1056 const ampiCommStruct &st = getCart(comm);
1057 return st.getProxy()[thisIndex].ckLocal();
1058 }
1059 if (isGraph(comm)) {
1060 const ampiCommStruct &st = getGraph(comm);
1061 return st.getProxy()[thisIndex].ckLocal();
1062 }
1063 if (isInter(comm)) {
1064 const ampiCommStruct &st=getInter(comm);
1065 return st.getProxy()[thisIndex].ckLocal();
1066 }
1067 if (isIntra(comm)) {
1068 const ampiCommStruct &st=getIntra(comm);
1069 return st.getProxy()[thisIndex].ckLocal();
1070 }
1071 if (comm>MPI_COMM_WORLD) return worldPtr; //Use MPI_WORLD ampi for cross-world messages:
1072 CkAbort("Invalid communicator used!");
1073 return NULL;
1074 }*/
1075
1076 // reduction client data - preparation for checkpointing
1077 class ckptClientStruct {
1078   public:
1079     const char *dname;
1080     ampiParent *ampiPtr;
1081     ckptClientStruct(const char *s, ampiParent *a): dname(s), ampiPtr(a) {}
1082 };
1083
1084 static void checkpointClient(void *param,void *msg)
1085 {
1086   ckptClientStruct *client = (ckptClientStruct*)param;
1087   const char *dname = client->dname;
1088   ampiParent *ampiPtr = client->ampiPtr;
1089   ampiPtr->Checkpoint(strlen(dname), dname);
1090   delete client;
1091 }
1092
1093 void ampiParent::startCheckpoint(const char* dname){
1094   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
1095   if (thisIndex==0) {
1096     ckptClientStruct *clientData = new ckptClientStruct(dname, this);
1097     CkCallback cb(checkpointClient, clientData);
1098     contribute(0, NULL, CkReduction::sum_int, cb);
1099   }
1100   else
1101     contribute(0, NULL, CkReduction::sum_int);
1102
1103 #if 0
1104 #if CMK_BLUEGENE_CHARM
1105   void *curLog;         // store current log in timeline
1106   _TRACE_BG_TLINE_END(&curLog);
1107   TRACE_BG_AMPI_SUSPEND();
1108 #endif
1109 #endif
1110
1111   thread->stop();
1112
1113 #if CMK_BLUEGENE_CHARM
1114   // _TRACE_BG_BEGIN_EXECUTE_NOMSG("CHECKPOINT_RESUME", &curLog);
1115   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
1116 #endif
1117 }
1118
1119 void ampiParent::Checkpoint(int len, const char* dname){
1120   if (len == 0) {
1121     // memory checkpoint
1122     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1123     CkStartMemCheckpoint(cb);
1124   }
1125   else {
1126     char dirname[256];
1127     strncpy(dirname,dname,len);
1128     dirname[len]='\0';
1129     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1130     CkStartCheckpoint(dirname,cb);
1131   }
1132 }
1133 void ampiParent::ResumeThread(void){
1134   thread->resume();
1135 }
1136
1137 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
1138     int *keyval, void* extra_state){
1139   KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
1140   int idx = kvlist.size();
1141   kvlist.resize(idx+1);
1142   kvlist[idx] = newnode;
1143   *keyval = idx;
1144   return 0;
1145 }
1146   int ampiParent::freeKeyval(int *keyval){
1147     if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
1148       return -1;
1149     delete kvlist[*keyval];
1150     kvlist[*keyval] = NULL;
1151     *keyval = MPI_KEYVAL_INVALID;
1152     return 0;
1153   }
1154
1155   int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
1156     if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1157       return -1;
1158     ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1159     // Enlarge the keyval list:
1160     while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
1161     cs.getKeyvals()[keyval]=attribute_val;
1162     return 0;
1163   }
1164
1165 int ampiParent::kv_is_builtin(int keyval) {
1166   switch(keyval) {
1167     case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
1168     case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
1169     case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
1170     case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
1171     case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
1172     case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
1173     case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
1174     case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
1175     default: return 0;
1176   };
1177 }
1178
1179 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
1180   *flag = false;
1181   if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
1182     *flag=true;
1183     *(int **)attribute_val = kv_builtin_storage;  // all default tags are ints
1184     return 0;
1185   }
1186   if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
1187     return -1; /* invalid keyval */
1188
1189   ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
1190   if (keyval>=cs.getKeyvals().size())  
1191     return 0; /* we don't have a value yet */
1192   if (cs.getKeyvals()[keyval]==0)
1193     return 0; /* we had a value, but now it's zero */
1194   /* Otherwise, we have a good value */
1195   *flag = true;
1196   *(void **)attribute_val = cs.getKeyvals()[keyval];
1197   return 0;
1198 }
1199 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
1200   /* no way to delete an attribute: just overwrite it with 0 */
1201   return putAttr(comm,keyval,0);
1202 }
1203
1204 //----------------------- ampi -------------------------
1205 void ampi::init(void) {
1206   parent=NULL;
1207   thread=NULL;
1208   msgs=NULL;
1209   posted_ireqs=NULL;
1210   resumeOnRecv=false;
1211   AsyncEvacuate(CmiFalse);
1212 }
1213
1214 ampi::ampi()
1215 {
1216   /* this constructor only exists so we can create an empty array during split */
1217   CkAbort("Default ampi constructor should never be called");
1218 }
1219
1220   ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
1221 :parentProxy(parent_)
1222 {
1223   init();
1224
1225   myComm=s; myComm.setArrayID(thisArrayID);
1226   myRank=myComm.getRankForIndex(thisIndex);
1227
1228   findParent(false);
1229
1230   msgs = CmmNew();
1231   posted_ireqs = CmmNew();
1232   nbcasts = 0;
1233
1234 #if AMPI_COMLIB
1235   comlibProxy = thisProxy; // Will later be associated with comlib
1236 #endif
1237
1238   seqEntries=parent->ckGetArraySize();
1239   oorder.init (seqEntries);
1240 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1241   if(thisIndex == 0){
1242     /*      CkAssert(CkMyPe() == 0);
1243      *              CkGroupID _myManagerGID = thisProxy.ckGetArrayID();     
1244      *                      CkAssert(numElements);
1245      *                              printf("ampi::ampi setting numInitial to %d on manager at gid %d \n",numElements,_myManagerGID.idx);
1246      *                                      CkArray *_myManager = thisProxy.ckLocalBranch();
1247      *                                              _myManager->setNumInitial(numElements);*/
1248   }
1249 #endif
1250 }
1251
1252 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
1253     ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
1254 :parentProxy(parent_)
1255 {
1256 #if AMPI_COMLIB
1257   ciStreaming = ciStreaming_;
1258   ciBcast = ciBcast_;
1259   ciAllgather = ciAllgather_;
1260   ciAlltoall = ciAlltoall_;
1261
1262   init();
1263
1264   myComm=s; myComm.setArrayID(thisArrayID);
1265   myRank=myComm.getRankForIndex(thisIndex);
1266
1267   findParent(false);
1268
1269   msgs = CmmNew();
1270   posted_ireqs = CmmNew();
1271   nbcasts = 0;
1272
1273   comlibProxy = thisProxy;
1274   CmiPrintf("comlibProxy created as a copy of thisProxy, no associate call\n");
1275
1276 #if AMPI_COMLIB
1277   //  ComlibAssociateProxy(ciAlltoall, comlibProxy);
1278 #endif
1279
1280   seqEntries=parent->ckGetArraySize();
1281   oorder.init (seqEntries);
1282 #endif
1283 }
1284
1285 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
1286 {
1287   init();
1288
1289   seqEntries=-1;
1290 }
1291
1292 void ampi::ckJustMigrated(void)
1293 {
1294   findParent(true);
1295   ArrayElement1D::ckJustMigrated();
1296 }
1297
1298 void ampi::ckJustRestored(void)
1299 {
1300   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
1301     findParent(true);
1302   ArrayElement1D::ckJustRestored();
1303
1304   //BIGSIM_OOC DEBUGGING
1305   //CkPrintf("In ampi[%d] thread[%p]:   ", thisIndex, thread);
1306   //CthPrintThdMagic(thread->getTid()); 
1307 }
1308
1309 void ampi::findParent(bool forMigration) {
1310   STARTUP_DEBUG("ampi> finding my parent")
1311     parent=parentProxy[thisIndex].ckLocal();
1312   if (parent==NULL) CkAbort("AMPI can't find its parent!");
1313   thread=parent->registerAmpi(this,myComm,forMigration);
1314   if (thread==NULL) CkAbort("AMPI can't find its thread!");
1315   //    printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
1316 }
1317
1318 //The following method should be called on the first element of the
1319 //ampi array
1320 void ampi::allInitDone(CkReductionMsg *m){
1321   FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
1322     thisProxy.setInitDoneFlag();
1323   delete m;
1324 }
1325
1326 void ampi::setInitDoneFlag(){
1327   //CkPrintf("ampi[%d]::setInitDone called!\n", thisIndex);
1328   parent->ampiInitCallDone=1;
1329   parent->getTCharmThread()->start();
1330 }
1331
1332 static void cmm_pup_ampi_message(pup_er p,void **msg) {
1333   CkPupMessage(*(PUP::er *)p,msg,1);
1334   if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
1335   //    printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
1336 }
1337
1338 static void cmm_pup_posted_ireq(pup_er p,void **msg) {
1339
1340   pup_int(p, (int *)msg);
1341
1342   /*    if(pup_isUnpacking(p)){
1343   // *msg = new IReq;
1344   //when unpacking, nothing is needed to do since *msg is an index
1345   //(of type integer) to the ampiParent::ampiReqs (the AmpiRequestList)
1346   }
1347   if(!pup_isUnpacking(p)){
1348   AmpiRequestList *reqL = getReqs();
1349   int retIdx = reqL->findRequestIndex((IReq *)*msg);
1350   if(retIdx==-1){
1351   CmiAbort("An AmpiRequest instance should be found for an instance in posted_ireq!\n");
1352   }
1353   pup_int(p, retIdx)
1354   }
1355    */
1356   //    ((IReq *)*msg)->pup(*(PUP::er *)p);
1357
1358   //    if (pup_isDeleting(p)) delete (IReq *)*msg;
1359   //    printf("[%d] pupping postd irequests %p \n",CkMyPe(),*msg);
1360 }
1361
1362 void ampi::pup(PUP::er &p)
1363 {
1364   if(!p.isUserlevel())
1365     ArrayElement1D::pup(p);//Pack superclass
1366   p|parentProxy;
1367   p|myComm;
1368   p|myRank;
1369   p|nbcasts;
1370   p|tmpVec;
1371   p|remoteProxy;
1372   p|resumeOnRecv;
1373 #if AMPI_COMLIB
1374   p|comlibProxy;
1375   p|ciStreaming;
1376   p|ciBcast;
1377   p|ciAllgather;
1378   p|ciAlltoall;
1379
1380   if(p.isUnpacking()){
1381     //    ciStreaming.setSourcePe();
1382     //    ciBcast.setSourcePe();
1383     //    ciAllgather.setSourcePe();
1384     //    ciAlltoall.setSourcePe();
1385   }
1386 #endif
1387
1388   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
1389
1390   //BIGSIM_OOC DEBUGGING
1391   //if(!p.isUnpacking()){
1392   //CkPrintf("ampi[%d]::packing: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1393   //}
1394
1395   posted_ireqs = CmmPup((pup_er)&p, posted_ireqs, cmm_pup_posted_ireq);
1396
1397   //if(p.isUnpacking()){
1398   //CkPrintf("ampi[%d]::unpacking: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
1399   //}
1400
1401   p|seqEntries;
1402   p|oorder;
1403 }
1404
1405 ampi::~ampi()
1406 {
1407   if (CkInRestarting() || _BgOutOfCoreFlag==1) {
1408     // in restarting, we need to flush messages
1409     int tags[3], sts[3];
1410     tags[0] = tags[1] = tags[2] = CmmWildCard;
1411     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1412     while (msg) {
1413       delete msg;
1414       msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1415     }
1416   }
1417
1418   CmmFree(msgs);
1419   CmmFreeAll(posted_ireqs);
1420 }
1421
1422 //------------------------ Communicator Splitting ---------------------
1423 class ampiSplitKey {
1424   public:
1425     int nextSplitComm;
1426     int color; //New class of processes we'll belong to
1427     int key; //To determine rank in new ordering
1428     int rank; //Rank in old ordering
1429     ampiSplitKey() {}
1430     ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
1431       :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
1432 };
1433
1434 /* "type" may indicate whether call is for a cartesian topology etc. */
1435
1436 void ampi::split(int color,int key,MPI_Comm *dest, int type)
1437 {
1438 #if CMK_BLUEGENE_CHARM
1439   void *curLog;         // store current log in timeline
1440   _TRACE_BG_TLINE_END(&curLog);
1441   //  TRACE_BG_AMPI_SUSPEND();
1442 #endif
1443   if (type == CART_TOPOL) {
1444     ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
1445     int rootIdx=myComm.getIndexForRank(0);
1446     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1447     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1448
1449     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1450     MPI_Comm newComm=parent->getNextCart()-1;
1451     *dest=newComm;
1452   } else {
1453     ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
1454     int rootIdx=myComm.getIndexForRank(0);
1455     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1456     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1457
1458     thread->suspend(); //Resumed by ampiParent::splitChildRegister
1459     MPI_Comm newComm=parent->getNextSplit()-1;
1460     *dest=newComm;
1461   }
1462 #if CMK_BLUEGENE_CHARM
1463   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "SPLIT_RESUME", curLog);
1464   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("SPLIT_RESUME", &curLog);
1465   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
1466 #endif
1467 }
1468
1469 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
1470   const ampiSplitKey *a=(const ampiSplitKey *)a_;
1471   const ampiSplitKey *b=(const ampiSplitKey *)b_;
1472   if (a->color!=b->color) return a->color-b->color;
1473   if (a->key!=b->key) return a->key-b->key;
1474   return a->rank-b->rank;
1475 }
1476
1477 void ampi::splitPhase1(CkReductionMsg *msg)
1478 {
1479   //Order the keys, which orders the ranks properly:
1480   int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1481   ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1482   if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1483   qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1484
1485   MPI_Comm newComm = -1;
1486   for(int i=0;i<nKeys;i++)
1487     if(keys[i].nextSplitComm>newComm)
1488       newComm = keys[i].nextSplitComm;
1489
1490   //Loop over the sorted keys, which gives us the new arrays:
1491   int lastColor=keys[0].color-1; //The color we're building an array for
1492   CProxy_ampi lastAmpi; //The array for lastColor
1493   int lastRoot=0; //C value for new rank 0 process for latest color
1494   ampiCommStruct lastComm; //Communicator info. for latest color
1495   for (int c=0;c<nKeys;c++) {
1496     if (keys[c].color!=lastColor)
1497     { //Hit a new color-- need to build a new communicator and array
1498       lastColor=keys[c].color;
1499       lastRoot=c;
1500       CkArrayOptions opts;
1501       opts.bindTo(parentProxy);
1502       opts.setNumInitial(0);
1503       CkArrayID unusedAID; ampiCommStruct unusedComm;
1504       lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1505       lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1506
1507       CkVec<int> indices; //Maps rank to array indices for new arrau
1508       for (int i=c;i<nKeys;i++) {
1509         if (keys[i].color!=lastColor) break; //Done with this color
1510         int idx=myComm.getIndexForRank(keys[i].rank);
1511         indices.push_back(idx);
1512       }
1513
1514       //FIXME: create a new communicator for each color, instead of
1515       // (confusingly) re-using the same MPI_Comm number for each.
1516       lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1517     }
1518     int newRank=c-lastRoot;
1519     int newIdx=lastComm.getIndexForRank(newRank);
1520
1521     //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1522     lastAmpi[newIdx].insert(parentProxy,lastComm);
1523   }
1524
1525   delete msg;
1526 }
1527
1528 //...newly created array elements register with the parent, which calls:
1529 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1530   int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1531   if (splitComm.size()<=idx) splitComm.resize(idx+1);
1532   splitComm[idx]=new ampiCommStruct(s);
1533   thread->resume(); //Matches suspend at end of ampi::split
1534 }
1535
1536 //-----------------create communicator from group--------------
1537 // The procedure is like that of comm_split very much,
1538 // so the code is shamelessly copied from above
1539 //   1. reduction to make sure all members have called
1540 //   2. the root in the old communicator create the new array
1541 //   3. ampiParent::register is called to register new array as new comm
1542 class vecStruct {
1543   public:
1544     int nextgroup;
1545     groupStruct vec;
1546     vecStruct():nextgroup(-1){}
1547     vecStruct(int nextgroup_, groupStruct vec_)
1548       : nextgroup(nextgroup_), vec(vec_) { }
1549 };
1550
1551 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1552   int rootIdx=vec[0];
1553   tmpVec = vec;
1554   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1555   MPI_Comm nextgroup = parent->getNextGroup();
1556   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1557
1558   if(getPosOp(thisIndex,vec)>=0){
1559     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1560     MPI_Comm retcomm = parent->getNextGroup()-1;
1561     *newcomm = retcomm;
1562   }else{
1563     *newcomm = MPI_COMM_NULL;
1564   }
1565 }
1566
1567 void ampi::commCreatePhase1(CkReductionMsg *msg){
1568   MPI_Comm *nextGroupComm = (int *)msg->getData();
1569
1570   CkArrayOptions opts;
1571   opts.bindTo(parentProxy);
1572   opts.setNumInitial(0);
1573   CkArrayID unusedAID;
1574   ampiCommStruct unusedComm;
1575   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1576   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1577
1578   groupStruct indices = tmpVec;
1579   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1580   for(int i=0;i<indices.size();i++){
1581     int newIdx=indices[i];
1582     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1583   }
1584   delete msg;
1585 }
1586
1587 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1588   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1589   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1590   groupComm[idx]=new ampiCommStruct(s);
1591   thread->resume(); //Matches suspend at end of ampi::split
1592 }
1593
1594 /* Virtual topology communicator creation */
1595 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1596   int rootIdx=vec[0];
1597   tmpVec = vec;
1598   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1599
1600   MPI_Comm nextcart = parent->getNextCart();
1601   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1602
1603   if(getPosOp(thisIndex,vec)>=0){
1604     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1605     MPI_Comm retcomm = parent->getNextCart()-1;
1606     *newcomm = retcomm;
1607   }else
1608     *newcomm = MPI_COMM_NULL;
1609 }
1610
1611 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1612   MPI_Comm *nextCartComm = (int *)msg->getData();
1613
1614   CkArrayOptions opts;
1615   opts.bindTo(parentProxy);
1616   opts.setNumInitial(0);
1617   CkArrayID unusedAID;
1618   ampiCommStruct unusedComm;
1619   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1620   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1621
1622   groupStruct indices = tmpVec;
1623   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1624       size(),indices);
1625   for(int i=0;i<indices.size();i++){
1626     int newIdx=indices[i];
1627     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1628   }
1629   delete msg;
1630 }
1631
1632 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1633   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1634   if (cartComm.size()<=idx) {
1635     cartComm.resize(idx+1);
1636     cartComm.length()=idx+1;
1637   }
1638   cartComm[idx]=new ampiCommStruct(s);
1639   thread->resume(); //Matches suspend at end of ampi::cartCreate
1640 }
1641
1642 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1643   int rootIdx=vec[0];
1644   tmpVec = vec;
1645   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1646       myComm.getProxy());
1647   MPI_Comm nextgraph = parent->getNextGraph();
1648   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1649
1650   if(getPosOp(thisIndex,vec)>=0){
1651     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1652     MPI_Comm retcomm = parent->getNextGraph()-1;
1653     *newcomm = retcomm;
1654   }else
1655     *newcomm = MPI_COMM_NULL;
1656 }
1657
1658 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1659   MPI_Comm *nextGraphComm = (int *)msg->getData();
1660
1661   CkArrayOptions opts;
1662   opts.bindTo(parentProxy);
1663   opts.setNumInitial(0);
1664   CkArrayID unusedAID;
1665   ampiCommStruct unusedComm;
1666   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1667   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1668
1669   groupStruct indices = tmpVec;
1670   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1671       .size(),indices);
1672   for(int i=0;i<indices.size();i++){
1673     int newIdx=indices[i];
1674     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1675   }
1676   delete msg;
1677 }
1678
1679 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1680   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1681   if (graphComm.size()<=idx) {
1682     graphComm.resize(idx+1);
1683     graphComm.length()=idx+1;
1684   }
1685   graphComm[idx]=new ampiCommStruct(s);
1686   thread->resume(); //Matches suspend at end of ampi::graphCreate
1687 }
1688
1689 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1690   if(thisIndex==root) { // not everybody gets the valid rvec
1691     tmpVec = rvec;
1692   }
1693   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1694   MPI_Comm nextinter = parent->getNextInter();
1695   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1696
1697   thread->suspend(); //Resumed by ampiParent::interChildRegister
1698   MPI_Comm newcomm=parent->getNextInter()-1;
1699   *ncomm=newcomm;
1700 }
1701
1702 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1703   MPI_Comm *nextInterComm = (int *)msg->getData();
1704
1705   groupStruct lgroup = myComm.getIndices();
1706   CkArrayOptions opts;
1707   opts.bindTo(parentProxy);
1708   opts.setNumInitial(0);
1709   CkArrayID unusedAID;
1710   ampiCommStruct unusedComm;
1711   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1712   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1713
1714   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1715   for(int i=0;i<lgroup.size();i++){
1716     int newIdx=lgroup[i];
1717     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1718   }
1719
1720   parentProxy[0].ExchangeProxy(newAmpi);
1721   delete msg;
1722 }
1723
1724 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1725   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1726   if (interComm.size()<=idx) interComm.resize(idx+1);
1727   interComm[idx]=new ampiCommStruct(s);
1728   //thread->resume(); // don't resume it yet, till parent set remote proxy
1729 }
1730
1731 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1732   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1733     groupStruct lvec = myComm.getIndices();
1734     groupStruct rvec = myComm.getRemoteIndices();
1735     int rsize = rvec.size();
1736     tmpVec = lvec;
1737     for(int i=0;i<rsize;i++)
1738       tmpVec.push_back(rvec[i]);
1739     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1740   }else{
1741     tmpVec.resize(0);
1742   }
1743
1744   int rootIdx=myComm.getIndexForRank(0);
1745   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1746   MPI_Comm nextintra = parent->getNextIntra();
1747   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1748
1749   thread->suspend(); //Resumed by ampiParent::interChildRegister
1750   MPI_Comm newcomm=parent->getNextIntra()-1;
1751   *ncomm=newcomm;
1752 }
1753
1754 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1755   if(tmpVec.size()==0) { delete msg; return; }
1756   MPI_Comm *nextIntraComm = (int *)msg->getData();
1757   CkArrayOptions opts;
1758   opts.bindTo(parentProxy);
1759   opts.setNumInitial(0);
1760   CkArrayID unusedAID;
1761   ampiCommStruct unusedComm;
1762   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1763   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1764
1765   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1766   for(int i=0;i<tmpVec.size();i++){
1767     int newIdx=tmpVec[i];
1768     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1769   }
1770   delete msg;
1771 }
1772
1773 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1774   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1775   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1776   intraComm[idx]=new ampiCommStruct(s);
1777   thread->resume(); //Matches suspend at end of ampi::split
1778 }
1779
1780 //------------------------ communication -----------------------
1781 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1782 {
1783   if (universeNo>MPI_COMM_WORLD) {
1784     int worldDex=universeNo-MPI_COMM_WORLD-1;
1785     if (worldDex>=_mpi_nworlds)
1786       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1787     return mpi_worlds[worldDex].comm;
1788   }
1789   CkAbort("Bad communicator passed to universeComm2CommStruct");
1790   return mpi_worlds[0].comm; // meaningless return
1791 }
1792
1793 void ampi::block(void){
1794   thread->suspend();
1795 }
1796
1797 void ampi::yield(void){
1798   thread->schedule();
1799 }
1800
1801 void ampi::unblock(void){
1802   thread->resume();
1803 }
1804
1805   void ampi::ssend_ack(int sreq_idx){
1806     if (sreq_idx == 1)
1807       thread->resume();           // MPI_Ssend
1808     else {
1809       sreq_idx -= 2;              // start from 2
1810       AmpiRequestList *reqs = &(parent->ampiReqs);
1811       SReq *sreq = (SReq *)(*reqs)[sreq_idx];
1812       sreq->statusIreq = true;
1813       if (resumeOnRecv) {
1814         thread->resume();
1815       }
1816     }
1817   }
1818
1819   void
1820 ampi::generic(AmpiMsg* msg)
1821 {
1822   MSG_ORDER_DEBUG(
1823       CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1824         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1825       )
1826 #if CMK_BLUEGENE_CHARM
1827     TRACE_BG_ADD_TAG("AMPI_generic");
1828   msg->event = NULL;
1829 #endif
1830
1831   int sync = UsrToEnv(msg)->getRef();
1832   int srcIdx;
1833   if (sync)  srcIdx = msg->srcIdx;
1834
1835   //    AmpiMsg *msgcopy = msg;
1836   if(msg->seq != -1) {
1837     int srcIdx=msg->srcIdx;
1838     int n=oorder.put(srcIdx,msg);
1839     if (n>0) { // This message was in-order
1840       inorder(msg);
1841       if (n>1) { // It enables other, previously out-of-order messages
1842         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1843           inorder(msg);
1844         }
1845       }
1846     }
1847   } else { //Cross-world or system messages are unordered
1848     inorder(msg);
1849   }
1850
1851   // msg may be free'ed from calling inorder()
1852   if (sync>0) {         // send an ack to sender
1853     CProxy_ampi pa(thisArrayID);
1854     pa[srcIdx].ssend_ack(sync);
1855   }
1856
1857   if(resumeOnRecv){
1858     //CkPrintf("Calling TCharm::resume at ampi::generic!\n");
1859     thread->resume();
1860   }
1861 }
1862
1863 inline static AmpiRequestList *getReqs(void); 
1864
1865   void
1866 ampi::inorder(AmpiMsg* msg)
1867 {
1868   MSG_ORDER_DEBUG(
1869       CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1870         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1871       )
1872     // check posted recvs
1873     int tags[3], sts[3];
1874   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1875   IReq *ireq = NULL;
1876   if (CpvAccess(CmiPICMethod) != 2) {
1877 #if 0
1878     //IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1879     ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1880 #else
1881 #if CMK_BLUEGENE_CHARM
1882     _TRACE_BG_TLINE_END(&msg->event);    // store current log
1883     msg->eventPe = CmiMyPe();
1884 #endif
1885     //in case ampi has not initialized and posted_ireqs are only inserted 
1886     //at AMPI_Irecv (MPI_Irecv)
1887     AmpiRequestList *reqL = &(parent->ampiReqs);
1888     //When storing the req index, it's 1-based. The reason is stated in the comments
1889     //in AMPI_Irecv function.
1890     int ireqIdx = (int)((long)CmmGet(posted_ireqs, 3, tags, sts));
1891     if(reqL->size()>0 && ireqIdx>0)
1892       ireq = (IReq *)(*reqL)[ireqIdx-1];
1893     //CkPrintf("[%d] ampi::inorder, ireqIdx=%d\n", thisIndex, ireqIdx);
1894 #endif
1895     //CkPrintf("[%d] ampi::inorder, ireq=%p\n", thisIndex, ireq);
1896     if (ireq) { // receive posted
1897       ireq->receive(this, msg);
1898       // Isaac changed this so that the IReq stores the tag when receiving the message, 
1899       // instead of using this user supplied tag which could be MPI_ANY_TAG
1900       // Formerly the following line was not commented out:
1901       //ireq->tag = sts[0];         
1902       //ireq->src = sts[1];
1903       //ireq->comm = sts[2];
1904     } else {
1905       CmmPut(msgs, 3, tags, msg);
1906     }
1907   }
1908   else
1909     CmmPut(msgs, 3, tags, msg);
1910 }
1911
1912 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
1913 {
1914   int tags[3];
1915   tags[0] = t; tags[1] = s; tags[2] = comm;
1916   AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1917   return msg;
1918 }
1919
1920 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1921     int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm, int sync)
1922 {
1923   CkDDT_DataType *ddt = getDDT()->getType(type);
1924   int len = ddt->getSize(count);
1925   int sIdx=thisIndex;
1926   int seq = -1;
1927   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_TAG_UB_VALUE) //Not cross-module: set seqno
1928     seq = oorder.nextOutgoing(destIdx);
1929   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1930   if (sync) UsrToEnv(msg)->setRef(sync);
1931   TCharm::activateVariable(buf);
1932   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1933   TCharm::deactivateVariable(buf);
1934   return msg;
1935 }
1936
1937 #if AMPI_COMLIB
1938   void
1939 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1940 {
1941   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1942 }
1943 #endif
1944
1945   void
1946 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, int sync)
1947 {
1948 #if CMK_TRACE_IN_CHARM
1949   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
1950 #endif
1951
1952 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1953   MPI_Comm disComm = myComm.getComm();
1954   ampi *dis = getAmpiInstance(disComm);
1955   CpvAccess(_currentObj) = dis;
1956 #endif
1957
1958   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1959   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),sync);
1960
1961 #if CMK_TRACE_IN_CHARM
1962   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
1963 #endif
1964
1965   if (sync == 1) {
1966     // waiting for receiver side
1967     resumeOnRecv = false;            // so no one else awakes it
1968     block();
1969   }
1970 }
1971
1972   void
1973 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1974 {
1975   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
1976   memcpy(msg->data, buf, len);
1977   CProxy_ampi pa(aid);
1978   pa[idx].generic(msg);
1979 }
1980
1981   void
1982 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy, int sync)
1983 {
1984   if(rank==MPI_PROC_NULL) return;
1985   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1986   int destIdx = dest.getIndexForRank(rank);
1987   if(isInter()){
1988     sRank = parent->thisIndex;
1989     destcomm = MPI_COMM_FIRST_INTER;
1990     destIdx = dest.getIndexForRemoteRank(rank);
1991     arrproxy = remoteProxy;
1992   }
1993   MSG_ORDER_DEBUG(
1994       CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
1995       )
1996
1997     arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm,sync));
1998
1999 #if 0
2000 #if CMK_TRACE_ENABLED
2001   int size=0;
2002   MPI_Type_size(type,&size);
2003   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
2004 #endif
2005 #endif
2006 }
2007
2008   int
2009 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
2010 {
2011   CkDDT_DataType *ddt = getDDT()->getType(type);
2012   int len = ddt->getSize(count);
2013
2014   if(msg->length < len){ // only at rare case shall we reset count by using divide
2015     count = msg->length/(ddt->getSize(1));
2016   }
2017
2018   TCharm::activateVariable(buf);
2019   if (t==MPI_REDUCE_TAG) {      // reduction msg
2020     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
2021   } else {
2022     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
2023   }
2024   TCharm::deactivateVariable(buf);
2025   return 0;
2026 }
2027
2028   int
2029 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
2030 {
2031   MPI_Comm disComm = myComm.getComm();
2032   if(s==MPI_PROC_NULL) {
2033     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
2034     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
2035     ((MPI_Status *)sts)->MPI_LENGTH = 0;
2036     return 0;
2037   }
2038   _LOG_E_END_AMPI_PROCESSING(thisIndex)
2039 #if CMK_BLUEGENE_CHARM
2040     void *curLog;               // store current log in timeline
2041   _TRACE_BG_TLINE_END(&curLog);
2042   //  TRACE_BG_AMPI_SUSPEND();
2043 #if CMK_TRACE_IN_CHARM
2044   if(CpvAccess(traceOn)) traceSuspend();
2045 #endif
2046 #endif
2047
2048   if(isInter()){
2049     s = myComm.getIndexForRemoteRank(s);
2050     comm = MPI_COMM_FIRST_INTER;
2051   }
2052
2053   int tags[3];
2054   AmpiMsg *msg = 0;
2055
2056   MSG_ORDER_DEBUG(
2057       CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
2058       )
2059
2060     resumeOnRecv=true;
2061   ampi *dis = getAmpiInstance(disComm);
2062 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2063   //  dis->yield();
2064   //  processRemoteMlogMessages();
2065 #endif
2066   int dosuspend = 0;
2067   while(1) {
2068     //This is done to take into account the case in which an ampi 
2069     // thread has migrated while waiting for a message
2070     tags[0] = t; tags[1] = s; tags[2] = comm;
2071     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
2072     if (msg) break;
2073     dis->thread->suspend();
2074     dosuspend = 1;
2075     dis = getAmpiInstance(disComm);
2076   }
2077
2078 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2079   CpvAccess(_currentObj) = dis;
2080   MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
2081 #endif
2082
2083     dis->resumeOnRecv=false;
2084
2085   if(sts)
2086     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2087   int status = dis->processMessage(msg, t, s, buf, count, type);
2088   if (status != 0) return status;
2089
2090   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
2091
2092 #if CMK_BLUEGENE_CHARM
2093 #if CMK_TRACE_IN_CHARM
2094     //if(CpvAccess(traceOn)) CthTraceResume(thread->getThread());
2095     //Due to the reason mentioned the in the while loop above, we need to 
2096     //use "dis" as "this" in the case of migration (or out-of-core execution in BigSim)
2097     if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
2098 #endif
2099   //TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", &curLog, 1);
2100   //TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0);
2101   //_TRACE_BG_SET_INFO((char *)msg, "RECV_RESUME",  &curLog, 1);
2102 #if 0
2103 #if 1
2104   if (!dosuspend) {
2105     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 1);
2106     if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2107   }
2108   else
2109 #endif
2110     TRACE_BG_ADD_TAG("RECV_RESUME_THREAD");
2111 #else
2112   TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
2113   if (msg->eventPe == CmiMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
2114 #endif
2115 #endif
2116
2117   delete msg;
2118   return 0;
2119 }
2120
2121   void
2122 ampi::probe(int t, int s, int comm, int *sts)
2123 {
2124   int tags[3];
2125 #if CMK_BLUEGENE_CHARM
2126   void *curLog;         // store current log in timeline
2127   _TRACE_BG_TLINE_END(&curLog);
2128   //  TRACE_BG_AMPI_SUSPEND();
2129 #endif
2130
2131   AmpiMsg *msg = 0;
2132   resumeOnRecv=true;
2133   while(1) {
2134     tags[0] = t; tags[1] = s; tags[2] = comm;
2135     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2136     if (msg) break;
2137     thread->suspend();
2138   }
2139   resumeOnRecv=false;
2140   if(sts)
2141     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2142 #if CMK_BLUEGENE_CHARM
2143   //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
2144   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
2145 #endif
2146 }
2147
2148   int
2149 ampi::iprobe(int t, int s, int comm, int *sts)
2150 {
2151   int tags[3];
2152   AmpiMsg *msg = 0;
2153   tags[0] = t; tags[1] = s; tags[2] = comm;
2154   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
2155   if (msg) {
2156     if(sts)
2157       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
2158     return 1;
2159   }
2160 #if CMK_BLUEGENE_CHARM
2161   void *curLog;         // store current log in timeline
2162   _TRACE_BG_TLINE_END(&curLog);
2163   //  TRACE_BG_AMPI_SUSPEND();
2164 #endif
2165   thread->schedule();
2166 #if CMK_BLUEGENE_CHARM
2167   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("IPROBE_RESUME", &curLog);
2168   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
2169 #endif
2170   return 0;
2171 }
2172
2173
2174 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
2175   void
2176 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
2177 {
2178   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2179   int rootIdx=dest.getIndexForRank(root);
2180   if(rootIdx==thisIndex) {
2181 #if 0//AMPI_COMLIB
2182     ciBcast.beginIteration();
2183     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2184 #else
2185 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2186     CpvAccess(_currentObj) = this;
2187 #endif
2188     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
2189 #endif
2190   }
2191   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
2192   nbcasts++;
2193 }
2194
2195   void
2196 ampi::bcastraw(void* buf, int len, CkArrayID aid)
2197 {
2198   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, MPI_COMM_WORLD);
2199   memcpy(msg->data, buf, len);
2200   CProxy_ampi pa(aid);
2201   pa.generic(msg);
2202 }
2203
2204
2205   AmpiMsg* 
2206 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
2207 {
2208   CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
2209   int unit;
2210   CkDDT_DataType *ddt = getDDT()->getType(type);
2211   unit = ddt->getSize(1);
2212   int totalsize = unit*cnt;
2213
2214   AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
2215   char* addr = (char*)Alltoallbuff+disp*unit;
2216   ddt->serialize((char*)msg->data, addr, cnt, (-1));
2217   return msg;
2218 }
2219
2220 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
2221     void *attr_in, void *attr_out, int *flag){
2222   (*flag) = 0;
2223   return (MPI_SUCCESS);
2224 }
2225 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
2226     void *attr_in, void *attr_out, int *flag){
2227   (*(void **)attr_out) = attr_in;
2228   (*flag) = 1;
2229   return (MPI_SUCCESS);
2230 }
2231 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
2232   return (MPI_SUCCESS);
2233 }
2234
2235
2236 void AmpiSeqQ::init(int numP) 
2237 {
2238   elements.init(numP);
2239 }
2240
2241 AmpiSeqQ::~AmpiSeqQ () {
2242 }
2243
2244 void AmpiSeqQ::pup(PUP::er &p) {
2245   p|out;
2246   p|elements;
2247 }
2248
2249 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
2250 {
2251   AmpiOtherElement &el=elements[srcIdx];
2252 #ifndef CMK_OPTIMIZE
2253   if (msg->seq<el.seqIncoming)
2254     CkAbort("AMPI Logic error: received late out-of-order message!\n");
2255 #endif
2256   out.enq(msg);
2257   el.nOut++; // We have another message in the out-of-order queue
2258 }
2259
2260 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
2261 {
2262   AmpiOtherElement &el=elements[srcIdx];
2263   if (el.nOut==0) return 0; // No more out-of-order left.
2264   // Walk through our out-of-order queue, searching for our next message:
2265   for (int i=0;i<out.length();i++) {
2266     AmpiMsg *msg=out.deq();
2267     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
2268       el.seqIncoming++;
2269       el.nOut--; // We have one less message out-of-order
2270       return msg;
2271     }
2272     else
2273       out.enq(msg);
2274   }
2275   // We walked the whole queue-- ours is not there.
2276   return 0;
2277 }
2278
2279 //BIGSIM_OOC DEBUGGING: Output for AmpiRequest and its children classes
2280 void AmpiRequest::print(){
2281   CmiPrintf("In AmpiRequest: buf=%p, count=%d, type=%d, src=%d, tag=%d, comm=%d, isvalid=%d\n", buf, count, type, src, tag, comm, isvalid);
2282 }
2283
2284 void PersReq::print(){
2285   AmpiRequest::print();
2286   CmiPrintf("In PersReq: sndrcv=%d\n", sndrcv);
2287 }
2288
2289 void IReq::print(){
2290   AmpiRequest::print();
2291   CmiPrintf("In IReq: this=%p, status=%d, length=%d\n", this, statusIreq, length);
2292 }
2293
2294 void ATAReq::print(){ //not complete for myreqs
2295   AmpiRequest::print();
2296   CmiPrintf("In ATAReq: elmcount=%d, idx=%d\n", elmcount, idx);
2297
2298
2299 void SReq::print(){
2300   AmpiRequest::print();
2301   CmiPrintf("In SReq: this=%p, status=%d\n", this, statusIreq);
2302 }
2303
2304 void AmpiRequestList::pup(PUP::er &p) { 
2305   if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
2306     return;
2307   }
2308
2309   p(blklen); //Allocated size of block
2310   p(len); //Number of used elements in block
2311   if(p.isUnpacking()){
2312     makeBlock(blklen,len);
2313   }
2314   int count=0;
2315   for(int i=0;i<len;i++){
2316     char nonnull;
2317     if(!p.isUnpacking()){
2318       if(block[i] == NULL){
2319         nonnull = 0;
2320       }else{
2321         nonnull = block[i]->getType();
2322       }
2323     }   
2324     p(nonnull);
2325     if(nonnull != 0){
2326       if(p.isUnpacking()){
2327         switch(nonnull){
2328           case 1:
2329             block[i] = new PersReq;
2330             break;
2331           case 2:       
2332             block[i] = new IReq;
2333             break;
2334           case 3:       
2335             block[i] = new ATAReq;
2336             break;
2337         }
2338       } 
2339       block[i]->pup(p);
2340       count++;
2341     }else{
2342       block[i] = 0;
2343     }
2344   }
2345   if(p.isDeleting()){
2346     freeBlock();
2347   }
2348 }
2349
2350 //------------------ External Interface -----------------
2351 ampiParent *getAmpiParent(void) {
2352   ampiParent *p = CtvAccess(ampiPtr);
2353 #ifndef CMK_OPTIMIZE
2354   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
2355 #endif
2356   return p;
2357 }
2358
2359 ampi *getAmpiInstance(MPI_Comm comm) {
2360   ampi *ptr=getAmpiParent()->comm2ampi(comm);
2361 #ifndef CMK_OPTIMIZE
2362   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
2363 #endif
2364   return ptr;
2365 }
2366
2367 inline static AmpiRequestList *getReqs(void) {
2368   return &(getAmpiParent()->ampiReqs);
2369 }
2370
2371 inline void checkComm(MPI_Comm comm){
2372 #ifndef CMK_OPTIMIZE
2373   getAmpiParent()->checkComm(comm);
2374 #endif
2375 }
2376
2377 inline void checkRequest(MPI_Request req){
2378 #ifndef CMK_OPTIMIZE
2379   getReqs()->checkRequest(req);
2380 #endif
2381 }
2382
2383 inline void checkRequests(int n, MPI_Request* reqs){
2384 #ifndef CMK_OPTIMIZE
2385   AmpiRequestList* reqlist = getReqs();
2386   for(int i=0;i<n;i++)
2387     reqlist->checkRequest(reqs[i]);
2388 #endif
2389 }
2390
2391 CDECL void AMPI_Migrate(void)
2392 {
2393   //  AMPIAPI("AMPI_Migrate");
2394 #if 0
2395 #if CMK_BLUEGENE_CHARM
2396   TRACE_BG_AMPI_SUSPEND();
2397 #endif
2398 #endif
2399   TCHARM_Migrate();
2400
2401 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2402   ampi *currentAmpi = getAmpiInstance(MPI_COMM_WORLD);
2403   CpvAccess(_currentObj) = currentAmpi;
2404 #endif
2405
2406 #if CMK_BLUEGENE_CHARM
2407   //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2408   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
2409 #endif
2410 }
2411
2412
2413 CDECL void AMPI_Evacuate(void)
2414 {
2415   TCHARM_Evacuate();
2416 }
2417
2418
2419
2420 CDECL void AMPI_Migrateto(int destPE)
2421 {
2422   AMPIAPI("AMPI_MigrateTo");
2423 #if 0
2424 #if CMK_BLUEGENE_CHARM
2425   TRACE_BG_AMPI_SUSPEND();
2426 #endif
2427 #endif
2428   TCHARM_Migrate_to(destPE);
2429 #if CMK_BLUEGENE_CHARM
2430   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
2431   TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
2432 #endif
2433 }
2434
2435 CDECL void AMPI_MigrateTo(int destPE)
2436 {
2437   AMPI_Migrateto(destPE);
2438 }
2439
2440 CDECL void AMPI_Async_Migrate(void)
2441 {
2442   AMPIAPI("AMPI_Async_Migrate");
2443 #if 0
2444 #if CMK_BLUEGENE_CHARM
2445   TRACE_BG_AMPI_SUSPEND();
2446 #endif
2447 #endif
2448   TCHARM_Async_Migrate();
2449 #if CMK_BLUEGENE_CHARM
2450   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2451   TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
2452 #endif
2453 }
2454
2455 CDECL void AMPI_Allow_Migrate(void)
2456 {
2457   AMPIAPI("AMPI_Allow_Migrate");
2458 #if 0
2459 #if CMK_BLUEGENE_CHARM
2460   TRACE_BG_AMPI_SUSPEND();
2461 #endif
2462 #endif
2463   TCHARM_Allow_Migrate();
2464 #if CMK_BLUEGENE_CHARM
2465   TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
2466 #endif
2467 }
2468
2469 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
2470 #if CMK_LBDB_ON
2471   //AMPIAPI("AMPI_Setmigratable");
2472   ampi *ptr=getAmpiInstance(comm);
2473   ptr->setMigratable(mig);
2474 #else
2475   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
2476 #endif
2477 }
2478
2479 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
2480 {
2481   //AMPIAPI("AMPI_Init");
2482   if (nodeinit_has_been_called) {
2483     AMPIAPI("AMPI_Init");
2484     char **argv;
2485     if (p_argv) argv=*p_argv;
2486     else argv=CkGetArgv();
2487     ampiInit(argv);
2488     if (p_argc) *p_argc=CmiGetArgc(argv);
2489   }
2490   else
2491   { /* Charm hasn't been started yet! */
2492     CkAbort("AMPI_Init> Charm is not initialized!");
2493   }
2494
2495   return 0;
2496 }
2497
2498 CDECL int AMPI_Initialized(int *isInit)
2499 {
2500   if (nodeinit_has_been_called) {
2501     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2502     *isInit=CtvAccess(ampiInitDone);
2503   }
2504   else /* !nodeinit_has_been_called */ {
2505     *isInit=nodeinit_has_been_called;
2506   }
2507   return 0;
2508 }
2509
2510 CDECL int AMPI_Finalized(int *isFinalized)
2511 {
2512   AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2513   *isFinalized=CtvAccess(ampiFinalized);
2514   return 0;
2515 }
2516
2517 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
2518 {
2519   //AMPIAPI("AMPI_Comm_rank");
2520
2521 #ifndef CMK_OPTIMIZE 
2522   if(checkCommunicator(comm) != MPI_SUCCESS)
2523     return checkCommunicator(comm);
2524 #endif
2525
2526 #if AMPIMSGLOG
2527   ampiParent* pptr = getAmpiParent();
2528   if(msgLogRead){
2529     PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
2530     return 0;
2531   }
2532 #endif
2533
2534   *rank = getAmpiInstance(comm)->getRank(comm);
2535
2536 #if AMPIMSGLOG
2537   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2538     PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
2539   }
2540 #endif
2541   return 0;
2542 }
2543
2544   CDECL
2545 int AMPI_Comm_size(MPI_Comm comm, int *size)
2546 {
2547   //AMPIAPI("AMPI_Comm_size");
2548
2549 #ifndef CMK_OPTIMIZE 
2550   if(checkCommunicator(comm) != MPI_SUCCESS)
2551     return checkCommunicator(comm);
2552 #endif
2553
2554 #if AMPIMSGLOG
2555   ampiParent* pptr = getAmpiParent();
2556   if(msgLogRead){
2557     PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
2558     return 0;
2559   }
2560 #endif
2561
2562   *size = getAmpiInstance(comm)->getSize(comm);
2563
2564 #if AMPIMSGLOG
2565   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2566     PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
2567   }
2568 #endif
2569
2570   return 0;
2571 }
2572
2573   CDECL
2574 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
2575 {
2576
2577 #ifndef CMK_OPTIMIZE 
2578   if(checkCommunicator(comm1) != MPI_SUCCESS)
2579     return checkCommunicator(comm1);
2580   if(checkCommunicator(comm2) != MPI_SUCCESS)
2581     return checkCommunicator(comm2);
2582 #endif
2583
2584   AMPIAPI("AMPI_Comm_compare");
2585   if(comm1==comm2) *result=MPI_IDENT;
2586   else{
2587     int equal=1;
2588     CkVec<int> ind1, ind2;
2589     ind1 = getAmpiInstance(comm1)->getIndices();
2590     ind2 = getAmpiInstance(comm2)->getIndices();
2591     if(ind1.size()==ind2.size()){
2592       for(int i=0;i<ind1.size();i++)
2593         if(ind1[i] != ind2[i]) { equal=0; break; }
2594     }
2595     if(equal==1) *result=MPI_CONGRUENT;
2596     else *result=MPI_UNEQUAL;
2597   }
2598   return 0;
2599 }
2600
2601 CDECL void AMPI_Exit(int /*exitCode*/)
2602 {
2603   AMPIAPI("AMPI_Exit");
2604   //finalizeBigSimTrace();
2605   TCHARM_Done();
2606 }
2607 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
2608 {
2609   AMPI_Exit(*exitCode);
2610 }
2611
2612   CDECL
2613 int AMPI_Finalize(void)
2614 {
2615   AMPIAPI("AMPI_Finalize");
2616 #if PRINT_IDLE
2617   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
2618 #endif
2619 #if AMPI_COUNTER
2620   getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
2621 #endif
2622   CtvAccess(ampiFinalized)=1;
2623
2624 #if CMK_BLUEGENE_CHARM
2625 #if 0
2626   TRACE_BG_AMPI_SUSPEND();
2627 #endif
2628 #if CMK_TRACE_IN_CHARM
2629   if(CpvAccess(traceOn)) traceSuspend();
2630 #endif
2631 #endif
2632
2633   //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
2634   AMPI_Exit(0);
2635   return 0;
2636 }
2637
2638 CDECL
2639 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
2640     int tag, MPI_Comm comm) {
2641
2642 #ifndef CMK_OPTIMIZE
2643   int ret;
2644   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2645   if(ret != MPI_SUCCESS)
2646     return ret;
2647 #endif
2648
2649   AMPIAPI("AMPI_Send");
2650 #if AMPIMSGLOG
2651   if(msgLogRead){
2652     return 0;
2653   }
2654 #endif
2655
2656   ampi *ptr = getAmpiInstance(comm);
2657 #if AMPI_COMLIB
2658   if(enableStreaming){  
2659     //    ptr->getStreaming().beginIteration();
2660     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2661   } else
2662 #endif
2663     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
2664 #if AMPI_COUNTER
2665   getAmpiParent()->counters.send++;
2666 #endif
2667 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2668   //  ptr->yield();
2669   //  //  processRemoteMlogMessages();
2670 #endif
2671   return 0;
2672 }
2673
2674   CDECL
2675 int AMPI_Ssend(void *msg, int count, MPI_Datatype type, int dest,
2676     int tag, MPI_Comm comm)
2677 {
2678 #ifndef CMK_OPTIMIZE
2679   int ret;
2680   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, msg, 1);
2681   if(ret != MPI_SUCCESS)
2682     return ret;
2683 #endif
2684
2685   AMPIAPI("AMPI_Ssend");
2686 #if AMPIMSGLOG
2687   if(msgLogRead){
2688     return 0;
2689   }
2690 #endif
2691
2692   ampi *ptr = getAmpiInstance(comm);
2693 #if AMPI_COMLIB
2694   if(enableStreaming){
2695     ptr->getStreaming().beginIteration();
2696     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2697   } else
2698 #endif
2699     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm, 1);
2700 #if AMPI_COUNTER
2701   getAmpiParent()->counters.send++;
2702 #endif
2703
2704   return 0;
2705 }
2706
2707   CDECL
2708 int AMPI_Issend(void *buf, int count, MPI_Datatype type, int dest,
2709     int tag, MPI_Comm comm, MPI_Request *request)
2710 {
2711   AMPIAPI("AMPI_Issend");
2712
2713 #ifndef CMK_OPTIMIZE
2714   int ret;
2715   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
2716   if(ret != MPI_SUCCESS)
2717   {
2718     *request = MPI_REQUEST_NULL;
2719     return ret;
2720   }
2721 #endif
2722
2723 #if AMPIMSGLOG
2724   ampiParent* pptr = getAmpiParent();
2725   if(msgLogRead){
2726     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
2727     return 0;
2728   }
2729 #endif
2730
2731   USER_CALL_DEBUG("AMPI_Issend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
2732   ampi *ptr = getAmpiInstance(comm);
2733   AmpiRequestList* reqs = getReqs();
2734   SReq *newreq = new SReq(comm);
2735   *request = reqs->insert(newreq);
2736   // 1:  blocking now  - used by MPI_Ssend
2737   // >=2:  the index of the requests - used by MPI_Issend
2738   ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm, *request+2);
2739 #if AMPI_COUNTER
2740   getAmpiParent()->counters.isend++;
2741 #endif
2742
2743 #if AMPIMSGLOG
2744   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2745     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
2746   }
2747 #endif
2748
2749   return 0;
2750 }
2751
2752   CDECL
2753 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
2754     MPI_Comm comm, MPI_Status *status)
2755 {
2756   AMPIAPI("AMPI_Recv");
2757
2758 #ifndef CMK_OPTIMIZE
2759   int ret;
2760   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, msg, 1);
2761   if(ret != MPI_SUCCESS)
2762     return ret;
2763 #endif
2764
2765 #if AMPIMSGLOG
2766   ampiParent* pptr = getAmpiParent();
2767   if(msgLogRead){
2768     (*(pptr->fromPUPer))|(pptr->pupBytes);
2769     PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
2770     PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
2771     return 0;
2772   }
2773 #endif
2774
2775   ampi *ptr = getAmpiInstance(comm);
2776   if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
2777
2778 #if AMPI_COUNTER
2779   getAmpiParent()->counters.recv++;
2780 #endif
2781
2782 #if AMPIMSGLOG
2783   if(msgLogWrite && record_msglog(pptr->thisIndex)){
2784     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2785     (*(pptr->toPUPer))|(pptr->pupBytes);
2786     PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
2787     PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
2788   }
2789 #endif
2790
2791   return 0;
2792 }
2793
2794   CDECL
2795 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
2796 {
2797
2798 #ifndef CMK_OPTIMIZE
2799   int ret;
2800   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2801   if(ret != MPI_SUCCESS)
2802     return ret;
2803 #endif
2804
2805   AMPIAPI("AMPI_Probe");
2806   ampi *ptr = getAmpiInstance(comm);
2807   ptr->probe(tag,src, comm, (int*) status);
2808   return 0;
2809 }
2810
2811   CDECL
2812 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
2813 {
2814   AMPIAPI("AMPI_Iprobe");
2815
2816 #ifndef CMK_OPTIMIZE
2817   int ret;
2818   ret = errorCheck(comm, 1, 0, 0, 0, 0, tag, 1, src, 1, 0, 0);
2819   if(ret != MPI_SUCCESS)
2820     return ret;
2821 #endif
2822
2823   ampi *ptr = getAmpiInstance(comm);
2824   *flag = ptr->iprobe(tag,src,comm,(int*) status);
2825   return 0;
2826 }
2827
2828   CDECL
2829 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
2830     int stag, void *rbuf, int rcount, int rtype,
2831     int src, int rtag, MPI_Comm comm, MPI_Status *sts)
2832 {
2833   AMPIAPI("AMPI_Sendrecv");
2834
2835 #ifndef CMK_OPTIMIZE
2836   int ret;
2837   ret = errorCheck(comm, 1, scount, 1, stype, 1, stag, 1, dest, 1, sbuf, 1);
2838   if(ret != MPI_SUCCESS)
2839     return ret;
2840   ret = errorCheck(comm, 1, rcount, 1, rtype, 1, rtag, 1, src, 1, rbuf, 1);
2841   if(ret != MPI_SUCCESS)
2842     return ret;
2843 #endif
2844
2845   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
2846   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
2847   if (se) return se;
2848   else return re;
2849 }
2850
2851   CDECL
2852 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
2853     int dest, int sendtag, int source, int recvtag,
2854     MPI_Comm comm, MPI_Status *status)
2855 {
2856   AMPIAPI("AMPI_Sendrecv_replace");
2857   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
2858       buf, count, datatype, source, recvtag, comm, status);
2859 }
2860
2861
2862   CDECL
2863 int AMPI_Barrier(MPI_Comm comm)
2864 {
2865   AMPIAPI("AMPI_Barrier");
2866
2867 #ifndef CMK_OPTIMIZE
2868   if(checkCommunicator(comm) != MPI_SUCCESS)
2869     return checkCommunicator(comm);
2870 #endif
2871
2872   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
2873
2874   TRACE_BG_AMPI_LOG(MPI_BARRIER, 0);
2875
2876   //HACK: Use collective operation as a barrier.
2877   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
2878
2879   //BIGSIM_OOC DEBUGGING
2880   //CkPrintf("%d: in AMPI_Barrier, after AMPI_Allreduce\n", getAmpiParent()->thisIndex);
2881 #if AMPI_COUNTER
2882   getAmpiParent()->counters.barrier++;
2883 #endif
2884   return 0;
2885 }
2886
2887   CDECL
2888 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
2889     MPI_Comm comm)
2890 {
2891   AMPIAPI("AMPI_Bcast");
2892
2893 #ifndef CMK_OPTIMIZE
2894   int ret;
2895   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, buf, 1);
2896   if(ret != MPI_SUCCESS)
2897     return ret;
2898 #endif
2899
2900   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
2901   if(comm==MPI_COMM_SELF) return 0;
2902
2903 #if AMPIMSGLOG
2904   ampiParent* pptr = getAmpiParent();
2905   if(msgLogRead){
2906     (*(pptr->fromPUPer))|(pptr->pupBytes);
2907     PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
2908     return 0;
2909   }
2910 #endif
2911
2912   ampi* ptr = getAmpiInstance(comm);
2913   ptr->bcast(root, buf, count, type,comm);
2914 #if AMPI_COUNTER
2915   getAmpiParent()->counters.bcast++;
2916 #endif
2917
2918 #if AMPIMSGLOG
2919   if(msgLogWrite && record_msglog(pptr->thisIndex)) {
2920     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2921     (*(pptr->toPUPer))|(pptr->pupBytes);
2922     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
2923   }
2924 #endif
2925 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2926   //  ptr->yield();
2927   //  //  processRemoteMlogMessages();
2928 #endif
2929
2930   return 0;
2931 }
2932
2933 /// This routine is called with the results of a Reduce or AllReduce
2934 const int MPI_REDUCE_SOURCE=0;
2935 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2936 void ampi::reduceResult(CkReductionMsg *msg)
2937 {
2938   MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2939   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2940       thisArrayID,thisIndex);
2941   delete msg;
2942 }
2943
2944 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2945 {
2946   int szdata = ddt->getSize(count);
2947   int szhdr = sizeof(AmpiOpHeader);
2948   AmpiOpHeader newhdr(op,type,count,szdata); 
2949   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2950   memcpy(msg->getData(),&newhdr,szhdr);
2951   TCharm::activateVariable(inbuf);
2952   ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2953   TCharm::deactivateVariable(inbuf);
2954   return msg;
2955 }
2956
2957 // Copy the MPI datatype "type" from inbuf to outbuf
2958 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2959   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2960   //  deserializing into the output.
2961   ampi *ptr = getAmpiInstance(comm);
2962   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2963   int len=ddt->getSize(count);
2964   char *serialized=new char[len];
2965   TCharm::activateVariable(inbuf);
2966   TCharm::activateVariable(outbuf);
2967   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
2968   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
2969   TCharm::deactivateVariable(outbuf);
2970   TCharm::deactivateVariable(inbuf);
2971   delete [] serialized;         // < memory leak!  // gzheng 
2972
2973   return MPI_SUCCESS;
2974 }
2975
2976   CDECL
2977 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
2978     int root, MPI_Comm comm)
2979 {
2980   AMPIAPI("AMPI_Reduce");
2981
2982 #ifndef CMK_OPTIMIZE
2983   int ret;
2984   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, 0, 0);
2985   if(ret != MPI_SUCCESS)
2986     return ret;
2987 #endif
2988
2989   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2990
2991 #if AMPIMSGLOG
2992   ampiParent* pptr = getAmpiParent();
2993   if(msgLogRead){
2994     (*(pptr->fromPUPer))|(pptr->pupBytes);
2995     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
2996     return 0;
2997   }
2998 #endif
2999
3000   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
3001   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
3002   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
3003
3004   ampi *ptr = getAmpiInstance(comm);
3005   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
3006   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
3007   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
3008
3009   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3010
3011   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
3012   msg->setCallback(reduceCB);
3013   MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
3014   ptr->contribute(msg);
3015   if (ptr->thisIndex == rootIdx){
3016     /*HACK: Use recv() to block until reduction data comes back*/
3017     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3018       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
3019   }
3020 #if AMPI_COUNTER
3021   getAmpiParent()->counters.reduce++;
3022 #endif
3023
3024 #if AMPIMSGLOG
3025   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3026     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3027     (*(pptr->toPUPer))|(pptr->pupBytes);
3028     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3029   }
3030 #endif
3031
3032   return 0;
3033 }
3034
3035   CDECL
3036 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
3037     MPI_Op op, MPI_Comm comm)
3038 {
3039   AMPIAPI("AMPI_Allreduce");
3040
3041 #ifndef CMK_OPTIMIZE
3042   int ret;
3043   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, 0, 0);
3044   if(ret != MPI_SUCCESS)
3045     return ret;
3046 #endif
3047
3048   ampi *ptr = getAmpiInstance(comm);
3049
3050   if (inbuf == MPI_IN_PLACE) inbuf = outbuf;
3051   if (outbuf == MPI_IN_PLACE) outbuf = inbuf;
3052   CmiAssert(inbuf != MPI_IN_PLACE && outbuf != MPI_IN_PLACE);
3053
3054   CkDDT_DataType *ddt_type = ptr->getDDT()->getType(type);
3055
3056 #if CMK_BLUEGENE_CHARM
3057   TRACE_BG_AMPI_LOG(MPI_ALLREDUCE, ddt_type->getSize(count));
3058 #endif
3059
3060   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3061
3062 #if AMPIMSGLOG
3063   ampiParent* pptr = getAmpiParent();
3064   if(msgLogRead){
3065     (*(pptr->fromPUPer))|(pptr->pupBytes);
3066     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
3067     //    CkExit();
3068     return 0;
3069   }
3070 #endif
3071
3072   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
3073   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
3074
3075   CkReductionMsg *msg=makeRednMsg(ddt_type, inbuf, count, type, op);
3076   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3077   msg->setCallback(allreduceCB);
3078   ptr->contribute(msg);
3079
3080   /*HACK: Use recv() to block until the reduction data comes back*/
3081   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
3082     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
3083 #if AMPI_COUNTER
3084   getAmpiParent()->counters.allreduce++;
3085 #endif
3086
3087 #if AMPIMSGLOG
3088   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3089     (pptr->pupBytes) = getDDT()->getSize(type) * count;
3090     (*(pptr->toPUPer))|(pptr->pupBytes);
3091     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
3092     //    CkExit();
3093   }
3094 #endif
3095
3096   return 0;
3097 }
3098
3099   CDECL
3100 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
3101     MPI_Op op, MPI_Comm comm, MPI_Request* request)
3102 {
3103   AMPIAPI("AMPI_Iallreduce");
3104
3105 #ifndef CMK_OPTIMIZE
3106   int ret;
3107   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, 0, 0, inbuf, 1);
3108   if(ret != MPI_SUCCESS)
3109   {
3110     *request = MPI_REQUEST_NULL;
3111     return ret;
3112   }
3113 #endif
3114
3115   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
3116
3117   checkRequest(*request);
3118   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
3119   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
3120   ampi *ptr = getAmpiInstance(comm);
3121
3122   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
3123   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
3124   msg->setCallback(allreduceCB);
3125   ptr->contribute(msg);
3126
3127   // using irecv instead recv to non-block the call and get request pointer
3128   AmpiRequestList* reqs = getReqs();
3129   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
3130   *request = reqs->insert(newreq);
3131   return 0;
3132 }
3133
3134   CDECL
3135 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
3136     MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
3137 {
3138   AMPIAPI("AMPI_Reduce_scatter");
3139
3140 #ifndef CMK_OPTIMIZE
3141   int ret;
3142   ret = errorCheck(comm, 1, 0, 0, datatype, 1, 0, 0, 0, 0, sendbuf, 1);
3143   if(ret != MPI_SUCCESS)
3144     return ret;
3145 #endif
3146
3147   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
3148   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
3149   ampi *ptr = getAmpiInstance(comm);
3150   int size = ptr->getSize(comm);
3151   int count=0;
3152   int *displs = new int [size];
3153   int len;
3154   void *tmpbuf;
3155
3156   //under construction
3157   for(int i=0;i<size;i++){
3158     displs[i] = count;
3159     count+= recvcounts[i];
3160   }
3161   len = ptr->getDDT()->getType(datatype)->getSize(count);
3162   tmpbuf = malloc(len);
3163   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
3164   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
3165       recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
3166   free(tmpbuf);
3167   delete [] displs;     // < memory leak ! // gzheng
3168   return 0;
3169 }
3170
3171 /***** MPI_Scan algorithm (from MPICH) *******
3172   recvbuf = sendbuf;
3173   partial_scan = sendbuf;
3174   mask = 0x1;
3175   while (mask < size) {
3176   dst = rank^mask;
3177   if (dst < size) {
3178   send partial_scan to dst;
3179   recv from dst into tmp_buf;
3180   if (rank > dst) {
3181   partial_scan = tmp_buf + partial_scan;
3182   recvbuf = tmp_buf + recvbuf;
3183   }
3184   else {
3185   if (op is commutative)
3186   partial_scan = tmp_buf + partial_scan;
3187   else {
3188   tmp_buf = partial_scan + tmp_buf;
3189   partial_scan = tmp_buf;
3190   }
3191   }
3192   }
3193   mask <<= 1;
3194   }
3195  ***** MPI_Scan algorithm (from MPICH) *******/
3196
3197 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
3198   (op)(invec,inoutvec,&count,&datatype);
3199 }
3200 CDECL
3201 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
3202   AMPIAPI("AMPI_Scan");
3203
3204 #ifndef CMK_OPTIMIZE
3205   int ret;
3206   ret = errorCheck(comm, 1, count, 1, datatype, 1, 0, 0, 0, 0, sendbuf, 1);
3207   if(ret != MPI_SUCCESS)
3208     return ret;
3209 #endif
3210
3211   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
3212   MPI_Status sts;
3213   ampi *ptr = getAmpiInstance(comm);
3214   int size = ptr->getSize(comm);
3215   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
3216   int rank = ptr->getRank(comm);
3217   int mask = 0x1;
3218   int dst;
3219   void* tmp_buf = malloc(blklen);
3220   void* partial_scan = malloc(blklen);
3221
3222   memcpy(recvbuf, sendbuf, blklen);
3223   memcpy(partial_scan, sendbuf, blklen);
3224   while(mask < size){
3225     dst = rank^mask;
3226     if(dst < size){
3227       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
3228           tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
3229       if(rank > dst){
3230         (op)(tmp_buf,partial_scan,&count,&datatype);
3231         (op)(tmp_buf,recvbuf,&count,&datatype);
3232       }else {
3233         (op)(partial_scan,tmp_buf,&count,&datatype);
3234         memcpy(partial_scan,tmp_buf,blklen);
3235       }
3236     }
3237     mask <<= 1;
3238
3239   }
3240
3241   free(tmp_buf);
3242   free(partial_scan);
3243 #if AMPI_COUNTER
3244   getAmpiParent()->counters.scan++;
3245 #endif
3246   return 0;
3247 }
3248
3249 CDECL
3250 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
3251   //AMPIAPI("AMPI_Op_create");
3252   *op = function;
3253   return 0;
3254 }
3255
3256 CDECL
3257 int AMPI_Op_free(MPI_Op *op){
3258   //AMPIAPI("AMPI_Op_free");
3259   *op = MPI_OP_NULL;
3260   return 0;
3261 }
3262
3263
3264   CDECL
3265 double AMPI_Wtime(void)
3266 {
3267   //  AMPIAPI("AMPI_Wtime");
3268
3269 #if AMPIMSGLOG
3270   double ret=TCHARM_Wall_timer();
3271   ampiParent* pptr = getAmpiParent();
3272   if(msgLogRead){
3273     (*(pptr->fromPUPer))|ret;
3274     return ret;
3275   }
3276
3277   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3278     (*(pptr->toPUPer))|ret;
3279   }
3280 #endif
3281
3282 #if CMK_BLUEGENE_CHARM
3283   return BgGetTime();
3284 #else
3285   return TCHARM_Wall_timer();
3286 #endif
3287 }
3288
3289 CDECL
3290 double AMPI_Wtick(void){
3291   //AMPIAPI("AMPI_Wtick");
3292   return 1e-6;
3293 }
3294
3295
3296 int PersReq::start(){
3297   if(sndrcv == 1 || sndrcv == 3) { // send or ssend request
3298     ampi *ptr=getAmpiInstance(comm);
3299     ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm, sndrcv==3?1:0);
3300   }
3301   return 0;
3302 }
3303
3304   CDECL
3305 int AMPI_Start(MPI_Request *request)
3306 {
3307   AMPIAPI("AMPI_Start");
3308   checkRequest(*request);
3309   AmpiRequestList *reqs = getReqs();
3310   if(-1==(*reqs)[*request]->start()) {
3311     CkAbort("MPI_Start could be used only on persistent communication requests!");
3312   }
3313   return 0;
3314 }
3315
3316 CDECL
3317 int AMPI_Startall(int count, MPI_Request *requests){
3318   AMPIAPI("AMPI_Startall");
3319   checkRequests(count,requests);
3320   AmpiRequestList *reqs = getReqs();
3321   for(int i=0;i<count;i++){
3322     if(-1==(*reqs)[requests[i]]->start())
3323       CkAbort("MPI_Start could be used only on persistent communication requests!");
3324   }
3325   return 0;
3326 }
3327
3328 /* organize the indices of requests into a vector of a vector: 
3329  * level 1 is different msg envelope matches
3330  * level 2 is (posting) ordered requests of with envelope
3331  * each time multiple completion call loop over first elem of level 1
3332  * and move the matched to the NULL request slot.   
3333  * warning: this does not work with I-Alltoall requests */
3334 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
3335   for(int i=0;i<count;i++){
3336     if(reqs[i]!=MPI_REQUEST_NULL)
3337       return 0;
3338   }
3339   return 1;
3340 }
3341 inline int matchReq(MPI_Request ia, MPI_Request ib){
3342   checkRequest(ia);  
3343   checkRequest(ib);
3344   AmpiRequestList* reqs = getReqs();
3345   AmpiRequest *a, *b;
3346   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
3347   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
3348   a=(*reqs)[ia];  b=(*reqs)[ib];
3349   if(a->tag != b->tag) return 0;
3350   if(a->src != b->src) return 0;
3351   if(a->comm != b->comm) return 0;
3352   return 1;
3353 }
3354 inline void swapInt(int& a,int& b){
3355   int tmp;
3356   tmp=a; a=b; b=tmp;
3357 }
3358 inline void sortedIndex(int n, int* arr, int* idx){
3359   int i,j;
3360   for(i=0;i<n;i++) 
3361     idx[i]=i;
3362   for (i=0; i<n-1; i++) 
3363     for (j=0; j<n-1-i; j++)
3364       if (arr[idx[j+1]] < arr[idx[j]]) 
3365         swapInt(idx[j+1],idx[j]);
3366 }
3367 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
3368   CkAssert(count!=0);
3369   int *newidx = new int [count];
3370   int flag;
3371   sortedIndex(count,arr,newidx);
3372   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
3373   CkVec<int> slot;
3374   vec->push_back(slot);
3375   (*vec)[0].push_back(newidx[0]);
3376   for(int i=1;i<count;i++){
3377     flag=0;
3378     for(int j=0;j<vec->size();j++){
3379       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
3380         ((*vec)[j]).push_back(newidx[i]);
3381         flag++;
3382       }
3383     }
3384     if(!flag){
3385       CkVec<int> newslot;
3386       newslot.push_back(newidx[i]);
3387       vec->push_back(newslot);
3388     }else{
3389       CkAssert(flag==1);
3390     }
3391   }
3392   delete [] newidx;
3393   return vec;
3394 }
3395 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
3396   printf("vec content: ");
3397   for(int i=0;i<vec.size();i++){
3398     printf("{");
3399     for(int j=0;j<(vec[i]).size();j++){
3400       printf(" %d ",arr[(vec[i])[j]]);
3401     }
3402     printf("} ");
3403   }
3404   printf("\n");
3405 }
3406
3407 int PersReq::wait(MPI_Status *sts){
3408   if(sndrcv == 2) {
3409     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3410       CkAbort("AMPI> Error in persistent request wait");
3411 #if CMK_BLUEGENE_CHARM
3412     _TRACE_BG_TLINE_END(&event);
3413 #endif
3414   }
3415   return 0;
3416 }
3417
3418 int IReq::wait(MPI_Status *sts){
3419   if(CpvAccess(CmiPICMethod) == 2) {
3420     AMPI_DEBUG("In weird clause of IReq::wait\n");
3421     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3422       CkAbort("AMPI> Error in non-blocking request wait");
3423
3424     return 0;
3425   }
3426
3427   //Copy "this" to a local variable in the case that "this" pointer
3428   //is updated during the out-of-core emulation.
3429
3430   // optimization for Irecv
3431   // generic() writes directly to the buffer, so the only thing we
3432   // do here is to wait
3433   ampi *ptr = getAmpiInstance(comm);
3434
3435   //BIGSIM_OOC DEBUGGING
3436   //int ooccnt=0;
3437   //int ampiIndex = ptr->thisIndex;
3438   //CmiPrintf("%d: IReq's status=%d\n", ampiIndex, statusIreq);
3439
3440   while (statusIreq == false) {
3441     //BIGSIM_OOC DEBUGGING
3442     //CmiPrintf("Before blocking: %dth time: %d: in Ireq::wait\n", ++ooccnt, ptr->thisIndex);
3443     //print();
3444
3445     ptr->resumeOnRecv=true;
3446     ptr->block();
3447
3448     //BIGSIM_OOC DEBUGGING
3449     //CmiPrintf("[%d] After blocking: in Ireq::wait\n", ptr->thisIndex);
3450     //CmiPrintf("IReq's this pointer: %p\n", this);
3451     //print();
3452
3453 #if CMK_BLUEGENE_CHARM
3454     //Because of the out-of-core emulation, this pointer is changed after in-out
3455     //memory operation. So we need to return from this function and do the while loop
3456     //in the outer function call.       
3457     if(_BgInOutOfCoreMode)
3458       return -1;
3459 #endif  
3460   }   // end of while
3461   ptr->resumeOnRecv=false;
3462
3463   AMPI_DEBUG("IReq::wait has resumed\n");
3464
3465   if(sts) {
3466     AMPI_DEBUG("Setting sts->MPI_TAG to this->tag=%d in IReq::wait  this=%p\n", (int)this->tag, this);
3467     sts->MPI_TAG = tag;
3468     sts->MPI_SOURCE = src;
3469     sts->MPI_COMM = comm;
3470     sts->MPI_LENGTH = length;
3471   }
3472
3473   return 0;
3474 }
3475
3476 int ATAReq::wait(MPI_Status *sts){
3477   int i;
3478   for(i=0;i<count;i++){
3479     if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3480           myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
3481       CkAbort("AMPI> Error in alltoall request wait");
3482 #if CMK_BLUEGENE_CHARM
3483     _TRACE_BG_TLINE_END(&myreqs[i].event);
3484 #endif
3485   }
3486 #if CMK_BLUEGENE_CHARM
3487   //TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", NULL, 0);
3488   TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0, 1);
3489   for (i=0; i<count; i++)
3490     _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
3491   _TRACE_BG_TLINE_END(&event);
3492 #endif
3493   return 0;
3494 }
3495
3496 int SReq::wait(MPI_Status *sts){
3497   ampi *ptr = getAmpiInstance(comm);
3498   while (statusIreq == false) {
3499     ptr->resumeOnRecv = true;
3500     ptr->block();
3501     ptr = getAmpiInstance(comm);
3502     ptr->resumeOnRecv = false;
3503   }
3504   return 0;
3505 }
3506
3507   CDECL
3508 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
3509 {
3510   AMPIAPI("AMPI_Wait");
3511   if(*request == MPI_REQUEST_NULL){
3512     stsempty(*sts);
3513     return 0;
3514   }
3515   checkRequest(*request);
3516   AmpiRequestList* reqs = getReqs();
3517
3518 #if AMPIMSGLOG
3519   ampiParent* pptr = getAmpiParent();
3520   if(msgLogRead){
3521     (*(pptr->fromPUPer))|(pptr->pupBytes);
3522     PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3523     PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
3524     return 0;
3525   }
3526 #endif
3527
3528   AMPI_DEBUG("AMPI_Wait request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3529   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3530   //(*reqs)[*request]->wait(sts);
3531   int waitResult = -1;
3532   do{
3533     AmpiRequest *waitReq = (*reqs)[*request];
3534     waitResult = waitReq->wait(sts);
3535     if(_BgInOutOfCoreMode){
3536       reqs = getReqs();
3537     }
3538   }while(waitResult==-1);
3539
3540
3541   AMPI_DEBUG("AMPI_Wait after calling wait, request=%d (*reqs)[*request]=%p (*reqs)[*request]->tag=%d\n", *request, (*reqs)[*request], (int)((*reqs)[*request]->tag) );
3542
3543
3544 #if AMPIMSGLOG
3545   if(msgLogWrite && record_msglog(pptr->thisIndex)){
3546     (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
3547     (*(pptr->toPUPer))|(pptr->pupBytes);
3548     PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
3549     PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
3550   }
3551 #endif
3552
3553   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3554     reqs->free(*request);
3555     *request = MPI_REQUEST_NULL;
3556   }
3557
3558   AMPI_DEBUG("End of AMPI_Wait\n");
3559
3560   return 0;
3561 }
3562
3563   CDECL
3564 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
3565 {
3566   AMPIAPI("AMPI_Waitall");
3567   if(count==0) return MPI_SUCCESS;
3568   checkRequests(count,request);
3569   int i,j,oldPe;
3570   AmpiRequestList* reqs = getReqs();
3571   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3572
3573 #if AMPIMSGLOG
3574   ampiParent* pptr = getAmpiParent();
3575   if(msgLogRead){
3576     for(i=0;i<reqvec->size();i++){
3577       for(j=0;j<((*reqvec)[i]).size();j++){
3578         if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3579           stsempty(sts[((*reqvec)[i])[j]]);
3580           continue;
3581         }
3582         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3583         (*(pptr->fromPUPer))|(pptr->pupBytes);
3584         PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3585         PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3586       }
3587     }
3588     return 0;
3589   }
3590 #endif
3591
3592 #if CMK_BLUEGENE_CHARM
3593   void *curLog;         // store current log in timeline
3594   _TRACE_BG_TLINE_END(&curLog);
3595 #if 0
3596   TRACE_BG_AMPI_SUSPEND();
3597 #endif
3598 #endif
3599   for(i=0;i<reqvec->size();i++){
3600     for(j=0;j<((*reqvec)[i]).size();j++){
3601       //CkPrintf("[%d] in loop [%d, %d]\n", pptr->thisIndex,i, j);
3602       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
3603         stsempty(sts[((*reqvec)[i])[j]]);
3604         continue;
3605       }
3606       oldPe = CkMyPe();
3607
3608       int waitResult = -1;
3609       do{       
3610         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
3611         waitResult = waitReq->wait(&sts[((*reqvec)[i])[j]]);
3612         if(_BgInOutOfCoreMode){
3613           reqs = getReqs();
3614           reqvec = vecIndex(count, request);
3615         }
3616
3617 #if AMPIMSGLOG
3618         if(msgLogWrite && record_msglog(pptr->thisIndex)){
3619           (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
3620           (*(pptr->toPUPer))|(pptr->pupBytes);
3621           PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
3622           PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
3623         }
3624 #endif
3625
3626       }while(waitResult==-1);
3627
3628 #if 1
3629 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3630       //for fault evacuation
3631       if(oldPe != CkMyPe()){
3632 #endif
3633         reqs = getReqs();
3634         reqvec  = vecIndex(count,request);
3635 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
3636       }
3637 #endif
3638 #endif
3639     }
3640   }
3641 #if CMK_BLUEGENE_CHARM
3642   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
3643 #endif
3644   // free memory of requests
3645   for(i=0;i<count;i++){ 
3646     if(request[i] == MPI_REQUEST_NULL)
3647       continue;
3648     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
3649       reqs->free(request[i]);
3650       request[i] = MPI_REQUEST_NULL;
3651     }
3652   }
3653   delete reqvec;
3654   return 0;
3655 }
3656
3657   CDECL
3658 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
3659 {
3660   AMPIAPI("AMPI_Waitany");
3661
3662   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
3663   if(count == 0) return MPI_SUCCESS;
3664   checkRequests(count,request);
3665   if(areInactiveReqs(count,request)){
3666     *idx=MPI_UNDEFINED;
3667     stsempty(*sts);
3668     return MPI_SUCCESS;
3669   }
3670   int flag=0;
3671   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3672   while(count>0){ /* keep looping until some request finishes: */
3673     for(int i=0;i<reqvec->size();i++){
3674       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
3675       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
3676         *idx = ((*reqvec)[i])[0];
3677         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
3678         return 0;
3679       }
3680     }
3681     /* no requests have finished yet-- schedule and try again */
3682     AMPI_Yield(MPI_COMM_WORLD);
3683   }
3684   *idx = MPI_UNDEFINED;
3685   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
3686   delete reqvec;
3687   return 0;
3688 }
3689
3690   CDECL
3691 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
3692     int *array_of_indices, MPI_Status *array_of_statuses)
3693 {
3694   AMPIAPI("AMPI_Waitsome");
3695   checkRequests(incount,array_of_requests);
3696   if(areInactiveReqs(incount,array_of_requests)){
3697     *outcount=MPI_UNDEFINED;
3698     return MPI_SUCCESS;
3699   }
3700   MPI_Status sts;
3701   int i;
3702   int flag=0, realflag=0;
3703   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3704   *outcount = 0;
3705   while(1){
3706     for(i=0;i<reqvec->size();i++){
3707       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3708       if(flag == 1){ 
3709         array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3710         array_of_statuses[(*outcount)++]=sts;
3711         if(sts.MPI_COMM != 0)
3712           realflag=1; // there is real(non null) request
3713       }
3714     }
3715     if(realflag && outcount>0) break;
3716   }
3717   delete reqvec;
3718   return 0;
3719 }
3720
3721   CmiBool PersReq::test(MPI_Status *sts){
3722     if(sndrcv == 2)     // recv request
3723       return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3724     else                        // send request
3725       return 1;
3726
3727   }
3728   void PersReq::complete(MPI_Status *sts){
3729     if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3730       CkAbort("AMPI> Error in persistent request complete");
3731   }
3732
3733 CmiBool IReq::test(MPI_Status *sts){
3734   if (statusIreq == true) {           
3735     if(sts)
3736       sts->MPI_LENGTH = length;           
3737     return true;
3738   }
3739   else {
3740     getAmpiInstance(comm)->yield();
3741     return false;
3742   }
3743   /*
3744      return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3745    */
3746 }
3747
3748 CmiBool SReq::test(MPI_Status *sts){
3749   if (statusIreq == true) {
3750     return true;
3751   }
3752   else {
3753     getAmpiInstance(comm)->yield();
3754     return false;
3755   }
3756 }
3757
3758 void IReq::complete(MPI_Status *sts){
3759   wait(sts);
3760   /*
3761      if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3762      CkAbort("AMPI> Error in non-blocking request complete");
3763    */
3764 }
3765
3766 void SReq::complete(MPI_Status *sts){
3767   wait(sts);
3768 }
3769
3770 void IReq::receive(ampi *ptr, AmpiMsg *msg)
3771 {
3772   int sts = ptr->processMessage(msg, tag, src, buf, count, type);
3773   statusIreq = (sts == 0);
3774   length = msg->length;
3775   this->tag = msg->tag; // Although not required, we also extract tag from msg
3776   src = msg->srcRank; // Although not required, we also extract src from msg
3777   comm = msg->comm;
3778   AMPI_DEBUG("Setting this->tag to %d in IReq::receive this=%p\n", (int)this->tag, this);
3779 #if CMK_BLUEGENE_CHARM
3780   event = msg->event; 
3781 #endif
3782   delete msg;
3783
3784   //BIGSIM_OOC DEBUGGING
3785   //CmiPrintf("In IReq::receive, this=%p ", this);
3786   //print();
3787 }
3788
3789 CmiBool ATAReq::test(MPI_Status *sts){
3790   int i, flag=1;
3791   for(i=0;i<count;i++){
3792     flag *= getAmpiInstance(myreqs[i].comm)->iprobe(myreqs[i].tag, myreqs[i].src,
3793         myreqs[i].comm, (int*) sts);
3794   }
3795   return flag;
3796 }
3797 void ATAReq::complete(MPI_Status *sts){
3798   int i;
3799   for(i=0;i<count;i++){
3800     if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3801           myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int*)sts))
3802       CkAbort("AMPI> Error in alltoall request complete");
3803   }
3804 }
3805
3806   CDECL
3807 int AMPI_Test(MPI_Request *request, int *flag, MPI_Status *sts)
3808 {
3809   AMPIAPI("AMPI_Test");
3810   if(*request==MPI_REQUEST_NULL) {
3811     *flag = 1;
3812     stsempty(*sts);
3813     return 0;
3814   }
3815   checkRequest(*request);
3816   AmpiRequestList* reqs = getReqs();
3817   if(1 == (*flag = (*reqs)[*request]->test(sts))){
3818     (*reqs)[*request]->complete(sts);
3819     if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3820       reqs->free(*request);
3821       *request = MPI_REQUEST_NULL;
3822     }
3823   }
3824   return 0;
3825 }
3826
3827 CDECL
3828 int AMPI_Testany(int count, MPI_Request *request, int *index, int *flag, MPI_Status *sts){
3829   AMPIAPI("AMPI_Testany");
3830   checkRequests(count,request);
3831   if(areInactiveReqs(count,request)){
3832     *flag=1;
3833     *index=MPI_UNDEFINED;
3834     stsempty(*sts);
3835     return MPI_SUCCESS;
3836   }
3837   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3838   *flag=0;
3839   for(int i=0;i<reqvec->size();i++){
3840     AMPI_Test(&request[((*reqvec)[i])[0]], flag, sts);
3841     if(*flag==1 && sts->MPI_COMM!=0){ // skip MPI_REQUEST_NULL
3842       *index = ((*reqvec)[i])[0];
3843       return 0;
3844     }
3845   }
3846   *index = MPI_UNDEFINED;
3847   delete reqvec;
3848   return 0;
3849 }
3850
3851   CDECL
3852 int AMPI_Testall(int count, MPI_Request *request, int *flag, MPI_Status *sts)
3853 {
3854   AMPIAPI("AMPI_Testall");
3855   if(count==0) return MPI_SUCCESS;
3856   checkRequests(count,request);
3857   int tmpflag;
3858   int i,j;
3859   AmpiRequestList* reqs = getReqs();
3860   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3861   *flag = 1;  
3862   for(i=0;i<reqvec->size();i++){
3863     for(j=0;j<((*reqvec)[i]).size();j++){
3864       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL)
3865         continue;
3866       tmpflag = (*reqs)[request[((*reqvec)[i])[j]]]->test(&sts[((*reqvec)[i])[j]]);
3867       *flag *= tmpflag;
3868     }
3869   }
3870   if(flag) 
3871     MPI_Waitall(count,request,sts);
3872   delete reqvec;        
3873   return 0;
3874 }
3875
3876   CDECL
3877 int AMPI_Testsome(int incount, MPI_Request *array_of_requests, int *outcount,
3878     int *array_of_indices, MPI_Status *array_of_statuses)
3879 {
3880   AMPIAPI("AMPI_Testsome");
3881   checkRequests(incount,array_of_requests);
3882   if(areInactiveReqs(incount,array_of_requests)){
3883     *outcount=MPI_UNDEFINED;
3884     return MPI_SUCCESS;
3885   }
3886   MPI_Status sts;
3887   int flag;
3888   int i;
3889   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3890   *outcount = 0;
3891   for(i=0;i<reqvec->size();i++){
3892     AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3893     if(flag == 1){
3894       array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3895       array_of_statuses[(*outcount)++]=sts;
3896     }
3897   }
3898   delete reqvec;
3899   return 0;
3900 }
3901
3902 CDECL
3903 int AMPI_Request_free(MPI_Request *request){
3904   AMPIAPI("AMPI_Request_free");
3905   if(*request==MPI_REQUEST_NULL) return 0;
3906   checkRequest(*request);
3907   AmpiRequestList* reqs = getReqs();
3908   reqs->free(*request);
3909   *request = MPI_REQUEST_NULL;
3910   return 0;
3911 }
3912
3913 CDECL
3914 int AMPI_Cancel(MPI_Request *request){
3915   AMPIAPI("AMPI_Cancel");
3916   return AMPI_Request_free(request);
3917 }
3918
3919 CDECL
3920 int AMPI_Test_cancelled(MPI_Status* status, int* flag) {
3921   /* FIXME: always returns success */
3922   *flag = 1;
3923   return 0;
3924 }
3925
3926   CDECL
3927 int AMPI_Recv_init(void *buf, int count, int type, int src, int tag,
3928     MPI_Comm comm, MPI_Request *req)
3929 {
3930   AMPIAPI("AMPI_Recv_init");
3931
3932 #ifndef CMK_OPTIMIZE
3933   int ret;
3934   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, buf, 1);
3935   if(ret != MPI_SUCCESS)
3936   {
3937     *req = MPI_REQUEST_NULL;
3938     return ret;
3939   }
3940 #endif
3941
3942   AmpiRequestList* reqs = getReqs();
3943   PersReq *newreq = new PersReq(buf,count,type,src,tag,comm,2);
3944   *req = reqs->insert(newreq);
3945   return 0;
3946 }
3947
3948   CDECL
3949 int AMPI_Send_init(void *buf, int count, int type, int dest, int tag,
3950     MPI_Comm comm, MPI_Request *req)
3951 {
3952   AMPIAPI("AMPI_Send_init");
3953
3954 #ifndef CMK_OPTIMIZE
3955   int ret;
3956   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
3957   if(ret != MPI_SUCCESS)
3958   {
3959     *req = MPI_REQUEST_NULL;
3960     return ret;
3961   }
3962 #endif
3963
3964   AmpiRequestList* reqs = getReqs();
3965   PersReq *newreq = new PersReq(buf,count,type,dest,tag,comm,1);
3966   *req = reqs->insert(newreq);
3967   return 0;
3968 }
3969
3970   CDECL
3971 int AMPI_Ssend_init(void *buf, int count, int type, int dest, int tag,
3972     MPI_Comm comm, MPI_Request *req)
3973 {
3974   AMPIAPI("AMPI_Ssend_init");
3975
3976 #ifndef CMK_OPTIMIZE
3977   int ret;
3978   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
3979   if(ret != MPI_SUCCESS)
3980   {
3981     *req = MPI_REQUEST_NULL;  
3982     return ret;
3983   }
3984 #endif
3985
3986   AmpiRequestList* reqs = getReqs();
3987   PersReq *newreq = new PersReq(buf,count,type,dest,tag,comm,3);
3988   *req = reqs->insert(newreq);
3989   return 0;
3990 }
3991
3992   CDECL
3993 int AMPI_Type_contiguous(int count, MPI_Datatype oldtype,
3994     MPI_Datatype *newtype)
3995 {
3996   AMPIAPI("AMPI_Type_contiguous");
3997   getDDT()->newContiguous(count, oldtype, newtype);
3998   return 0;
3999 }
4000
4001   CDECL
4002 int AMPI_Type_vector(int count, int blocklength, int stride,
4003     MPI_Datatype oldtype, MPI_Datatype*  newtype)
4004 {
4005   AMPIAPI("AMPI_Type_vector");
4006   getDDT()->newVector(count, blocklength, stride, oldtype, newtype);
4007   return 0 ;
4008 }
4009
4010   CDECL
4011 int AMPI_Type_hvector(int count, int blocklength, MPI_Aint stride, 
4012     MPI_Datatype oldtype, MPI_Datatype*  newtype)
4013 {
4014   AMPIAPI("AMPI_Type_hvector");
4015   getDDT()->newHVector(count, blocklength, stride, oldtype, newtype);
4016   return 0 ;
4017 }
4018
4019   CDECL
4020 int AMPI_Type_indexed(int count, int* arrBlength, int* arrDisp, 
4021     MPI_Datatype oldtype, MPI_Datatype*  newtype)
4022 {
4023   AMPIAPI("AMPI_Type_indexed");
4024   getDDT()->newIndexed(count, arrBlength, arrDisp, oldtype, newtype);
4025   return 0 ;
4026 }
4027
4028   CDECL
4029 int AMPI_Type_hindexed(int count, int* arrBlength, MPI_Aint* arrDisp,
4030     MPI_Datatype oldtype, MPI_Datatype*  newtype)
4031 {
4032   AMPIAPI("AMPI_Type_hindexed");
4033   getDDT()->newHIndexed(count, arrBlength, arrDisp, oldtype, newtype);
4034   return 0 ;
4035 }
4036
4037   CDECL
4038 int AMPI_Type_struct(int count, int* arrBlength, int* arrDisp, 
4039     MPI_Datatype* oldtype, MPI_Datatype*  newtype)
4040 {
4041   AMPIAPI("AMPI_Type_struct");
4042   getDDT()->newStruct(count, arrBlength, arrDisp, oldtype, newtype);
4043   return 0 ;
4044 }
4045
4046   CDECL
4047 int AMPI_Type_commit(MPI_Datatype *datatype)
4048 {
4049   AMPIAPI("AMPI_Type_commit");
4050   return 0;
4051 }
4052
4053   CDECL
4054 int AMPI_Type_free(MPI_Datatype *datatype)
4055 {
4056   AMPIAPI("AMPI_Type_free");
4057   getDDT()->freeType(datatype);
4058   return 0;
4059 }
4060
4061
4062   CDECL
4063 int AMPI_Type_extent(MPI_Datatype datatype, MPI_Aint *extent)
4064 {
4065   AMPIAPI("AMPI_Type_extent");
4066   *extent = getDDT()->getExtent(datatype);
4067   return 0;
4068 }
4069
4070   CDECL
4071 int AMPI_Type_size(MPI_Datatype datatype, int *size)
4072 {
4073   AMPIAPI("AMPI_Type_size");
4074   *size=getDDT()->getSize(datatype);
4075   return 0;
4076 }
4077
4078   CDECL
4079 int AMPI_Isend(void *buf, int count, MPI_Datatype type, int dest,
4080     int tag, MPI_Comm comm, MPI_Request *request)
4081 {
4082   AMPIAPI("AMPI_Isend");
4083
4084 #ifndef CMK_OPTIMIZE
4085   int ret;
4086   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, dest, 1, buf, 1);
4087   if(ret != MPI_SUCCESS)
4088   {
4089     *request = MPI_REQUEST_NULL;
4090     return ret;
4091   }
4092 #endif
4093
4094 #if AMPIMSGLOG
4095   ampiParent* pptr = getAmpiParent();
4096   if(msgLogRead){
4097     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
4098     return 0;
4099   }
4100 #endif
4101
4102   USER_CALL_DEBUG("AMPI_Isend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
4103   ampi *ptr = getAmpiInstance(comm);
4104 #if AMPI_COMLIB
4105   if(enableStreaming){
4106     //    ptr->getStreaming().beginIteration();
4107     ptr->comlibsend(tag,ptr->getRank(comm),buf,count,type,dest,comm);
4108   } else
4109 #endif
4110     ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm);
4111   *request = MPI_REQUEST_NULL;
4112 #if AMPI_COUNTER
4113   getAmpiParent()->counters.isend++;
4114 #endif
4115
4116 #if AMPIMSGLOG
4117   if(msgLogWrite && record_msglog(pptr->thisIndex)){
4118     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
4119   }
4120 #endif
4121
4122   return 0;
4123 }
4124
4125   CDECL
4126 int AMPI_Irecv(void *buf, int count, MPI_Datatype type, int src,
4127     int tag, MPI_Comm comm, MPI_Request *request)
4128 {
4129   AMPIAPI("AMPI_Irecv");
4130
4131 #ifndef CMK_OPTIMIZE
4132   int ret;
4133   ret = errorCheck(comm, 1, count, 1, type, 1, tag, 1, src, 1, buf, 1);
4134   if(ret != MPI_SUCCESS)
4135     return ret;
4136 #endif
4137
4138   if(src==MPI_PROC_NULL) { *request = MPI_REQUEST_NULL; return 0; }
4139   USER_CALL_DEBUG("AMPI_Irecv("<<type<<","<<src<<","<<tag<<","<<comm<<")");
4140   AmpiRequestList* reqs = getReqs();
4141   IReq *newreq = new IReq(buf,count,type,src,tag,comm);
4142   *request = reqs->insert(newreq);
4143
4144 #if AMPIMSGLOG
4145   ampiParent* pptr = getAmpiParent();
4146   if(msgLogRead){
4147     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
4148     return 0;
4149   }
4150 #endif
4151
4152   // if msg is here, shall we do receive right away ??
4153   // posted irecv
4154   // message already arraved?
4155   ampi *ptr = getAmpiInstance(comm);
4156   AmpiMsg *msg = NULL;
4157   if (CpvAccess(CmiPICMethod) != 2)       // not copyglobals 
4158   {
4159     msg = ptr->getMessage(tag, src, comm, &newreq->tag);
4160   }
4161   if (msg) {
4162     newreq->receive(ptr, msg);
4163   } else {
4164     // post receive
4165     int tags[3];
4166     tags[0] = tag; tags[1] = src; tags[2] = comm;
4167 #if 0
4168     CmmPut(ptr->posted_ireqs, 3, tags, newreq);
4169 #else    
4170     //just insert the index of the newreq in the ampiParent::ampiReqs
4171     //to posted_ireqs. Such change is due to the need for Out-of-core Emulation
4172     //in BigSim. Before this change, posted_ireqs and ampiReqs both hold pointers to
4173     //AmpiRequest instances. After going through the Pupping routines, both will have
4174     //pointers to different AmpiRequest instances and no longer refer to the same AmpiRequest
4175     //instance. Therefore, to keep both always accessing the same AmpiRequest instance,
4176     //posted_ireqs stores the index (an integer) to ampiReqs. 
4177     //The index is 1-based rather 0-based because when pulling entries from posted_ireqs,
4178     //if not found, a "0"(i.e. NULL) is returned, this confuses the indexing of ampiReqs. 
4179     CmmPut(ptr->posted_ireqs, 3, tags, (void *)((*request)+1));
4180 #endif
4181   }
4182
4183 #if AMPI_COUNTER
4184   getAmpiParent()->counters.irecv++;
4185 #endif
4186
4187 #if AMPIMSGLOG
4188   if(msgLogWrite && record_msglog(pptr->thisIndex)){
4189     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
4190   }
4191 #endif
4192
4193   return 0;
4194 }
4195
4196   CDECL
4197 int AMPI_Ireduce(void *sendbuf, void *recvbuf, int count, int type, MPI_Op op,
4198     int root, MPI_Comm comm, MPI_Request *request)
4199 {
4200   AMPIAPI("AMPI_Ireduce");
4201
4202 #ifndef CMK_OPTIMIZE
4203   int ret;
4204   ret = errorCheck(comm, 1, count, 1, type, 1, 0, 0, root, 1, sendbuf, 1);
4205   if(ret != MPI_SUCCESS)
4206   {
4207     *request = MPI_REQUEST_NULL;
4208     return ret;
4209   }
4210 #endif
4211
4212   if(op == MPI_OP_NULL) CkAbort("MPI_Ireduce called with MPI_OP_NULL!!!");
4213   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,sendbuf,recvbuf);
4214   ampi *ptr = getAmpiInstance(comm);
4215   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),sendbuf,count,type,op);
4216   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
4217   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
4218   msg->setCallback(reduceCB);
4219   ptr->contribute(msg);
4220
4221   if (ptr->thisIndex == rootIdx){
4222     // using irecv instead recv to non-block the call and get request pointer
4223     AmpiRequestList* reqs = getReqs();
4224     IReq *newreq = new IReq(recvbuf,count,type,0,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
4225     *request = reqs->insert(newreq);
4226   }
4227   return 0;
4228 }
4229
4230   CDECL
4231 int AMPI_Allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
4232     void *recvbuf, int recvcount, MPI_Datatype recvtype,
4233     MPI_Comm comm)
4234 {
4235   AMPIAPI("AMPI_Allgather");
4236
4237 #ifndef CMK_OPTIMIZE
4238   int ret;
4239   ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
4240   if(ret != MPI_SUCCESS)
4241     return ret;
4242   ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
4243   if(ret != MPI_SUCCESS)
4244     return ret;
4245 #endif
4246
4247   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgather not allowed for Inter-communicator!");
4248   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
4249
4250   ampi *ptr = getAmpiInstance(comm);
4251   int size = ptr->getSize(comm);
4252   int i;
4253
4254 #if AMPI_COMLIB  
4255
4256   if(comm == MPI_COMM_WORLD) {
4257     // commlib support
4258
4259     for(i=0;i<size;i++) {
4260       ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4261           sendtype, i, comm, ptr->getComlibProxy());
4262     }
4263     ptr->getAllgatherStrategy()->doneInserting();
4264
4265   } else 
4266 #endif
4267
4268     for(i=0;i<size;i++) {
4269       ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4270           sendtype, i, comm);
4271     }
4272
4273
4274   MPI_Status status;
4275   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
4276   int itemsize = dttype->getSize(recvcount) ;
4277
4278   for(i=0;i<size;i++) {
4279     AMPI_Recv(((char*)recvbuf)+(itemsize*i), recvcount, recvtype,
4280         i, MPI_GATHER_TAG, comm, &status);
4281   }
4282 #if AMPI_COUNTER
4283   getAmpiParent()->counters.allgather++;
4284 #endif
4285   return 0;
4286 }
4287
4288   CDECL
4289 int AMPI_Iallgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
4290     void *recvbuf, int recvcount, MPI_Datatype recvtype,
4291     MPI_Comm comm, MPI_Request* request)
4292 {
4293   AMPIAPI("AMPI_Iallgather");
4294
4295 #ifndef CMK_OPTIMIZE
4296   int ret;
4297   ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
4298   if(ret != MPI_SUCCESS)
4299   {
4300     *request = MPI_REQUEST_NULL;
4301     return ret;
4302   }
4303   ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
4304   if(ret != MPI_SUCCESS)
4305   {
4306     *request = MPI_REQUEST_NULL;
4307     return ret;
4308   }
4309 #endif
4310
4311   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallgather not allowed for Inter-communicator!");
4312   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
4313
4314   ampi *ptr = getAmpiInstance(comm);
4315   int size = ptr->getSize(comm);
4316   int i;
4317 #if AMPI_COMLIB
4318   if(comm == MPI_COMM_WORLD) {
4319     // commlib support
4320     //      ptr->getAllgather().beginIteration();
4321     for(i=0;i<size;i++) {
4322       ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4323           sendtype, i, comm, ptr->getComlibProxy());
4324     }
4325     ptr->getAllgatherStrategy()->doneInserting();
4326   } else 
4327 #endif
4328     for(i=0;i<size;i++) {
4329       ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4330           sendtype, i, comm);
4331     }
4332
4333   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
4334   int itemsize = dttype->getSize(recvcount) ;
4335
4336   // copy+paste from MPI_Irecv
4337   AmpiRequestList* reqs = getReqs();
4338   ATAReq *newreq = new ATAReq(size);
4339   for(i=0;i<size;i++){
4340     if(newreq->addReq(((char*)recvbuf)+(itemsize*i),recvcount,recvtype,i,MPI_GATHER_TAG,comm)!=(i+1))
4341       CkAbort("MPI_Iallgather: Error adding requests into ATAReq!");
4342   }
4343   *request = reqs->insert(newreq);
4344   AMPI_DEBUG("MPI_Iallgather: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
4345
4346   return 0;
4347 }
4348
4349   CDECL
4350 int AMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
4351     void *recvbuf, int *recvcounts, int *displs,
4352     MPI_Datatype recvtype, MPI_Comm comm)
4353 {
4354   AMPIAPI("AMPI_Allgatherv");
4355
4356 #ifndef CMK_OPTIMIZE
4357   int ret;
4358   ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
4359   if(ret != MPI_SUCCESS)
4360     return ret;
4361 #endif
4362
4363   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgatherv not allowed for Inter-communicator!");
4364   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
4365
4366   ampi *ptr = getAmpiInstance(comm);
4367   int size = ptr->getSize(comm);
4368   int i;
4369 #if AMPI_COMLIB
4370   if(comm == MPI_COMM_WORLD) {
4371     // commlib support
4372     //      ptr->getAllgather().beginIteration();
4373     for(i=0;i<size;i++) {
4374       ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4375           sendtype, i, comm, ptr->getComlibProxy());
4376     }
4377     ptr->getAllgatherStrategy()->doneInserting();
4378   } else
4379 #endif 
4380     for(i=0;i<size;i++) {
4381       ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
4382           sendtype, i, comm);
4383     }
4384
4385   MPI_Status status;
4386   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
4387   int itemsize = dttype->getSize() ;
4388
4389   for(i=0;i<size;i++) {
4390     AMPI_Recv(((char*)recvbuf)+(itemsize*displs[i]), recvcounts[i], recvtype,
4391         i, MPI_GATHER_TAG, comm, &status);
4392   }
4393 #if AMPI_COUNTER
4394   getAmpiParent()->counters.allgather++;
4395 #endif
4396   return 0;
4397 }
4398
4399   CDECL
4400 int AMPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
4401     void *recvbuf, int recvcount, MPI_Datatype recvtype,
4402     int root, MPI_Comm comm)
4403 {
4404   AMPIAPI("AMPI_Gather");
4405
4406 #ifndef CMK_OPTIMIZE
4407   int ret;
4408   ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
4409   if(ret != MPI_SUCCESS)
4410     return ret;
4411   ret = errorCheck(comm, 1, recvcount, 1, recvtype, 1, 0, 0, 0, 0, recvbuf, 1);
4412   if(ret != MPI_SUCCESS)
4413     return ret;
4414 #endif
4415
4416   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
4417
4418 #if AMPIMSGLOG
4419   ampiParent* pptr = getAmpiParent();
4420   if(msgLogRead){
4421     (*(pptr->fromPUPer))|(pptr->pupBytes);
4422     PUParray(*(pptr->fromPUPer), (char *)recvbuf, (pptr->pupBytes));
4423     return 0;
4424   }
4425 #endif
4426
4427   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Gather not allowed for Inter-communicator!");
4428
4429   ampi *ptr = getAmpiInstance(comm);
4430   int size = ptr->getSize(comm);
4431   int i;
4432   AMPI_Send(sendbuf, sendcount, sendtype, root, MPI_GATHER_TAG, comm);
4433
4434   if(ptr->getRank(comm)==root) {
4435     MPI_Status status;
4436     CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
4437     int itemsize = dttype->getSize(recvcount) ;
4438
4439     for(i=0;i<size;i++) {
4440       if(-1==ptr->recv(MPI_GATHER_TAG, i, (void*)(((char*)recvbuf)+(itemsize*i)), recvcount, recvtype, comm, (int*)(&status)))
4441         CkAbort("AMPI> Error in MPI_Gather recv");
4442     }
4443   }
4444 #if AMPI_COUNTER
4445   getAmpiParent()->counters.gather++;
4446 #endif
4447
4448 #if AMPIMSGLOG
4449   if(msgLogWrite && record_msglog(pptr->thisIndex)){
4450     (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcount * size;
4451     (*(pptr->toPUPer))|(pptr->pupBytes);
4452     PUParray(*(pptr->toPUPer), (char *)recvbuf, (pptr->pupBytes));
4453   }
4454 #endif
4455
4456   return 0;
4457 }
4458
4459   CDECL
4460 int AMPI_Gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
4461     void *recvbuf, int *recvcounts, int *displs,
4462     MPI_Datatype recvtype, int root, MPI_Comm comm)
4463 {
4464   AMPIAPI("AMPI_Gatherv");
4465
4466 #ifndef CMK_OPTIMIZE
4467   int ret;
4468   ret = errorCheck(comm, 1, sendcount, 1, sendtype, 1, 0, 0, 0, 0, sendbuf, 1);
4469   if(ret != MPI_SUCCESS)
4470     return ret;
4471 #endif
4472
4473   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
4474
4475   int itemsize = getDDT()->getSize(recvtype);
4476
4477 #if AMPIMSGLOG
4478   ampiParent* pptr = getAmpiParent();
4479   if(msgLogRead){
4480     int commsize;
4481     (*(pptr->fromPUPer))|commsize;
4482     for(int i=0;i<commsize;i++){
4483       (*(pptr->fromPUPer))|(pptr->pupBytes);
4484       PUParray(*(pptr->fromPUPer), (char *)(((char*)recvbuf)+(itemsize*displs[i])), (pptr->pupBytes));
4485     }
4486     return 0;
4487   }
4488 #endif
4489
4490   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Gatherv not allowed for Inter-communicator!");
4491
4492   AMPI_Send(sendbuf, sendcount, sendtype, root, MPI_GATHER_TAG, comm);