fixed a bug in making assignment of a builtin attr.
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #define exit exit /*Supress definition of exit in ampi.h*/
9 #include "ampiimpl.h"
10 #include "tcharm.h"
11 #include "ampiEvents.h" /*** for trace generation for projector *****/
12 #include "ampiProjections.h"
13
14 #define CART_TOPOL 1
15 #define AMPI_PRINT_IDLE 0
16
17 /* change this define to "x" to trace all send/recv's */
18 #define MSG_ORDER_DEBUG(x) // x /* empty */
19 /* change this define to "x" to trace user calls */
20 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
21 #define STARTUP_DEBUG(x)  // ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
22
23 static CkDDT *getDDT(void) {
24   return getAmpiParent()->myDDT;
25 }
26
27 //------------- startup -------------
28 static mpi_comm_worlds mpi_worlds;
29
30 int mpi_nworlds; /*Accessed by ampif*/
31 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
32
33 /* ampiReducer: AMPI's generic reducer type 
34    MPI_Op is function pointer to MPI_User_function
35    so that it can be packed into AmpiOpHeader, shipped 
36    with the reduction message, and then plugged into 
37    the ampiReducer. 
38    One little trick is the ampi::recv which receives
39    the final reduction message will see additional
40    sizeof(AmpiOpHeader) bytes in the buffer before
41    any user data.                             */
42 class AmpiComplex { 
43 public: 
44         double re, im; 
45         void operator+=(const AmpiComplex &a) {
46                 re+=a.re;
47                 im+=a.im;
48         }
49         void operator*=(const AmpiComplex &a) {
50                 double nu_re=re*a.re-im*a.im;
51                 im=re*a.im+im*a.re;
52                 re=nu_re;
53         }
54         int operator>(const AmpiComplex &a) {
55                 CkAbort("Cannot compare complex numbers with MPI_MAX");
56                 return 0;
57         }
58         int operator<(const AmpiComplex &a) {
59                 CkAbort("Cannot compare complex numbers with MPI_MIN");
60                 return 0;
61         }
62 };
63 typedef struct { float val; int idx; } FloatInt;
64 typedef struct { double val; int idx; } DoubleInt;
65 typedef struct { long val; int idx; } LongInt;
66 typedef struct { int val; int idx; } IntInt;
67 typedef struct { short val; int idx; } ShortInt;
68 typedef struct { long double val; int idx; } LongdoubleInt;
69 typedef struct { float val; float idx; } FloatFloat;
70 typedef struct { double val; double idx; } DoubleDouble;
71
72
73 #define MPI_OP_SWITCH(OPNAME) \
74   int i; \
75   switch (*datatype) { \
76   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
77   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
78   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
79   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
80   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
81   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
82   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
83   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
84   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
85   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
86   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
87   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
88   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
89   default: \
90     ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
91     CmiAbort("Unsupported MPI datatype for MPI Op"); \
92   };\
93
94 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
95 #define MPI_OP_IMPL(type) \
96         if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
97   MPI_OP_SWITCH(MPI_MAX)
98 #undef MPI_OP_IMPL
99 }
100
101 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
102 #define MPI_OP_IMPL(type) \
103         if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
104   MPI_OP_SWITCH(MPI_MIN)
105 #undef MPI_OP_IMPL
106 }
107
108 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
109 #define MPI_OP_IMPL(type) \
110         ((type *)inoutvec)[i] += ((type *)invec)[i];
111   MPI_OP_SWITCH(MPI_SUM)
112 #undef MPI_OP_IMPL
113 }
114
115 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
116 #define MPI_OP_IMPL(type) \
117         ((type *)inoutvec)[i] *= ((type *)invec)[i];
118   MPI_OP_SWITCH(MPI_PROD)
119 #undef MPI_OP_IMPL
120 }
121
122 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
123   int i;  
124   switch (*datatype) {
125   case MPI_INT:
126   case MPI_LOGICAL:
127     for(i=0;i<(*len);i++)
128       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
129     break;
130   default:
131     ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
132     CmiAbort("exiting");
133   }
134 }
135 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
136   int i; 
137   switch (*datatype) {
138   case MPI_INT:
139     for(i=0;i<(*len);i++)
140       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
141     break;
142   case MPI_BYTE:
143     for(i=0;i<(*len);i++)
144       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
145     break;
146   default:
147     ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
148     CmiAbort("exiting");
149   }
150 }
151 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
152   int i;  
153   switch (*datatype) {
154   case MPI_INT:
155   case MPI_LOGICAL:
156     for(i=0;i<(*len);i++)
157       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
158     break;
159   default:
160     ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
161     CmiAbort("exiting");
162   }
163 }
164 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
165   int i;  
166   switch (*datatype) {
167   case MPI_INT:
168     for(i=0;i<(*len);i++)
169       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
170     break;
171   case MPI_BYTE:
172     for(i=0;i<(*len);i++)
173       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
174     break;
175   default:
176     ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
177     CmiAbort("exiting");
178   }
179 }
180 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
181   int i;  
182   switch (*datatype) {
183   case MPI_INT:
184   case MPI_LOGICAL:
185     for(i=0;i<(*len);i++)
186       ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
187     break;
188   default:
189     ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
190     CmiAbort("exiting");
191   }
192 }
193 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
194   int i;  
195   switch (*datatype) {
196   case MPI_INT:
197     for(i=0;i<(*len);i++)
198       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
199     break;
200   case MPI_BYTE:
201     for(i=0;i<(*len);i++)
202       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
203     break;
204         case MPI_UNSIGNED:
205     for(i=0;i<(*len);i++)
206       ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
207     break;
208   default:
209     ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
210     CmiAbort("exiting");
211   }
212 }
213
214 #ifndef MIN
215 #define MIN(a,b) (a < b ? a : b)
216 #endif
217
218 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
219   int i;  
220
221   switch (*datatype) {
222   case MPI_FLOAT_INT:
223     for(i=0;i<(*len);i++)
224       if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
225         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
226       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
227         ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
228     break;
229   case MPI_DOUBLE_INT:
230     for(i=0;i<(*len);i++)
231       if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
232         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
233       else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
234         ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
235
236     break;
237   case MPI_LONG_INT:
238     for(i=0;i<(*len);i++)
239       if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
240         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
241       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
242         ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
243     break;
244   case MPI_2INT:
245     for(i=0;i<(*len);i++)
246       if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
247         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
248       else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
249         ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
250     break;
251   case MPI_SHORT_INT:
252     for(i=0;i<(*len);i++)
253       if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
254         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
255       else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
256         ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
257     break;
258   case MPI_LONG_DOUBLE_INT:
259     for(i=0;i<(*len);i++)
260       if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
261         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
262       else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
263         ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
264     break;
265   case MPI_2FLOAT:
266     for(i=0;i<(*len);i++)
267       if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
268         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
269       else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
270         ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
271     break;
272   case MPI_2DOUBLE:
273     for(i=0;i<(*len);i++)
274       if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
275         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
276       else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
277         ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
278     break;
279   default:
280     ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
281     CmiAbort("exiting");
282   }
283 }
284 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
285   int i;  
286   switch (*datatype) {
287   case MPI_FLOAT_INT:
288     for(i=0;i<(*len);i++)
289       if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
290         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
291       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
292         ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
293     break;
294   case MPI_DOUBLE_INT:
295     for(i=0;i<(*len);i++)
296       if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
297         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
298       else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
299         ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
300     break;
301   case MPI_LONG_INT:
302     for(i=0;i<(*len);i++)
303       if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
304         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
305       else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
306         ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
307     break;
308   case MPI_2INT:
309     for(i=0;i<(*len);i++)
310       if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
311         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
312       else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
313         ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
314     break;
315   case MPI_SHORT_INT:
316     for(i=0;i<(*len);i++)
317       if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
318         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
319       else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
320         ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
321     break;
322   case MPI_LONG_DOUBLE_INT:
323     for(i=0;i<(*len);i++)
324       if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
325         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
326       else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
327         ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
328     break;
329   case MPI_2FLOAT:
330     for(i=0;i<(*len);i++)
331       if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
332         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
333       else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
334         ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
335     break;
336   case MPI_2DOUBLE:
337     for(i=0;i<(*len);i++)
338       if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
339         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
340       else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
341         ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
342     break;
343   default:
344     ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
345     CmiAbort("exiting");
346   }
347 }
348
349 // every msg contains a AmpiOpHeader structure before user data
350 // FIXME: non-commutative operations require messages be ordered by rank
351 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
352   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
353   MPI_Datatype dtype;
354   int szhdr, szdata, len;
355   MPI_User_function* func;
356   func = hdr->func;
357   dtype = hdr->dtype;  
358   szdata = hdr->szdata;
359   len = hdr->len;  
360   szhdr = sizeof(AmpiOpHeader);
361
362   //Assuming extent == size
363   void *ret = malloc(szhdr+szdata);
364   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
365   for(int i=1;i<nMsg;i++){
366     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
367   }
368   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
369   free(ret);
370   return retmsg;
371 }
372
373 CkReduction::reducerType AmpiReducer;
374
375 class Builtin_kvs{
376  public:
377   int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
378   Builtin_kvs(){
379     tag_ub = MPI_TAG_UB_VALUE; 
380     host = MPI_PROC_NULL;
381     io = 0;
382     wtime_is_global = 0;
383     keyval_mype = CkMyPe();
384     keyval_numpes = CkNumPes();
385     keyval_mynode = CkMyNode();
386     keyval_numnodes = CkNumNodes();
387   }
388 };
389
390 // ------------ startup support -----------
391 int _ampi_fallback_setup_count;
392 CDECL void AMPI_Setup(void);
393 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
394
395 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
396
397 /*Main routine used when missing MPI_Setup routine*/
398 CDECL void AMPI_Fallback_Main(int argc,char **argv)
399 {
400   AMPI_Main_cpp(argc,argv);
401   AMPI_Main(argc,argv);
402   FTN_NAME(MPI_MAIN,mpi_main)();
403 }
404
405 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
406 /*Startup routine used if user *doesn't* write
407   a TCHARM_User_setup routine.
408  */
409 CDECL void AMPI_Setup_Switch(void) {
410   _ampi_fallback_setup_count=0;
411   FTN_NAME(AMPI_SETUP,ampi_setup)();
412   AMPI_Setup();
413   if (_ampi_fallback_setup_count==2)
414   { //Missing AMPI_Setup in both C and Fortran:
415     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
416   }
417 }
418
419 static int nodeinit_has_been_called=0;
420 CtvDeclare(ampiParent*, ampiPtr);
421 CtvDeclare(int, ampiInitDone);
422 CtvDeclare(int, ampiFinalized);
423 CkpvDeclare(Builtin_kvs, bikvs);
424 CkpvDeclare(int,argvExtracted);
425 static int enableStreaming = 0;
426
427 static void ampiNodeInit(void)
428 {
429   mpi_nworlds=0;
430   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
431   {
432     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
433   }
434   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
435
436   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
437   
438   nodeinit_has_been_called=1;
439 }
440
441 #if PRINT_IDLE
442 static double totalidle=0.0, startT=0.0;
443 static int beginHandle, endHandle;
444 static void BeginIdle(void *dummy,double curWallTime)
445 {
446   startT = curWallTime;
447 }
448 static void EndIdle(void *dummy,double curWallTime)
449 {
450   totalidle += curWallTime - startT;
451 }
452 #endif
453
454 /* for fortran reduction operation table to handle mapping */
455 typedef MPI_Op  MPI_Op_Array[128];
456 CtvDeclare(int, mpi_opc);
457 CtvDeclare(MPI_Op_Array, mpi_ops);
458
459 static void ampiProcInit(void){
460   CtvInitialize(ampiParent*, ampiPtr);
461   CtvInitialize(int,ampiInitDone);
462   CtvInitialize(int,ampiFinalized);
463
464   CtvInitialize(MPI_Op_Array, mpi_ops);
465   CtvInitialize(int, mpi_opc);
466
467   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
468   CkpvAccess(bikvs) = Builtin_kvs();
469
470   CkpvInitialize(int, argvExtracted);
471   CkpvAccess(argvExtracted) = 0;
472   REGISTER_AMPI
473   initAmpiProjections();
474   char **argv=CkGetArgv();
475 #if AMPI_COMLIB  
476   if(CkpvAccess(argvExtracted)==0){
477     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
478   }
479 #endif
480
481 #ifdef AMPIMSGLOG
482   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
483   msgLogRead = CmiGetArgFlag(argv, "+msgLogRead");
484   CmiGetArgInt(argv, "+msgLogRank", &msgLogRank);
485   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
486 #endif
487 }
488
489 void AMPI_Install_Idle_Timer(){
490 #if AMPI_PRINT_IDLE
491   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
492   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
493 #endif
494 }
495
496 void AMPI_Uninstall_Idle_Timer(){
497 #if AMPI_PRINT_IDLE
498   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
499   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
500 #endif
501 }
502
503 PUPfunctionpointer(MPI_MainFn);
504
505 class MPI_threadstart_t {
506 public:
507         MPI_MainFn fn;
508         MPI_threadstart_t() {}
509         MPI_threadstart_t(MPI_MainFn fn_)
510                 :fn(fn_) {}
511         void start(void) {
512                 char **argv=CmiCopyArgs(CkGetArgv());
513                 int argc=CkGetArgc();
514 #if CMK_AMPI_FNPTR_HACK
515                 AMPI_Fallback_Main(argc,argv);
516 #else
517                 (fn)(argc,argv);
518 #endif
519         }
520         void pup(PUP::er &p) {
521                 p|fn;
522         }
523 };
524 PUPmarshall(MPI_threadstart_t);
525
526 CDECL void AMPI_threadstart(void *data)
527 {
528         STARTUP_DEBUG("MPI_threadstart")
529         MPI_threadstart_t t;
530         pupFromBuf(data,t);
531         t.start();
532 }
533
534 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
535 {
536         STARTUP_DEBUG("ampiCreateMain")
537         int _nchunks=TCHARM_Get_num_chunks();
538         //Make a new threads array:
539         MPI_threadstart_t s(mainFn);
540         memBuf b; pupIntoBuf(b,s);
541         TCHARM_Create_data( _nchunks,AMPI_threadstart,
542                           b.getData(), b.getSize());
543 }
544
545 /* TCharm Semaphore ID's for AMPI startup */
546 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
547 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
548
549 static CProxy_ampiWorlds ampiWorldsGroup;
550
551 static void init_operations()
552 {
553   CtvInitialize(MPI_Op_Array, mpi_ops);
554   int i = 0;
555   MPI_Op *tab = CtvAccess(mpi_ops);
556   tab[i++] = MPI_MAX;
557   tab[i++] = MPI_MIN;
558   tab[i++] = MPI_SUM;
559   tab[i++] = MPI_PROD;
560   tab[i++] = MPI_LAND;
561   tab[i++] = MPI_BAND;
562   tab[i++] = MPI_LOR;
563   tab[i++] = MPI_BOR;
564   tab[i++] = MPI_LXOR;
565   tab[i++] = MPI_BXOR;
566   tab[i++] = MPI_MAXLOC;
567   tab[i++] = MPI_MINLOC;
568
569   CtvInitialize(int, mpi_opc);
570   CtvAccess(mpi_opc) = i;
571 }
572
573 /*
574 Called from MPI_Init, a collective initialization call:
575  creates a new AMPI array and attaches it to the current
576  set of TCHARM threads.
577 */
578 static ampi *ampiInit(char **argv)
579 {
580   if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
581   STARTUP_DEBUG("ampiInit> begin")
582
583   MPI_Comm new_world;
584   int _nchunks;
585   CkArrayOptions opts;
586   CProxy_ampiParent parent;
587   if (TCHARM_Element()==0)
588   { /* I'm responsible for building the arrays: */
589         STARTUP_DEBUG("ampiInit> creating arrays")
590
591 // FIXME: Need to serialize global communicator allocation in one place.
592         //Allocate the next communicator
593         if(mpi_nworlds == MPI_MAX_COMM_WORLDS)
594         {
595                 CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
596         }
597         int new_idx=mpi_nworlds;
598         new_world=MPI_COMM_WORLD+1+new_idx;
599
600         //Create and attach the ampiParent array
601         CkArrayID threads;
602         opts=TCHARM_Attach_start(&threads,&_nchunks);
603         parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
604         STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
605   }
606   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
607   if (TCHARM_Element()==0)
608   {
609         //Make a new ampi array
610         CkArrayID empty;
611         
612 #if AMPI_COMLIB
613         ComlibInstanceHandle ciStreaming;
614         ComlibInstanceHandle ciBcast;
615         ComlibInstanceHandle ciAllgather;
616         ComlibInstanceHandle ciAlltoall;
617         ciStreaming=CkGetComlibInstance();
618         ciBcast=CkGetComlibInstance();
619         ciAllgather=CkGetComlibInstance();
620         ciAlltoall=CkGetComlibInstance();
621 #endif
622
623         CkVec<int> _indices;
624         for(int i=0;i<_nchunks;i++) _indices.push_back(i);
625         ampiCommStruct worldComm(new_world,empty,_nchunks,_indices);
626         CProxy_ampi arr;
627 #if AMPI_COMLIB
628         arr=CProxy_ampi::ckNew(parent,worldComm,ciStreaming,ciBcast,ciAllgather,ciAlltoall,opts);
629 #else
630         //opts.setAnytimeMigration(0);
631         arr=CProxy_ampi::ckNew(parent,worldComm,opts);
632 #endif
633
634         //Broadcast info. to the mpi_worlds array
635         // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
636         ampiCommStruct newComm(new_world,arr,_nchunks);
637         if (ampiWorldsGroup.ckGetGroupID().isZero())
638                 ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
639         else
640                 ampiWorldsGroup.add(newComm);
641         STARTUP_DEBUG("ampiInit> arrays created")
642
643 #if AMPI_COMLIB
644         StreamingStrategy *sStreaming = new StreamingStrategy(1,10);
645         ciStreaming.setStrategy(sStreaming);
646         
647         BroadcastStrategy *sBcast = new BroadcastStrategy(arr.ckGetArrayID(),USE_HYPERCUBE);
648         ciBcast.setStrategy(sBcast);
649         
650         EachToManyMulticastStrategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
651         ciAllgather.setStrategy(sAllgather);
652         
653         EachToManyMulticastStrategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
654         ciAlltoall.setStrategy(sAlltoall);
655 #endif        
656   }
657
658   // Find our ampi object:
659   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
660   CtvAccess(ampiInitDone)=1;
661   CtvAccess(ampiFinalized)=0;
662   STARTUP_DEBUG("ampiInit> complete")
663 #if CMK_BLUEGENE_CHARM
664 //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
665   TRACE_BG_ADD_TAG("AMPI_START");
666 #endif
667
668   init_operations();     // initialize fortran reduction operation table
669
670   return ptr;
671 }
672
673 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
674 class ampiWorlds : public CBase_ampiWorlds {
675 public:
676     ampiWorlds(const ampiCommStruct &nextWorld) {
677         ampiWorldsGroup=thisgroup;
678         add(nextWorld);
679     }
680     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
681     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
682     void add(const ampiCommStruct &nextWorld) {
683         int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD+1);
684         mpi_worlds[new_idx].comm=nextWorld;
685         if (mpi_nworlds<=new_idx) mpi_nworlds=new_idx+1;
686         STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
687     }
688 };
689
690 //-------------------- ampiParent -------------------------
691 ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
692                 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
693 {
694   int barrier = 0x1234;
695   STARTUP_DEBUG("ampiParent> starting up")
696   thread=NULL;
697   worldPtr=NULL;
698   myDDT=&myDDTsto;
699   prepareCtv();
700
701   init();
702
703   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
704         AsyncEvacuate(CmiFalse);
705 }
706
707 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
708   thread=NULL;
709   worldPtr=NULL;
710   myDDT=&myDDTsto;
711
712   init();
713
714   AsyncEvacuate(CmiFalse);
715 }
716
717 void ampiParent::pup(PUP::er &p) {
718
719   ArrayElement1D::pup(p);
720 //      printf("[%d] Ampiparent being pupped \n",thisIndex);
721   p|threads;
722
723   p|worldStruct;
724   myDDT->pup(p);
725   p|splitComm;
726   p|groupComm;
727   p|groups;
728   p|ampiReqs;
729   p|RProxyCnt;
730   p|tmpRProxy;
731   p|winStructList;
732   p|infos;
733 }
734 void ampiParent::prepareCtv(void) {
735   thread=threads[thisIndex].ckLocal();
736   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
737   CtvAccessOther(thread->getThread(),ampiPtr) = this;
738   STARTUP_DEBUG("ampiParent> found TCharm")
739 }
740
741 void ampiParent::init(){
742 #ifdef AMPIMSGLOG
743   if(msgLogWrite && msgLogRank == thisIndex){
744 #if CMK_PROJECTIONS_USE_ZLIB
745     fMsgLog = gzopen(msgLogFilename,"wb");
746     toPUPer = new PUP::tozDisk(fMsgLog);
747 #else
748     fMsgLog = fopen(msgLogFilename,"wb");
749     toPUPer = new PUP::toDisk(fMsgLog);
750 #endif
751   }else if(msgLogRead){
752 #if CMK_PROJECTIONS_USE_ZLIB
753     fMsgLog = gzopen(msgLogFilename,"rb");
754     fromPUPer = new PUP::fromzDisk(fMsgLog);
755 #else
756     fMsgLog = fopen(msgLogFilename,"rb");
757     fromPUPer = new PUP::fromDisk(fMsgLog);
758 #endif
759   }
760 #endif
761 }
762
763 void ampiParent::finalize(){
764 #ifdef AMPIMSGLOG
765   if(msgLogWrite && msgLogRank == thisIndex){
766     delete toPUPer;
767 #if CMK_PROJECTIONS_USE_ZLIB
768     gzclose(fMsgLog);
769 #else
770     fclose(fMsgLog);
771 #endif
772   }else if(msgLogRead){
773     delete fromPUPer;
774 #if CMK_PROJECTIONS_USE_ZLIB
775     gzclose(fMsgLog);
776 #else
777     fclose(fMsgLog);
778 #endif
779   }
780 #endif
781 }
782
783 void ampiParent::ckJustMigrated(void) {
784   ArrayElement1D::ckJustMigrated();
785   prepareCtv();
786 }
787
788 ampiParent::~ampiParent() {
789   STARTUP_DEBUG("ampiParent> destructor called");
790   finalize();
791 }
792
793 //Children call this when they are first created or just migrated
794 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
795 {
796   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
797
798   if (s.getComm()>=MPI_COMM_WORLD)
799   { //We now have our COMM_WORLD-- register it
800     //Note that split communicators don't keep a raw pointer, so
801     //they don't need to re-register on migration.
802      if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
803      worldPtr=ptr;
804      worldStruct=s;
805
806     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
807      CkVec<int> _indices;
808      _indices.push_back(thisIndex);
809      selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
810   }
811   
812   if (!forMigration)
813   { //Register the new communicator:
814      MPI_Comm comm = s.getComm();
815      STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
816      if (comm>=MPI_COMM_WORLD) { 
817        // Pass the new ampi to the waiting ampiInit
818        thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
819      } else if (isSplit(comm)) {
820        splitChildRegister(s);
821      } else if (isGroup(comm)) {
822        groupChildRegister(s);
823      } else if (isCart(comm)) {
824        cartChildRegister(s);
825      } else if (isGraph(comm)) {
826        graphChildRegister(s);
827      } else if (isInter(comm)) {
828        interChildRegister(s);
829      } else if (isIntra(comm)) {
830        intraChildRegister(s);
831      }else
832        CkAbort("ampiParent recieved child with bad communicator");
833   }
834
835   return thread;
836 }
837
838 // reduction client data - preparation for checkpointing
839 class ckptClientStruct {
840 public:
841   char *dname;
842   ampiParent *ampiPtr;
843   ckptClientStruct(char *s, ampiParent *a): dname(s), ampiPtr(a) {}
844 };
845
846 static void checkpointClient(void *param,void *msg)
847 {
848   ckptClientStruct *client = (ckptClientStruct*)param;
849   char *dname = client->dname;
850   ampiParent *ampiPtr = client->ampiPtr;
851   ampiPtr->Checkpoint(strlen(dname), dname);
852   delete client;
853 }
854
855 void ampiParent::startCheckpoint(char* dname){
856   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
857   if (thisIndex==0) {
858         ckptClientStruct *clientData = new ckptClientStruct(dname, this);
859         CkCallback cb(checkpointClient, clientData);
860         contribute(0, NULL, CkReduction::sum_int, cb);
861   }
862   else
863         contribute(0, NULL, CkReduction::sum_int);
864
865 #if 0
866 #if CMK_BLUEGENE_CHARM
867   void *curLog;         // store current log in timeline
868   _TRACE_BG_TLINE_END(&curLog);
869   TRACE_BG_AMPI_SUSPEND();
870 #endif
871 #endif
872
873   thread->stop();
874
875 #if CMK_BLUEGENE_CHARM
876   // _TRACE_BG_BEGIN_EXECUTE_NOMSG("CHECKPOINT_RESUME", &curLog);
877   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
878 #endif
879 }
880 void ampiParent::Checkpoint(int len, char* dname){
881   if (len == 0) {
882     // memory checkpoint
883     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
884     CkStartMemCheckpoint(cb);
885   }
886   else {
887     char dirname[256];
888     strncpy(dirname,dname,len);
889     dirname[len]='\0';
890     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
891     CkStartCheckpoint(dirname,cb);
892   }
893 }
894 void ampiParent::ResumeThread(void){
895   thread->resume();
896 }
897
898 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
899                              int *keyval, void* extra_state){
900         KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
901         int idx = kvlist.size();
902         kvlist.resize(idx+1);
903         kvlist[idx] = newnode;
904         *keyval = idx;
905         return 0;
906 }
907 int ampiParent::freeKeyval(int *keyval){
908         if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
909                 return -1;
910         delete kvlist[*keyval];
911         kvlist[*keyval] = NULL;
912         *keyval = MPI_KEYVAL_INVALID;
913         return 0;
914 }
915
916 int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
917         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
918                 return -1;
919         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
920         // Enlarge the keyval list:
921         while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
922         cs.getKeyvals()[keyval]=attribute_val;
923         return 0;
924 }
925
926 int ampiParent::kv_is_builtin(int keyval) {
927         switch(keyval) {
928         case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
929         case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
930         case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
931         case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
932         case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
933         case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
934         case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
935         case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
936         default: return 0;
937         };
938 }
939
940 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
941         *flag = false;
942         if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
943           *flag=true;
944           *(int *)attribute_val = *kv_builtin_storage;  // looks like all kv_builtin_storage are of integer type
945           return 0;
946         }
947         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
948                 return -1; /* invalid keyval */
949         
950         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
951         if (keyval>=cs.getKeyvals().size())  
952                 return 0; /* we don't have a value yet */
953         if (cs.getKeyvals()[keyval]==0)
954                 return 0; /* we had a value, but now it's zero */
955         /* Otherwise, we have a good value */
956         *flag = true;
957         *(void **)attribute_val = cs.getKeyvals()[keyval];
958         return 0;
959 }
960 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
961         /* no way to delete an attribute: just overwrite it with 0 */
962         return putAttr(comm,keyval,0);
963 }
964
965 //----------------------- ampi -------------------------
966 void ampi::init(void) {
967   parent=NULL;
968   thread=NULL;
969   msgs=NULL;
970   posted_ireqs=NULL;
971   resumeOnRecv=false;
972   AsyncEvacuate(CmiFalse);
973 }
974
975 ampi::ampi()
976 {
977   /* this constructor only exists so we can create an empty array during split */
978   CkAbort("Default ampi constructor should never be called");
979 }
980
981 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
982    :parentProxy(parent_)
983 {
984   init();
985
986   myComm=s; myComm.setArrayID(thisArrayID);
987   myRank=myComm.getRankForIndex(thisIndex);
988
989   findParent(false);
990
991   msgs = CmmNew();
992   posted_ireqs = CmmNew();
993   nbcasts = 0;
994
995   comlibProxy = thisProxy;
996   ComlibDelegateProxy(&comlibProxy);
997   
998   seqEntries=parent->numElements;
999   oorder.init (seqEntries);
1000 }
1001
1002 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
1003                 ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
1004    :parentProxy(parent_),ciStreaming(ciStreaming_),ciBcast(ciBcast_),ciAllgather(ciAllgather_),ciAlltoall(ciAlltoall_)
1005 {
1006   init();
1007
1008   myComm=s; myComm.setArrayID(thisArrayID);
1009   myRank=myComm.getRankForIndex(thisIndex);
1010
1011   findParent(false);
1012
1013   msgs = CmmNew();
1014   posted_ireqs = CmmNew();
1015   nbcasts = 0;
1016
1017   comlibProxy = thisProxy;
1018   ComlibDelegateProxy(&comlibProxy);
1019   ciStreaming.setSourcePe();
1020   ciBcast.setSourcePe();
1021   ciAllgather.setSourcePe();
1022   ciAlltoall.setSourcePe();
1023
1024   seqEntries=parent->numElements;
1025   oorder.init (seqEntries);
1026 }
1027
1028 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
1029 {
1030   init();
1031   
1032   seqEntries=-1;
1033 }
1034
1035 void ampi::ckJustMigrated(void)
1036 {
1037         findParent(true);
1038         ArrayElement1D::ckJustMigrated();
1039 }
1040
1041 void ampi::findParent(bool forMigration) {
1042         STARTUP_DEBUG("ampi> finding my parent")
1043         parent=parentProxy[thisIndex].ckLocal();
1044         if (parent==NULL) CkAbort("AMPI can't find its parent!");
1045         thread=parent->registerAmpi(this,myComm,forMigration);
1046         if (thread==NULL) CkAbort("AMPI can't find its thread!");
1047 //      printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
1048 }
1049
1050 static void cmm_pup_ampi_message(pup_er p,void **msg) {
1051         CkPupMessage(*(PUP::er *)p,msg,1);
1052         if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
1053 //      printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
1054 }
1055
1056 void ampi::pup(PUP::er &p)
1057 {
1058   if(!p.isUserlevel())
1059     ArrayElement1D::pup(p);//Pack superclass
1060   p|parentProxy;
1061   p|myComm;
1062   p|myRank;
1063   p|nbcasts;
1064   p|tmpVec;
1065   p|remoteProxy;
1066   p|resumeOnRecv;
1067   p|comlibProxy;
1068   p|ciStreaming;
1069   p|ciBcast;
1070   p|ciAllgather;
1071   p|ciAlltoall;
1072
1073   if(p.isUnpacking()){
1074     ciStreaming.setSourcePe();
1075     ciBcast.setSourcePe();
1076     ciAllgather.setSourcePe();
1077     ciAlltoall.setSourcePe();
1078   }
1079
1080   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
1081   posted_ireqs = CmmNew();              // FIXME 
1082 //      printf("[%d] ampi index %d msgs table pointer %p\n",CkMyPe(),thisIndex,msgs);
1083
1084   p|seqEntries;
1085   p|oorder;
1086 }
1087
1088 ampi::~ampi()
1089 {
1090   if (CkInRestarting()) {
1091     // in restarting, we need to flush messages
1092   int tags[3], sts[3];
1093   tags[0] = tags[1] = tags[2] = CmmWildCard;
1094   AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1095   while (msg) {
1096     delete msg;
1097     msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1098   }
1099   }
1100   CmmFree(msgs);
1101   CmmFree(posted_ireqs);         // FIXME
1102 }
1103
1104 //------------------------ Communicator Splitting ---------------------
1105 class ampiSplitKey {
1106 public:
1107         int nextSplitComm;
1108         int color; //New class of processes we'll belong to
1109         int key; //To determine rank in new ordering
1110         int rank; //Rank in old ordering
1111         ampiSplitKey() {}
1112         ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
1113                 :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
1114 };
1115
1116 /* "type" may indicate whether call is for a cartesian topology etc. */
1117
1118 void ampi::split(int color,int key,MPI_Comm *dest, int type)
1119 {
1120 #if CMK_BLUEGENE_CHARM
1121   void *curLog;         // store current log in timeline
1122   _TRACE_BG_TLINE_END(&curLog);
1123 //  TRACE_BG_AMPI_SUSPEND();
1124 #endif
1125   if (type == CART_TOPOL) {
1126         ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
1127         int rootIdx=myComm.getIndexForRank(0);
1128         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1129         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1130
1131         thread->suspend(); //Resumed by ampiParent::cartChildRegister
1132         MPI_Comm newComm=parent->getNextCart()-1;
1133         *dest=newComm;
1134   } else {
1135         ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
1136         int rootIdx=myComm.getIndexForRank(0);
1137         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1138         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1139
1140         thread->suspend(); //Resumed by ampiParent::splitChildRegister
1141         MPI_Comm newComm=parent->getNextSplit()-1;
1142         *dest=newComm;
1143   }
1144 #if CMK_BLUEGENE_CHARM
1145 //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "SPLIT_RESUME", curLog);
1146   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("SPLIT_RESUME", &curLog);
1147   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
1148 #endif
1149 }
1150
1151 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
1152         const ampiSplitKey *a=(const ampiSplitKey *)a_;
1153         const ampiSplitKey *b=(const ampiSplitKey *)b_;
1154         if (a->color!=b->color) return a->color-b->color;
1155         if (a->key!=b->key) return a->key-b->key;
1156         return a->rank-b->rank;
1157 }
1158
1159 void ampi::splitPhase1(CkReductionMsg *msg)
1160 {
1161         //Order the keys, which orders the ranks properly:
1162         int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1163         ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1164         if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1165         qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1166
1167         MPI_Comm newComm = -1;
1168         for(int i=0;i<nKeys;i++)
1169                 if(keys[i].nextSplitComm>newComm)
1170                         newComm = keys[i].nextSplitComm;
1171
1172         //Loop over the sorted keys, which gives us the new arrays:
1173         int lastColor=keys[0].color-1; //The color we're building an array for
1174         CProxy_ampi lastAmpi; //The array for lastColor
1175         int lastRoot=0; //C value for new rank 0 process for latest color
1176         ampiCommStruct lastComm; //Communicator info. for latest color
1177         for (int c=0;c<nKeys;c++) {
1178                 if (keys[c].color!=lastColor)
1179                 { //Hit a new color-- need to build a new communicator and array
1180                         lastColor=keys[c].color;
1181                         lastRoot=c;
1182                         CkArrayOptions opts;
1183                         opts.bindTo(parentProxy);
1184                         opts.setNumInitial(0);
1185                         CkArrayID unusedAID; ampiCommStruct unusedComm;
1186                         lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1187                         lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1188
1189                         CkVec<int> indices; //Maps rank to array indices for new arrau
1190                         for (int i=c;i<nKeys;i++) {
1191                                 if (keys[i].color!=lastColor) break; //Done with this color
1192                                 int idx=myComm.getIndexForRank(keys[i].rank);
1193                                 indices.push_back(idx);
1194                         }
1195
1196                         //FIXME: create a new communicator for each color, instead of
1197                         // (confusingly) re-using the same MPI_Comm number for each.
1198                         lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1199                 }
1200                 int newRank=c-lastRoot;
1201                 int newIdx=lastComm.getIndexForRank(newRank);
1202
1203                 //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1204                 lastAmpi[newIdx].insert(parentProxy,lastComm);
1205         }
1206
1207         delete msg;
1208 }
1209
1210 //...newly created array elements register with the parent, which calls:
1211 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1212         int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1213         if (splitComm.size()<=idx) splitComm.resize(idx+1);
1214         splitComm[idx]=new ampiCommStruct(s);
1215         thread->resume(); //Matches suspend at end of ampi::split
1216 }
1217
1218 //-----------------create communicator from group--------------
1219 // The procedure is like that of comm_split very much,
1220 // so the code is shamelessly copied from above
1221 //   1. reduction to make sure all members have called
1222 //   2. the root in the old communicator create the new array
1223 //   3. ampiParent::register is called to register new array as new comm
1224 class vecStruct {
1225 public:
1226   int nextgroup;
1227   groupStruct vec;
1228   vecStruct():nextgroup(-1){}
1229   vecStruct(int nextgroup_, groupStruct vec_)
1230     : nextgroup(nextgroup_), vec(vec_) { }
1231 };
1232
1233 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1234   int rootIdx=vec[0];
1235   tmpVec = vec;
1236   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1237   MPI_Comm nextgroup = parent->getNextGroup();
1238   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1239
1240   if(getPosOp(thisIndex,vec)>=0){
1241     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1242     MPI_Comm retcomm = parent->getNextGroup()-1;
1243     *newcomm = retcomm;
1244   }else{
1245     *newcomm = MPI_COMM_NULL;
1246   }
1247 }
1248
1249 void ampi::commCreatePhase1(CkReductionMsg *msg){
1250   MPI_Comm *nextGroupComm = (int *)msg->getData();
1251
1252   CkArrayOptions opts;
1253   opts.bindTo(parentProxy);
1254   opts.setNumInitial(0);
1255   CkArrayID unusedAID;
1256   ampiCommStruct unusedComm;
1257   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1258   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1259
1260   groupStruct indices = tmpVec;
1261   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1262   for(int i=0;i<indices.size();i++){
1263     int newIdx=indices[i];
1264     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1265   }
1266   delete msg;
1267 }
1268
1269 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1270   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1271   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1272   groupComm[idx]=new ampiCommStruct(s);
1273   thread->resume(); //Matches suspend at end of ampi::split
1274 }
1275
1276 /* Virtual topology communicator creation */
1277 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1278   int rootIdx=vec[0];
1279   tmpVec = vec;
1280   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1281
1282   MPI_Comm nextcart = parent->getNextCart();
1283   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1284
1285   if(getPosOp(thisIndex,vec)>=0){
1286     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1287      MPI_Comm retcomm = parent->getNextCart()-1;
1288      *newcomm = retcomm;
1289   }else
1290     *newcomm = MPI_COMM_NULL;
1291 }
1292
1293 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1294   MPI_Comm *nextCartComm = (int *)msg->getData();
1295
1296   CkArrayOptions opts;
1297   opts.bindTo(parentProxy);
1298   opts.setNumInitial(0);
1299   CkArrayID unusedAID;
1300   ampiCommStruct unusedComm;
1301   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1302   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1303
1304   groupStruct indices = tmpVec;
1305   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1306                                                 size(),indices);
1307   for(int i=0;i<indices.size();i++){
1308     int newIdx=indices[i];
1309     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1310   }
1311   delete msg;
1312 }
1313
1314 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1315   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1316   if (cartComm.size()<=idx) {
1317     cartComm.resize(idx+1);
1318     cartComm.length()=idx+1;
1319   }
1320   cartComm[idx]=new ampiCommStruct(s);
1321   thread->resume(); //Matches suspend at end of ampi::cartCreate
1322 }
1323
1324 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1325   int rootIdx=vec[0];
1326   tmpVec = vec;
1327   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1328                 myComm.getProxy());
1329   MPI_Comm nextgraph = parent->getNextGraph();
1330   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1331
1332   if(getPosOp(thisIndex,vec)>=0){
1333     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1334     MPI_Comm retcomm = parent->getNextGraph()-1;
1335     *newcomm = retcomm;
1336   }else
1337     *newcomm = MPI_COMM_NULL;
1338 }
1339
1340 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1341   MPI_Comm *nextGraphComm = (int *)msg->getData();
1342
1343   CkArrayOptions opts;
1344   opts.bindTo(parentProxy);
1345   opts.setNumInitial(0);
1346   CkArrayID unusedAID;
1347   ampiCommStruct unusedComm;
1348   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1349   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1350
1351   groupStruct indices = tmpVec;
1352   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1353                                                 .size(),indices);
1354   for(int i=0;i<indices.size();i++){
1355     int newIdx=indices[i];
1356     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1357   }
1358   delete msg;
1359 }
1360
1361 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1362   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1363   if (graphComm.size()<=idx) {
1364     graphComm.resize(idx+1);
1365     graphComm.length()=idx+1;
1366   }
1367   graphComm[idx]=new ampiCommStruct(s);
1368   thread->resume(); //Matches suspend at end of ampi::graphCreate
1369 }
1370
1371 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1372   if(thisIndex==root) { // not everybody gets the valid rvec
1373     tmpVec = rvec;
1374   }
1375   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1376   MPI_Comm nextinter = parent->getNextInter();
1377   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1378
1379   thread->suspend(); //Resumed by ampiParent::interChildRegister
1380   MPI_Comm newcomm=parent->getNextInter()-1;
1381   *ncomm=newcomm;
1382 }
1383
1384 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1385   MPI_Comm *nextInterComm = (int *)msg->getData();
1386
1387   groupStruct lgroup = myComm.getIndices();
1388   CkArrayOptions opts;
1389   opts.bindTo(parentProxy);
1390   opts.setNumInitial(0);
1391   CkArrayID unusedAID;
1392   ampiCommStruct unusedComm;
1393   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1394   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1395
1396   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1397   for(int i=0;i<lgroup.size();i++){
1398     int newIdx=lgroup[i];
1399     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1400   }
1401
1402   parentProxy[0].ExchangeProxy(newAmpi);
1403   delete msg;
1404 }
1405
1406 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1407   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1408   if (interComm.size()<=idx) interComm.resize(idx+1);
1409   interComm[idx]=new ampiCommStruct(s);
1410   //thread->resume(); // don't resume it yet, till parent set remote proxy
1411 }
1412
1413 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1414   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1415     groupStruct lvec = myComm.getIndices();
1416     groupStruct rvec = myComm.getRemoteIndices();
1417     int rsize = rvec.size();
1418     tmpVec = lvec;
1419     for(int i=0;i<rsize;i++)
1420       tmpVec.push_back(rvec[i]);
1421     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1422   }else{
1423     tmpVec.resize(0);
1424   }
1425
1426   int rootIdx=myComm.getIndexForRank(0);
1427   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1428   MPI_Comm nextintra = parent->getNextIntra();
1429   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1430
1431   thread->suspend(); //Resumed by ampiParent::interChildRegister
1432   MPI_Comm newcomm=parent->getNextIntra()-1;
1433   *ncomm=newcomm;
1434 }
1435
1436 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1437   if(tmpVec.size()==0) { delete msg; return; }
1438   MPI_Comm *nextIntraComm = (int *)msg->getData();
1439   CkArrayOptions opts;
1440   opts.bindTo(parentProxy);
1441   opts.setNumInitial(0);
1442   CkArrayID unusedAID;
1443   ampiCommStruct unusedComm;
1444   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1445   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1446
1447   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1448   for(int i=0;i<tmpVec.size();i++){
1449     int newIdx=tmpVec[i];
1450     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1451   }
1452   delete msg;
1453 }
1454
1455 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1456   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1457   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1458   intraComm[idx]=new ampiCommStruct(s);
1459   thread->resume(); //Matches suspend at end of ampi::split
1460 }
1461
1462 //------------------------ communication -----------------------
1463 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1464 {
1465   if (universeNo>MPI_COMM_WORLD) {
1466     int worldDex=universeNo-MPI_COMM_WORLD-1;
1467     if (worldDex>=mpi_nworlds)
1468       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1469     return mpi_worlds[worldDex].comm;
1470   }
1471   CkAbort("Bad communicator passed to universeComm2CommStruct");
1472   return mpi_worlds[0].comm; // meaningless return
1473 }
1474
1475 void ampi::block(void){
1476         thread->suspend();
1477 }
1478
1479 void ampi::yield(void){
1480         thread->schedule();
1481 }
1482
1483 void ampi::unblock(void){
1484         thread->resume();
1485 }
1486
1487 void
1488 ampi::generic(AmpiMsg* msg)
1489 {
1490 MSG_ORDER_DEBUG(
1491   CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1492         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1493 )
1494
1495 //      AmpiMsg *msgcopy = msg;
1496   if(msg->seq != -1) {
1497     int srcIdx=msg->srcIdx;
1498     int n=oorder.put(srcIdx,msg);
1499     if (n>0) { // This message was in-order
1500       inorder(msg);
1501       if (n>1) { // It enables other, previously out-of-order messages
1502         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1503           inorder(msg);
1504         }
1505       }
1506     }
1507   } else { //Cross-world or system messages are unordered
1508     inorder(msg);
1509   }
1510   
1511   if(resumeOnRecv){
1512     thread->resume();
1513   }
1514 }
1515
1516 void
1517 ampi::inorder(AmpiMsg* msg)
1518 {
1519 MSG_ORDER_DEBUG(
1520   CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1521         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1522 )
1523   // check posted recvs
1524   int tags[3], sts[3];
1525   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1526   IReq *ireq = NULL;
1527   if (CpvAccess(CmiPICMethod) != 2) {
1528     IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1529     if (ireq) { // receive posted
1530       ireq->receive(this, msg);
1531       ireq->tag = sts[0];
1532       ireq->src = sts[1];
1533       ireq->comm = sts[2];
1534     } else {
1535       CmmPut(msgs, 3, tags, msg);
1536     }
1537   }
1538   else
1539       CmmPut(msgs, 3, tags, msg);
1540 }
1541
1542 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
1543 {
1544     int tags[3];
1545     tags[0] = t; tags[1] = s; tags[2] = comm;
1546     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1547     return msg;
1548 }
1549
1550 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1551         int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm)
1552 {
1553   CkDDT_DataType *ddt = getDDT()->getType(type);
1554   int len = ddt->getSize(count);
1555   int sIdx=thisIndex;
1556   int seq = -1;
1557   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_TAG_UB_VALUE) //Not cross-module: set seqno
1558   seq = oorder.nextOutgoing(destIdx);
1559   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1560   TCharm::activateVariable(buf);
1561   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1562   TCharm::deactivateVariable(buf);
1563   return msg;
1564 }
1565
1566 void
1567 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1568 {
1569   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1570 }
1571
1572 void
1573 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1574 {
1575   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1576   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy());
1577 }
1578
1579 void
1580 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1581 {
1582   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
1583   memcpy(msg->data, buf, len);
1584   CProxy_ampi pa(aid);
1585   pa[idx].generic(msg);
1586 }
1587
1588 void
1589 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy)
1590 {
1591   if(rank==MPI_PROC_NULL) return;
1592   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1593   int destIdx = dest.getIndexForRank(rank);
1594   if(isInter()){
1595     sRank = parent->thisIndex;
1596     destcomm = MPI_COMM_FIRST_INTER;
1597     destIdx = dest.getIndexForRemoteRank(rank);
1598     arrproxy = remoteProxy;
1599   }
1600  MSG_ORDER_DEBUG(
1601   CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
1602  )
1603
1604   arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm));
1605
1606 #ifndef CMK_OPTIMIZE
1607   int size=0;
1608   MPI_Type_size(type,&size);
1609   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
1610 #endif
1611 }
1612
1613 int
1614 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
1615 {
1616   CkDDT_DataType *ddt = getDDT()->getType(type);
1617   int len = ddt->getSize(count);
1618   
1619   if (msg->length > len && msg->length-len!=sizeof(AmpiOpHeader))
1620   { /* Received more data than we were expecting */
1621     char einfo[1024];
1622     sprintf(einfo, "FATAL ERROR in rank %d MPI_Recv (tag=%d, source=%d)\n"
1623         "  Expecting only %d bytes (%d items of type %d), \n"
1624         "  but received %d bytes from rank %d\nAMPI> MPI_Send was longer than matching MPI_Recv.",
1625             thisIndex,t,s,
1626             len, count, type,
1627             msg->length, msg->srcRank);
1628     CkAbort(einfo);
1629     return -1;
1630   }else if(msg->length < len){ // only at rare case shall we reset count by using divide
1631     count = msg->length/(ddt->getSize(1));
1632   }
1633   //
1634   TCharm::activateVariable(buf);
1635   if (msg->length-len==sizeof(AmpiOpHeader)) {  // reduction msg
1636     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
1637   } else {
1638     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
1639   }
1640   TCharm::deactivateVariable(buf);
1641   return 0;
1642 }
1643
1644 int
1645 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
1646 {
1647   MPI_Comm disComm = myComm.getComm();
1648   if(s==MPI_PROC_NULL) {
1649     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
1650     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
1651     ((MPI_Status *)sts)->MPI_LENGTH = 0;
1652     return 0;
1653   }
1654   _LOG_E_END_AMPI_PROCESSING(thisIndex)
1655 #if CMK_BLUEGENE_CHARM
1656   void *curLog;         // store current log in timeline
1657   _TRACE_BG_TLINE_END(&curLog);
1658 //  TRACE_BG_AMPI_SUSPEND();
1659 #endif
1660
1661   if(isInter()){
1662     s = myComm.getIndexForRemoteRank(s);
1663     comm = MPI_COMM_FIRST_INTER;
1664   }
1665
1666   int tags[3];
1667   AmpiMsg *msg = 0;
1668   
1669  MSG_ORDER_DEBUG(
1670   CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
1671  )
1672
1673   resumeOnRecv=true;
1674   ampi *dis = getAmpiInstance(disComm);
1675   while(1) {
1676       //This is done to take into account the case in which an ampi 
1677       // thread has migrated while waiting for a message
1678     tags[0] = t; tags[1] = s; tags[2] = comm;
1679     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
1680     if (msg) break;
1681     dis->thread->suspend();
1682     dis = getAmpiInstance(disComm);
1683   }
1684   dis->resumeOnRecv=false;
1685
1686   if(sts)
1687     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1688   int status = dis->processMessage(msg, t, s, buf, count, type);
1689   if (status != 0) return status;
1690
1691   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
1692 #if CMK_BLUEGENE_CHARM
1693   //TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", &curLog, 1);
1694   TRACE_BG_AMPI_BREAK(NULL, "RECV_RESUME", NULL, 0);
1695   _TRACE_BG_SET_INFO((char *)msg, "RECV_RESUME",  &curLog, 1);
1696 #endif
1697
1698   delete msg;
1699   return 0;
1700 }
1701
1702 void
1703 ampi::probe(int t, int s, int comm, int *sts)
1704 {
1705   int tags[3];
1706 #if CMK_BLUEGENE_CHARM
1707   void *curLog;         // store current log in timeline
1708   _TRACE_BG_TLINE_END(&curLog);
1709 //  TRACE_BG_AMPI_SUSPEND();
1710 #endif
1711
1712   AmpiMsg *msg = 0;
1713   resumeOnRecv=true;
1714   while(1) {
1715     tags[0] = t; tags[1] = s; tags[2] = comm;
1716     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
1717     if (msg) break;
1718     thread->suspend();
1719   }
1720   resumeOnRecv=false;
1721   if(sts)
1722     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1723 #if CMK_BLUEGENE_CHARM
1724 //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
1725   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
1726 #endif
1727 }
1728
1729 int
1730 ampi::iprobe(int t, int s, int comm, int *sts)
1731 {
1732   int tags[3];
1733   AmpiMsg *msg = 0;
1734   tags[0] = t; tags[1] = s; tags[2] = comm;
1735   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
1736   if (msg) {
1737     if(sts)
1738       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1739     return 1;
1740   }
1741 #if CMK_BLUEGENE_CHARM
1742   void *curLog;         // store current log in timeline
1743   _TRACE_BG_TLINE_END(&curLog);
1744 //  TRACE_BG_AMPI_SUSPEND();
1745 #endif
1746   thread->schedule();
1747 #if CMK_BLUEGENE_CHARM
1748   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("IPROBE_RESUME", &curLog);
1749   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
1750 #endif
1751   return 0;
1752 }
1753
1754
1755 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
1756 void
1757 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
1758 {
1759   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1760   int rootIdx=dest.getIndexForRank(root);
1761   if(rootIdx==thisIndex) {
1762 #if 0//AMPI_COMLIB
1763     ciBcast.beginIteration();
1764     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
1765 #else
1766     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
1767 #endif
1768   }
1769   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
1770   nbcasts++;
1771 }
1772
1773 void
1774 ampi::bcastraw(void* buf, int len, CkArrayID aid)
1775 {
1776   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, 0);
1777   memcpy(msg->data, buf, len);
1778   CProxy_ampi pa(aid);
1779   pa.generic(msg);
1780 }
1781
1782
1783 AmpiMsg* 
1784 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
1785 {
1786   CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
1787   int unit;
1788   CkDDT_DataType *ddt = getDDT()->getType(type);
1789   unit = ddt->getSize(1);
1790   int totalsize = unit*cnt;
1791
1792   AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
1793   char* addr = (char*)Alltoallbuff+disp*unit;
1794   ddt->serialize((char*)msg->data, addr, cnt, (-1));
1795   return msg;
1796 }
1797
1798 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
1799                         void *attr_in, void *attr_out, int *flag){
1800   (*flag) = 0;
1801   return (MPI_SUCCESS);
1802 }
1803 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
1804                         void *attr_in, void *attr_out, int *flag){
1805   (*(void **)attr_out) = attr_in;
1806   (*flag) = 1;
1807   return (MPI_SUCCESS);
1808 }
1809 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
1810   return (MPI_SUCCESS);
1811 }
1812
1813
1814 void AmpiSeqQ::init(int numP) 
1815 {
1816   elements.init(numP);
1817 }
1818
1819 AmpiSeqQ::~AmpiSeqQ () {
1820 }
1821   
1822 void AmpiSeqQ::pup(PUP::er &p) {
1823   p|out;
1824   p|elements;
1825 }
1826
1827 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
1828 {
1829   AmpiOtherElement &el=elements[srcIdx];
1830 #ifndef CMK_OPTIMIZE
1831   if (msg->seq<el.seqIncoming)
1832     CkAbort("AMPI Logic error: received late out-of-order message!\n");
1833 #endif
1834   out.enq(msg);
1835   el.nOut++; // We have another message in the out-of-order queue
1836 }
1837
1838 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
1839 {
1840   AmpiOtherElement &el=elements[srcIdx];
1841   if (el.nOut==0) return 0; // No more out-of-order left.
1842   // Walk through our out-of-order queue, searching for our next message:
1843   for (int i=0;i<out.length();i++) {
1844     AmpiMsg *msg=out.deq();
1845     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
1846       el.seqIncoming++;
1847       el.nOut--; // We have one less message out-of-order
1848       return msg;
1849     }
1850     else
1851       out.enq(msg);
1852   }
1853   // We walked the whole queue-- ours is not there.
1854   return 0;
1855 }
1856
1857
1858 void AmpiRequestList::pup(PUP::er &p) { 
1859         CmiMemoryCheck();
1860         if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
1861                 return;
1862         }
1863
1864         p(blklen); //Allocated size of block
1865         p(len); //Number of used elements in block
1866         if(p.isUnpacking()){
1867                 makeBlock(blklen,len);
1868         }
1869         int count=0;
1870         for(int i=0;i<len;i++){
1871                 char nonnull;
1872                 if(!p.isUnpacking()){
1873                         if(block[i] == NULL){
1874                                 nonnull = 0;
1875                         }else{
1876                                 nonnull = block[i]->getType();
1877                         }
1878                 }       
1879                 p(nonnull);
1880                 if(nonnull != 0){
1881                         if(p.isUnpacking()){
1882                                 switch(nonnull){
1883                                         case 1:
1884                                                 block[i] = new PersReq;
1885                                                 break;
1886                                         case 2: 
1887                                                 block[i] = new IReq;
1888                                                 break;
1889                                         case 3: 
1890                                                 block[i] = new ATAReq;
1891                                                 break;
1892                                 }
1893                         }       
1894                         block[i]->pup(p);
1895                         count++;
1896                 }else{
1897                         block[i] = 0;
1898                 }
1899         }
1900         if(p.isDeleting()){
1901                 freeBlock();
1902         }
1903         CmiMemoryCheck();
1904 }
1905
1906 //------------------ External Interface -----------------
1907 ampiParent *getAmpiParent(void) {
1908   ampiParent *p = CtvAccess(ampiPtr);
1909 #ifndef CMK_OPTIMIZE
1910   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
1911 #endif
1912   return p;
1913 }
1914
1915 ampi *getAmpiInstance(MPI_Comm comm) {
1916   ampi *ptr=getAmpiParent()->comm2ampi(comm);
1917 #ifndef CMK_OPTIMIZE
1918   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
1919 #endif
1920   return ptr;
1921 }
1922
1923 inline static AmpiRequestList *getReqs(void) {
1924   return &(getAmpiParent()->ampiReqs);
1925 }
1926
1927 inline void checkComm(MPI_Comm comm){
1928 #ifndef CMK_OPTIMIZE
1929   getAmpiParent()->checkComm(comm);
1930 #endif
1931 }
1932
1933 inline void checkRequest(MPI_Request req){
1934 #ifndef CMK_OPTIMIZE
1935   getReqs()->checkRequest(req);
1936 #endif
1937 }
1938
1939 inline void checkRequests(int n, MPI_Request* reqs){
1940 #ifndef CMK_OPTIMIZE
1941   AmpiRequestList* reqlist = getReqs();
1942   for(int i=0;i<n;i++)
1943     reqlist->checkRequest(reqs[i]);
1944 #endif
1945 }
1946
1947 CDECL void AMPI_Migrate(void)
1948 {
1949   AMPIAPI("AMPI_Migrate");
1950 #if 0
1951 #if CMK_BLUEGENE_CHARM
1952   TRACE_BG_AMPI_SUSPEND();
1953 #endif
1954 #endif
1955   TCHARM_Migrate();
1956 #if CMK_BLUEGENE_CHARM
1957 //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
1958   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
1959 #endif
1960 }
1961
1962
1963 CDECL void AMPI_Evacuate(void)
1964 {
1965   TCHARM_Evacuate();
1966 }
1967
1968
1969
1970 CDECL void AMPI_Migrateto(int destPE)
1971 {
1972   AMPIAPI("AMPI_MigrateTo");
1973 #if 0
1974 #if CMK_BLUEGENE_CHARM
1975   TRACE_BG_AMPI_SUSPEND();
1976 #endif
1977 #endif
1978   TCHARM_Migrate_to(destPE);
1979 #if CMK_BLUEGENE_CHARM
1980   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
1981   TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
1982 #endif
1983 }
1984
1985 CDECL void AMPI_MigrateTo(int destPE)
1986 {
1987         AMPI_Migrateto(destPE);
1988 }
1989
1990 CDECL void AMPI_Async_Migrate(void)
1991 {
1992   AMPIAPI("AMPI_Async_Migrate");
1993 #if 0
1994 #if CMK_BLUEGENE_CHARM
1995   TRACE_BG_AMPI_SUSPEND();
1996 #endif
1997 #endif
1998   TCHARM_Async_Migrate();
1999 #if CMK_BLUEGENE_CHARM
2000   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2001   TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
2002 #endif
2003 }
2004
2005 CDECL void AMPI_Allow_Migrate(void)
2006 {
2007   AMPIAPI("AMPI_Allow_Migrate");
2008 #if 0
2009 #if CMK_BLUEGENE_CHARM
2010   TRACE_BG_AMPI_SUSPEND();
2011 #endif
2012 #endif
2013   TCHARM_Allow_Migrate();
2014 #if CMK_BLUEGENE_CHARM
2015   TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
2016 #endif
2017 }
2018
2019 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
2020 #if CMK_LBDB_ON
2021   //AMPIAPI("AMPI_Setmigratable");
2022   ampi *ptr=getAmpiInstance(comm);
2023   ptr->setMigratable(mig);
2024 #else
2025   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
2026 #endif
2027 }
2028
2029 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
2030 {
2031   if (nodeinit_has_been_called) {
2032     AMPIAPI("AMPI_Init");
2033     char **argv;
2034     if (p_argv) argv=*p_argv;
2035     else argv=CkGetArgv();
2036     ampiInit(argv);
2037     if (p_argc) *p_argc=CmiGetArgc(argv);
2038   }
2039   else
2040   { /* Charm hasn't been started yet! */
2041     CkAbort("AMPI_Init> Charm is not initialized!");
2042   }
2043   return 0;
2044 }
2045
2046 CDECL int AMPI_Initialized(int *isInit)
2047 {
2048   if (nodeinit_has_been_called) {
2049         AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2050         *isInit=CtvAccess(ampiInitDone);
2051   }
2052   else /* !nodeinit_has_been_called */ {
2053         *isInit=nodeinit_has_been_called;
2054   }
2055   return 0;
2056 }
2057
2058 CDECL int AMPI_Finalized(int *isFinalized)
2059 {
2060     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2061     *isFinalized=CtvAccess(ampiFinalized);
2062     return 0;
2063 }
2064
2065 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
2066 {
2067   //AMPIAPI("AMPI_Comm_rank");
2068
2069 #ifdef AMPIMSGLOG
2070   ampiParent* pptr = getAmpiParent();
2071   if(msgLogRead){
2072     PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
2073     return 0;
2074   }
2075 #endif
2076
2077   *rank = getAmpiInstance(comm)->getRank(comm);
2078
2079 #ifdef AMPIMSGLOG
2080   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2081     PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
2082   }
2083 #endif
2084   return 0;
2085 }
2086
2087 CDECL
2088 int AMPI_Comm_size(MPI_Comm comm, int *size)
2089 {
2090   //AMPIAPI("AMPI_Comm_size");
2091 #ifdef AMPIMSGLOG
2092   ampiParent* pptr = getAmpiParent();
2093   if(msgLogRead){
2094     PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
2095     return 0;
2096   }
2097 #endif
2098
2099   *size = getAmpiInstance(comm)->getSize(comm);
2100
2101 #ifdef AMPIMSGLOG
2102   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2103     PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
2104   }
2105 #endif
2106
2107   return 0;
2108 }
2109
2110 CDECL
2111 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
2112 {
2113   AMPIAPI("AMPI_Comm_compare");
2114   if(comm1==comm2) *result=MPI_IDENT;
2115   else{
2116     int equal=1;
2117     CkVec<int> ind1, ind2;
2118     ind1 = getAmpiInstance(comm1)->getIndices();
2119     ind2 = getAmpiInstance(comm2)->getIndices();
2120     if(ind1.size()==ind2.size()){
2121       for(int i=0;i<ind1.size();i++)
2122         if(ind1[i] != ind2[i]) { equal=0; break; }
2123     }
2124     if(equal==1) *result=MPI_CONGRUENT;
2125     else *result=MPI_UNEQUAL;
2126   }
2127   return 0;
2128 }
2129
2130 CDECL void AMPI_Exit(int /*exitCode*/)
2131 {
2132   AMPIAPI("AMPI_Exit");
2133   TCHARM_Done();
2134 }
2135 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
2136 {
2137   AMPI_Exit(*exitCode);
2138 }
2139
2140 CDECL
2141 int AMPI_Finalize(void)
2142 {
2143   AMPIAPI("AMPI_Finalize");
2144 #if PRINT_IDLE
2145   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
2146 #endif
2147 #if AMPI_COUNTER
2148     getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
2149 #endif
2150   CtvAccess(ampiFinalized)=1;
2151 #if 0
2152 #if CMK_BLUEGENE_CHARM
2153   TRACE_BG_AMPI_SUSPEND();
2154 #endif
2155 #endif
2156 //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
2157   AMPI_Exit(0);
2158   return 0;
2159 }
2160
2161 CDECL
2162 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
2163                         int tag, MPI_Comm comm)
2164 {
2165   AMPIAPI("AMPI_Send");
2166 #ifdef AMPIMSGLOG
2167   if(msgLogRead){
2168     return 0;
2169   }
2170 #endif
2171
2172   ampi *ptr = getAmpiInstance(comm);
2173 #if AMPI_COMLIB
2174   if(enableStreaming){  
2175     ptr->getStreaming().beginIteration();
2176     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2177   } else
2178 #endif
2179     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
2180 #if AMPI_COUNTER
2181   getAmpiParent()->counters.send++;
2182 #endif
2183   return 0;
2184 }
2185
2186 CDECL
2187 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
2188               MPI_Comm comm, MPI_Status *status)
2189 {
2190   AMPIAPI("AMPI_Recv");
2191
2192 #ifdef AMPIMSGLOG
2193   ampiParent* pptr = getAmpiParent();
2194   if(msgLogRead){
2195     (*(pptr->fromPUPer))|(pptr->pupBytes);
2196     PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
2197     PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
2198     return 0;
2199   }
2200 #endif
2201
2202   ampi *ptr = getAmpiInstance(comm);
2203   if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
2204   
2205 #if AMPI_COUNTER
2206   getAmpiParent()->counters.recv++;
2207 #endif
2208  
2209 #ifdef AMPIMSGLOG
2210   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2211     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2212     (*(pptr->toPUPer))|(pptr->pupBytes);
2213     PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
2214     PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
2215   }
2216 #endif
2217   
2218   return 0;
2219 }
2220
2221 CDECL
2222 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
2223 {
2224   AMPIAPI("AMPI_Probe");
2225   ampi *ptr = getAmpiInstance(comm);
2226   ptr->probe(tag,src, comm, (int*) status);
2227   return 0;
2228 }
2229
2230 CDECL
2231 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
2232 {
2233   AMPIAPI("AMPI_Iprobe");
2234   ampi *ptr = getAmpiInstance(comm);
2235   *flag = ptr->iprobe(tag,src,comm,(int*) status);
2236   return 0;
2237 }
2238
2239 CDECL
2240 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
2241                   int stag, void *rbuf, int rcount, int rtype,
2242                   int src, int rtag, MPI_Comm comm, MPI_Status *sts)
2243 {
2244   AMPIAPI("AMPI_Sendrecv");
2245   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
2246   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
2247   if (se) return se;
2248   else return re;
2249 }
2250
2251 CDECL
2252 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
2253                          int dest, int sendtag, int source, int recvtag,
2254                          MPI_Comm comm, MPI_Status *status)
2255 {
2256   AMPIAPI("AMPI_Sendrecv_replace");
2257   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
2258                       buf, count, datatype, source, recvtag, comm, status);
2259 }
2260
2261
2262 CDECL
2263 int AMPI_Barrier(MPI_Comm comm)
2264 {
2265   AMPIAPI("AMPI_Barrier");
2266
2267   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
2268 #if CMK_BLUEGENE_CHARM
2269   void *barrierLog;             // store current log in timeline
2270   _TRACE_BG_TLINE_END(&barrierLog); 
2271   //TRACE_BG_AMPI_BARRIER_START(barrierLog);
2272   TRACE_BG_AMPI_BREAK(NULL, "AMPI_Barrier", NULL, 0);
2273 #endif
2274   //HACK: Use collective operation as a barrier.
2275   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
2276 #if CMK_BLUEGENE_CHARM
2277   //TRACE_BG_AMPI_BARRIER_END(barrierLog);
2278   _TRACE_BG_SET_INFO(NULL, "AMPI_Barrier_END",  &barrierLog, 1);
2279 #endif
2280 #if AMPI_COUNTER
2281   getAmpiParent()->counters.barrier++;
2282 #endif
2283   return 0;
2284 }
2285
2286 CDECL
2287 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
2288                          MPI_Comm comm)
2289 {
2290   AMPIAPI("AMPI_Bcast");
2291   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
2292   if(comm==MPI_COMM_SELF) return 0;
2293
2294 #ifdef AMPIMSGLOG
2295   ampiParent* pptr = getAmpiParent();
2296   if(msgLogRead){
2297     (*(pptr->fromPUPer))|(pptr->pupBytes);
2298     PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
2299     return 0;
2300   }
2301 #endif
2302
2303   ampi* ptr = getAmpiInstance(comm);
2304   ptr->bcast(root, buf, count, type,comm);
2305 #if AMPI_COUNTER
2306   getAmpiParent()->counters.bcast++;
2307 #endif
2308
2309 #ifdef AMPIMSGLOG
2310   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2311     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2312     (*(pptr->toPUPer))|(pptr->pupBytes);
2313     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
2314   }
2315 #endif
2316
2317   return 0;
2318 }
2319
2320 /// This routine is called with the results of a Reduce or AllReduce
2321 const int MPI_REDUCE_SOURCE=0;
2322 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2323 void ampi::reduceResult(CkReductionMsg *msg)
2324 {
2325         MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2326   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2327              thisArrayID,thisIndex);
2328   delete msg;
2329 }
2330
2331 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2332 {
2333   int szdata = ddt->getSize(count);
2334   int szhdr = sizeof(AmpiOpHeader);
2335   AmpiOpHeader newhdr(op,type,count,szdata); 
2336   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2337   memcpy(msg->getData(),&newhdr,szhdr);
2338   TCharm::activateVariable(inbuf);
2339   ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2340   TCharm::deactivateVariable(inbuf);
2341   return msg;
2342 }
2343
2344 // Copy the MPI datatype "type" from inbuf to outbuf
2345 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2346   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2347   //  deserializing into the output.
2348   ampi *ptr = getAmpiInstance(comm);
2349   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2350   int len=ddt->getSize(count);
2351   char *serialized=new char[len];
2352   TCharm::activateVariable(inbuf);
2353   TCharm::activateVariable(outbuf);
2354   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
2355   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
2356   TCharm::deactivateVariable(outbuf);
2357   TCharm::deactivateVariable(inbuf);
2358   delete [] serialized;         // < memory leak!  // gzheng 
2359   
2360   return MPI_SUCCESS;
2361 }
2362
2363 CDECL
2364 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
2365                int root, MPI_Comm comm)
2366 {
2367   AMPIAPI("AMPI_Reduce");
2368   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2369
2370 #ifdef AMPIMSGLOG
2371   ampiParent* pptr = getAmpiParent();
2372   if(msgLogRead){
2373     (*(pptr->fromPUPer))|(pptr->pupBytes);
2374     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
2375     return 0;
2376   }
2377 #endif
2378   
2379   ampi *ptr = getAmpiInstance(comm);
2380   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
2381   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
2382   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
2383
2384   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2385
2386   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
2387   msg->setCallback(reduceCB);
2388         MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
2389   ptr->contribute(msg);
2390   if (ptr->thisIndex == rootIdx){
2391     /*HACK: Use recv() to block until reduction data comes back*/
2392     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2393       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
2394   }
2395 #if AMPI_COUNTER
2396   getAmpiParent()->counters.reduce++;
2397 #endif
2398
2399 #ifdef AMPIMSGLOG
2400   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2401     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2402     (*(pptr->toPUPer))|(pptr->pupBytes);
2403     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
2404   }
2405 #endif
2406   
2407   return 0;
2408 }
2409
2410 CDECL
2411 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
2412                   MPI_Op op, MPI_Comm comm)
2413 {
2414   AMPIAPI("AMPI_Allreduce");
2415   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2416
2417 #ifdef AMPIMSGLOG
2418   ampiParent* pptr = getAmpiParent();
2419   if(msgLogRead){
2420     (*(pptr->fromPUPer))|(pptr->pupBytes);
2421     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
2422     //    CkExit();
2423     return 0;
2424   }
2425 #endif
2426
2427   ampi *ptr = getAmpiInstance(comm);
2428   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
2429   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
2430
2431   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2432   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2433   msg->setCallback(allreduceCB);
2434   ptr->contribute(msg);
2435
2436   /*HACK: Use recv() to block until the reduction data comes back*/
2437   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2438     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
2439 #if AMPI_COUNTER
2440   getAmpiParent()->counters.allreduce++;
2441 #endif
2442
2443 #ifdef AMPIMSGLOG
2444   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2445     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2446     (*(pptr->toPUPer))|(pptr->pupBytes);
2447     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
2448     //    CkExit();
2449   }
2450 #endif
2451
2452   return 0;
2453 }
2454
2455 CDECL
2456 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
2457                    MPI_Op op, MPI_Comm comm, MPI_Request* request)
2458 {
2459   AMPIAPI("AMPI_Allreduce");
2460   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2461
2462   checkRequest(*request);
2463   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
2464   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
2465   ampi *ptr = getAmpiInstance(comm);
2466
2467   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2468   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2469   msg->setCallback(allreduceCB);
2470   ptr->contribute(msg);
2471
2472   // using irecv instead recv to non-block the call and get request pointer
2473   AmpiRequestList* reqs = getReqs();
2474   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
2475   *request = reqs->insert(newreq);
2476   return 0;
2477 }
2478
2479 CDECL
2480 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
2481                        MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
2482 {
2483   AMPIAPI("AMPI_Reduce_scatter");
2484   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
2485   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
2486   ampi *ptr = getAmpiInstance(comm);
2487   int size = ptr->getSize(comm);
2488   int count=0;
2489   int *displs = new int [size];
2490   int len;
2491   void *tmpbuf;
2492
2493   //under construction
2494   for(int i=0;i<size;i++){
2495     displs[i] = count;
2496     count+= recvcounts[i];
2497   }
2498   len = ptr->getDDT()->getType(datatype)->getSize(count);
2499   tmpbuf = malloc(len);
2500   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
2501   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
2502                recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
2503   free(tmpbuf);
2504   delete [] displs;     // < memory leak ! // gzheng
2505   return 0;
2506 }
2507
2508 /***** MPI_Scan algorithm (from MPICH) *******
2509    recvbuf = sendbuf;
2510    partial_scan = sendbuf;
2511    mask = 0x1;
2512    while (mask < size) {
2513       dst = rank^mask;
2514       if (dst < size) {
2515          send partial_scan to dst;
2516          recv from dst into tmp_buf;
2517          if (rank > dst) {
2518             partial_scan = tmp_buf + partial_scan;
2519             recvbuf = tmp_buf + recvbuf;
2520          }
2521          else {
2522             if (op is commutative)
2523                partial_scan = tmp_buf + partial_scan;
2524             else {
2525                tmp_buf = partial_scan + tmp_buf;
2526                partial_scan = tmp_buf;
2527             }
2528          }
2529       }
2530       mask <<= 1;
2531    }
2532  ***** MPI_Scan algorithm (from MPICH) *******/
2533
2534 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
2535   (op)(invec,inoutvec,&count,&datatype);
2536 }
2537 CDECL
2538 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
2539   AMPIAPI("AMPI_Scan");
2540   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
2541   MPI_Status sts;
2542   ampi *ptr = getAmpiInstance(comm);
2543   int size = ptr->getSize(comm);
2544   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
2545   int rank = ptr->getRank(comm);
2546   int mask = 0x1;
2547   int dst;
2548   void* tmp_buf = malloc(blklen);
2549   void* partial_scan = malloc(blklen);
2550   
2551   memcpy(recvbuf, sendbuf, blklen);
2552   memcpy(partial_scan, sendbuf, blklen);
2553   while(mask < size){
2554     dst = rank^mask;
2555     if(dst < size){
2556       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
2557                    tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
2558       if(rank > dst){
2559         (op)(tmp_buf,partial_scan,&count,&datatype);
2560         (op)(tmp_buf,recvbuf,&count,&datatype);
2561       }else {
2562         (op)(partial_scan,tmp_buf,&count,&datatype);
2563         memcpy(partial_scan,tmp_buf,blklen);
2564       }
2565     }
2566     mask <<= 1;
2567
2568   }
2569
2570   free(tmp_buf);
2571   free(partial_scan);
2572 #if AMPI_COUNTER
2573   getAmpiParent()->counters.scan++;
2574 #endif
2575   return 0;
2576 }
2577
2578 CDECL
2579 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
2580   //AMPIAPI("AMPI_Op_create");
2581   *op = function;
2582   return 0;
2583 }
2584
2585 CDECL
2586 int AMPI_Op_free(MPI_Op *op){
2587   //AMPIAPI("AMPI_Op_free");
2588   *op = MPI_OP_NULL;
2589   return 0;
2590 }
2591
2592
2593 CDECL
2594 double AMPI_Wtime(void)
2595 {
2596 //  AMPIAPI("AMPI_Wtime");
2597
2598 #ifdef AMPIMSGLOG
2599   double ret=TCHARM_Wall_timer();
2600   ampiParent* pptr = getAmpiParent();
2601   if(msgLogRead){
2602     (*(pptr->fromPUPer))|ret;
2603     return ret;
2604   }
2605
2606   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2607     (*(pptr->toPUPer))|ret;
2608   }
2609 #endif
2610
2611 #if CMK_BLUEGENE_CHARM
2612   return BgGetTime();
2613 #else
2614   return TCHARM_Wall_timer();
2615 #endif
2616 }
2617
2618 CDECL
2619 double AMPI_Wtick(void){
2620   //AMPIAPI("AMPI_Wtick");
2621   return 1e-6;
2622 }
2623
2624 int PersReq::start(){
2625   if(sndrcv == 1) { // send request
2626     ampi *ptr=getAmpiInstance(comm);
2627     ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm);
2628   }
2629   return 0;
2630 }
2631
2632 CDECL
2633 int AMPI_Start(MPI_Request *request)
2634 {
2635   AMPIAPI("AMPI_Start");
2636   checkRequest(*request);
2637   AmpiRequestList *reqs = getReqs();
2638   if(-1==(*reqs)[*request]->start()){
2639     CkAbort("MPI_Start could be used only on persistent communication requests!");
2640   }
2641   return 0;
2642 }
2643
2644 CDECL
2645 int AMPI_Startall(int count, MPI_Request *requests){
2646   AMPIAPI("AMPI_Startall");
2647   checkRequests(count,requests);
2648   AmpiRequestList *reqs = getReqs();
2649   for(int i=0;i<count;i++){
2650     if(-1==(*reqs)[requests[i]]->start())
2651       CkAbort("MPI_Start could be used only on persistent communication requests!");
2652   }
2653   return 0;
2654 }
2655
2656 /* organize the indices of requests into a vector of a vector: 
2657  * level 1 is different msg envelope matches
2658  * level 2 is (posting) ordered requests of with envelope
2659  * each time multiple completion call loop over first elem of level 1
2660  * and move the matched to the NULL request slot.   
2661  * warning: this does not work with I-Alltoall requests */
2662 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
2663   for(int i=0;i<count;i++){
2664     if(reqs[i]!=MPI_REQUEST_NULL)
2665       return 0;
2666   }
2667   return 1;
2668 }
2669 inline int matchReq(MPI_Request ia, MPI_Request ib){
2670   checkRequest(ia);  
2671   checkRequest(ib);
2672   AmpiRequestList* reqs = getReqs();
2673   AmpiRequest *a, *b;
2674   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
2675   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
2676   a=(*reqs)[ia];  b=(*reqs)[ib];
2677   if(a->tag != b->tag) return 0;
2678   if(a->src != b->src) return 0;
2679   if(a->comm != b->comm) return 0;
2680   return 1;
2681 }
2682 inline void swapInt(int& a,int& b){
2683   int tmp;
2684   tmp=a; a=b; b=tmp;
2685 }
2686 inline void sortedIndex(int n, int* arr, int* idx){
2687   int i,j;
2688   for(i=0;i<n;i++) 
2689     idx[i]=i;
2690   for (i=0; i<n-1; i++) 
2691     for (j=0; j<n-1-i; j++)
2692       if (arr[idx[j+1]] < arr[idx[j]]) 
2693         swapInt(idx[j+1],idx[j]);
2694 }
2695 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
2696   CkAssert(count!=0);
2697   int *newidx = new int [count];
2698   int flag;
2699   sortedIndex(count,arr,newidx);
2700   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
2701   CkVec<int> slot;
2702   vec->push_back(slot);
2703   (*vec)[0].push_back(newidx[0]);
2704   for(int i=1;i<count;i++){
2705     flag=0;
2706     for(int j=0;j<vec->size();j++){
2707       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
2708         ((*vec)[j]).push_back(newidx[i]);
2709         flag++;
2710       }
2711     }
2712     if(!flag){
2713       CkVec<int> newslot;
2714       newslot.push_back(newidx[i]);
2715       vec->push_back(newslot);
2716     }else{
2717       CkAssert(flag==1);
2718     }
2719   }
2720   delete [] newidx;
2721   return vec;
2722 }
2723 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
2724   printf("vec content: ");
2725   for(int i=0;i<vec.size();i++){
2726     printf("{");
2727     for(int j=0;j<(vec[i]).size();j++){
2728       printf(" %d ",arr[(vec[i])[j]]);
2729     }
2730     printf("} ");
2731   }
2732   printf("\n");
2733 }
2734
2735 int PersReq::wait(MPI_Status *sts){
2736         if(sndrcv == 2) {
2737                 if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
2738                         CkAbort("AMPI> Error in persistent request wait");
2739 #if CMK_BLUEGENE_CHARM
2740                 _TRACE_BG_TLINE_END(&event);
2741 #endif
2742         }
2743         return 0;
2744 }
2745
2746 int IReq::wait(MPI_Status *sts){
2747   if (CpvAccess(CmiPICMethod) != 2) 
2748   {
2749           // optimization for Irecv
2750           // generic() writes directly to the buffer, so the only thing we
2751           // do here is to wait
2752         ampi *ptr = getAmpiInstance(comm);
2753         while (status == false) {
2754                 ptr->resumeOnRecv=true;
2755                 ptr->block();
2756         }
2757         ptr->resumeOnRecv=false;
2758         if(sts) {
2759           sts->MPI_TAG = tag;
2760           sts->MPI_SOURCE = src;
2761           sts->MPI_COMM = comm;
2762           sts->MPI_LENGTH = length;
2763         }
2764   }
2765   else {
2766         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
2767                 CkAbort("AMPI> Error in non-blocking request wait");
2768   }
2769 #if CMK_BLUEGENE_CHARM
2770         _TRACE_BG_TLINE_END(&event);
2771 #endif
2772         return 0;
2773 }
2774
2775 int ATAReq::wait(MPI_Status *sts){
2776         int i;
2777         for(i=0;i<count;i++){
2778                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
2779                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
2780                         CkAbort("AMPI> Error in alltoall request wait");
2781 #if CMK_BLUEGENE_CHARM
2782                 _TRACE_BG_TLINE_END(&myreqs[i].event);
2783 #endif
2784         }
2785 #if CMK_BLUEGENE_CHARM
2786         //TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", NULL, 0);
2787         TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0);
2788         for (i=0; i<count; i++)
2789           _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
2790         _TRACE_BG_TLINE_END(&event);
2791 #endif
2792         return 0;
2793 }
2794
2795 CDECL
2796 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
2797 {
2798   AMPIAPI("AMPI_Wait");
2799   if(*request == MPI_REQUEST_NULL){
2800     stsempty(*sts);
2801     return 0;
2802   }
2803   checkRequest(*request);
2804   AmpiRequestList* reqs = getReqs();
2805
2806 #ifdef AMPIMSGLOG
2807   ampiParent* pptr = getAmpiParent();
2808   if(msgLogRead){
2809     (*(pptr->fromPUPer))|(pptr->pupBytes);
2810     PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
2811     PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
2812     return 0;
2813   }
2814 #endif
2815   
2816   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
2817   (*reqs)[*request]->wait(sts);
2818
2819 #ifdef AMPIMSGLOG
2820   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2821     (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
2822     (*(pptr->toPUPer))|(pptr->pupBytes);
2823     PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
2824     PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
2825   }
2826 #endif
2827   
2828   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
2829     reqs->free(*request);
2830     *request = MPI_REQUEST_NULL;
2831   }
2832
2833   return 0;
2834 }
2835
2836 CDECL
2837 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
2838 {
2839   AMPIAPI("AMPI_Waitall");
2840   if(count==0) return MPI_SUCCESS;
2841   checkRequests(count,request);
2842   int i,j,oldPe;
2843   AmpiRequestList* reqs = getReqs();
2844   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);\
2845
2846 #ifdef AMPIMSGLOG
2847   ampiParent* pptr = getAmpiParent();
2848   if(msgLogRead){
2849     for(i=0;i<reqvec->size();i++){
2850       for(j=0;j<((*reqvec)[i]).size();j++){
2851         if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
2852           stsempty(sts[((*reqvec)[i])[j]]);
2853           continue;
2854         }
2855         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
2856         (*(pptr->fromPUPer))|(pptr->pupBytes);
2857         PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
2858         PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
2859       }
2860     }
2861     return 0;
2862   }
2863 #endif
2864
2865 #if CMK_BLUEGENE_CHARM
2866   void *curLog;         // store current log in timeline
2867   _TRACE_BG_TLINE_END(&curLog);
2868 #if 0
2869   TRACE_BG_AMPI_SUSPEND();
2870 #endif
2871 #endif
2872   for(i=0;i<reqvec->size();i++){
2873     for(j=0;j<((*reqvec)[i]).size();j++){
2874       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
2875         stsempty(sts[((*reqvec)[i])[j]]);
2876         continue;
2877       }
2878       oldPe = CkMyPe();
2879       AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
2880       waitReq->wait(&sts[((*reqvec)[i])[j]]);
2881
2882 #ifdef AMPIMSGLOG
2883       if(msgLogWrite && pptr->thisIndex == msgLogRank){
2884         (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
2885         (*(pptr->toPUPer))|(pptr->pupBytes);
2886         PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
2887         PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
2888       }
2889 #endif
2890
2891 #if 1
2892       if(oldPe != CkMyPe()){
2893                         reqs = getReqs();
2894                         reqvec  = vecIndex(count,request);
2895       }
2896 #endif
2897     }
2898   }
2899 #if CMK_BLUEGENE_CHARM
2900   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
2901 #endif
2902   // free memory of requests
2903   for(i=0;i<count;i++){ 
2904     if(request[i] == MPI_REQUEST_NULL)
2905       continue;
2906     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
2907       reqs->free(request[i]);
2908       request[i] = MPI_REQUEST_NULL;
2909     }
2910   }
2911   delete reqvec;
2912   return 0;
2913 }
2914
2915 CDECL
2916 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
2917 {
2918   AMPIAPI("AMPI_Waitany");
2919   
2920   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
2921   checkRequests(count,request);
2922   if(areInactiveReqs(count,request)){
2923     *idx=MPI_UNDEFINED;
2924     stsempty(*sts);
2925     return MPI_SUCCESS;
2926   }
2927   int flag=0;
2928   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
2929   while(count>0){ /* keep looping until some request finishes: */
2930     for(int i=0;i<reqvec->size();i++){
2931       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
2932       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
2933         *idx = ((*reqvec)[i])[0];
2934         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
2935         return 0;
2936       }
2937     }
2938     /* no requests have finished yet-- schedule and try again */
2939     AMPI_Yield(MPI_COMM_WORLD);
2940   }
2941   *idx = MPI_UNDEFINED;
2942   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
2943         delete reqvec;
2944   return 0;
2945 }
2946
2947 CDECL
2948 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
2949                  int *array_of_indices, MPI_Status *array_of_statuses)
2950 {
2951   AMPIAPI("AMPI_Waitsome");
2952   checkRequests(incount,array_of_requests);
2953   if(areInactiveReqs(incount,array_of_requests)){
2954     *outcount=MPI_UNDEFINED;
2955     return MPI_SUCCESS;
2956   }
2957   MPI_Status sts;
2958   int i;
2959   int flag=0, realflag=0;
2960   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
2961   *outcount = 0;
2962   while(1){
2963     for(i=0;i<reqvec->size();i++){
2964       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
2965       if(flag == 1){ 
2966         array_of_indices[(*outcount)]=((*reqvec)[i])[0];
2967         array_of_statuses[(*outcount)++]=sts;
2968         if(sts.MPI_COMM != 0)
2969           realflag=1; // there is real(non null) request
2970       }
2971     }
2972     if(realflag && outcount>0) break;
2973   }
2974         delete reqvec;
2975   return 0;
2976 }
2977
2978 CmiBool PersReq::test(MPI_Status *sts){
2979         if(sndrcv == 2)         // recv request
2980                 return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
2981         else                    // send request
2982                 return 1;
2983
2984 }
2985 void PersReq::complete(MPI_Status *sts){
2986         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
2987                 CkAbort("AMPI> Error in persistent request complete");
2988 }
2989
2990 CmiBool IReq::test(MPI_Status *sts){
2991         if (status == true) {           
2992           if(sts)
2993             sts->MPI_LENGTH = length;           
2994           return true;
2995         }
2996         else {
2997           getAmpiInstance(comm)->yield();
2998           return false;
2999         }
3000 /*
3001         return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3002 */
3003 }
3004 void IReq::complete(MPI_Status *sts){
3005         wait(sts);
3006 /*
3007         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3008                 CkAbort("AMPI> Error in non-blocking request complete");
3009 */
3010 }
3011
3012 void IReq::receive(ampi *ptr, AmpiMsg *msg)
3013 {
3014     int sts = ptr->processMessage(msg, tag, src, buf, count, type);
3015     status = (sts == 0);
3016     length = msg->length;
3017     delete msg;
3018 }
3019
3020 CmiBool ATAReq::test(MPI_Status *sts){
3021         int i, flag=1;
3022         for(i=0;i<count;i++){
3023                 flag *= getAmpiInstance(myreqs[i].comm)->iprobe(myreqs[i].tag, myreqs[i].src,
3024                                         myreqs[i].comm, (int*) sts);
3025         }
3026         return flag;
3027 }
3028 void ATAReq::complete(MPI_Status *sts){
3029         int i;
3030         for(i=0;i<count;i++){
3031                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3032                                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int*)sts))
3033                         CkAbort("AMPI> Error in alltoall request complete");
3034         }
3035 }
3036
3037 CDECL
3038 int AMPI_Test(MPI_Request *request, int *flag, MPI_Status *sts)
3039 {
3040   AMPIAPI("AMPI_Test");
3041   checkRequest(*request);
3042   if(*request==MPI_REQUEST_NULL) {
3043     *flag = 1;
3044     stsempty(*sts);
3045     return 0;
3046   }
3047   AmpiRequestList* reqs = getReqs();
3048   if(1 == (*flag = (*reqs)[*request]->test(sts))){
3049     if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3050       (*reqs)[*request]->complete(sts);
3051       reqs->free(*request);
3052       *request = MPI_REQUEST_NULL;
3053     }
3054   }
3055   return 0;
3056 }
3057
3058 CDECL
3059 int AMPI_Testany(int count, MPI_Request *request, int *index, int *flag, MPI_Status *sts){
3060   AMPIAPI("AMPI_Testany");
3061   checkRequests(count,request);
3062   if(areInactiveReqs(count,request)){
3063     *flag=1;
3064     *index=MPI_UNDEFINED;
3065     stsempty(*sts);
3066     return MPI_SUCCESS;
3067   }
3068   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3069   *flag=0;
3070   for(int i=0;i<reqvec->size();i++){
3071     AMPI_Test(&request[((*reqvec)[i])[0]], flag, sts);
3072     if(*flag==1 && sts->MPI_COMM!=0){ // skip MPI_REQUEST_NULL
3073       *index = ((*reqvec)[i])[0];
3074       return 0;
3075     }
3076   }
3077   *index = MPI_UNDEFINED;
3078         delete reqvec;
3079   return 0;
3080 }
3081
3082 CDECL
3083 int AMPI_Testall(int count, MPI_Request *request, int *flag, MPI_Status *sts)
3084 {
3085   AMPIAPI("AMPI_Testall");
3086   if(count==0) return MPI_SUCCESS;
3087   checkRequests(count,request);
3088   int tmpflag;
3089   int i,j;
3090   AmpiRequestList* reqs = getReqs();
3091   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3092   *flag = 1;  
3093   for(i=0;i<reqvec->size();i++){
3094     for(j=0;j<((*reqvec)[i]).size();j++){
3095       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL)
3096         continue;
3097       tmpflag = (*reqs)[request[((*reqvec)[i])[j]]]->test(&sts[((*reqvec)[i])[j]]);
3098       *flag *= tmpflag;
3099     }
3100   }
3101   if(flag) 
3102     MPI_Waitall(count,request,sts);
3103         delete reqvec;  
3104   return 0;
3105 }
3106
3107 CDECL
3108 int AMPI_Testsome(int incount, MPI_Request *array_of_requests, int *outcount,
3109                  int *array_of_indices, MPI_Status *array_of_statuses)
3110 {
3111   AMPIAPI("AMPI_Testsome");
3112   checkRequests(incount,array_of_requests);
3113   if(areInactiveReqs(incount,array_of_requests)){
3114     *outcount=MPI_UNDEFINED;
3115     return MPI_SUCCESS;
3116   }
3117   MPI_Status sts;
3118   int flag;
3119   int i;
3120   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3121   *outcount = 0;
3122   for(i=0;i<reqvec->size();i++){
3123     AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3124     if(flag == 1){
3125       array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3126       array_of_statuses[(*outcount)++]=sts;
3127     }
3128   }
3129         delete reqvec;
3130   return 0;
3131 }
3132
3133 CDECL
3134 int AMPI_Request_free(MPI_Request *request){
3135   AMPIAPI("AMPI_Request_free");
3136   checkRequest(*request);
3137   if(*request==MPI_REQUEST_NULL) return 0;
3138   AmpiRequestList* reqs = getReqs();
3139   reqs->free(*request);
3140   return 0;
3141 }
3142
3143 CDECL
3144 int AMPI_Cancel(MPI_Request *request){
3145   AMPIAPI("AMPI_Request_free");
3146   return AMPI_Request_free(request);
3147 }
3148
3149 CDECL
3150 int AMPI_Recv_init(void *buf, int count, int type, int src, int tag,
3151                    MPI_Comm comm, MPI_Request *req)
3152 {
3153   AMPIAPI("AMPI_Recv_init");
3154   AmpiRequestList* reqs = getReqs();
3155   PersReq *newreq = new PersReq(buf,count,type,src,tag,comm,2);
3156   *req = reqs->insert(newreq);
3157   return 0;
3158 }
3159
3160 CDECL
3161 int AMPI_Send_init(void *buf, int count, int type, int dest, int tag,
3162                    MPI_Comm comm, MPI_Request *req)
3163 {
3164   AMPIAPI("AMPI_Send_init");
3165   AmpiRequestList* reqs = getReqs();
3166   PersReq *newreq = new PersReq(buf,count,type,dest,tag,comm,1);
3167   *req = reqs->insert(newreq);
3168   return 0;
3169 }
3170
3171 CDECL
3172 int AMPI_Type_contiguous(int count, MPI_Datatype oldtype,
3173                          MPI_Datatype *newtype)
3174 {
3175   AMPIAPI("AMPI_Type_contiguous");
3176   getDDT()->newContiguous(count, oldtype, newtype);
3177   return 0;
3178 }
3179
3180 CDECL
3181 int AMPI_Type_vector(int count, int blocklength, int stride,
3182                      MPI_Datatype oldtype, MPI_Datatype*  newtype)
3183 {
3184   AMPIAPI("AMPI_Type_vector");
3185   getDDT()->newVector(count, blocklength, stride, oldtype, newtype);
3186   return 0 ;
3187 }
3188
3189 CDECL
3190 int AMPI_Type_hvector(int count, int blocklength, MPI_Aint stride, 
3191                       MPI_Datatype oldtype, MPI_Datatype*  newtype)
3192 {
3193   AMPIAPI("AMPI_Type_hvector");
3194   getDDT()->newHVector(count, blocklength, stride, oldtype, newtype);
3195   return 0 ;
3196 }
3197
3198 CDECL
3199 int AMPI_Type_indexed(int count, int* arrBlength, int* arrDisp, 
3200                       MPI_Datatype oldtype, MPI_Datatype*  newtype)
3201 {
3202   AMPIAPI("AMPI_Type_indexed");
3203   getDDT()->newIndexed(count, arrBlength, arrDisp, oldtype, newtype);
3204   return 0 ;
3205 }
3206
3207 CDECL
3208 int AMPI_Type_hindexed(int count, int* arrBlength, MPI_Aint* arrDisp,
3209                        MPI_Datatype oldtype, MPI_Datatype*  newtype)
3210 {
3211   AMPIAPI("AMPI_Type_hindexed");
3212   getDDT()->newHIndexed(count, arrBlength, arrDisp, oldtype, newtype);
3213   return 0 ;
3214 }
3215
3216 CDECL
3217 int AMPI_Type_struct(int count, int* arrBlength, int* arrDisp, 
3218                      MPI_Datatype* oldtype, MPI_Datatype*  newtype)
3219 {
3220   AMPIAPI("AMPI_Type_struct");
3221   getDDT()->newStruct(count, arrBlength, arrDisp, oldtype, newtype);
3222   return 0 ;
3223 }
3224
3225 CDECL
3226 int AMPI_Type_commit(MPI_Datatype *datatype)
3227 {
3228   AMPIAPI("AMPI_Type_commit");
3229   return 0;
3230 }
3231
3232 CDECL
3233 int AMPI_Type_free(MPI_Datatype *datatype)
3234 {
3235   AMPIAPI("AMPI_Type_free");
3236   getDDT()->freeType(datatype);
3237   return 0;
3238 }
3239
3240
3241 CDECL
3242 int AMPI_Type_extent(MPI_Datatype datatype, MPI_Aint *extent)
3243 {
3244   AMPIAPI("AMPI_Type_extent");
3245   *extent = getDDT()->getExtent(datatype);
3246   return 0;
3247 }
3248
3249 CDECL
3250 int AMPI_Type_size(MPI_Datatype datatype, int *size)
3251 {
3252   AMPIAPI("AMPI_Type_size");
3253   *size=getDDT()->getSize(datatype);
3254   return 0;
3255 }
3256
3257 CDECL
3258 int AMPI_Isend(void *buf, int count, MPI_Datatype type, int dest,
3259               int tag, MPI_Comm comm, MPI_Request *request)
3260 {
3261   AMPIAPI("AMPI_Isend");
3262
3263 #ifdef AMPIMSGLOG
3264   ampiParent* pptr = getAmpiParent();
3265   if(msgLogRead){
3266     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
3267     return 0;
3268   }
3269 #endif
3270
3271   USER_CALL_DEBUG("AMPI_Isend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
3272   ampi *ptr = getAmpiInstance(comm);
3273 #if AMPI_COMLIB
3274   if(enableStreaming){
3275     ptr->getStreaming().beginIteration();
3276     ptr->comlibsend(tag,ptr->getRank(comm),buf,count,type,dest,comm);
3277   } else
3278 #endif
3279   ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm);
3280   *request = MPI_REQUEST_NULL;
3281 #if AMPI_COUNTER
3282   getAmpiParent()->counters.isend++;
3283 #endif
3284
3285 #ifdef AMPIMSGLOG
3286   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3287     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
3288   }
3289 #endif
3290
3291   return 0;
3292 }
3293
3294 CDECL
3295 int AMPI_Irecv(void *buf, int count, MPI_Datatype type, int src,
3296               int tag, MPI_Comm comm, MPI_Request *request)
3297 {
3298   AMPIAPI("AMPI_Irecv");
3299
3300   if(src==MPI_PROC_NULL) { *request = MPI_REQUEST_NULL; return 0; }
3301   USER_CALL_DEBUG("AMPI_Irecv("<<type<<","<<src<<","<<tag<<","<<comm<<")");
3302   AmpiRequestList* reqs = getReqs();
3303   IReq *newreq = new IReq(buf,count,type,src,tag,comm);
3304   *request = reqs->insert(newreq);
3305
3306 #ifdef AMPIMSGLOG
3307   ampiParent* pptr = getAmpiParent();
3308   if(msgLogRead){
3309     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
3310     return 0;
3311   }
3312 #endif
3313  
3314   // if msg is here, shall we do receive right away ??
3315   // posted irecv
3316   // message already arraved?
3317   ampi *ptr = getAmpiInstance(comm);
3318   AmpiMsg *msg = NULL;
3319   if (CpvAccess(CmiPICMethod) != 2)       // not copyglobals 
3320   {
3321     msg = ptr->getMessage(tag, src, comm, &newreq->tag);
3322   }
3323   if (msg) {
3324     newreq->receive(ptr, msg);
3325   } else {
3326       // post receive
3327     int tags[3];
3328     tags[0] = tag; tags[1] = src; tags[2] = comm;
3329     CmmPut(ptr->posted_ireqs, 3, tags, newreq);
3330   }
3331   
3332 #if AMPI_COUNTER
3333   getAmpiParent()->counters.irecv++;
3334 #endif
3335
3336 #ifdef AMPIMSGLOG
3337   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3338     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
3339   }
3340 #endif
3341   
3342   return 0;
3343 }
3344
3345 CDECL
3346 int AMPI_Ireduce(void *sendbuf, void *recvbuf, int count, int type, MPI_Op op,
3347                  int root, MPI_Comm comm, MPI_Request *request)
3348 {
3349   AMPIAPI("AMPI_Ireduce");
3350   if(op == MPI_OP_NULL) CkAbort("MPI_Ireduce called with MPI_OP_NULL!!!");
3351   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,sendbuf,recvbuf);
3352   ampi *ptr = getAmpiInstance(comm);
3353   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),sendbuf,count,type,op);
3354   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
3355   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
3356   msg->setCallback(reduceCB);
3357   ptr->contribute(msg);
3358
3359   if (ptr->thisIndex == rootIdx){
3360     // using irecv instead recv to non-block the call and get request pointer
3361     AmpiRequestList* reqs = getReqs();
3362     IReq *newreq = new IReq(recvbuf,count,type,0,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
3363     *request = reqs->insert(newreq);
3364   }
3365   return 0;
3366 }
3367
3368 CDECL
3369 int AMPI_Allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3370                   void *recvbuf, int recvcount, MPI_Datatype recvtype,
3371                   MPI_Comm comm)
3372 {
3373   AMPIAPI("AMPI_Allgather");
3374   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgather not allowed for Inter-communicator!");
3375   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3376
3377   ampi *ptr = getAmpiInstance(comm);
3378   int size = ptr->getSize(comm);
3379   int i;
3380 #if AMPI_COMLIB
3381   if(comm == MPI_COMM_WORLD) {
3382       // commlib support
3383       ptr->getAllgather().beginIteration();
3384       for(i=0;i<size;i++) {
3385           ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3386                         sendtype, i, comm, ptr->getComlibProxy());
3387       }
3388       ptr->getAllgather().endIteration();
3389   } else 
3390 #endif
3391       for(i=0;i<size;i++) {
3392           ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3393                     sendtype, i, comm);
3394       }
3395
3396   MPI_Status status;
3397   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
3398   int itemsize = dttype->getSize(recvcount) ;
3399
3400   for(i=0;i<size;i++) {
3401     AMPI_Recv(((char*)recvbuf)+(itemsize*i), recvcount, recvtype,
3402              i, MPI_GATHER_TAG, comm, &status);
3403   }
3404 #if AMPI_COUNTER
3405   getAmpiParent()->counters.allgather++;
3406 #endif
3407   return 0;
3408 }
3409
3410 CDECL
3411 int AMPI_Iallgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3412                     void *recvbuf, int recvcount, MPI_Datatype recvtype,
3413                     MPI_Comm comm, MPI_Request* request)
3414 {
3415   AMPIAPI("AMPI_Iallgather");
3416   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallgather not allowed for Inter-communicator!");
3417   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3418
3419   ampi *ptr = getAmpiInstance(comm);
3420   int size = ptr->getSize(comm);
3421   int i;
3422 #if AMPI_COMLIB
3423   if(comm == MPI_COMM_WORLD) {
3424       // commlib support
3425       ptr->getAllgather().beginIteration();
3426       for(i=0;i<size;i++) {
3427           ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3428                         sendtype, i, comm, ptr->getComlibProxy());
3429       }
3430       ptr->getAllgather().endIteration();
3431   } else 
3432 #endif
3433       for(i=0;i<size;i++) {
3434           ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3435                     sendtype, i, comm);
3436       }
3437
3438   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
3439   int itemsize = dttype->getSize(recvcount) ;
3440
3441   // copy+paste from MPI_Irecv
3442   AmpiRequestList* reqs = getReqs();
3443   ATAReq *newreq = new ATAReq(size);
3444   for(i=0;i<size;i++){
3445     if(newreq->addReq(((char*)recvbuf)+(itemsize*i),recvcount,recvtype,i,MPI_GATHER_TAG,comm)!=(i+1))
3446       CkAbort("MPI_Iallgather: Error adding requests into ATAReq!");
3447   }
3448   *request = reqs->insert(newreq);
3449   AMPI_DEBUG("MPI_Iallgather: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3450
3451   return 0;
3452 }
3453
3454 CDECL
3455 int AMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3456                    void *recvbuf, int *recvcounts, int *displs,
3457                    MPI_Datatype recvtype, MPI_Comm comm)
3458 {
3459   AMPIAPI("AMPI_Allgatherv");
3460   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgatherv not allowed for Inter-communicator!");
3461   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3462
3463   ampi *ptr = getAmpiInstance(comm);
3464   int size = ptr->getSize(comm);
3465   int i;
3466 #if AMPI_COMLIB
3467   if(comm == MPI_COMM_WORLD) {
3468       // commlib support
3469       ptr->getAllgather().beginIteration();
3470       for(i=0;i<size;i++) {
3471           ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3472                         sendtype, i, comm, ptr->getComlibProxy());
3473       }
3474       ptr->getAllgather().endIteration();
3475   } else
3476 #endif 
3477       for(i=0;i<size;i++) {
3478           ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3479                     sendtype, i, comm);
3480       }
3481
3482   MPI_Status status;
3483   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
3484   int itemsize = dttype->getSize() ;
3485
3486   for(i=0;i<size;i++) {
3487     AMPI_Recv(((char*)recvbuf)+(itemsize*displs[i]), recvcounts[i], recvtype,
3488              i, MPI_GATHER_TAG, comm, &status);
3489   }
3490 #if AMPI_COUNTER
3491   getAmpiParent()->counters.allgather++;
3492 #endif
3493   return 0;
3494 }
3495
3496 CDECL
3497 int AMPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3498                void *recvbuf, int recvcount, MPI_Datatype recvtype,
3499                int root, MPI_Comm comm)
3500 {
3501   AMPIAPI("AMPI_Gather");
3502   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3503
3504 #ifdef AMPIMSGLOG
3505   ampiParent* pptr = getAmpiParent();
3506   if(msgLogRead){
3507     (*(pptr->fromPUPer))|(pptr->pupBytes);
3508     PUParray(*(pptr->fromPUPer), (char *)recvbuf, (pptr->pupBytes));
3509     return 0;
3510   }
3511 #endif
3512
3513   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Gather not allowed for Inter-communicator!");
3514
3515   ampi *ptr = getAmpiInstance(comm);
3516   int size = ptr->getSize(comm);
3517   int i;
3518   AMPI_Send(sendbuf, sendcount, sendtype, root, MPI_GATHER_TAG, comm);
3519
3520   if(ptr->getRank(comm)==root) {
3521     MPI_Status status;
3522     CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
3523     int itemsize = dttype->getSize(recvcount) ;
3524
3525     for(i=0;i<size;i++) {
3526       if(-1==ptr->recv(MPI_GATHER_TAG, i, (void*)(((char*)recvbuf)+(itemsize*i)), recvcount, recvtype, comm, (int*)(&status)))
3527         CkAbort("AMPI> Error in MPI_Gather recv");
3528     }
3529   }
3530 #if AMPI_COUNTER
3531   getAmpiParent()->counters.gather++;
3532 #endif
3533
3534 #ifdef AMPIMSGLOG
3535   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3536     (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcount * size;
3537     (*(pptr->toPUPer))|(pptr->pupBytes);
3538     PUParray(*(pptr->toPUPer), (char *)recvbuf, (pptr->pupBytes));
3539   }
3540 #endif
3541   
3542   return 0;
3543 }
3544
3545 CDECL
3546 int AMPI_Gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3547                 void *recvbuf, int *recvcounts, int *displs,
3548                 MPI_Datatype recvtype, int root, MPI_Comm comm)
3549 {
3550   AMPIAPI("AMPI_Gatherv");
3551   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3552
3553   int itemsize = getDDT()->getSize(recvtype);
3554
3555 #ifdef AMPIMSGLOG
3556   ampiParent* pptr = getAmpiParent();
3557   if(msgLogRead){
3558     int commsize;
3559     (*(pptr->fromPUPer))|commsize;
3560     for(int i=0;i<commsize;i++){
3561       (*(pptr->fromPUPer))|(pptr->pupBytes);
3562       PUParray(*(pptr->fromPUPer), (char *)(((char*)recvbuf)+(itemsize*displs[i])), (pptr->pupBytes));
3563     }
3564     return 0;
3565   }
3566 #endif
3567
3568   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Gatherv not allowed for Inter-communicator!");
3569
3570   AMPI_Send(sendbuf, sendcount, sendtype, root, MPI_GATHER_TAG, comm);
3571
3572   ampi *ptr = getAmpiInstance(comm);
3573   int size = ptr->getSize(comm);
3574   if(ptr->getRank(comm) == root) {
3575     MPI_Status status;
3576     for(int i=0;i<size;i++) {
3577       if(-1==ptr->recv(MPI_GATHER_TAG, i, (void*)(((char*)recvbuf)+(itemsize*displs[i])), recvcounts[i], recvtype, comm, (int*)(&status)))
3578         CkAbort("AMPI> Error in MPI_Gatherv recv");
3579     }
3580   }
3581 #if AMPI_COUNTER
3582   getAmpiParent()->counters.gather++;
3583 #endif
3584
3585 #ifdef AMPIMSGLOG
3586   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3587     for(int i=0;i<size;i++){
3588       (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcounts[i];
3589       (*(pptr->toPUPer))|(pptr->pupBytes);
3590       PUParray(*(pptr->toPUPer), (char *)(((char*)recvbuf)+(itemsize*displs[i])), (pptr->pupBytes));
3591     }
3592   }
3593 #endif
3594     
3595   return 0;
3596 }
3597
3598 CDECL
3599 int AMPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3600                 void *recvbuf, int recvcount, MPI_Datatype recvtype,
3601                 int root, MPI_Comm comm)
3602 {
3603   AMPIAPI("AMPI_Scatter");
3604   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scatter not allowed for Inter-communicator!");
3605   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3606
3607 #ifdef AMPIMSGLOG
3608   ampiParent* pptr = getAmpiParent();
3609   if(msgLogRead){
3610     (*(pptr->fromPUPer))|(pptr->pupBytes);
3611     PUParray(*(pptr->fromPUPer), (char *)recvbuf, (pptr->pupBytes));
3612     return 0;
3613   }
3614 #endif
3615
3616   ampi *ptr = getAmpiInstance(comm);
3617   int size = ptr->getSize(comm);
3618   int i;
3619
3620   if(ptr->getRank(comm)==root) {
3621       CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
3622       int itemsize = dttype->getSize(sendcount) ;
3623       for(i=0;i<size;i++) {
3624           ptr->send(MPI_SCATTER_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i),
3625                     sendcount, sendtype, i, comm);
3626       }
3627       //ptr->mcomlibEnd();
3628   }
3629   
3630   MPI_Status status;
3631   if(-1==ptr->recv(MPI_SCATTER_TAG, root, recvbuf, recvcount, recvtype, comm, (int*)(&status)))
3632     CkAbort("AMPI> Error in MPI_Scatter recv");
3633
3634 #if AMPI_COUNTER
3635   getAmpiParent()->counters.scatter++;
3636 #endif
3637
3638 #ifdef AMPIMSGLOG
3639   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3640     (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcount;
3641     (*(pptr->toPUPer))|(pptr->pupBytes);
3642     PUParray(*(pptr->toPUPer), (char *)recvbuf, (pptr->pupBytes));
3643   }
3644 #endif
3645
3646   return 0;
3647 }
3648
3649 CDECL
3650 int AMPI_Scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype,
3651                  void *recvbuf, int recvcount, MPI_Datatype recvtype,
3652                  int root, MPI_Comm comm)
3653 {
3654   AMPIAPI("AMPI_Scatterv");
3655   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scatterv not allowed for Inter-communicator!");
3656   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcounts[0],sendbuf,recvbuf);
3657
3658 #ifdef AMPIMSGLOG
3659   ampiParent* pptr = getAmpiParent();
3660   if(msgLogRead){
3661     (*(pptr->fromPUPer))|(pptr->pupBytes);
3662     PUParray(*(pptr->fromPUPer), (char *)recvbuf, (pptr->pupBytes));
3663     return 0;
3664   }
3665 #endif
3666
3667   ampi *ptr = getAmpiInstance(comm);
3668   int size = ptr->getSize(comm);
3669   int i;
3670
3671   if(ptr->getRank(comm) == root) {
3672       CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
3673       int itemsize = dttype->getSize() ;
3674       for(i=0;i<size;i++) {
3675           ptr->send(MPI_SCATTER_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*displs[i]),
3676                     sendcounts[i], sendtype, i, comm);
3677       }
3678       //ptr->mcomlibEnd();
3679   }
3680   
3681   MPI_Status status;
3682   if(-1==ptr->recv(MPI_SCATTER_TAG, root, recvbuf, recvcount, recvtype, comm, (int*)(&status)))
3683     CkAbort("AMPI> Error in MPI_Scatterv recv");
3684   
3685 #if AMPI_COUNTER
3686   getAmpiParent()->counters.scatter++;
3687 #endif
3688
3689 #ifdef AMPIMSGLOG
3690   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3691     (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcount;
3