44f74e6956743c312e9d6438d47fb6ae0ddb7149
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #define exit exit /*Supress definition of exit in ampi.h*/
9 #include "ampiimpl.h"
10 #include "tcharm.h"
11 #include "ampiEvents.h" /*** for trace generation for projector *****/
12 #include "ampiProjections.h"
13
14 #define CART_TOPOL 1
15 #define AMPI_PRINT_IDLE 0
16
17 /* change this define to "x" to trace all send/recv's */
18 #define MSG_ORDER_DEBUG(x) // x /* empty */
19 /* change this define to "x" to trace user calls */
20 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
21 #define STARTUP_DEBUG(x)  // ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
22
23 static CkDDT *getDDT(void) {
24   return getAmpiParent()->myDDT;
25 }
26
27 //------------- startup -------------
28 static mpi_comm_worlds mpi_worlds;
29
30 int mpi_nworlds; /*Accessed by ampif*/
31 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
32
33 /* ampiReducer: AMPI's generic reducer type 
34    MPI_Op is function pointer to MPI_User_function
35    so that it can be packed into AmpiOpHeader, shipped 
36    with the reduction message, and then plugged into 
37    the ampiReducer. 
38    One little trick is the ampi::recv which receives
39    the final reduction message will see additional
40    sizeof(AmpiOpHeader) bytes in the buffer before
41    any user data.                             */
42 class AmpiComplex { 
43 public: 
44         double re, im; 
45         void operator+=(const AmpiComplex &a) {
46                 re+=a.re;
47                 im+=a.im;
48         }
49         void operator*=(const AmpiComplex &a) {
50                 double nu_re=re*a.re-im*a.im;
51                 im=re*a.im+im*a.re;
52                 re=nu_re;
53         }
54         int operator>(const AmpiComplex &a) {
55                 CkAbort("Cannot compare complex numbers with MPI_MAX");
56                 return 0;
57         }
58         int operator<(const AmpiComplex &a) {
59                 CkAbort("Cannot compare complex numbers with MPI_MIN");
60                 return 0;
61         }
62 };
63 typedef struct { float val; int idx; } FloatInt;
64 typedef struct { double val; int idx; } DoubleInt;
65 typedef struct { long val; int idx; } LongInt;
66 typedef struct { int val; int idx; } IntInt;
67 typedef struct { short val; int idx; } ShortInt;
68 typedef struct { long double val; int idx; } LongdoubleInt;
69 typedef struct { float val; float idx; } FloatFloat;
70 typedef struct { double val; double idx; } DoubleDouble;
71
72
73 #define MPI_OP_SWITCH(OPNAME) \
74   int i; \
75   switch (*datatype) { \
76   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
77   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(short); } break; \
78   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
79   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(long); } break; \
80   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
81   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
82   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
83   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiUInt8); } break; \
84   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
85   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
86   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
87   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
88   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(CmiInt8); } break; \
89   default: \
90     ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
91     CmiAbort("Unsupported MPI datatype for MPI Op"); \
92   };\
93
94 void MPI_MAX( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
95 #define MPI_OP_IMPL(type) \
96         if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
97   MPI_OP_SWITCH(MPI_MAX)
98 #undef MPI_OP_IMPL
99 }
100
101 void MPI_MIN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
102 #define MPI_OP_IMPL(type) \
103         if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
104   MPI_OP_SWITCH(MPI_MIN)
105 #undef MPI_OP_IMPL
106 }
107
108 void MPI_SUM( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
109 #define MPI_OP_IMPL(type) \
110         ((type *)inoutvec)[i] += ((type *)invec)[i];
111   MPI_OP_SWITCH(MPI_SUM)
112 #undef MPI_OP_IMPL
113 }
114
115 void MPI_PROD( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
116 #define MPI_OP_IMPL(type) \
117         ((type *)inoutvec)[i] *= ((type *)invec)[i];
118   MPI_OP_SWITCH(MPI_PROD)
119 #undef MPI_OP_IMPL
120 }
121
122 void MPI_LAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
123   int i;  
124   switch (*datatype) {
125   case MPI_INT:
126   case MPI_LOGICAL:
127     for(i=0;i<(*len);i++)
128       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] && ((int *)invec)[i];
129     break;
130   default:
131     ckerr << "Type " << *datatype << " with Op MPI_LAND not supported." << endl;
132     CmiAbort("exiting");
133   }
134 }
135 void MPI_BAND( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
136   int i; 
137   switch (*datatype) {
138   case MPI_INT:
139     for(i=0;i<(*len);i++)
140       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] & ((int *)invec)[i];
141     break;
142   case MPI_BYTE:
143     for(i=0;i<(*len);i++)
144       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] & ((char *)invec)[i];
145     break;
146   default:
147     ckerr << "Type " << *datatype << " with Op MPI_BAND not supported." << endl;
148     CmiAbort("exiting");
149   }
150 }
151 void MPI_LOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
152   int i;  
153   switch (*datatype) {
154   case MPI_INT:
155   case MPI_LOGICAL:
156     for(i=0;i<(*len);i++)
157       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] || ((int *)invec)[i];
158     break;
159   default:
160     ckerr << "Type " << *datatype << " with Op MPI_LOR not supported." << endl;
161     CmiAbort("exiting");
162   }
163 }
164 void MPI_BOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
165   int i;  
166   switch (*datatype) {
167   case MPI_INT:
168     for(i=0;i<(*len);i++)
169       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] | ((int *)invec)[i];
170     break;
171   case MPI_BYTE:
172     for(i=0;i<(*len);i++)
173       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] | ((char *)invec)[i];
174     break;
175   default:
176     ckerr << "Type " << *datatype << " with Op MPI_BOR not supported." << endl;
177     CmiAbort("exiting");
178   }
179 }
180 void MPI_LXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
181   int i;  
182   switch (*datatype) {
183   case MPI_INT:
184   case MPI_LOGICAL:
185     for(i=0;i<(*len);i++)
186       ((int *)inoutvec)[i] = (((int *)inoutvec)[i]&&(!((int *)invec)[i]))||(!(((int *)inoutvec)[i])&&((int *)invec)[i]); //emulate ^^
187     break;
188   default:
189     ckerr << "Type " << *datatype << " with Op MPI_LXOR not supported." << endl;
190     CmiAbort("exiting");
191   }
192 }
193 void MPI_BXOR( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
194   int i;  
195   switch (*datatype) {
196   case MPI_INT:
197     for(i=0;i<(*len);i++)
198       ((int *)inoutvec)[i] = ((int *)inoutvec)[i] ^ ((int *)invec)[i];
199     break;
200   case MPI_BYTE:
201     for(i=0;i<(*len);i++)
202       ((char *)inoutvec)[i] = ((char *)inoutvec)[i] ^ ((char *)invec)[i];
203     break;
204         case MPI_UNSIGNED:
205     for(i=0;i<(*len);i++)
206       ((unsigned int *)inoutvec)[i] = ((unsigned int *)inoutvec)[i] ^ ((unsigned int *)invec)[i];
207     break;
208   default:
209     ckerr << "Type " << *datatype << " with Op MPI_BXOR not supported." << endl;
210     CmiAbort("exiting");
211   }
212 }
213
214 #ifndef MIN
215 #define MIN(a,b) (a < b ? a : b)
216 #endif
217
218 void MPI_MAXLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
219   int i;  
220
221   switch (*datatype) {
222   case MPI_FLOAT_INT:
223     for(i=0;i<(*len);i++)
224       if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
225         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
226       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
227         ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
228     break;
229   case MPI_DOUBLE_INT:
230     for(i=0;i<(*len);i++)
231       if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
232         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
233       else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
234         ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
235
236     break;
237   case MPI_LONG_INT:
238     for(i=0;i<(*len);i++)
239       if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
240         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
241       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
242         ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
243     break;
244   case MPI_2INT:
245     for(i=0;i<(*len);i++)
246       if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
247         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
248       else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
249         ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
250     break;
251   case MPI_SHORT_INT:
252     for(i=0;i<(*len);i++)
253       if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
254         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
255       else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
256         ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
257     break;
258   case MPI_LONG_DOUBLE_INT:
259     for(i=0;i<(*len);i++)
260       if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
261         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
262       else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
263         ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
264     break;
265   case MPI_2FLOAT:
266     for(i=0;i<(*len);i++)
267       if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
268         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
269       else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
270         ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
271     break;
272   case MPI_2DOUBLE:
273     for(i=0;i<(*len);i++)
274       if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
275         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
276       else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
277         ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
278     break;
279   default:
280     ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
281     CmiAbort("exiting");
282   }
283 }
284 void MPI_MINLOC( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
285   int i;  
286   switch (*datatype) {
287   case MPI_FLOAT_INT:
288     for(i=0;i<(*len);i++)
289       if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
290         ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
291       else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
292         ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
293     break;
294   case MPI_DOUBLE_INT:
295     for(i=0;i<(*len);i++)
296       if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
297         ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
298       else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
299         ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
300     break;
301   case MPI_LONG_INT:
302     for(i=0;i<(*len);i++)
303       if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
304         ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
305       else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
306         ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
307     break;
308   case MPI_2INT:
309     for(i=0;i<(*len);i++)
310       if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
311         ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
312       else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
313         ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
314     break;
315   case MPI_SHORT_INT:
316     for(i=0;i<(*len);i++)
317       if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
318         ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
319       else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
320         ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
321     break;
322   case MPI_LONG_DOUBLE_INT:
323     for(i=0;i<(*len);i++)
324       if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
325         ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
326       else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
327         ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
328     break;
329   case MPI_2FLOAT:
330     for(i=0;i<(*len);i++)
331       if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
332         ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
333       else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
334         ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
335     break;
336   case MPI_2DOUBLE:
337     for(i=0;i<(*len);i++)
338       if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
339         ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
340       else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
341         ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
342     break;
343   default:
344     ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
345     CmiAbort("exiting");
346   }
347 }
348
349 // every msg contains a AmpiOpHeader structure before user data
350 // FIXME: non-commutative operations require messages be ordered by rank
351 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs){
352   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
353   MPI_Datatype dtype;
354   int szhdr, szdata, len;
355   MPI_User_function* func;
356   func = hdr->func;
357   dtype = hdr->dtype;  
358   szdata = hdr->szdata;
359   len = hdr->len;  
360   szhdr = sizeof(AmpiOpHeader);
361
362   //Assuming extent == size
363   void *ret = malloc(szhdr+szdata);
364   memcpy(ret,msgs[0]->getData(),szhdr+szdata);
365   for(int i=1;i<nMsg;i++){
366     (*func)((void *)((char *)msgs[i]->getData()+szhdr),(void *)((char *)ret+szhdr),&len,&dtype);
367   }
368   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,ret);
369   free(ret);
370   return retmsg;
371 }
372
373 CkReduction::reducerType AmpiReducer;
374
375 class Builtin_kvs{
376  public:
377   int tag_ub,host,io,wtime_is_global,keyval_mype,keyval_numpes,keyval_mynode,keyval_numnodes;
378   Builtin_kvs(){
379     tag_ub = MPI_TAG_UB_VALUE; 
380     host = MPI_PROC_NULL;
381     io = 0;
382     wtime_is_global = 0;
383     keyval_mype = CkMyPe();
384     keyval_numpes = CkNumPes();
385     keyval_mynode = CkMyNode();
386     keyval_numnodes = CkNumNodes();
387   }
388 };
389
390 // ------------ startup support -----------
391 int _ampi_fallback_setup_count;
392 CDECL void AMPI_Setup(void);
393 FDECL void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
394
395 FDECL void FTN_NAME(MPI_MAIN,mpi_main)(void);
396
397 /*Main routine used when missing MPI_Setup routine*/
398 CDECL void AMPI_Fallback_Main(int argc,char **argv)
399 {
400   AMPI_Main_cpp(argc,argv);
401   AMPI_Main(argc,argv);
402   FTN_NAME(MPI_MAIN,mpi_main)();
403 }
404
405 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
406 /*Startup routine used if user *doesn't* write
407   a TCHARM_User_setup routine.
408  */
409 CDECL void AMPI_Setup_Switch(void) {
410   _ampi_fallback_setup_count=0;
411   FTN_NAME(AMPI_SETUP,ampi_setup)();
412   AMPI_Setup();
413   if (_ampi_fallback_setup_count==2)
414   { //Missing AMPI_Setup in both C and Fortran:
415     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
416   }
417 }
418
419 static int nodeinit_has_been_called=0;
420 CtvDeclare(ampiParent*, ampiPtr);
421 CtvDeclare(int, ampiInitDone);
422 CtvDeclare(int, ampiFinalized);
423 CkpvDeclare(Builtin_kvs, bikvs);
424 CkpvDeclare(int,argvExtracted);
425 static int enableStreaming = 0;
426
427 static void ampiNodeInit(void)
428 {
429   mpi_nworlds=0;
430   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
431   {
432     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
433   }
434   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
435
436   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc);
437   
438   nodeinit_has_been_called=1;
439 }
440
441 #if PRINT_IDLE
442 static double totalidle=0.0, startT=0.0;
443 static int beginHandle, endHandle;
444 static void BeginIdle(void *dummy,double curWallTime)
445 {
446   startT = curWallTime;
447 }
448 static void EndIdle(void *dummy,double curWallTime)
449 {
450   totalidle += curWallTime - startT;
451 }
452 #endif
453
454 /* for fortran reduction operation table to handle mapping */
455 typedef MPI_Op  MPI_Op_Array[128];
456 CtvDeclare(int, mpi_opc);
457 CtvDeclare(MPI_Op_Array, mpi_ops);
458
459 static void ampiProcInit(void){
460   CtvInitialize(ampiParent*, ampiPtr);
461   CtvInitialize(int,ampiInitDone);
462   CtvInitialize(int,ampiFinalized);
463
464   CtvInitialize(MPI_Op_Array, mpi_ops);
465   CtvInitialize(int, mpi_opc);
466
467   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
468
469   CkpvInitialize(int, argvExtracted);
470   CkpvAccess(argvExtracted) = 0;
471   REGISTER_AMPI
472   initAmpiProjections();
473   char **argv=CkGetArgv();
474 #if AMPI_COMLIB  
475   if(CkpvAccess(argvExtracted)==0){
476     enableStreaming=CmiGetArgFlagDesc(argv,"+ampi_streaming","Enable streaming comlib for ampi send/recv.");
477   }
478 #endif
479
480 #ifdef AMPIMSGLOG
481   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
482   msgLogRead = CmiGetArgFlag(argv, "+msgLogRead");
483   CmiGetArgInt(argv, "+msgLogRank", &msgLogRank);
484   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
485 #endif
486 }
487
488 void AMPI_Install_Idle_Timer(){
489 #if AMPI_PRINT_IDLE
490   beginHandle = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)BeginIdle,NULL);
491   endHandle = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,(CcdVoidFn)EndIdle,NULL);
492 #endif
493 }
494
495 void AMPI_Uninstall_Idle_Timer(){
496 #if AMPI_PRINT_IDLE
497   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,beginHandle);
498   CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,endHandle);
499 #endif
500 }
501
502 PUPfunctionpointer(MPI_MainFn);
503
504 class MPI_threadstart_t {
505 public:
506         MPI_MainFn fn;
507         MPI_threadstart_t() {}
508         MPI_threadstart_t(MPI_MainFn fn_)
509                 :fn(fn_) {}
510         void start(void) {
511                 char **argv=CmiCopyArgs(CkGetArgv());
512                 int argc=CkGetArgc();
513 #if CMK_AMPI_FNPTR_HACK
514                 AMPI_Fallback_Main(argc,argv);
515 #else
516                 (fn)(argc,argv);
517 #endif
518         }
519         void pup(PUP::er &p) {
520                 p|fn;
521         }
522 };
523 PUPmarshall(MPI_threadstart_t);
524
525 CDECL void AMPI_threadstart(void *data)
526 {
527         STARTUP_DEBUG("MPI_threadstart")
528         MPI_threadstart_t t;
529         pupFromBuf(data,t);
530         t.start();
531 }
532
533 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
534 {
535         STARTUP_DEBUG("ampiCreateMain")
536         int _nchunks=TCHARM_Get_num_chunks();
537         //Make a new threads array:
538         MPI_threadstart_t s(mainFn);
539         memBuf b; pupIntoBuf(b,s);
540         TCHARM_Create_data( _nchunks,AMPI_threadstart,
541                           b.getData(), b.getSize());
542 }
543
544 /* TCharm Semaphore ID's for AMPI startup */
545 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
546 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
547
548 static CProxy_ampiWorlds ampiWorldsGroup;
549
550 static void init_operations()
551 {
552   CtvInitialize(MPI_Op_Array, mpi_ops);
553   int i = 0;
554   MPI_Op *tab = CtvAccess(mpi_ops);
555   tab[i++] = MPI_MAX;
556   tab[i++] = MPI_MIN;
557   tab[i++] = MPI_SUM;
558   tab[i++] = MPI_PROD;
559   tab[i++] = MPI_LAND;
560   tab[i++] = MPI_BAND;
561   tab[i++] = MPI_LOR;
562   tab[i++] = MPI_BOR;
563   tab[i++] = MPI_LXOR;
564   tab[i++] = MPI_BXOR;
565   tab[i++] = MPI_MAXLOC;
566   tab[i++] = MPI_MINLOC;
567
568   CtvInitialize(int, mpi_opc);
569   CtvAccess(mpi_opc) = i;
570 }
571
572 /*
573 Called from MPI_Init, a collective initialization call:
574  creates a new AMPI array and attaches it to the current
575  set of TCHARM threads.
576 */
577 static ampi *ampiInit(char **argv)
578 {
579   if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
580   STARTUP_DEBUG("ampiInit> begin")
581
582   MPI_Comm new_world;
583   int _nchunks;
584   CkArrayOptions opts;
585   CProxy_ampiParent parent;
586   if (TCHARM_Element()==0)
587   { /* I'm responsible for building the arrays: */
588         STARTUP_DEBUG("ampiInit> creating arrays")
589
590 // FIXME: Need to serialize global communicator allocation in one place.
591         //Allocate the next communicator
592         if(mpi_nworlds == MPI_MAX_COMM_WORLDS)
593         {
594                 CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
595         }
596         int new_idx=mpi_nworlds;
597         new_world=MPI_COMM_WORLD+1+new_idx;
598
599         //Create and attach the ampiParent array
600         CkArrayID threads;
601         opts=TCHARM_Attach_start(&threads,&_nchunks);
602         parent=CProxy_ampiParent::ckNew(new_world,threads,opts);
603         STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
604   }
605   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
606   if (TCHARM_Element()==0)
607   {
608         //Make a new ampi array
609         CkArrayID empty;
610         
611 #if AMPI_COMLIB
612         ComlibInstanceHandle ciStreaming;
613         ComlibInstanceHandle ciBcast;
614         ComlibInstanceHandle ciAllgather;
615         ComlibInstanceHandle ciAlltoall;
616         ciStreaming=CkGetComlibInstance();
617         ciBcast=CkGetComlibInstance();
618         ciAllgather=CkGetComlibInstance();
619         ciAlltoall=CkGetComlibInstance();
620 #endif
621
622         CkVec<int> _indices;
623         for(int i=0;i<_nchunks;i++) _indices.push_back(i);
624         ampiCommStruct worldComm(new_world,empty,_nchunks,_indices);
625         CProxy_ampi arr;
626 #if AMPI_COMLIB
627         arr=CProxy_ampi::ckNew(parent,worldComm,ciStreaming,ciBcast,ciAllgather,ciAlltoall,opts);
628 #else
629         //opts.setAnytimeMigration(0);
630         arr=CProxy_ampi::ckNew(parent,worldComm,opts);
631 #endif
632
633         //Broadcast info. to the mpi_worlds array
634         // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
635         ampiCommStruct newComm(new_world,arr,_nchunks);
636         if (ampiWorldsGroup.ckGetGroupID().isZero())
637                 ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
638         else
639                 ampiWorldsGroup.add(newComm);
640         STARTUP_DEBUG("ampiInit> arrays created")
641
642 #if AMPI_COMLIB
643         StreamingStrategy *sStreaming = new StreamingStrategy(1,10);
644         ciStreaming.setStrategy(sStreaming);
645         
646         BroadcastStrategy *sBcast = new BroadcastStrategy(arr.ckGetArrayID(),USE_HYPERCUBE);
647         ciBcast.setStrategy(sBcast);
648         
649         EachToManyMulticastStrategy *sAllgather = new EachToManyMulticastStrategy(USE_HYPERCUBE,arr.ckGetArrayID(),arr.ckGetArrayID());
650         ciAllgather.setStrategy(sAllgather);
651         
652         EachToManyMulticastStrategy *sAlltoall = new EachToManyMulticastStrategy(USE_PREFIX, arr.ckGetArrayID(),arr.ckGetArrayID());
653         ciAlltoall.setStrategy(sAlltoall);
654 #endif        
655   }
656
657   // Find our ampi object:
658   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
659   CtvAccess(ampiInitDone)=1;
660   CtvAccess(ampiFinalized)=0;
661   STARTUP_DEBUG("ampiInit> complete")
662 #if CMK_BLUEGENE_CHARM
663 //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
664   TRACE_BG_ADD_TAG("AMPI_START");
665 #endif
666
667   init_operations();     // initialize fortran reduction operation table
668
669   return ptr;
670 }
671
672 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
673 class ampiWorlds : public CBase_ampiWorlds {
674 public:
675     ampiWorlds(const ampiCommStruct &nextWorld) {
676         ampiWorldsGroup=thisgroup;
677         add(nextWorld);
678     }
679     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
680     void pup(PUP::er &p)  { CBase_ampiWorlds::pup(p); }
681     void add(const ampiCommStruct &nextWorld) {
682         int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD+1);
683         mpi_worlds[new_idx].comm=nextWorld;
684         if (mpi_nworlds<=new_idx) mpi_nworlds=new_idx+1;
685         STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
686     }
687 };
688
689 //-------------------- ampiParent -------------------------
690 ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_)
691                 :threads(threads_), worldNo(worldNo_), RProxyCnt(0)
692 {
693   int barrier = 0x1234;
694   STARTUP_DEBUG("ampiParent> starting up")
695   thread=NULL;
696   worldPtr=NULL;
697   myDDT=&myDDTsto;
698   prepareCtv();
699
700   init();
701
702   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
703         AsyncEvacuate(CmiFalse);
704 }
705
706 ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
707   thread=NULL;
708   worldPtr=NULL;
709   myDDT=&myDDTsto;
710
711   init();
712
713   AsyncEvacuate(CmiFalse);
714 }
715
716 void ampiParent::pup(PUP::er &p) {
717
718   ArrayElement1D::pup(p);
719 //      printf("[%d] Ampiparent being pupped \n",thisIndex);
720   p|threads;
721
722   p|worldStruct;
723   myDDT->pup(p);
724   p|splitComm;
725   p|groupComm;
726   p|groups;
727   p|ampiReqs;
728   p|RProxyCnt;
729   p|tmpRProxy;
730   p|winStructList;
731   p|infos;
732 }
733 void ampiParent::prepareCtv(void) {
734   thread=threads[thisIndex].ckLocal();
735   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
736   CtvAccessOther(thread->getThread(),ampiPtr) = this;
737   STARTUP_DEBUG("ampiParent> found TCharm")
738 }
739
740 void ampiParent::init(){
741 #ifdef AMPIMSGLOG
742   if(msgLogWrite && msgLogRank == thisIndex){
743 #if CMK_PROJECTIONS_USE_ZLIB
744     fMsgLog = gzopen(msgLogFilename,"wb");
745     toPUPer = new PUP::tozDisk(fMsgLog);
746 #else
747     fMsgLog = fopen(msgLogFilename,"wb");
748     toPUPer = new PUP::toDisk(fMsgLog);
749 #endif
750   }else if(msgLogRead){
751 #if CMK_PROJECTIONS_USE_ZLIB
752     fMsgLog = gzopen(msgLogFilename,"rb");
753     fromPUPer = new PUP::fromzDisk(fMsgLog);
754 #else
755     fMsgLog = fopen(msgLogFilename,"rb");
756     fromPUPer = new PUP::fromDisk(fMsgLog);
757 #endif
758   }
759 #endif
760 }
761
762 void ampiParent::finalize(){
763 #ifdef AMPIMSGLOG
764   if(msgLogWrite && msgLogRank == thisIndex){
765     delete toPUPer;
766 #if CMK_PROJECTIONS_USE_ZLIB
767     gzclose(fMsgLog);
768 #else
769     fclose(fMsgLog);
770 #endif
771   }else if(msgLogRead){
772     delete fromPUPer;
773 #if CMK_PROJECTIONS_USE_ZLIB
774     gzclose(fMsgLog);
775 #else
776     fclose(fMsgLog);
777 #endif
778   }
779 #endif
780 }
781
782 void ampiParent::ckJustMigrated(void) {
783   ArrayElement1D::ckJustMigrated();
784   prepareCtv();
785 }
786
787 ampiParent::~ampiParent() {
788   STARTUP_DEBUG("ampiParent> destructor called");
789   finalize();
790 }
791
792 //Children call this when they are first created or just migrated
793 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
794 {
795   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
796
797   if (s.getComm()>=MPI_COMM_WORLD)
798   { //We now have our COMM_WORLD-- register it
799     //Note that split communicators don't keep a raw pointer, so
800     //they don't need to re-register on migration.
801      if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
802      worldPtr=ptr;
803      worldStruct=s;
804
805     //MPI_COMM_SELF has the same member as MPI_COMM_WORLD, but it's alone:
806      CkVec<int> _indices;
807      _indices.push_back(thisIndex);
808      selfStruct = ampiCommStruct(MPI_COMM_SELF,s.getProxy(),1,_indices);
809   }
810   
811   if (!forMigration)
812   { //Register the new communicator:
813      MPI_Comm comm = s.getComm();
814      STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
815      if (comm>=MPI_COMM_WORLD) { 
816        // Pass the new ampi to the waiting ampiInit
817        thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
818      } else if (isSplit(comm)) {
819        splitChildRegister(s);
820      } else if (isGroup(comm)) {
821        groupChildRegister(s);
822      } else if (isCart(comm)) {
823        cartChildRegister(s);
824      } else if (isGraph(comm)) {
825        graphChildRegister(s);
826      } else if (isInter(comm)) {
827        interChildRegister(s);
828      } else if (isIntra(comm)) {
829        intraChildRegister(s);
830      }else
831        CkAbort("ampiParent recieved child with bad communicator");
832   }
833
834   return thread;
835 }
836
837 // reduction client data - preparation for checkpointing
838 class ckptClientStruct {
839 public:
840   char *dname;
841   ampiParent *ampiPtr;
842   ckptClientStruct(char *s, ampiParent *a): dname(s), ampiPtr(a) {}
843 };
844
845 static void checkpointClient(void *param,void *msg)
846 {
847   ckptClientStruct *client = (ckptClientStruct*)param;
848   char *dname = client->dname;
849   ampiParent *ampiPtr = client->ampiPtr;
850   ampiPtr->Checkpoint(strlen(dname), dname);
851   delete client;
852 }
853
854 void ampiParent::startCheckpoint(char* dname){
855   //if(thisIndex==0) thisProxy[thisIndex].Checkpoint(strlen(dname),dname);
856   if (thisIndex==0) {
857         ckptClientStruct *clientData = new ckptClientStruct(dname, this);
858         CkCallback cb(checkpointClient, clientData);
859         contribute(0, NULL, CkReduction::sum_int, cb);
860   }
861   else
862         contribute(0, NULL, CkReduction::sum_int);
863
864 #if 0
865 #if CMK_BLUEGENE_CHARM
866   void *curLog;         // store current log in timeline
867   _TRACE_BG_TLINE_END(&curLog);
868   TRACE_BG_AMPI_SUSPEND();
869 #endif
870 #endif
871
872   thread->stop();
873
874 #if CMK_BLUEGENE_CHARM
875   // _TRACE_BG_BEGIN_EXECUTE_NOMSG("CHECKPOINT_RESUME", &curLog);
876   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
877 #endif
878 }
879 void ampiParent::Checkpoint(int len, char* dname){
880   if (len == 0) {
881     // memory checkpoint
882     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
883     CkStartMemCheckpoint(cb);
884   }
885   else {
886     char dirname[256];
887     strncpy(dirname,dname,len);
888     dirname[len]='\0';
889     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
890     CkStartCheckpoint(dirname,cb);
891   }
892 }
893 void ampiParent::ResumeThread(void){
894   thread->resume();
895 }
896
897 int ampiParent::createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
898                              int *keyval, void* extra_state){
899         KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
900         int idx = kvlist.size();
901         kvlist.resize(idx+1);
902         kvlist[idx] = newnode;
903         *keyval = idx;
904         return 0;
905 }
906 int ampiParent::freeKeyval(int *keyval){
907         if(*keyval <0 || *keyval >= kvlist.size() || !kvlist[*keyval])
908                 return -1;
909         delete kvlist[*keyval];
910         kvlist[*keyval] = NULL;
911         *keyval = MPI_KEYVAL_INVALID;
912         return 0;
913 }
914
915 int ampiParent::putAttr(MPI_Comm comm, int keyval, void* attribute_val){
916         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
917                 return -1;
918         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
919         // Enlarge the keyval list:
920         while (cs.getKeyvals().size()<=keyval) cs.getKeyvals().push_back(0);
921         cs.getKeyvals()[keyval]=attribute_val;
922         return 0;
923 }
924
925 int ampiParent::kv_is_builtin(int keyval) {
926         switch(keyval) {
927         case MPI_TAG_UB: kv_builtin_storage=&(CkpvAccess(bikvs).tag_ub); return 1;
928         case MPI_HOST: kv_builtin_storage=&(CkpvAccess(bikvs).host); return 1;
929         case MPI_IO: kv_builtin_storage=&(CkpvAccess(bikvs).io); return 1;
930         case MPI_WTIME_IS_GLOBAL: kv_builtin_storage=&(CkpvAccess(bikvs).wtime_is_global); return 1;
931         case AMPI_KEYVAL_MYPE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mype); return 1;
932         case AMPI_KEYVAL_NUMPES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numpes); return 1;
933         case AMPI_KEYVAL_MYNODE: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_mynode); return 1;
934         case AMPI_KEYVAL_NUMNODES: kv_builtin_storage=&(CkpvAccess(bikvs).keyval_numnodes); return 1;
935         default: return 0;
936         };
937 }
938
939 int ampiParent::getAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag){
940         *flag = false;
941         if (kv_is_builtin(keyval)) { /* Allow access to special builtin flags */
942           *flag=true;
943           *(void **)attribute_val = kv_builtin_storage;
944           return 0;
945         }
946         if(keyval<0 || keyval >= kvlist.size() || (kvlist[keyval]==NULL))
947                 return -1; /* invalid keyval */
948         
949         ampiCommStruct &cs=*(ampiCommStruct *)&comm2CommStruct(comm);
950         if (keyval>=cs.getKeyvals().size())  
951                 return 0; /* we don't have a value yet */
952         if (cs.getKeyvals()[keyval]==0)
953                 return 0; /* we had a value, but now it's zero */
954         /* Otherwise, we have a good value */
955         *flag = true;
956         *(void **)attribute_val = cs.getKeyvals()[keyval];
957         return 0;
958 }
959 int ampiParent::deleteAttr(MPI_Comm comm, int keyval){
960         /* no way to delete an attribute: just overwrite it with 0 */
961         return putAttr(comm,keyval,0);
962 }
963
964 //----------------------- ampi -------------------------
965 void ampi::init(void) {
966   parent=NULL;
967   thread=NULL;
968   msgs=NULL;
969   posted_ireqs=NULL;
970   resumeOnRecv=false;
971   AsyncEvacuate(CmiFalse);
972 }
973
974 ampi::ampi()
975 {
976   /* this constructor only exists so we can create an empty array during split */
977   CkAbort("Default ampi constructor should never be called");
978 }
979
980 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s)
981    :parentProxy(parent_)
982 {
983   init();
984
985   myComm=s; myComm.setArrayID(thisArrayID);
986   myRank=myComm.getRankForIndex(thisIndex);
987
988   findParent(false);
989
990   msgs = CmmNew();
991   posted_ireqs = CmmNew();
992   nbcasts = 0;
993
994   comlibProxy = thisProxy;
995   ComlibDelegateProxy(&comlibProxy);
996   
997   seqEntries=parent->numElements;
998   oorder.init (seqEntries);
999 }
1000
1001 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s, ComlibInstanceHandle ciStreaming_,
1002                 ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_)
1003    :parentProxy(parent_),ciStreaming(ciStreaming_),ciBcast(ciBcast_),ciAllgather(ciAllgather_),ciAlltoall(ciAlltoall_)
1004 {
1005   init();
1006
1007   myComm=s; myComm.setArrayID(thisArrayID);
1008   myRank=myComm.getRankForIndex(thisIndex);
1009
1010   findParent(false);
1011
1012   msgs = CmmNew();
1013   posted_ireqs = CmmNew();
1014   nbcasts = 0;
1015
1016   comlibProxy = thisProxy;
1017   ComlibDelegateProxy(&comlibProxy);
1018   ciStreaming.setSourcePe();
1019   ciBcast.setSourcePe();
1020   ciAllgather.setSourcePe();
1021   ciAlltoall.setSourcePe();
1022
1023   seqEntries=parent->numElements;
1024   oorder.init (seqEntries);
1025 }
1026
1027 ampi::ampi(CkMigrateMessage *msg):CBase_ampi(msg)
1028 {
1029   init();
1030   
1031   seqEntries=-1;
1032 }
1033
1034 void ampi::ckJustMigrated(void)
1035 {
1036         findParent(true);
1037         ArrayElement1D::ckJustMigrated();
1038 }
1039
1040 void ampi::findParent(bool forMigration) {
1041         STARTUP_DEBUG("ampi> finding my parent")
1042         parent=parentProxy[thisIndex].ckLocal();
1043         if (parent==NULL) CkAbort("AMPI can't find its parent!");
1044         thread=parent->registerAmpi(this,myComm,forMigration);
1045         if (thread==NULL) CkAbort("AMPI can't find its thread!");
1046 //      printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
1047 }
1048
1049 static void cmm_pup_ampi_message(pup_er p,void **msg) {
1050         CkPupMessage(*(PUP::er *)p,msg,1);
1051         if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
1052 //      printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
1053 }
1054
1055 void ampi::pup(PUP::er &p)
1056 {
1057   if(!p.isUserlevel())
1058     ArrayElement1D::pup(p);//Pack superclass
1059   p|parentProxy;
1060   p|myComm;
1061   p|myRank;
1062   p|nbcasts;
1063   p|tmpVec;
1064   p|remoteProxy;
1065   p|resumeOnRecv;
1066   p|comlibProxy;
1067   p|ciStreaming;
1068   p|ciBcast;
1069   p|ciAllgather;
1070   p|ciAlltoall;
1071
1072   if(p.isUnpacking()){
1073     ciStreaming.setSourcePe();
1074     ciBcast.setSourcePe();
1075     ciAllgather.setSourcePe();
1076     ciAlltoall.setSourcePe();
1077   }
1078
1079   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
1080   posted_ireqs = CmmNew();              // FIXME 
1081 //      printf("[%d] ampi index %d msgs table pointer %p\n",CkMyPe(),thisIndex,msgs);
1082
1083   p|seqEntries;
1084   p|oorder;
1085 }
1086
1087 ampi::~ampi()
1088 {
1089   if (CkInRestarting()) {
1090     // in restarting, we need to flush messages
1091   int tags[3], sts[3];
1092   tags[0] = tags[1] = tags[2] = CmmWildCard;
1093   AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1094   while (msg) {
1095     delete msg;
1096     msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1097   }
1098   }
1099   CmmFree(msgs);
1100   CmmFree(posted_ireqs);         // FIXME
1101 }
1102
1103 //------------------------ Communicator Splitting ---------------------
1104 class ampiSplitKey {
1105 public:
1106         int nextSplitComm;
1107         int color; //New class of processes we'll belong to
1108         int key; //To determine rank in new ordering
1109         int rank; //Rank in old ordering
1110         ampiSplitKey() {}
1111         ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_)
1112                 :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
1113 };
1114
1115 /* "type" may indicate whether call is for a cartesian topology etc. */
1116
1117 void ampi::split(int color,int key,MPI_Comm *dest, int type)
1118 {
1119 #if CMK_BLUEGENE_CHARM
1120   void *curLog;         // store current log in timeline
1121   _TRACE_BG_TLINE_END(&curLog);
1122 //  TRACE_BG_AMPI_SUSPEND();
1123 #endif
1124   if (type == CART_TOPOL) {
1125         ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
1126         int rootIdx=myComm.getIndexForRank(0);
1127         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1128         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1129
1130         thread->suspend(); //Resumed by ampiParent::cartChildRegister
1131         MPI_Comm newComm=parent->getNextCart()-1;
1132         *dest=newComm;
1133   } else {
1134         ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
1135         int rootIdx=myComm.getIndexForRank(0);
1136         CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
1137         contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
1138
1139         thread->suspend(); //Resumed by ampiParent::splitChildRegister
1140         MPI_Comm newComm=parent->getNextSplit()-1;
1141         *dest=newComm;
1142   }
1143 #if CMK_BLUEGENE_CHARM
1144 //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "SPLIT_RESUME", curLog);
1145   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("SPLIT_RESUME", &curLog);
1146   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
1147 #endif
1148 }
1149
1150 CDECL int compareAmpiSplitKey(const void *a_, const void *b_) {
1151         const ampiSplitKey *a=(const ampiSplitKey *)a_;
1152         const ampiSplitKey *b=(const ampiSplitKey *)b_;
1153         if (a->color!=b->color) return a->color-b->color;
1154         if (a->key!=b->key) return a->key-b->key;
1155         return a->rank-b->rank;
1156 }
1157
1158 void ampi::splitPhase1(CkReductionMsg *msg)
1159 {
1160         //Order the keys, which orders the ranks properly:
1161         int nKeys=msg->getSize()/sizeof(ampiSplitKey);
1162         ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
1163         if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
1164         qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
1165
1166         MPI_Comm newComm = -1;
1167         for(int i=0;i<nKeys;i++)
1168                 if(keys[i].nextSplitComm>newComm)
1169                         newComm = keys[i].nextSplitComm;
1170
1171         //Loop over the sorted keys, which gives us the new arrays:
1172         int lastColor=keys[0].color-1; //The color we're building an array for
1173         CProxy_ampi lastAmpi; //The array for lastColor
1174         int lastRoot=0; //C value for new rank 0 process for latest color
1175         ampiCommStruct lastComm; //Communicator info. for latest color
1176         for (int c=0;c<nKeys;c++) {
1177                 if (keys[c].color!=lastColor)
1178                 { //Hit a new color-- need to build a new communicator and array
1179                         lastColor=keys[c].color;
1180                         lastRoot=c;
1181                         CkArrayOptions opts;
1182                         opts.bindTo(parentProxy);
1183                         opts.setNumInitial(0);
1184                         CkArrayID unusedAID; ampiCommStruct unusedComm;
1185                         lastAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1186                         lastAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1187
1188                         CkVec<int> indices; //Maps rank to array indices for new arrau
1189                         for (int i=c;i<nKeys;i++) {
1190                                 if (keys[i].color!=lastColor) break; //Done with this color
1191                                 int idx=myComm.getIndexForRank(keys[i].rank);
1192                                 indices.push_back(idx);
1193                         }
1194
1195                         //FIXME: create a new communicator for each color, instead of
1196                         // (confusingly) re-using the same MPI_Comm number for each.
1197                         lastComm=ampiCommStruct(newComm,lastAmpi,indices.size(),indices);
1198                 }
1199                 int newRank=c-lastRoot;
1200                 int newIdx=lastComm.getIndexForRank(newRank);
1201
1202                 //CkPrintf("[%d (%d)] Split (%d,%d) %d insert\n",newIdx,newRank,keys[c].color,keys[c].key,newComm);
1203                 lastAmpi[newIdx].insert(parentProxy,lastComm);
1204         }
1205
1206         delete msg;
1207 }
1208
1209 //...newly created array elements register with the parent, which calls:
1210 void ampiParent::splitChildRegister(const ampiCommStruct &s) {
1211         int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
1212         if (splitComm.size()<=idx) splitComm.resize(idx+1);
1213         splitComm[idx]=new ampiCommStruct(s);
1214         thread->resume(); //Matches suspend at end of ampi::split
1215 }
1216
1217 //-----------------create communicator from group--------------
1218 // The procedure is like that of comm_split very much,
1219 // so the code is shamelessly copied from above
1220 //   1. reduction to make sure all members have called
1221 //   2. the root in the old communicator create the new array
1222 //   3. ampiParent::register is called to register new array as new comm
1223 class vecStruct {
1224 public:
1225   int nextgroup;
1226   groupStruct vec;
1227   vecStruct():nextgroup(-1){}
1228   vecStruct(int nextgroup_, groupStruct vec_)
1229     : nextgroup(nextgroup_), vec(vec_) { }
1230 };
1231
1232 void ampi::commCreate(const groupStruct vec,MPI_Comm* newcomm){
1233   int rootIdx=vec[0];
1234   tmpVec = vec;
1235   CkCallback cb(CkIndex_ampi::commCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1236   MPI_Comm nextgroup = parent->getNextGroup();
1237   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
1238
1239   if(getPosOp(thisIndex,vec)>=0){
1240     thread->suspend(); //Resumed by ampiParent::groupChildRegister
1241     MPI_Comm retcomm = parent->getNextGroup()-1;
1242     *newcomm = retcomm;
1243   }else{
1244     *newcomm = MPI_COMM_NULL;
1245   }
1246 }
1247
1248 void ampi::commCreatePhase1(CkReductionMsg *msg){
1249   MPI_Comm *nextGroupComm = (int *)msg->getData();
1250
1251   CkArrayOptions opts;
1252   opts.bindTo(parentProxy);
1253   opts.setNumInitial(0);
1254   CkArrayID unusedAID;
1255   ampiCommStruct unusedComm;
1256   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1257   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1258
1259   groupStruct indices = tmpVec;
1260   ampiCommStruct newCommstruct = ampiCommStruct(*nextGroupComm,newAmpi,indices.size(),indices);
1261   for(int i=0;i<indices.size();i++){
1262     int newIdx=indices[i];
1263     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1264   }
1265   delete msg;
1266 }
1267
1268 void ampiParent::groupChildRegister(const ampiCommStruct &s) {
1269   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
1270   if (groupComm.size()<=idx) groupComm.resize(idx+1);
1271   groupComm[idx]=new ampiCommStruct(s);
1272   thread->resume(); //Matches suspend at end of ampi::split
1273 }
1274
1275 /* Virtual topology communicator creation */
1276 void ampi::cartCreate(const groupStruct vec,MPI_Comm* newcomm){
1277   int rootIdx=vec[0];
1278   tmpVec = vec;
1279   CkCallback cb(CkIndex_ampi::cartCreatePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1280
1281   MPI_Comm nextcart = parent->getNextCart();
1282   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
1283
1284   if(getPosOp(thisIndex,vec)>=0){
1285     thread->suspend(); //Resumed by ampiParent::cartChildRegister
1286      MPI_Comm retcomm = parent->getNextCart()-1;
1287      *newcomm = retcomm;
1288   }else
1289     *newcomm = MPI_COMM_NULL;
1290 }
1291
1292 void ampi::cartCreatePhase1(CkReductionMsg *msg){
1293   MPI_Comm *nextCartComm = (int *)msg->getData();
1294
1295   CkArrayOptions opts;
1296   opts.bindTo(parentProxy);
1297   opts.setNumInitial(0);
1298   CkArrayID unusedAID;
1299   ampiCommStruct unusedComm;
1300   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1301   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1302
1303   groupStruct indices = tmpVec;
1304   ampiCommStruct newCommstruct = ampiCommStruct(*nextCartComm,newAmpi,indices.
1305                                                 size(),indices);
1306   for(int i=0;i<indices.size();i++){
1307     int newIdx=indices[i];
1308     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1309   }
1310   delete msg;
1311 }
1312
1313 void ampiParent::cartChildRegister(const ampiCommStruct &s) {
1314   int idx=s.getComm()-MPI_COMM_FIRST_CART;
1315   if (cartComm.size()<=idx) {
1316     cartComm.resize(idx+1);
1317     cartComm.length()=idx+1;
1318   }
1319   cartComm[idx]=new ampiCommStruct(s);
1320   thread->resume(); //Matches suspend at end of ampi::cartCreate
1321 }
1322
1323 void ampi::graphCreate(const groupStruct vec,MPI_Comm* newcomm){
1324   int rootIdx=vec[0];
1325   tmpVec = vec;
1326   CkCallback cb(CkIndex_ampi::graphCreatePhase1(NULL),CkArrayIndex1D(rootIdx),
1327                 myComm.getProxy());
1328   MPI_Comm nextgraph = parent->getNextGraph();
1329   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
1330
1331   if(getPosOp(thisIndex,vec)>=0){
1332     thread->suspend(); //Resumed by ampiParent::graphChildRegister
1333     MPI_Comm retcomm = parent->getNextGraph()-1;
1334     *newcomm = retcomm;
1335   }else
1336     *newcomm = MPI_COMM_NULL;
1337 }
1338
1339 void ampi::graphCreatePhase1(CkReductionMsg *msg){
1340   MPI_Comm *nextGraphComm = (int *)msg->getData();
1341
1342   CkArrayOptions opts;
1343   opts.bindTo(parentProxy);
1344   opts.setNumInitial(0);
1345   CkArrayID unusedAID;
1346   ampiCommStruct unusedComm;
1347   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1348   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1349
1350   groupStruct indices = tmpVec;
1351   ampiCommStruct newCommstruct = ampiCommStruct(*nextGraphComm,newAmpi,indices
1352                                                 .size(),indices);
1353   for(int i=0;i<indices.size();i++){
1354     int newIdx=indices[i];
1355     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1356   }
1357   delete msg;
1358 }
1359
1360 void ampiParent::graphChildRegister(const ampiCommStruct &s) {
1361   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
1362   if (graphComm.size()<=idx) {
1363     graphComm.resize(idx+1);
1364     graphComm.length()=idx+1;
1365   }
1366   graphComm[idx]=new ampiCommStruct(s);
1367   thread->resume(); //Matches suspend at end of ampi::graphCreate
1368 }
1369
1370 void ampi::intercommCreate(const groupStruct rvec, const int root, MPI_Comm *ncomm){
1371   if(thisIndex==root) { // not everybody gets the valid rvec
1372     tmpVec = rvec;
1373   }
1374   CkCallback cb(CkIndex_ampi::intercommCreatePhase1(NULL),CkArrayIndex1D(root),myComm.getProxy());
1375   MPI_Comm nextinter = parent->getNextInter();
1376   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
1377
1378   thread->suspend(); //Resumed by ampiParent::interChildRegister
1379   MPI_Comm newcomm=parent->getNextInter()-1;
1380   *ncomm=newcomm;
1381 }
1382
1383 void ampi::intercommCreatePhase1(CkReductionMsg *msg){
1384   MPI_Comm *nextInterComm = (int *)msg->getData();
1385
1386   groupStruct lgroup = myComm.getIndices();
1387   CkArrayOptions opts;
1388   opts.bindTo(parentProxy);
1389   opts.setNumInitial(0);
1390   CkArrayID unusedAID;
1391   ampiCommStruct unusedComm;
1392   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1393   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1394
1395   ampiCommStruct newCommstruct = ampiCommStruct(*nextInterComm,newAmpi,lgroup.size(),lgroup,tmpVec);
1396   for(int i=0;i<lgroup.size();i++){
1397     int newIdx=lgroup[i];
1398     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1399   }
1400
1401   parentProxy[0].ExchangeProxy(newAmpi);
1402   delete msg;
1403 }
1404
1405 void ampiParent::interChildRegister(const ampiCommStruct &s) {
1406   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
1407   if (interComm.size()<=idx) interComm.resize(idx+1);
1408   interComm[idx]=new ampiCommStruct(s);
1409   //thread->resume(); // don't resume it yet, till parent set remote proxy
1410 }
1411
1412 void ampi::intercommMerge(int first, MPI_Comm *ncomm){ // first valid only at local root
1413   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
1414     groupStruct lvec = myComm.getIndices();
1415     groupStruct rvec = myComm.getRemoteIndices();
1416     int rsize = rvec.size();
1417     tmpVec = lvec;
1418     for(int i=0;i<rsize;i++)
1419       tmpVec.push_back(rvec[i]);
1420     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
1421   }else{
1422     tmpVec.resize(0);
1423   }
1424
1425   int rootIdx=myComm.getIndexForRank(0);
1426   CkCallback cb(CkIndex_ampi::intercommMergePhase1(NULL),CkArrayIndex1D(rootIdx),myComm.getProxy());
1427   MPI_Comm nextintra = parent->getNextIntra();
1428   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
1429
1430   thread->suspend(); //Resumed by ampiParent::interChildRegister
1431   MPI_Comm newcomm=parent->getNextIntra()-1;
1432   *ncomm=newcomm;
1433 }
1434
1435 void ampi::intercommMergePhase1(CkReductionMsg *msg){  // gets called on two roots, first root creates the comm
1436   if(tmpVec.size()==0) { delete msg; return; }
1437   MPI_Comm *nextIntraComm = (int *)msg->getData();
1438   CkArrayOptions opts;
1439   opts.bindTo(parentProxy);
1440   opts.setNumInitial(0);
1441   CkArrayID unusedAID;
1442   ampiCommStruct unusedComm;
1443   CProxy_ampi newAmpi=CProxy_ampi::ckNew(unusedAID,unusedComm,opts);
1444   newAmpi.doneInserting(); //<- Meaning, I need to do my own creation race resolution
1445
1446   ampiCommStruct newCommstruct = ampiCommStruct(*nextIntraComm,newAmpi,tmpVec.size(),tmpVec);
1447   for(int i=0;i<tmpVec.size();i++){
1448     int newIdx=tmpVec[i];
1449     newAmpi[newIdx].insert(parentProxy,newCommstruct);
1450   }
1451   delete msg;
1452 }
1453
1454 void ampiParent::intraChildRegister(const ampiCommStruct &s) {
1455   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
1456   if (intraComm.size()<=idx) intraComm.resize(idx+1);
1457   intraComm[idx]=new ampiCommStruct(s);
1458   thread->resume(); //Matches suspend at end of ampi::split
1459 }
1460
1461 //------------------------ communication -----------------------
1462 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo)
1463 {
1464   if (universeNo>MPI_COMM_WORLD) {
1465     int worldDex=universeNo-MPI_COMM_WORLD-1;
1466     if (worldDex>=mpi_nworlds)
1467       CkAbort("Bad world communicator passed to universeComm2CommStruct");
1468     return mpi_worlds[worldDex].comm;
1469   }
1470   CkAbort("Bad communicator passed to universeComm2CommStruct");
1471   return mpi_worlds[0].comm; // meaningless return
1472 }
1473
1474 void ampi::block(void){
1475         thread->suspend();
1476 }
1477
1478 void ampi::yield(void){
1479         thread->schedule();
1480 }
1481
1482 void ampi::unblock(void){
1483         thread->resume();
1484 }
1485
1486 void
1487 ampi::generic(AmpiMsg* msg)
1488 {
1489 MSG_ORDER_DEBUG(
1490   CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
1491         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
1492 )
1493
1494 //      AmpiMsg *msgcopy = msg;
1495   if(msg->seq != -1) {
1496     int srcIdx=msg->srcIdx;
1497     int n=oorder.put(srcIdx,msg);
1498     if (n>0) { // This message was in-order
1499       inorder(msg);
1500       if (n>1) { // It enables other, previously out-of-order messages
1501         while((msg=oorder.getOutOfOrder(srcIdx))!=0) {
1502           inorder(msg);
1503         }
1504       }
1505     }
1506   } else { //Cross-world or system messages are unordered
1507     inorder(msg);
1508   }
1509   
1510   if(resumeOnRecv){
1511     thread->resume();
1512   }
1513 }
1514
1515 void
1516 ampi::inorder(AmpiMsg* msg)
1517 {
1518 MSG_ORDER_DEBUG(
1519   CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d  (from %d, seq %d)\n",
1520         thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq);
1521 )
1522   // check posted recvs
1523   int tags[3], sts[3];
1524   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
1525   IReq *ireq = NULL;
1526   if (CpvAccess(CmiPICMethod) != 2) {
1527     IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
1528     if (ireq) { // receive posted
1529       ireq->receive(this, msg);
1530       ireq->tag = sts[0];
1531       ireq->src = sts[1];
1532       ireq->comm = sts[2];
1533     } else {
1534       CmmPut(msgs, 3, tags, msg);
1535     }
1536   }
1537   else
1538       CmmPut(msgs, 3, tags, msg);
1539 }
1540
1541 AmpiMsg *ampi::getMessage(int t, int s, int comm, int *sts)
1542 {
1543     int tags[3];
1544     tags[0] = t; tags[1] = s; tags[2] = comm;
1545     AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
1546     return msg;
1547 }
1548
1549 AmpiMsg *ampi::makeAmpiMsg(int destIdx,
1550         int t,int sRank,const void *buf,int count,int type,MPI_Comm destcomm)
1551 {
1552   CkDDT_DataType *ddt = getDDT()->getType(type);
1553   int len = ddt->getSize(count);
1554   int sIdx=thisIndex;
1555   int seq = -1;
1556   if (destIdx>=0 && destcomm<=MPI_COMM_WORLD && t<=MPI_TAG_UB_VALUE) //Not cross-module: set seqno
1557   seq = oorder.nextOutgoing(destIdx);
1558   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, t, sIdx, sRank, len, destcomm);
1559   TCharm::activateVariable(buf);
1560   ddt->serialize((char*)buf, (char*)msg->data, count, 1);
1561   TCharm::deactivateVariable(buf);
1562   return msg;
1563 }
1564
1565 void
1566 ampi::comlibsend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1567 {
1568   delesend(t,sRank,buf,count,type,rank,destcomm,comlibProxy);
1569 }
1570
1571 void
1572 ampi::send(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm)
1573 {
1574   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1575   delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy());
1576 }
1577
1578 void
1579 ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx)
1580 {
1581   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, t, -1, sRank, len, MPI_COMM_WORLD);
1582   memcpy(msg->data, buf, len);
1583   CProxy_ampi pa(aid);
1584   pa[idx].generic(msg);
1585 }
1586
1587 void
1588 ampi::delesend(int t, int sRank, const void* buf, int count, int type,  int rank, MPI_Comm destcomm, CProxy_ampi arrproxy)
1589 {
1590   if(rank==MPI_PROC_NULL) return;
1591   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1592   int destIdx = dest.getIndexForRank(rank);
1593   if(isInter()){
1594     sRank = parent->thisIndex;
1595     destcomm = MPI_COMM_FIRST_INTER;
1596     destIdx = dest.getIndexForRemoteRank(rank);
1597     arrproxy = remoteProxy;
1598   }
1599  MSG_ORDER_DEBUG(
1600   CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
1601  )
1602
1603   arrproxy[destIdx].generic(makeAmpiMsg(destIdx,t,sRank,buf,count,type,destcomm));
1604
1605 #ifndef CMK_OPTIMIZE
1606   int size=0;
1607   MPI_Type_size(type,&size);
1608   _LOG_E_AMPI_MSG_SEND(t,destIdx,count,size)
1609 #endif
1610 }
1611
1612 int
1613 ampi::processMessage(AmpiMsg *msg, int t, int s, void* buf, int count, int type)
1614 {
1615   CkDDT_DataType *ddt = getDDT()->getType(type);
1616   int len = ddt->getSize(count);
1617   
1618   if (msg->length > len && msg->length-len!=sizeof(AmpiOpHeader))
1619   { /* Received more data than we were expecting */
1620     char einfo[1024];
1621     sprintf(einfo, "FATAL ERROR in rank %d MPI_Recv (tag=%d, source=%d)\n"
1622         "  Expecting only %d bytes (%d items of type %d), \n"
1623         "  but received %d bytes from rank %d\nAMPI> MPI_Send was longer than matching MPI_Recv.",
1624             thisIndex,t,s,
1625             len, count, type,
1626             msg->length, msg->srcRank);
1627     CkAbort(einfo);
1628     return -1;
1629   }else if(msg->length < len){ // only at rare case shall we reset count by using divide
1630     count = msg->length/(ddt->getSize(1));
1631   }
1632   //
1633   TCharm::activateVariable(buf);
1634   if (msg->length-len==sizeof(AmpiOpHeader)) {  // reduction msg
1635     ddt->serialize((char*)buf, (char*)msg->data+sizeof(AmpiOpHeader), count, (-1));
1636   } else {
1637     ddt->serialize((char*)buf, (char*)msg->data, count, (-1));
1638   }
1639   TCharm::deactivateVariable(buf);
1640   return 0;
1641 }
1642
1643 int
1644 ampi::recv(int t, int s, void* buf, int count, int type, int comm, int *sts)
1645 {
1646   MPI_Comm disComm = myComm.getComm();
1647   if(s==MPI_PROC_NULL) {
1648     ((MPI_Status *)sts)->MPI_SOURCE = MPI_PROC_NULL;
1649     ((MPI_Status *)sts)->MPI_TAG = MPI_ANY_TAG;
1650     ((MPI_Status *)sts)->MPI_LENGTH = 0;
1651     return 0;
1652   }
1653   _LOG_E_END_AMPI_PROCESSING(thisIndex)
1654 #if CMK_BLUEGENE_CHARM
1655   void *curLog;         // store current log in timeline
1656   _TRACE_BG_TLINE_END(&curLog);
1657 //  TRACE_BG_AMPI_SUSPEND();
1658 #endif
1659
1660   if(isInter()){
1661     s = myComm.getIndexForRemoteRank(s);
1662     comm = MPI_COMM_FIRST_INTER;
1663   }
1664
1665   int tags[3];
1666   AmpiMsg *msg = 0;
1667   
1668  MSG_ORDER_DEBUG(
1669   CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
1670  )
1671
1672   resumeOnRecv=true;
1673   ampi *dis = getAmpiInstance(disComm);
1674   while(1) {
1675       //This is done to take into account the case in which an ampi 
1676       // thread has migrated while waiting for a message
1677     tags[0] = t; tags[1] = s; tags[2] = comm;
1678     msg = (AmpiMsg *) CmmGet(dis->msgs, 3, tags, sts);
1679     if (msg) break;
1680     dis->thread->suspend();
1681     dis = getAmpiInstance(disComm);
1682   }
1683   dis->resumeOnRecv=false;
1684
1685   if(sts)
1686     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1687   int status = dis->processMessage(msg, t, s, buf, count, type);
1688   if (status != 0) return status;
1689
1690   _LOG_E_BEGIN_AMPI_PROCESSING(thisIndex,s,count)
1691 #if CMK_BLUEGENE_CHARM
1692   //TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "RECV_RESUME", &curLog, 1);
1693   TRACE_BG_AMPI_BREAK(NULL, "RECV_RESUME", NULL, 0);
1694   _TRACE_BG_SET_INFO((char *)msg, "RECV_RESUME",  &curLog, 1);
1695 #endif
1696
1697   delete msg;
1698   return 0;
1699 }
1700
1701 void
1702 ampi::probe(int t, int s, int comm, int *sts)
1703 {
1704   int tags[3];
1705 #if CMK_BLUEGENE_CHARM
1706   void *curLog;         // store current log in timeline
1707   _TRACE_BG_TLINE_END(&curLog);
1708 //  TRACE_BG_AMPI_SUSPEND();
1709 #endif
1710
1711   AmpiMsg *msg = 0;
1712   resumeOnRecv=true;
1713   while(1) {
1714     tags[0] = t; tags[1] = s; tags[2] = comm;
1715     msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
1716     if (msg) break;
1717     thread->suspend();
1718   }
1719   resumeOnRecv=false;
1720   if(sts)
1721     ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1722 #if CMK_BLUEGENE_CHARM
1723 //  TRACE_BG_AMPI_RESUME(thread->getThread(), msg, "PROBE_RESUME", curLog);
1724   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
1725 #endif
1726 }
1727
1728 int
1729 ampi::iprobe(int t, int s, int comm, int *sts)
1730 {
1731   int tags[3];
1732   AmpiMsg *msg = 0;
1733   tags[0] = t; tags[1] = s; tags[2] = comm;
1734   msg = (AmpiMsg *) CmmProbe(msgs, 3, tags, sts);
1735   if (msg) {
1736     if(sts)
1737       ((MPI_Status*)sts)->MPI_LENGTH = msg->length;
1738     return 1;
1739   }
1740 #if CMK_BLUEGENE_CHARM
1741   void *curLog;         // store current log in timeline
1742   _TRACE_BG_TLINE_END(&curLog);
1743 //  TRACE_BG_AMPI_SUSPEND();
1744 #endif
1745   thread->schedule();
1746 #if CMK_BLUEGENE_CHARM
1747   //_TRACE_BG_BEGIN_EXECUTE_NOMSG("IPROBE_RESUME", &curLog);
1748   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
1749 #endif
1750   return 0;
1751 }
1752
1753
1754 const int MPI_BCAST_COMM=MPI_COMM_WORLD+1000;
1755 void
1756 ampi::bcast(int root, void* buf, int count, int type,MPI_Comm destcomm)
1757 {
1758   const ampiCommStruct &dest=comm2CommStruct(destcomm);
1759   int rootIdx=dest.getIndexForRank(root);
1760   if(rootIdx==thisIndex) {
1761 #if 0//AMPI_COMLIB
1762     ciBcast.beginIteration();
1763     comlibProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
1764 #else
1765     thisProxy.generic(makeAmpiMsg(-1,MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM));
1766 #endif
1767   }
1768   if(-1==recv(MPI_BCAST_TAG,0, buf,count,type, MPI_BCAST_COMM)) CkAbort("AMPI> Error in broadcast");
1769   nbcasts++;
1770 }
1771
1772 void
1773 ampi::bcastraw(void* buf, int len, CkArrayID aid)
1774 {
1775   AmpiMsg *msg = new (len, 0) AmpiMsg(-1, MPI_BCAST_TAG, -1, 0, len, 0);
1776   memcpy(msg->data, buf, len);
1777   CProxy_ampi pa(aid);
1778   pa.generic(msg);
1779 }
1780
1781
1782 AmpiMsg* 
1783 ampi::Alltoall_RemoteIGet(int disp, int cnt, MPI_Datatype type, int tag)
1784 {
1785   CkAssert(tag==MPI_ATA_TAG && AlltoallGetFlag);
1786   int unit;
1787   CkDDT_DataType *ddt = getDDT()->getType(type);
1788   unit = ddt->getSize(1);
1789   int totalsize = unit*cnt;
1790
1791   AmpiMsg *msg = new (totalsize, 0) AmpiMsg(-1, -1, -1, thisIndex,totalsize,myComm.getComm());
1792   char* addr = (char*)Alltoallbuff+disp*unit;
1793   ddt->serialize((char*)msg->data, addr, cnt, (-1));
1794   return msg;
1795 }
1796
1797 int MPI_null_copy_fn (MPI_Comm comm, int keyval, void *extra_state,
1798                         void *attr_in, void *attr_out, int *flag){
1799   (*flag) = 0;
1800   return (MPI_SUCCESS);
1801 }
1802 int MPI_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
1803                         void *attr_in, void *attr_out, int *flag){
1804   (*(void **)attr_out) = attr_in;
1805   (*flag) = 1;
1806   return (MPI_SUCCESS);
1807 }
1808 int MPI_null_delete_fn (MPI_Comm comm, int keyval, void *attr, void *extra_state ){
1809   return (MPI_SUCCESS);
1810 }
1811
1812
1813 void AmpiSeqQ::init(int numP) 
1814 {
1815   elements.init(numP);
1816 }
1817
1818 AmpiSeqQ::~AmpiSeqQ () {
1819 }
1820   
1821 void AmpiSeqQ::pup(PUP::er &p) {
1822   p|out;
1823   p|elements;
1824 }
1825
1826 void AmpiSeqQ::putOutOfOrder(int srcIdx, AmpiMsg *msg)
1827 {
1828   AmpiOtherElement &el=elements[srcIdx];
1829 #ifndef CMK_OPTIMIZE
1830   if (msg->seq<el.seqIncoming)
1831     CkAbort("AMPI Logic error: received late out-of-order message!\n");
1832 #endif
1833   out.enq(msg);
1834   el.nOut++; // We have another message in the out-of-order queue
1835 }
1836
1837 AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
1838 {
1839   AmpiOtherElement &el=elements[srcIdx];
1840   if (el.nOut==0) return 0; // No more out-of-order left.
1841   // Walk through our out-of-order queue, searching for our next message:
1842   for (int i=0;i<out.length();i++) {
1843     AmpiMsg *msg=out.deq();
1844     if (msg->srcIdx==srcIdx && msg->seq==el.seqIncoming) {
1845       el.seqIncoming++;
1846       el.nOut--; // We have one less message out-of-order
1847       return msg;
1848     }
1849     else
1850       out.enq(msg);
1851   }
1852   // We walked the whole queue-- ours is not there.
1853   return 0;
1854 }
1855
1856
1857 void AmpiRequestList::pup(PUP::er &p) { 
1858         CmiMemoryCheck();
1859         if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
1860                 return;
1861         }
1862
1863         p(blklen); //Allocated size of block
1864         p(len); //Number of used elements in block
1865         if(p.isUnpacking()){
1866                 makeBlock(blklen,len);
1867         }
1868         int count=0;
1869         for(int i=0;i<len;i++){
1870                 char nonnull;
1871                 if(!p.isUnpacking()){
1872                         if(block[i] == NULL){
1873                                 nonnull = 0;
1874                         }else{
1875                                 nonnull = block[i]->getType();
1876                         }
1877                 }       
1878                 p(nonnull);
1879                 if(nonnull != 0){
1880                         if(p.isUnpacking()){
1881                                 switch(nonnull){
1882                                         case 1:
1883                                                 block[i] = new PersReq;
1884                                                 break;
1885                                         case 2: 
1886                                                 block[i] = new IReq;
1887                                                 break;
1888                                         case 3: 
1889                                                 block[i] = new ATAReq;
1890                                                 break;
1891                                 }
1892                         }       
1893                         block[i]->pup(p);
1894                         count++;
1895                 }else{
1896                         block[i] = 0;
1897                 }
1898         }
1899         if(p.isDeleting()){
1900                 freeBlock();
1901         }
1902         CmiMemoryCheck();
1903 }
1904
1905 //------------------ External Interface -----------------
1906 ampiParent *getAmpiParent(void) {
1907   ampiParent *p = CtvAccess(ampiPtr);
1908 #ifndef CMK_OPTIMIZE
1909   if (p==NULL) CkAbort("Cannot call MPI routines before AMPI is initialized.\n");
1910 #endif
1911   return p;
1912 }
1913
1914 ampi *getAmpiInstance(MPI_Comm comm) {
1915   ampi *ptr=getAmpiParent()->comm2ampi(comm);
1916 #ifndef CMK_OPTIMIZE
1917   if (ptr==NULL) CkAbort("AMPI's getAmpiInstance> null pointer\n");
1918 #endif
1919   return ptr;
1920 }
1921
1922 inline static AmpiRequestList *getReqs(void) {
1923   return &(getAmpiParent()->ampiReqs);
1924 }
1925
1926 inline void checkComm(MPI_Comm comm){
1927 #ifndef CMK_OPTIMIZE
1928   getAmpiParent()->checkComm(comm);
1929 #endif
1930 }
1931
1932 inline void checkRequest(MPI_Request req){
1933 #ifndef CMK_OPTIMIZE
1934   getReqs()->checkRequest(req);
1935 #endif
1936 }
1937
1938 inline void checkRequests(int n, MPI_Request* reqs){
1939 #ifndef CMK_OPTIMIZE
1940   AmpiRequestList* reqlist = getReqs();
1941   for(int i=0;i<n;i++)
1942     reqlist->checkRequest(reqs[i]);
1943 #endif
1944 }
1945
1946 CDECL void AMPI_Migrate(void)
1947 {
1948   AMPIAPI("AMPI_Migrate");
1949 #if 0
1950 #if CMK_BLUEGENE_CHARM
1951   TRACE_BG_AMPI_SUSPEND();
1952 #endif
1953 #endif
1954   TCHARM_Migrate();
1955 #if CMK_BLUEGENE_CHARM
1956 //  TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
1957   TRACE_BG_ADD_TAG("AMPI_MIGRATE");
1958 #endif
1959 }
1960
1961
1962 CDECL void AMPI_Evacuate(void)
1963 {
1964   TCHARM_Evacuate();
1965 }
1966
1967
1968
1969 CDECL void AMPI_Migrateto(int destPE)
1970 {
1971   AMPIAPI("AMPI_MigrateTo");
1972 #if 0
1973 #if CMK_BLUEGENE_CHARM
1974   TRACE_BG_AMPI_SUSPEND();
1975 #endif
1976 #endif
1977   TCHARM_Migrate_to(destPE);
1978 #if CMK_BLUEGENE_CHARM
1979   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATETO")
1980   TRACE_BG_ADD_TAG("AMPI_MIGRATETO");
1981 #endif
1982 }
1983
1984 CDECL void AMPI_MigrateTo(int destPE)
1985 {
1986         AMPI_Migrateto(destPE);
1987 }
1988
1989 CDECL void AMPI_Async_Migrate(void)
1990 {
1991   AMPIAPI("AMPI_Async_Migrate");
1992 #if 0
1993 #if CMK_BLUEGENE_CHARM
1994   TRACE_BG_AMPI_SUSPEND();
1995 #endif
1996 #endif
1997   TCHARM_Async_Migrate();
1998 #if CMK_BLUEGENE_CHARM
1999   //TRACE_BG_AMPI_START(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "AMPI_MIGRATE")
2000   TRACE_BG_ADD_TAG("AMPI_ASYNC_MIGRATE");
2001 #endif
2002 }
2003
2004 CDECL void AMPI_Allow_Migrate(void)
2005 {
2006   AMPIAPI("AMPI_Allow_Migrate");
2007 #if 0
2008 #if CMK_BLUEGENE_CHARM
2009   TRACE_BG_AMPI_SUSPEND();
2010 #endif
2011 #endif
2012   TCHARM_Allow_Migrate();
2013 #if CMK_BLUEGENE_CHARM
2014   TRACE_BG_ADD_TAG("AMPI_ALLOW_MIGRATE");
2015 #endif
2016 }
2017
2018 CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
2019 #if CMK_LBDB_ON
2020   //AMPIAPI("AMPI_Setmigratable");
2021   ampi *ptr=getAmpiInstance(comm);
2022   ptr->setMigratable(mig);
2023 #else
2024   CkPrintf("Warning: MPI_Setmigratable and load balancing are not supported in this version.\n");
2025 #endif
2026 }
2027
2028 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
2029 {
2030   if (nodeinit_has_been_called) {
2031     AMPIAPI("AMPI_Init");
2032     char **argv;
2033     if (p_argv) argv=*p_argv;
2034     else argv=CkGetArgv();
2035     ampiInit(argv);
2036     if (p_argc) *p_argc=CmiGetArgc(argv);
2037   }
2038   else
2039   { /* Charm hasn't been started yet! */
2040     CkAbort("AMPI_Init> Charm is not initialized!");
2041   }
2042   return 0;
2043 }
2044
2045 CDECL int AMPI_Initialized(int *isInit)
2046 {
2047   if (nodeinit_has_been_called) {
2048         AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2049         *isInit=CtvAccess(ampiInitDone);
2050   }
2051   else /* !nodeinit_has_been_called */ {
2052         *isInit=nodeinit_has_been_called;
2053   }
2054   return 0;
2055 }
2056
2057 CDECL int AMPI_Finalized(int *isFinalized)
2058 {
2059     AMPIAPI("AMPI_Initialized");     /* in case charm init not called */
2060     *isFinalized=CtvAccess(ampiFinalized);
2061     return 0;
2062 }
2063
2064 CDECL int AMPI_Comm_rank(MPI_Comm comm, int *rank)
2065 {
2066   //AMPIAPI("AMPI_Comm_rank");
2067
2068 #ifdef AMPIMSGLOG
2069   ampiParent* pptr = getAmpiParent();
2070   if(msgLogRead){
2071     PUParray(*(pptr->fromPUPer), (char*)rank, sizeof(int));
2072     return 0;
2073   }
2074 #endif
2075
2076   *rank = getAmpiInstance(comm)->getRank(comm);
2077
2078 #ifdef AMPIMSGLOG
2079   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2080     PUParray(*(pptr->toPUPer), (char*)rank, sizeof(int));
2081   }
2082 #endif
2083   return 0;
2084 }
2085
2086 CDECL
2087 int AMPI_Comm_size(MPI_Comm comm, int *size)
2088 {
2089   //AMPIAPI("AMPI_Comm_size");
2090 #ifdef AMPIMSGLOG
2091   ampiParent* pptr = getAmpiParent();
2092   if(msgLogRead){
2093     PUParray(*(pptr->fromPUPer), (char*)size, sizeof(int));
2094     return 0;
2095   }
2096 #endif
2097
2098   *size = getAmpiInstance(comm)->getSize(comm);
2099
2100 #ifdef AMPIMSGLOG
2101   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2102     PUParray(*(pptr->toPUPer), (char*)size, sizeof(int));
2103   }
2104 #endif
2105
2106   return 0;
2107 }
2108
2109 CDECL
2110 int AMPI_Comm_compare(MPI_Comm comm1,MPI_Comm comm2, int *result)
2111 {
2112   AMPIAPI("AMPI_Comm_compare");
2113   if(comm1==comm2) *result=MPI_IDENT;
2114   else{
2115     int equal=1;
2116     CkVec<int> ind1, ind2;
2117     ind1 = getAmpiInstance(comm1)->getIndices();
2118     ind2 = getAmpiInstance(comm2)->getIndices();
2119     if(ind1.size()==ind2.size()){
2120       for(int i=0;i<ind1.size();i++)
2121         if(ind1[i] != ind2[i]) { equal=0; break; }
2122     }
2123     if(equal==1) *result=MPI_CONGRUENT;
2124     else *result=MPI_UNEQUAL;
2125   }
2126   return 0;
2127 }
2128
2129 CDECL void AMPI_Exit(int /*exitCode*/)
2130 {
2131   AMPIAPI("AMPI_Exit");
2132   TCHARM_Done();
2133 }
2134 FDECL void FTN_NAME(MPI_EXIT,mpi_exit)(int *exitCode)
2135 {
2136   AMPI_Exit(*exitCode);
2137 }
2138
2139 CDECL
2140 int AMPI_Finalize(void)
2141 {
2142   AMPIAPI("AMPI_Finalize");
2143 #if PRINT_IDLE
2144   CkPrintf("[%d] Idle time %fs.\n", CkMyPe(), totalidle);
2145 #endif
2146 #if AMPI_COUNTER
2147     getAmpiParent()->counters.output(getAmpiInstance(MPI_COMM_WORLD)->getRank(MPI_COMM_WORLD));
2148 #endif
2149   CtvAccess(ampiFinalized)=1;
2150 #if 0
2151 #if CMK_BLUEGENE_CHARM
2152   TRACE_BG_AMPI_SUSPEND();
2153 #endif
2154 #endif
2155 //  getAmpiInstance(MPI_COMM_WORLD)->outputCounter();
2156   AMPI_Exit(0);
2157   return 0;
2158 }
2159
2160 CDECL
2161 int AMPI_Send(void *msg, int count, MPI_Datatype type, int dest,
2162                         int tag, MPI_Comm comm)
2163 {
2164   AMPIAPI("AMPI_Send");
2165 #ifdef AMPIMSGLOG
2166   if(msgLogRead){
2167     return 0;
2168   }
2169 #endif
2170
2171   ampi *ptr = getAmpiInstance(comm);
2172 #if AMPI_COMLIB
2173   if(enableStreaming){  
2174     ptr->getStreaming().beginIteration();
2175     ptr->comlibsend(tag,ptr->getRank(comm),msg,count,type,dest,comm);
2176   } else
2177 #endif
2178     ptr->send(tag, ptr->getRank(comm), msg, count, type, dest, comm);
2179 #if AMPI_COUNTER
2180   getAmpiParent()->counters.send++;
2181 #endif
2182   return 0;
2183 }
2184
2185 CDECL
2186 int AMPI_Recv(void *msg, int count, MPI_Datatype type, int src, int tag,
2187               MPI_Comm comm, MPI_Status *status)
2188 {
2189   AMPIAPI("AMPI_Recv");
2190
2191 #ifdef AMPIMSGLOG
2192   ampiParent* pptr = getAmpiParent();
2193   if(msgLogRead){
2194     (*(pptr->fromPUPer))|(pptr->pupBytes);
2195     PUParray(*(pptr->fromPUPer), (char *)msg, (pptr->pupBytes));
2196     PUParray(*(pptr->fromPUPer), (char *)status, sizeof(MPI_Status));
2197     return 0;
2198   }
2199 #endif
2200
2201   ampi *ptr = getAmpiInstance(comm);
2202   if(-1==ptr->recv(tag,src,msg,count,type, comm, (int*) status)) CkAbort("AMPI> Error in MPI_Recv");
2203   
2204 #if AMPI_COUNTER
2205   getAmpiParent()->counters.recv++;
2206 #endif
2207  
2208 #ifdef AMPIMSGLOG
2209   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2210     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2211     (*(pptr->toPUPer))|(pptr->pupBytes);
2212     PUParray(*(pptr->toPUPer), (char *)msg, (pptr->pupBytes));
2213     PUParray(*(pptr->toPUPer), (char *)status, sizeof(MPI_Status));
2214   }
2215 #endif
2216   
2217   return 0;
2218 }
2219
2220 CDECL
2221 int AMPI_Probe(int src, int tag, MPI_Comm comm, MPI_Status *status)
2222 {
2223   AMPIAPI("AMPI_Probe");
2224   ampi *ptr = getAmpiInstance(comm);
2225   ptr->probe(tag,src, comm, (int*) status);
2226   return 0;
2227 }
2228
2229 CDECL
2230 int AMPI_Iprobe(int src,int tag,MPI_Comm comm,int *flag,MPI_Status *status)
2231 {
2232   AMPIAPI("AMPI_Iprobe");
2233   ampi *ptr = getAmpiInstance(comm);
2234   *flag = ptr->iprobe(tag,src,comm,(int*) status);
2235   return 0;
2236 }
2237
2238 CDECL
2239 int AMPI_Sendrecv(void *sbuf, int scount, int stype, int dest,
2240                   int stag, void *rbuf, int rcount, int rtype,
2241                   int src, int rtag, MPI_Comm comm, MPI_Status *sts)
2242 {
2243   AMPIAPI("AMPI_Sendrecv");
2244   int se=MPI_Send(sbuf,scount,stype,dest,stag,comm);
2245   int re=MPI_Recv(rbuf,rcount,rtype,src,rtag,comm,sts);
2246   if (se) return se;
2247   else return re;
2248 }
2249
2250 CDECL
2251 int AMPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype,
2252                          int dest, int sendtag, int source, int recvtag,
2253                          MPI_Comm comm, MPI_Status *status)
2254 {
2255   AMPIAPI("AMPI_Sendrecv_replace");
2256   return AMPI_Sendrecv(buf, count, datatype, dest, sendtag,
2257                       buf, count, datatype, source, recvtag, comm, status);
2258 }
2259
2260
2261 CDECL
2262 int AMPI_Barrier(MPI_Comm comm)
2263 {
2264   AMPIAPI("AMPI_Barrier");
2265
2266   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Barrier not allowed for Inter-communicator!");
2267 #if CMK_BLUEGENE_CHARM
2268   void *barrierLog;             // store current log in timeline
2269   _TRACE_BG_TLINE_END(&barrierLog); 
2270   //TRACE_BG_AMPI_BARRIER_START(barrierLog);
2271   TRACE_BG_AMPI_BREAK(NULL, "AMPI_Barrier", NULL, 0);
2272 #endif
2273   //HACK: Use collective operation as a barrier.
2274   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
2275 #if CMK_BLUEGENE_CHARM
2276   //TRACE_BG_AMPI_BARRIER_END(barrierLog);
2277   _TRACE_BG_SET_INFO(NULL, "AMPI_Barrier_END",  &barrierLog, 1);
2278 #endif
2279 #if AMPI_COUNTER
2280   getAmpiParent()->counters.barrier++;
2281 #endif
2282   return 0;
2283 }
2284
2285 CDECL
2286 int AMPI_Bcast(void *buf, int count, MPI_Datatype type, int root,
2287                          MPI_Comm comm)
2288 {
2289   AMPIAPI("AMPI_Bcast");
2290   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Bcast not allowed for Inter-communicator!");
2291   if(comm==MPI_COMM_SELF) return 0;
2292
2293 #ifdef AMPIMSGLOG
2294   ampiParent* pptr = getAmpiParent();
2295   if(msgLogRead){
2296     (*(pptr->fromPUPer))|(pptr->pupBytes);
2297     PUParray(*(pptr->fromPUPer), (char *)buf, (pptr->pupBytes));
2298     return 0;
2299   }
2300 #endif
2301
2302   ampi* ptr = getAmpiInstance(comm);
2303   ptr->bcast(root, buf, count, type,comm);
2304 #if AMPI_COUNTER
2305   getAmpiParent()->counters.bcast++;
2306 #endif
2307
2308 #ifdef AMPIMSGLOG
2309   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2310     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2311     (*(pptr->toPUPer))|(pptr->pupBytes);
2312     PUParray(*(pptr->toPUPer), (char *)buf, (pptr->pupBytes));
2313   }
2314 #endif
2315
2316   return 0;
2317 }
2318
2319 /// This routine is called with the results of a Reduce or AllReduce
2320 const int MPI_REDUCE_SOURCE=0;
2321 const int MPI_REDUCE_COMM=MPI_COMM_WORLD;
2322 void ampi::reduceResult(CkReductionMsg *msg)
2323 {
2324         MSG_ORDER_DEBUG(printf("[%d] reduceResult called \n",thisIndex));
2325   ampi::sendraw(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, msg->getData(), msg->getSize(),
2326              thisArrayID,thisIndex);
2327   delete msg;
2328 }
2329
2330 static CkReductionMsg *makeRednMsg(CkDDT_DataType *ddt,const void *inbuf,int count,int type,MPI_Op op)
2331 {
2332   int szdata = ddt->getSize(count);
2333   int szhdr = sizeof(AmpiOpHeader);
2334   AmpiOpHeader newhdr(op,type,count,szdata); 
2335   CkReductionMsg *msg=CkReductionMsg::buildNew(szdata+szhdr,NULL,AmpiReducer);
2336   memcpy(msg->getData(),&newhdr,szhdr);
2337   TCharm::activateVariable(inbuf);
2338   ddt->serialize((char*)inbuf, (char*)msg->getData()+szhdr, count, 1);
2339   TCharm::deactivateVariable(inbuf);
2340   return msg;
2341 }
2342
2343 // Copy the MPI datatype "type" from inbuf to outbuf
2344 static int copyDatatype(MPI_Comm comm,MPI_Datatype type,int count,const void *inbuf,void *outbuf) {
2345   // ddts don't have "copy", so fake it by serializing into a temp buffer, then
2346   //  deserializing into the output.
2347   ampi *ptr = getAmpiInstance(comm);
2348   CkDDT_DataType *ddt=ptr->getDDT()->getType(type);
2349   int len=ddt->getSize(count);
2350   char *serialized=new char[len];
2351   TCharm::activateVariable(inbuf);
2352   TCharm::activateVariable(outbuf);
2353   ddt->serialize((char*)inbuf,(char*)serialized,count,1);
2354   ddt->serialize((char*)outbuf,(char*)serialized,count,-1); 
2355   TCharm::deactivateVariable(outbuf);
2356   TCharm::deactivateVariable(inbuf);
2357   delete [] serialized;         // < memory leak!  // gzheng 
2358   
2359   return MPI_SUCCESS;
2360 }
2361
2362 CDECL
2363 int AMPI_Reduce(void *inbuf, void *outbuf, int count, int type, MPI_Op op,
2364                int root, MPI_Comm comm)
2365 {
2366   AMPIAPI("AMPI_Reduce");
2367   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2368
2369 #ifdef AMPIMSGLOG
2370   ampiParent* pptr = getAmpiParent();
2371   if(msgLogRead){
2372     (*(pptr->fromPUPer))|(pptr->pupBytes);
2373     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
2374     return 0;
2375   }
2376 #endif
2377   
2378   ampi *ptr = getAmpiInstance(comm);
2379   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
2380   if(op == MPI_OP_NULL) CkAbort("MPI_Reduce called with MPI_OP_NULL!!!");
2381   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce not allowed for Inter-communicator!");
2382
2383   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2384
2385   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
2386   msg->setCallback(reduceCB);
2387         MSG_ORDER_DEBUG(CkPrintf("[%d] AMPI_Reduce called on comm %d root %d \n",ptr->thisIndex,comm,rootIdx));
2388   ptr->contribute(msg);
2389   if (ptr->thisIndex == rootIdx){
2390     /*HACK: Use recv() to block until reduction data comes back*/
2391     if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2392       CkAbort("AMPI>MPI_Reduce called with different values on different processors!");
2393   }
2394 #if AMPI_COUNTER
2395   getAmpiParent()->counters.reduce++;
2396 #endif
2397
2398 #ifdef AMPIMSGLOG
2399   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2400     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2401     (*(pptr->toPUPer))|(pptr->pupBytes);
2402     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
2403   }
2404 #endif
2405   
2406   return 0;
2407 }
2408
2409 CDECL
2410 int AMPI_Allreduce(void *inbuf, void *outbuf, int count, int type,
2411                   MPI_Op op, MPI_Comm comm)
2412 {
2413   AMPIAPI("AMPI_Allreduce");
2414   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2415
2416 #ifdef AMPIMSGLOG
2417   ampiParent* pptr = getAmpiParent();
2418   if(msgLogRead){
2419     (*(pptr->fromPUPer))|(pptr->pupBytes);
2420     PUParray(*(pptr->fromPUPer), (char *)outbuf, (pptr->pupBytes));
2421     //    CkExit();
2422     return 0;
2423   }
2424 #endif
2425
2426   ampi *ptr = getAmpiInstance(comm);
2427   if(op == MPI_OP_NULL) CkAbort("MPI_Allreduce called with MPI_OP_NULL!!!");
2428   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allreduce not allowed for Inter-communicator!");
2429
2430   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2431   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2432   msg->setCallback(allreduceCB);
2433   ptr->contribute(msg);
2434
2435   /*HACK: Use recv() to block until the reduction data comes back*/
2436   if(-1==ptr->recv(MPI_REDUCE_TAG, MPI_REDUCE_SOURCE, outbuf, count, type, MPI_REDUCE_COMM))
2437     CkAbort("AMPI> MPI_Allreduce called with different values on different processors!");
2438 #if AMPI_COUNTER
2439   getAmpiParent()->counters.allreduce++;
2440 #endif
2441
2442 #ifdef AMPIMSGLOG
2443   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2444     (pptr->pupBytes) = getDDT()->getSize(type) * count;
2445     (*(pptr->toPUPer))|(pptr->pupBytes);
2446     PUParray(*(pptr->toPUPer), (char *)outbuf, (pptr->pupBytes));
2447     //    CkExit();
2448   }
2449 #endif
2450
2451   return 0;
2452 }
2453
2454 CDECL
2455 int AMPI_Iallreduce(void *inbuf, void *outbuf, int count, int type,
2456                    MPI_Op op, MPI_Comm comm, MPI_Request* request)
2457 {
2458   AMPIAPI("AMPI_Allreduce");
2459   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,inbuf,outbuf);
2460
2461   checkRequest(*request);
2462   if(op == MPI_OP_NULL) CkAbort("MPI_Iallreduce called with MPI_OP_NULL!!!");
2463   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallreduce not allowed for Inter-communicator!");
2464   ampi *ptr = getAmpiInstance(comm);
2465
2466   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),inbuf,count,type,op);
2467   CkCallback allreduceCB(CkIndex_ampi::reduceResult(0),ptr->getProxy());
2468   msg->setCallback(allreduceCB);
2469   ptr->contribute(msg);
2470
2471   // using irecv instead recv to non-block the call and get request pointer
2472   AmpiRequestList* reqs = getReqs();
2473   IReq *newreq = new IReq(outbuf,count,type,MPI_REDUCE_SOURCE,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
2474   *request = reqs->insert(newreq);
2475   return 0;
2476 }
2477
2478 CDECL
2479 int AMPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts,
2480                        MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
2481 {
2482   AMPIAPI("AMPI_Reduce_scatter");
2483   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Reduce_scatter not allowed for Inter-communicator!");
2484   if(comm==MPI_COMM_SELF) return copyDatatype(comm,datatype,recvcounts[0],sendbuf,recvbuf);
2485   ampi *ptr = getAmpiInstance(comm);
2486   int size = ptr->getSize(comm);
2487   int count=0;
2488   int *displs = new int [size];
2489   int len;
2490   void *tmpbuf;
2491
2492   //under construction
2493   for(int i=0;i<size;i++){
2494     displs[i] = count;
2495     count+= recvcounts[i];
2496   }
2497   len = ptr->getDDT()->getType(datatype)->getSize(count);
2498   tmpbuf = malloc(len);
2499   AMPI_Reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
2500   AMPI_Scatterv(tmpbuf, recvcounts, displs, datatype,
2501                recvbuf, recvcounts[ptr->getRank(comm)], datatype, 0, comm);
2502   free(tmpbuf);
2503   delete [] displs;     // < memory leak ! // gzheng
2504   return 0;
2505 }
2506
2507 /***** MPI_Scan algorithm (from MPICH) *******
2508    recvbuf = sendbuf;
2509    partial_scan = sendbuf;
2510    mask = 0x1;
2511    while (mask < size) {
2512       dst = rank^mask;
2513       if (dst < size) {
2514          send partial_scan to dst;
2515          recv from dst into tmp_buf;
2516          if (rank > dst) {
2517             partial_scan = tmp_buf + partial_scan;
2518             recvbuf = tmp_buf + recvbuf;
2519          }
2520          else {
2521             if (op is commutative)
2522                partial_scan = tmp_buf + partial_scan;
2523             else {
2524                tmp_buf = partial_scan + tmp_buf;
2525                partial_scan = tmp_buf;
2526             }
2527          }
2528       }
2529       mask <<= 1;
2530    }
2531  ***** MPI_Scan algorithm (from MPICH) *******/
2532
2533 void applyOp(MPI_Datatype datatype, MPI_Op op, int count, void* invec, void* inoutvec) { // inoutvec[i] = invec[i] op inoutvec[i]
2534   (op)(invec,inoutvec,&count,&datatype);
2535 }
2536 CDECL
2537 int AMPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm ){
2538   AMPIAPI("AMPI_Scan");
2539   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scan not allowed for Inter-communicator!");
2540   MPI_Status sts;
2541   ampi *ptr = getAmpiInstance(comm);
2542   int size = ptr->getSize(comm);
2543   int blklen = ptr->getDDT()->getType(datatype)->getSize(count);
2544   int rank = ptr->getRank(comm);
2545   int mask = 0x1;
2546   int dst;
2547   void* tmp_buf = malloc(blklen);
2548   void* partial_scan = malloc(blklen);
2549   
2550   memcpy(recvbuf, sendbuf, blklen);
2551   memcpy(partial_scan, sendbuf, blklen);
2552   while(mask < size){
2553     dst = rank^mask;
2554     if(dst < size){
2555       AMPI_Sendrecv(partial_scan,count,datatype,dst,MPI_SCAN_TAG,
2556                    tmp_buf,count,datatype,dst,MPI_SCAN_TAG,comm,&sts);
2557       if(rank > dst){
2558         (op)(tmp_buf,partial_scan,&count,&datatype);
2559         (op)(tmp_buf,recvbuf,&count,&datatype);
2560       }else {
2561         (op)(partial_scan,tmp_buf,&count,&datatype);
2562         memcpy(partial_scan,tmp_buf,blklen);
2563       }
2564     }
2565     mask <<= 1;
2566
2567   }
2568
2569   free(tmp_buf);
2570   free(partial_scan);
2571 #if AMPI_COUNTER
2572   getAmpiParent()->counters.scan++;
2573 #endif
2574   return 0;
2575 }
2576
2577 CDECL
2578 int AMPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op){
2579   //AMPIAPI("AMPI_Op_create");
2580   *op = function;
2581   return 0;
2582 }
2583
2584 CDECL
2585 int AMPI_Op_free(MPI_Op *op){
2586   //AMPIAPI("AMPI_Op_free");
2587   *op = MPI_OP_NULL;
2588   return 0;
2589 }
2590
2591
2592 CDECL
2593 double AMPI_Wtime(void)
2594 {
2595 //  AMPIAPI("AMPI_Wtime");
2596
2597 #ifdef AMPIMSGLOG
2598   double ret=TCHARM_Wall_timer();
2599   ampiParent* pptr = getAmpiParent();
2600   if(msgLogRead){
2601     (*(pptr->fromPUPer))|ret;
2602     return ret;
2603   }
2604
2605   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2606     (*(pptr->toPUPer))|ret;
2607   }
2608 #endif
2609
2610 #if CMK_BLUEGENE_CHARM
2611   return BgGetTime();
2612 #else
2613   return TCHARM_Wall_timer();
2614 #endif
2615 }
2616
2617 CDECL
2618 double AMPI_Wtick(void){
2619   //AMPIAPI("AMPI_Wtick");
2620   return 1e-6;
2621 }
2622
2623 int PersReq::start(){
2624   if(sndrcv == 1) { // send request
2625     ampi *ptr=getAmpiInstance(comm);
2626     ptr->send(tag, ptr->getRank(comm), buf, count, type, src, comm);
2627   }
2628   return 0;
2629 }
2630
2631 CDECL
2632 int AMPI_Start(MPI_Request *request)
2633 {
2634   AMPIAPI("AMPI_Start");
2635   checkRequest(*request);
2636   AmpiRequestList *reqs = getReqs();
2637   if(-1==(*reqs)[*request]->start()){
2638     CkAbort("MPI_Start could be used only on persistent communication requests!");
2639   }
2640   return 0;
2641 }
2642
2643 CDECL
2644 int AMPI_Startall(int count, MPI_Request *requests){
2645   AMPIAPI("AMPI_Startall");
2646   checkRequests(count,requests);
2647   AmpiRequestList *reqs = getReqs();
2648   for(int i=0;i<count;i++){
2649     if(-1==(*reqs)[requests[i]]->start())
2650       CkAbort("MPI_Start could be used only on persistent communication requests!");
2651   }
2652   return 0;
2653 }
2654
2655 /* organize the indices of requests into a vector of a vector: 
2656  * level 1 is different msg envelope matches
2657  * level 2 is (posting) ordered requests of with envelope
2658  * each time multiple completion call loop over first elem of level 1
2659  * and move the matched to the NULL request slot.   
2660  * warning: this does not work with I-Alltoall requests */
2661 inline int areInactiveReqs(int count, MPI_Request* reqs){ // if count==0 then all inactive
2662   for(int i=0;i<count;i++){
2663     if(reqs[i]!=MPI_REQUEST_NULL)
2664       return 0;
2665   }
2666   return 1;
2667 }
2668 inline int matchReq(MPI_Request ia, MPI_Request ib){
2669   checkRequest(ia);  
2670   checkRequest(ib);
2671   AmpiRequestList* reqs = getReqs();
2672   AmpiRequest *a, *b;
2673   if(ia==MPI_REQUEST_NULL && ib==MPI_REQUEST_NULL) return 1;
2674   if(ia==MPI_REQUEST_NULL || ib==MPI_REQUEST_NULL) return 0;
2675   a=(*reqs)[ia];  b=(*reqs)[ib];
2676   if(a->tag != b->tag) return 0;
2677   if(a->src != b->src) return 0;
2678   if(a->comm != b->comm) return 0;
2679   return 1;
2680 }
2681 inline void swapInt(int& a,int& b){
2682   int tmp;
2683   tmp=a; a=b; b=tmp;
2684 }
2685 inline void sortedIndex(int n, int* arr, int* idx){
2686   int i,j;
2687   for(i=0;i<n;i++) 
2688     idx[i]=i;
2689   for (i=0; i<n-1; i++) 
2690     for (j=0; j<n-1-i; j++)
2691       if (arr[idx[j+1]] < arr[idx[j]]) 
2692         swapInt(idx[j+1],idx[j]);
2693 }
2694 CkVec<CkVec<int> > *vecIndex(int count, int* arr){
2695   CkAssert(count!=0);
2696   int *newidx = new int [count];
2697   int flag;
2698   sortedIndex(count,arr,newidx);
2699   CkVec<CkVec<int> > *vec = new CkVec<CkVec<int> >;
2700   CkVec<int> slot;
2701   vec->push_back(slot);
2702   (*vec)[0].push_back(newidx[0]);
2703   for(int i=1;i<count;i++){
2704     flag=0;
2705     for(int j=0;j<vec->size();j++){
2706       if(matchReq(arr[newidx[i]],arr[((*vec)[j])[0]])){
2707         ((*vec)[j]).push_back(newidx[i]);
2708         flag++;
2709       }
2710     }
2711     if(!flag){
2712       CkVec<int> newslot;
2713       newslot.push_back(newidx[i]);
2714       vec->push_back(newslot);
2715     }else{
2716       CkAssert(flag==1);
2717     }
2718   }
2719   delete [] newidx;
2720   return vec;
2721 }
2722 void vecPrint(CkVec<CkVec<int> > vec, int* arr){
2723   printf("vec content: ");
2724   for(int i=0;i<vec.size();i++){
2725     printf("{");
2726     for(int j=0;j<(vec[i]).size();j++){
2727       printf(" %d ",arr[(vec[i])[j]]);
2728     }
2729     printf("} ");
2730   }
2731   printf("\n");
2732 }
2733
2734 int PersReq::wait(MPI_Status *sts){
2735         if(sndrcv == 2) {
2736                 if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
2737                         CkAbort("AMPI> Error in persistent request wait");
2738 #if CMK_BLUEGENE_CHARM
2739                 _TRACE_BG_TLINE_END(&event);
2740 #endif
2741         }
2742         return 0;
2743 }
2744
2745 int IReq::wait(MPI_Status *sts){
2746   if (CpvAccess(CmiPICMethod) != 2) 
2747   {
2748           // optimization for Irecv
2749           // generic() writes directly to the buffer, so the only thing we
2750           // do here is to wait
2751         ampi *ptr = getAmpiInstance(comm);
2752         while (status == false) {
2753                 ptr->resumeOnRecv=true;
2754                 ptr->block();
2755         }
2756         ptr->resumeOnRecv=false;
2757         if(sts) {
2758           sts->MPI_TAG = tag;
2759           sts->MPI_SOURCE = src;
2760           sts->MPI_COMM = comm;
2761           sts->MPI_LENGTH = length;
2762         }
2763   }
2764   else {
2765         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
2766                 CkAbort("AMPI> Error in non-blocking request wait");
2767   }
2768 #if CMK_BLUEGENE_CHARM
2769         _TRACE_BG_TLINE_END(&event);
2770 #endif
2771         return 0;
2772 }
2773
2774 int ATAReq::wait(MPI_Status *sts){
2775         int i;
2776         for(i=0;i<count;i++){
2777                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
2778                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int *)sts))
2779                         CkAbort("AMPI> Error in alltoall request wait");
2780 #if CMK_BLUEGENE_CHARM
2781                 _TRACE_BG_TLINE_END(&myreqs[i].event);
2782 #endif
2783         }
2784 #if CMK_BLUEGENE_CHARM
2785         //TRACE_BG_AMPI_NEWSTART(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq", NULL, 0);
2786         TRACE_BG_AMPI_BREAK(getAmpiInstance(MPI_COMM_WORLD)->getThread(), "ATAReq_wait", NULL, 0);
2787         for (i=0; i<count; i++)
2788           _TRACE_BG_ADD_BACKWARD_DEP(myreqs[i].event);
2789         _TRACE_BG_TLINE_END(&event);
2790 #endif
2791         return 0;
2792 }
2793
2794 CDECL
2795 int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
2796 {
2797   AMPIAPI("AMPI_Wait");
2798   if(*request == MPI_REQUEST_NULL){
2799     stsempty(*sts);
2800     return 0;
2801   }
2802   checkRequest(*request);
2803   AmpiRequestList* reqs = getReqs();
2804
2805 #ifdef AMPIMSGLOG
2806   ampiParent* pptr = getAmpiParent();
2807   if(msgLogRead){
2808     (*(pptr->fromPUPer))|(pptr->pupBytes);
2809     PUParray(*(pptr->fromPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
2810     PUParray(*(pptr->fromPUPer), (char *)sts, sizeof(MPI_Status));
2811     return 0;
2812   }
2813 #endif
2814   
2815   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
2816   (*reqs)[*request]->wait(sts);
2817
2818 #ifdef AMPIMSGLOG
2819   if(msgLogWrite && pptr->thisIndex == msgLogRank){
2820     (pptr->pupBytes) = getDDT()->getSize((*reqs)[*request]->type) * ((*reqs)[*request]->count);
2821     (*(pptr->toPUPer))|(pptr->pupBytes);
2822     PUParray(*(pptr->toPUPer), (char *)((*reqs)[*request]->buf), (pptr->pupBytes));
2823     PUParray(*(pptr->toPUPer), (char *)sts, sizeof(MPI_Status));
2824   }
2825 #endif
2826   
2827   if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
2828     reqs->free(*request);
2829     *request = MPI_REQUEST_NULL;
2830   }
2831
2832   return 0;
2833 }
2834
2835 CDECL
2836 int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
2837 {
2838   AMPIAPI("AMPI_Waitall");
2839   if(count==0) return MPI_SUCCESS;
2840   checkRequests(count,request);
2841   int i,j,oldPe;
2842   AmpiRequestList* reqs = getReqs();
2843   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);\
2844
2845 #ifdef AMPIMSGLOG
2846   ampiParent* pptr = getAmpiParent();
2847   if(msgLogRead){
2848     for(i=0;i<reqvec->size();i++){
2849       for(j=0;j<((*reqvec)[i]).size();j++){
2850         if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
2851           stsempty(sts[((*reqvec)[i])[j]]);
2852           continue;
2853         }
2854         AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
2855         (*(pptr->fromPUPer))|(pptr->pupBytes);
2856         PUParray(*(pptr->fromPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
2857         PUParray(*(pptr->fromPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
2858       }
2859     }
2860     return 0;
2861   }
2862 #endif
2863
2864 #if CMK_BLUEGENE_CHARM
2865   void *curLog;         // store current log in timeline
2866   _TRACE_BG_TLINE_END(&curLog);
2867 #if 0
2868   TRACE_BG_AMPI_SUSPEND();
2869 #endif
2870 #endif
2871   for(i=0;i<reqvec->size();i++){
2872     for(j=0;j<((*reqvec)[i]).size();j++){
2873       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
2874         stsempty(sts[((*reqvec)[i])[j]]);
2875         continue;
2876       }
2877       oldPe = CkMyPe();
2878       AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
2879       waitReq->wait(&sts[((*reqvec)[i])[j]]);
2880
2881 #ifdef AMPIMSGLOG
2882       if(msgLogWrite && pptr->thisIndex == msgLogRank){
2883         (pptr->pupBytes) = getDDT()->getSize(waitReq->type) * (waitReq->count);
2884         (*(pptr->toPUPer))|(pptr->pupBytes);
2885         PUParray(*(pptr->toPUPer), (char *)(waitReq->buf), (pptr->pupBytes));
2886         PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
2887       }
2888 #endif
2889
2890 #if 1
2891       if(oldPe != CkMyPe()){
2892                         reqs = getReqs();
2893                         reqvec  = vecIndex(count,request);
2894       }
2895 #endif
2896     }
2897   }
2898 #if CMK_BLUEGENE_CHARM
2899   TRACE_BG_AMPI_WAITALL(reqs);   // setup forward and backward dependence
2900 #endif
2901   // free memory of requests
2902   for(i=0;i<count;i++){ 
2903     if(request[i] == MPI_REQUEST_NULL)
2904       continue;
2905     if((*reqs)[request[i]]->getType() != 1) { // only free non-blocking request
2906       reqs->free(request[i]);
2907       request[i] = MPI_REQUEST_NULL;
2908     }
2909   }
2910   delete reqvec;
2911   return 0;
2912 }
2913
2914 CDECL
2915 int AMPI_Waitany(int count, MPI_Request *request, int *idx, MPI_Status *sts)
2916 {
2917   AMPIAPI("AMPI_Waitany");
2918   
2919   USER_CALL_DEBUG("AMPI_Waitany("<<count<<")");
2920   checkRequests(count,request);
2921   if(areInactiveReqs(count,request)){
2922     *idx=MPI_UNDEFINED;
2923     stsempty(*sts);
2924     return MPI_SUCCESS;
2925   }
2926   int flag=0;
2927   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
2928   while(count>0){ /* keep looping until some request finishes: */
2929     for(int i=0;i<reqvec->size();i++){
2930       AMPI_Test(&request[((*reqvec)[i])[0]], &flag, sts);
2931       if(flag == 1 && sts->MPI_COMM != 0){ // to skip MPI_REQUEST_NULL
2932         *idx = ((*reqvec)[i])[0];
2933         USER_CALL_DEBUG("AMPI_Waitany returning "<<*idx);
2934         return 0;
2935       }
2936     }
2937     /* no requests have finished yet-- schedule and try again */
2938     AMPI_Yield(MPI_COMM_WORLD);
2939   }
2940   *idx = MPI_UNDEFINED;
2941   USER_CALL_DEBUG("AMPI_Waitany returning UNDEFINED");
2942         delete reqvec;
2943   return 0;
2944 }
2945
2946 CDECL
2947 int AMPI_Waitsome(int incount, MPI_Request *array_of_requests, int *outcount,
2948                  int *array_of_indices, MPI_Status *array_of_statuses)
2949 {
2950   AMPIAPI("AMPI_Waitsome");
2951   checkRequests(incount,array_of_requests);
2952   if(areInactiveReqs(incount,array_of_requests)){
2953     *outcount=MPI_UNDEFINED;
2954     return MPI_SUCCESS;
2955   }
2956   MPI_Status sts;
2957   int i;
2958   int flag=0, realflag=0;
2959   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
2960   *outcount = 0;
2961   while(1){
2962     for(i=0;i<reqvec->size();i++){
2963       AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
2964       if(flag == 1){ 
2965         array_of_indices[(*outcount)]=((*reqvec)[i])[0];
2966         array_of_statuses[(*outcount)++]=sts;
2967         if(sts.MPI_COMM != 0)
2968           realflag=1; // there is real(non null) request
2969       }
2970     }
2971     if(realflag && outcount>0) break;
2972   }
2973         delete reqvec;
2974   return 0;
2975 }
2976
2977 CmiBool PersReq::test(MPI_Status *sts){
2978         if(sndrcv == 2)         // recv request
2979                 return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
2980         else                    // send request
2981                 return 1;
2982
2983 }
2984 void PersReq::complete(MPI_Status *sts){
2985         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
2986                 CkAbort("AMPI> Error in persistent request complete");
2987 }
2988
2989 CmiBool IReq::test(MPI_Status *sts){
2990         if (status == true) {           
2991           if(sts)
2992             sts->MPI_LENGTH = length;           
2993           return true;
2994         }
2995         else {
2996           getAmpiInstance(comm)->yield();
2997           return false;
2998         }
2999 /*
3000         return getAmpiInstance(comm)->iprobe(tag, src, comm, (int*)sts);
3001 */
3002 }
3003 void IReq::complete(MPI_Status *sts){
3004         wait(sts);
3005 /*
3006         if(-1==getAmpiInstance(comm)->recv(tag, src, buf, count, type, comm, (int*)sts))
3007                 CkAbort("AMPI> Error in non-blocking request complete");
3008 */
3009 }
3010
3011 void IReq::receive(ampi *ptr, AmpiMsg *msg)
3012 {
3013     int sts = ptr->processMessage(msg, tag, src, buf, count, type);
3014     status = (sts == 0);
3015     length = msg->length;
3016     delete msg;
3017 }
3018
3019 CmiBool ATAReq::test(MPI_Status *sts){
3020         int i, flag=1;
3021         for(i=0;i<count;i++){
3022                 flag *= getAmpiInstance(myreqs[i].comm)->iprobe(myreqs[i].tag, myreqs[i].src,
3023                                         myreqs[i].comm, (int*) sts);
3024         }
3025         return flag;
3026 }
3027 void ATAReq::complete(MPI_Status *sts){
3028         int i;
3029         for(i=0;i<count;i++){
3030                 if(-1==getAmpiInstance(myreqs[i].comm)->recv(myreqs[i].tag, myreqs[i].src, myreqs[i].buf,
3031                                                 myreqs[i].count, myreqs[i].type, myreqs[i].comm, (int*)sts))
3032                         CkAbort("AMPI> Error in alltoall request complete");
3033         }
3034 }
3035
3036 CDECL
3037 int AMPI_Test(MPI_Request *request, int *flag, MPI_Status *sts)
3038 {
3039   AMPIAPI("AMPI_Test");
3040   checkRequest(*request);
3041   if(*request==MPI_REQUEST_NULL) {
3042     *flag = 1;
3043     stsempty(*sts);
3044     return 0;
3045   }
3046   AmpiRequestList* reqs = getReqs();
3047   if(1 == (*flag = (*reqs)[*request]->test(sts))){
3048     if((*reqs)[*request]->getType() != 1) { // only free non-blocking request
3049       (*reqs)[*request]->complete(sts);
3050       reqs->free(*request);
3051       *request = MPI_REQUEST_NULL;
3052     }
3053   }
3054   return 0;
3055 }
3056
3057 CDECL
3058 int AMPI_Testany(int count, MPI_Request *request, int *index, int *flag, MPI_Status *sts){
3059   AMPIAPI("AMPI_Testany");
3060   checkRequests(count,request);
3061   if(areInactiveReqs(count,request)){
3062     *flag=1;
3063     *index=MPI_UNDEFINED;
3064     stsempty(*sts);
3065     return MPI_SUCCESS;
3066   }
3067   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3068   *flag=0;
3069   for(int i=0;i<reqvec->size();i++){
3070     AMPI_Test(&request[((*reqvec)[i])[0]], flag, sts);
3071     if(*flag==1 && sts->MPI_COMM!=0){ // skip MPI_REQUEST_NULL
3072       *index = ((*reqvec)[i])[0];
3073       return 0;
3074     }
3075   }
3076   *index = MPI_UNDEFINED;
3077         delete reqvec;
3078   return 0;
3079 }
3080
3081 CDECL
3082 int AMPI_Testall(int count, MPI_Request *request, int *flag, MPI_Status *sts)
3083 {
3084   AMPIAPI("AMPI_Testall");
3085   if(count==0) return MPI_SUCCESS;
3086   checkRequests(count,request);
3087   int tmpflag;
3088   int i,j;
3089   AmpiRequestList* reqs = getReqs();
3090   CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
3091   *flag = 1;  
3092   for(i=0;i<reqvec->size();i++){
3093     for(j=0;j<((*reqvec)[i]).size();j++){
3094       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL)
3095         continue;
3096       tmpflag = (*reqs)[request[((*reqvec)[i])[j]]]->test(&sts[((*reqvec)[i])[j]]);
3097       *flag *= tmpflag;
3098     }
3099   }
3100   if(flag) 
3101     MPI_Waitall(count,request,sts);
3102         delete reqvec;  
3103   return 0;
3104 }
3105
3106 CDECL
3107 int AMPI_Testsome(int incount, MPI_Request *array_of_requests, int *outcount,
3108                  int *array_of_indices, MPI_Status *array_of_statuses)
3109 {
3110   AMPIAPI("AMPI_Testsome");
3111   checkRequests(incount,array_of_requests);
3112   if(areInactiveReqs(incount,array_of_requests)){
3113     *outcount=MPI_UNDEFINED;
3114     return MPI_SUCCESS;
3115   }
3116   MPI_Status sts;
3117   int flag;
3118   int i;
3119   CkVec<CkVec<int> > *reqvec = vecIndex(incount,array_of_requests);
3120   *outcount = 0;
3121   for(i=0;i<reqvec->size();i++){
3122     AMPI_Test(&array_of_requests[((*reqvec)[i])[0]], &flag, &sts);
3123     if(flag == 1){
3124       array_of_indices[(*outcount)]=((*reqvec)[i])[0];
3125       array_of_statuses[(*outcount)++]=sts;
3126     }
3127   }
3128         delete reqvec;
3129   return 0;
3130 }
3131
3132 CDECL
3133 int AMPI_Request_free(MPI_Request *request){
3134   AMPIAPI("AMPI_Request_free");
3135   checkRequest(*request);
3136   if(*request==MPI_REQUEST_NULL) return 0;
3137   AmpiRequestList* reqs = getReqs();
3138   reqs->free(*request);
3139   return 0;
3140 }
3141
3142 CDECL
3143 int AMPI_Cancel(MPI_Request *request){
3144   AMPIAPI("AMPI_Request_free");
3145   return AMPI_Request_free(request);
3146 }
3147
3148 CDECL
3149 int AMPI_Recv_init(void *buf, int count, int type, int src, int tag,
3150                    MPI_Comm comm, MPI_Request *req)
3151 {
3152   AMPIAPI("AMPI_Recv_init");
3153   AmpiRequestList* reqs = getReqs();
3154   PersReq *newreq = new PersReq(buf,count,type,src,tag,comm,2);
3155   *req = reqs->insert(newreq);
3156   return 0;
3157 }
3158
3159 CDECL
3160 int AMPI_Send_init(void *buf, int count, int type, int dest, int tag,
3161                    MPI_Comm comm, MPI_Request *req)
3162 {
3163   AMPIAPI("AMPI_Send_init");
3164   AmpiRequestList* reqs = getReqs();
3165   PersReq *newreq = new PersReq(buf,count,type,dest,tag,comm,1);
3166   *req = reqs->insert(newreq);
3167   return 0;
3168 }
3169
3170 CDECL
3171 int AMPI_Type_contiguous(int count, MPI_Datatype oldtype,
3172                          MPI_Datatype *newtype)
3173 {
3174   AMPIAPI("AMPI_Type_contiguous");
3175   getDDT()->newContiguous(count, oldtype, newtype);
3176   return 0;
3177 }
3178
3179 CDECL
3180 int AMPI_Type_vector(int count, int blocklength, int stride,
3181                      MPI_Datatype oldtype, MPI_Datatype*  newtype)
3182 {
3183   AMPIAPI("AMPI_Type_vector");
3184   getDDT()->newVector(count, blocklength, stride, oldtype, newtype);
3185   return 0 ;
3186 }
3187
3188 CDECL
3189 int AMPI_Type_hvector(int count, int blocklength, MPI_Aint stride, 
3190                       MPI_Datatype oldtype, MPI_Datatype*  newtype)
3191 {
3192   AMPIAPI("AMPI_Type_hvector");
3193   getDDT()->newHVector(count, blocklength, stride, oldtype, newtype);
3194   return 0 ;
3195 }
3196
3197 CDECL
3198 int AMPI_Type_indexed(int count, int* arrBlength, int* arrDisp, 
3199                       MPI_Datatype oldtype, MPI_Datatype*  newtype)
3200 {
3201   AMPIAPI("AMPI_Type_indexed");
3202   getDDT()->newIndexed(count, arrBlength, arrDisp, oldtype, newtype);
3203   return 0 ;
3204 }
3205
3206 CDECL
3207 int AMPI_Type_hindexed(int count, int* arrBlength, MPI_Aint* arrDisp,
3208                        MPI_Datatype oldtype, MPI_Datatype*  newtype)
3209 {
3210   AMPIAPI("AMPI_Type_hindexed");
3211   getDDT()->newHIndexed(count, arrBlength, arrDisp, oldtype, newtype);
3212   return 0 ;
3213 }
3214
3215 CDECL
3216 int AMPI_Type_struct(int count, int* arrBlength, int* arrDisp, 
3217                      MPI_Datatype* oldtype, MPI_Datatype*  newtype)
3218 {
3219   AMPIAPI("AMPI_Type_struct");
3220   getDDT()->newStruct(count, arrBlength, arrDisp, oldtype, newtype);
3221   return 0 ;
3222 }
3223
3224 CDECL
3225 int AMPI_Type_commit(MPI_Datatype *datatype)
3226 {
3227   AMPIAPI("AMPI_Type_commit");
3228   return 0;
3229 }
3230
3231 CDECL
3232 int AMPI_Type_free(MPI_Datatype *datatype)
3233 {
3234   AMPIAPI("AMPI_Type_free");
3235   getDDT()->freeType(datatype);
3236   return 0;
3237 }
3238
3239
3240 CDECL
3241 int AMPI_Type_extent(MPI_Datatype datatype, MPI_Aint *extent)
3242 {
3243   AMPIAPI("AMPI_Type_extent");
3244   *extent = getDDT()->getExtent(datatype);
3245   return 0;
3246 }
3247
3248 CDECL
3249 int AMPI_Type_size(MPI_Datatype datatype, int *size)
3250 {
3251   AMPIAPI("AMPI_Type_size");
3252   *size=getDDT()->getSize(datatype);
3253   return 0;
3254 }
3255
3256 CDECL
3257 int AMPI_Isend(void *buf, int count, MPI_Datatype type, int dest,
3258               int tag, MPI_Comm comm, MPI_Request *request)
3259 {
3260   AMPIAPI("AMPI_Isend");
3261
3262 #ifdef AMPIMSGLOG
3263   ampiParent* pptr = getAmpiParent();
3264   if(msgLogRead){
3265     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
3266     return 0;
3267   }
3268 #endif
3269
3270   USER_CALL_DEBUG("AMPI_Isend("<<type<<","<<dest<<","<<tag<<","<<comm<<")");
3271   ampi *ptr = getAmpiInstance(comm);
3272 #if AMPI_COMLIB
3273   if(enableStreaming){
3274     ptr->getStreaming().beginIteration();
3275     ptr->comlibsend(tag,ptr->getRank(comm),buf,count,type,dest,comm);
3276   } else
3277 #endif
3278   ptr->send(tag, ptr->getRank(comm), buf, count, type, dest, comm);
3279   *request = MPI_REQUEST_NULL;
3280 #if AMPI_COUNTER
3281   getAmpiParent()->counters.isend++;
3282 #endif
3283
3284 #ifdef AMPIMSGLOG
3285   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3286     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
3287   }
3288 #endif
3289
3290   return 0;
3291 }
3292
3293 CDECL
3294 int AMPI_Irecv(void *buf, int count, MPI_Datatype type, int src,
3295               int tag, MPI_Comm comm, MPI_Request *request)
3296 {
3297   AMPIAPI("AMPI_Irecv");
3298
3299   if(src==MPI_PROC_NULL) { *request = MPI_REQUEST_NULL; return 0; }
3300   USER_CALL_DEBUG("AMPI_Irecv("<<type<<","<<src<<","<<tag<<","<<comm<<")");
3301   AmpiRequestList* reqs = getReqs();
3302   IReq *newreq = new IReq(buf,count,type,src,tag,comm);
3303   *request = reqs->insert(newreq);
3304
3305 #ifdef AMPIMSGLOG
3306   ampiParent* pptr = getAmpiParent();
3307   if(msgLogRead){
3308     PUParray(*(pptr->fromPUPer), (char *)request, sizeof(MPI_Request));
3309     return 0;
3310   }
3311 #endif
3312  
3313   // if msg is here, shall we do receive right away ??
3314   // posted irecv
3315   // message already arraved?
3316   ampi *ptr = getAmpiInstance(comm);
3317   AmpiMsg *msg = NULL;
3318   if (CpvAccess(CmiPICMethod) != 2)       // not copyglobals 
3319   {
3320     msg = ptr->getMessage(tag, src, comm, &newreq->tag);
3321   }
3322   if (msg) {
3323     newreq->receive(ptr, msg);
3324   } else {
3325       // post receive
3326     int tags[3];
3327     tags[0] = tag; tags[1] = src; tags[2] = comm;
3328     CmmPut(ptr->posted_ireqs, 3, tags, newreq);
3329   }
3330   
3331 #if AMPI_COUNTER
3332   getAmpiParent()->counters.irecv++;
3333 #endif
3334
3335 #ifdef AMPIMSGLOG
3336   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3337     PUParray(*(pptr->toPUPer), (char *)request, sizeof(MPI_Request));
3338   }
3339 #endif
3340   
3341   return 0;
3342 }
3343
3344 CDECL
3345 int AMPI_Ireduce(void *sendbuf, void *recvbuf, int count, int type, MPI_Op op,
3346                  int root, MPI_Comm comm, MPI_Request *request)
3347 {
3348   AMPIAPI("AMPI_Ireduce");
3349   if(op == MPI_OP_NULL) CkAbort("MPI_Ireduce called with MPI_OP_NULL!!!");
3350   if(comm==MPI_COMM_SELF) return copyDatatype(comm,type,count,sendbuf,recvbuf);
3351   ampi *ptr = getAmpiInstance(comm);
3352   CkReductionMsg *msg=makeRednMsg(ptr->getDDT()->getType(type),sendbuf,count,type,op);
3353   int rootIdx=ptr->comm2CommStruct(comm).getIndexForRank(root);
3354   CkCallback reduceCB(CkIndex_ampi::reduceResult(0),CkArrayIndex1D(rootIdx),ptr->getProxy(),true);
3355   msg->setCallback(reduceCB);
3356   ptr->contribute(msg);
3357
3358   if (ptr->thisIndex == rootIdx){
3359     // using irecv instead recv to non-block the call and get request pointer
3360     AmpiRequestList* reqs = getReqs();
3361     IReq *newreq = new IReq(recvbuf,count,type,0,MPI_REDUCE_TAG,MPI_REDUCE_COMM);
3362     *request = reqs->insert(newreq);
3363   }
3364   return 0;
3365 }
3366
3367 CDECL
3368 int AMPI_Allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3369                   void *recvbuf, int recvcount, MPI_Datatype recvtype,
3370                   MPI_Comm comm)
3371 {
3372   AMPIAPI("AMPI_Allgather");
3373   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgather not allowed for Inter-communicator!");
3374   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3375
3376   ampi *ptr = getAmpiInstance(comm);
3377   int size = ptr->getSize(comm);
3378   int i;
3379 #if AMPI_COMLIB
3380   if(comm == MPI_COMM_WORLD) {
3381       // commlib support
3382       ptr->getAllgather().beginIteration();
3383       for(i=0;i<size;i++) {
3384           ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3385                         sendtype, i, comm, ptr->getComlibProxy());
3386       }
3387       ptr->getAllgather().endIteration();
3388   } else 
3389 #endif
3390       for(i=0;i<size;i++) {
3391           ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3392                     sendtype, i, comm);
3393       }
3394
3395   MPI_Status status;
3396   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
3397   int itemsize = dttype->getSize(recvcount) ;
3398
3399   for(i=0;i<size;i++) {
3400     AMPI_Recv(((char*)recvbuf)+(itemsize*i), recvcount, recvtype,
3401              i, MPI_GATHER_TAG, comm, &status);
3402   }
3403 #if AMPI_COUNTER
3404   getAmpiParent()->counters.allgather++;
3405 #endif
3406   return 0;
3407 }
3408
3409 CDECL
3410 int AMPI_Iallgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3411                     void *recvbuf, int recvcount, MPI_Datatype recvtype,
3412                     MPI_Comm comm, MPI_Request* request)
3413 {
3414   AMPIAPI("AMPI_Iallgather");
3415   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Iallgather not allowed for Inter-communicator!");
3416   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3417
3418   ampi *ptr = getAmpiInstance(comm);
3419   int size = ptr->getSize(comm);
3420   int i;
3421 #if AMPI_COMLIB
3422   if(comm == MPI_COMM_WORLD) {
3423       // commlib support
3424       ptr->getAllgather().beginIteration();
3425       for(i=0;i<size;i++) {
3426           ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3427                         sendtype, i, comm, ptr->getComlibProxy());
3428       }
3429       ptr->getAllgather().endIteration();
3430   } else 
3431 #endif
3432       for(i=0;i<size;i++) {
3433           ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3434                     sendtype, i, comm);
3435       }
3436
3437   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
3438   int itemsize = dttype->getSize(recvcount) ;
3439
3440   // copy+paste from MPI_Irecv
3441   AmpiRequestList* reqs = getReqs();
3442   ATAReq *newreq = new ATAReq(size);
3443   for(i=0;i<size;i++){
3444     if(newreq->addReq(((char*)recvbuf)+(itemsize*i),recvcount,recvtype,i,MPI_GATHER_TAG,comm)!=(i+1))
3445       CkAbort("MPI_Iallgather: Error adding requests into ATAReq!");
3446   }
3447   *request = reqs->insert(newreq);
3448   AMPI_DEBUG("MPI_Iallgather: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
3449
3450   return 0;
3451 }
3452
3453 CDECL
3454 int AMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3455                    void *recvbuf, int *recvcounts, int *displs,
3456                    MPI_Datatype recvtype, MPI_Comm comm)
3457 {
3458   AMPIAPI("AMPI_Allgatherv");
3459   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Allgatherv not allowed for Inter-communicator!");
3460   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3461
3462   ampi *ptr = getAmpiInstance(comm);
3463   int size = ptr->getSize(comm);
3464   int i;
3465 #if AMPI_COMLIB
3466   if(comm == MPI_COMM_WORLD) {
3467       // commlib support
3468       ptr->getAllgather().beginIteration();
3469       for(i=0;i<size;i++) {
3470           ptr->delesend(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3471                         sendtype, i, comm, ptr->getComlibProxy());
3472       }
3473       ptr->getAllgather().endIteration();
3474   } else
3475 #endif 
3476       for(i=0;i<size;i++) {
3477           ptr->send(MPI_GATHER_TAG, ptr->getRank(comm), sendbuf, sendcount,
3478                     sendtype, i, comm);
3479       }
3480
3481   MPI_Status status;
3482   CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
3483   int itemsize = dttype->getSize() ;
3484
3485   for(i=0;i<size;i++) {
3486     AMPI_Recv(((char*)recvbuf)+(itemsize*displs[i]), recvcounts[i], recvtype,
3487              i, MPI_GATHER_TAG, comm, &status);
3488   }
3489 #if AMPI_COUNTER
3490   getAmpiParent()->counters.allgather++;
3491 #endif
3492   return 0;
3493 }
3494
3495 CDECL
3496 int AMPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3497                void *recvbuf, int recvcount, MPI_Datatype recvtype,
3498                int root, MPI_Comm comm)
3499 {
3500   AMPIAPI("AMPI_Gather");
3501   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3502
3503 #ifdef AMPIMSGLOG
3504   ampiParent* pptr = getAmpiParent();
3505   if(msgLogRead){
3506     (*(pptr->fromPUPer))|(pptr->pupBytes);
3507     PUParray(*(pptr->fromPUPer), (char *)recvbuf, (pptr->pupBytes));
3508     return 0;
3509   }
3510 #endif
3511
3512   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Gather not allowed for Inter-communicator!");
3513
3514   ampi *ptr = getAmpiInstance(comm);
3515   int size = ptr->getSize(comm);
3516   int i;
3517   AMPI_Send(sendbuf, sendcount, sendtype, root, MPI_GATHER_TAG, comm);
3518
3519   if(ptr->getRank(comm)==root) {
3520     MPI_Status status;
3521     CkDDT_DataType* dttype = ptr->getDDT()->getType(recvtype) ;
3522     int itemsize = dttype->getSize(recvcount) ;
3523
3524     for(i=0;i<size;i++) {
3525       if(-1==ptr->recv(MPI_GATHER_TAG, i, (void*)(((char*)recvbuf)+(itemsize*i)), recvcount, recvtype, comm, (int*)(&status)))
3526         CkAbort("AMPI> Error in MPI_Gather recv");
3527     }
3528   }
3529 #if AMPI_COUNTER
3530   getAmpiParent()->counters.gather++;
3531 #endif
3532
3533 #ifdef AMPIMSGLOG
3534   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3535     (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcount * size;
3536     (*(pptr->toPUPer))|(pptr->pupBytes);
3537     PUParray(*(pptr->toPUPer), (char *)recvbuf, (pptr->pupBytes));
3538   }
3539 #endif
3540   
3541   return 0;
3542 }
3543
3544 CDECL
3545 int AMPI_Gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3546                 void *recvbuf, int *recvcounts, int *displs,
3547                 MPI_Datatype recvtype, int root, MPI_Comm comm)
3548 {
3549   AMPIAPI("AMPI_Gatherv");
3550   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3551
3552   int itemsize = getDDT()->getSize(recvtype);
3553
3554 #ifdef AMPIMSGLOG
3555   ampiParent* pptr = getAmpiParent();
3556   if(msgLogRead){
3557     int commsize;
3558     (*(pptr->fromPUPer))|commsize;
3559     for(int i=0;i<commsize;i++){
3560       (*(pptr->fromPUPer))|(pptr->pupBytes);
3561       PUParray(*(pptr->fromPUPer), (char *)(((char*)recvbuf)+(itemsize*displs[i])), (pptr->pupBytes));
3562     }
3563     return 0;
3564   }
3565 #endif
3566
3567   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Gatherv not allowed for Inter-communicator!");
3568
3569   AMPI_Send(sendbuf, sendcount, sendtype, root, MPI_GATHER_TAG, comm);
3570
3571   ampi *ptr = getAmpiInstance(comm);
3572   int size = ptr->getSize(comm);
3573   if(ptr->getRank(comm) == root) {
3574     MPI_Status status;
3575     for(int i=0;i<size;i++) {
3576       if(-1==ptr->recv(MPI_GATHER_TAG, i, (void*)(((char*)recvbuf)+(itemsize*displs[i])), recvcounts[i], recvtype, comm, (int*)(&status)))
3577         CkAbort("AMPI> Error in MPI_Gatherv recv");
3578     }
3579   }
3580 #if AMPI_COUNTER
3581   getAmpiParent()->counters.gather++;
3582 #endif
3583
3584 #ifdef AMPIMSGLOG
3585   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3586     for(int i=0;i<size;i++){
3587       (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcounts[i];
3588       (*(pptr->toPUPer))|(pptr->pupBytes);
3589       PUParray(*(pptr->toPUPer), (char *)(((char*)recvbuf)+(itemsize*displs[i])), (pptr->pupBytes));
3590     }
3591   }
3592 #endif
3593     
3594   return 0;
3595 }
3596
3597 CDECL
3598 int AMPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3599                 void *recvbuf, int recvcount, MPI_Datatype recvtype,
3600                 int root, MPI_Comm comm)
3601 {
3602   AMPIAPI("AMPI_Scatter");
3603   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scatter not allowed for Inter-communicator!");
3604   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3605
3606 #ifdef AMPIMSGLOG
3607   ampiParent* pptr = getAmpiParent();
3608   if(msgLogRead){
3609     (*(pptr->fromPUPer))|(pptr->pupBytes);
3610     PUParray(*(pptr->fromPUPer), (char *)recvbuf, (pptr->pupBytes));
3611     return 0;
3612   }
3613 #endif
3614
3615   ampi *ptr = getAmpiInstance(comm);
3616   int size = ptr->getSize(comm);
3617   int i;
3618
3619   if(ptr->getRank(comm)==root) {
3620       CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
3621       int itemsize = dttype->getSize(sendcount) ;
3622       for(i=0;i<size;i++) {
3623           ptr->send(MPI_SCATTER_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i),
3624                     sendcount, sendtype, i, comm);
3625       }
3626       //ptr->mcomlibEnd();
3627   }
3628   
3629   MPI_Status status;
3630   if(-1==ptr->recv(MPI_SCATTER_TAG, root, recvbuf, recvcount, recvtype, comm, (int*)(&status)))
3631     CkAbort("AMPI> Error in MPI_Scatter recv");
3632
3633 #if AMPI_COUNTER
3634   getAmpiParent()->counters.scatter++;
3635 #endif
3636
3637 #ifdef AMPIMSGLOG
3638   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3639     (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcount;
3640     (*(pptr->toPUPer))|(pptr->pupBytes);
3641     PUParray(*(pptr->toPUPer), (char *)recvbuf, (pptr->pupBytes));
3642   }
3643 #endif
3644
3645   return 0;
3646 }
3647
3648 CDECL
3649 int AMPI_Scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype,
3650                  void *recvbuf, int recvcount, MPI_Datatype recvtype,
3651                  int root, MPI_Comm comm)
3652 {
3653   AMPIAPI("AMPI_Scatterv");
3654   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Scatterv not allowed for Inter-communicator!");
3655   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcounts[0],sendbuf,recvbuf);
3656
3657 #ifdef AMPIMSGLOG
3658   ampiParent* pptr = getAmpiParent();
3659   if(msgLogRead){
3660     (*(pptr->fromPUPer))|(pptr->pupBytes);
3661     PUParray(*(pptr->fromPUPer), (char *)recvbuf, (pptr->pupBytes));
3662     return 0;
3663   }
3664 #endif
3665
3666   ampi *ptr = getAmpiInstance(comm);
3667   int size = ptr->getSize(comm);
3668   int i;
3669
3670   if(ptr->getRank(comm) == root) {
3671       CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
3672       int itemsize = dttype->getSize() ;
3673       for(i=0;i<size;i++) {
3674           ptr->send(MPI_SCATTER_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*displs[i]),
3675                     sendcounts[i], sendtype, i, comm);
3676       }
3677       //ptr->mcomlibEnd();
3678   }
3679   
3680   MPI_Status status;
3681   if(-1==ptr->recv(MPI_SCATTER_TAG, root, recvbuf, recvcount, recvtype, comm, (int*)(&status)))
3682     CkAbort("AMPI> Error in MPI_Scatterv recv");
3683   
3684 #if AMPI_COUNTER
3685   getAmpiParent()->counters.scatter++;
3686 #endif
3687
3688 #ifdef AMPIMSGLOG
3689   if(msgLogWrite && pptr->thisIndex == msgLogRank){
3690     (pptr->pupBytes) = getDDT()->getSize(recvtype) * recvcount;
3691     (*(pptr->toPUPer))|(pptr->pupBytes);
3692     PUParray(*(pptr->toPUPer), (char *)recvbuf, (pptr->pupBytes));
3693   }
3694 #endif
3695
3696   return 0;
3697 }
3698
3699 CDECL
3700 int AMPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3701                  void *recvbuf, int recvcount, MPI_Datatype recvtype,
3702                  MPI_Comm comm)
3703 {
3704   AMPIAPI("AMPI_Alltoall");
3705   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Alltoall not allowed for Inter-communicator!");
3706   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3707   ampi *ptr = getAmpiInstance(comm);
3708   int size = ptr->getSize(comm);
3709   CkDDT_DataType *dttype;
3710   int itemsize;
3711   int i;
3712
3713 #if 0
3714     // use irecv to post receives
3715   dttype = ptr->getDDT()->getType(recvtype) ;
3716   itemsize = dttype->getSize(recvcount) ;
3717   int rank = ptr->getRank(comm);
3718
3719   MPI_Request *reqs = new MPI_Request[size];
3720   for(i=0;i<size;i++) {
3721         int dst = (rank+i) % size;
3722         AMPI_Irecv(((char*)recvbuf)+(itemsize*dst), recvcount, recvtype,
3723               dst, MPI_ATA_TAG, comm, &reqs[i]);
3724   }
3725   //AMPI_Yield(comm);
3726   //AMPI_Barrier(comm);
3727
3728   dttype = ptr->getDDT()->getType(sendtype) ;
3729   itemsize = dttype->getSize(sendcount) ;
3730 #if AMPI_COMLIB
3731   if(comm == MPI_COMM_WORLD) {
3732       // commlib support
3733       ptr->getAlltoall().beginIteration();
3734       for(i=0;i<size;i++) {
3735           ptr->delesend(MPI_ATA_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
3736                         sendtype, i, comm, ptr->getComlibProxy());
3737       }
3738       ptr->getAlltoall().endIteration();
3739   } else
3740 #endif 
3741   {
3742       for(i=0;i<size;i++) {
3743           int dst = (rank+i) % size;
3744           ptr->send(MPI_ATA_TAG, rank, ((char*)sendbuf)+(itemsize*dst), sendcount,
3745                     sendtype, dst, comm);
3746       }
3747   }
3748   
3749   // can use waitall if it is fixed for memory
3750   MPI_Status status;
3751   for (i=0;i<size;i++) AMPI_Wait(&reqs[i], &status);
3752 /*
3753   MPI_Status *status = new MPI_Status[size];
3754   AMPI_Waitall(size, reqs, status);
3755   delete [] status;
3756 */
3757
3758   delete [] reqs;
3759 #else
3760     // use blocking recv
3761   dttype = ptr->getDDT()->getType(sendtype) ;
3762   itemsize = dttype->getSize(sendcount) ;
3763   int rank = ptr->getRank(comm);
3764   int comm_size = size;
3765   MPI_Status status;
3766
3767   if( itemsize <= AMPI_ALLTOALL_SHORT_MSG ){
3768     /* Short message. Use recursive doubling. Each process sends all
3769        its data at each step along with all data it received in
3770        previous steps. */
3771     
3772     /* need to allocate temporary buffer of size
3773        sendbuf_extent*comm_size */
3774     
3775     int sendtype_extent = getDDT()->getExtent(sendtype);
3776     int recvtype_extent = getDDT()->getExtent(recvtype);
3777     int sendbuf_extent = sendcount * comm_size * sendtype_extent;
3778
3779     void* tmp_buf = malloc(sendbuf_extent*comm_size);
3780
3781     /* copy local sendbuf into tmp_buf at location indexed by rank */
3782     int curr_cnt = sendcount*comm_size;
3783     copyDatatype(comm, sendtype, curr_cnt, sendbuf,
3784                  ((char *)tmp_buf + rank*sendbuf_extent));
3785
3786     int mask = 0x1;
3787     int src,dst,tree_root,dst_tree_root,my_tree_root;
3788     int last_recv_cnt,nprocs_completed;
3789     int j,k,tmp_mask;
3790     i = 0;
3791     while (mask < comm_size) {
3792       dst = rank ^ mask;
3793       
3794       dst_tree_root = dst >> i;
3795       dst_tree_root <<= i;
3796       
3797       my_tree_root = rank >> i;
3798       my_tree_root <<= i;
3799       
3800       if (dst < comm_size) {
3801         MPI_Sendrecv(((char *)tmp_buf +
3802                       my_tree_root*sendbuf_extent),
3803                      curr_cnt, sendtype,
3804                      dst, MPI_ATA_TAG, 
3805                      ((char *)tmp_buf +
3806                       dst_tree_root*sendbuf_extent),
3807                      sendcount*comm_size*mask,
3808                      sendtype, dst, MPI_ATA_TAG, 
3809                      comm, &status);
3810         
3811         /* in case of non-power-of-two nodes, less data may be
3812            received than specified */
3813         MPI_Get_count(&status, sendtype, &last_recv_cnt);
3814         curr_cnt += last_recv_cnt;
3815       }
3816       
3817       /* if some processes in this process's subtree in this step
3818          did not have any destination process to communicate with
3819          because of non-power-of-two, we need to send them the
3820          result. We use a logarithmic recursive-halfing algorithm
3821          for this. */
3822       
3823       if (dst_tree_root + mask > comm_size) {
3824         nprocs_completed = comm_size - my_tree_root - mask;
3825         /* nprocs_completed is the number of processes in this
3826            subtree that have all the data. Send data to others
3827            in a tree fashion. First find root of current tree
3828            that is being divided into two. k is the number of
3829            least-significant bits in this process's rank that
3830            must be zeroed out to find the rank of the root */ 
3831         j = mask;
3832         k = 0;
3833         while (j) {
3834           j >>= 1;
3835           k++;
3836         }
3837         k--;
3838         
3839         tmp_mask = mask >> 1;
3840         while (tmp_mask) {
3841           dst = rank ^ tmp_mask;
3842           
3843           tree_root = rank >> k;
3844           tree_root <<= k;
3845           
3846           /* send only if this proc has data and destination
3847              doesn't have data. at any step, multiple processes
3848              can send if they have the data */
3849           if ((dst > rank) && 
3850               (rank < tree_root + nprocs_completed)
3851               && (dst >= tree_root + nprocs_completed)) {
3852             /* send the data received in this step above */
3853             MPI_Send(((char *)tmp_buf +
3854                       dst_tree_root*sendbuf_extent),
3855                      last_recv_cnt, sendtype,
3856                      dst, MPI_ATA_TAG,
3857                      comm);  
3858           }
3859           /* recv only if this proc. doesn't have data and sender
3860              has data */
3861           else if ((dst < rank) && 
3862                    (dst < tree_root + nprocs_completed) &&
3863                    (rank >= tree_root + nprocs_completed)) {
3864             MPI_Recv(((char *)tmp_buf +
3865                       dst_tree_root*sendbuf_extent),
3866                      sendcount*comm_size*mask, 
3867                      sendtype,   
3868                      dst, MPI_ATA_TAG,
3869                      comm, &status); 
3870             MPI_Get_count(&status, sendtype, &last_recv_cnt);
3871             curr_cnt += last_recv_cnt;
3872           }
3873           tmp_mask >>= 1;
3874           k--;
3875         }
3876       }
3877       
3878       mask <<= 1;
3879       i++;
3880     }
3881     
3882     /* now copy everyone's contribution from tmp_buf to recvbuf */
3883     for (int p=0; p<comm_size; p++) {
3884       copyDatatype(comm,sendtype,sendcount,
3885                    ((char *)tmp_buf +
3886                     p*sendbuf_extent +
3887                     rank*sendcount*sendtype_extent),
3888                    ((char*)recvbuf +
3889                     p*recvcount*recvtype_extent));
3890     }
3891     
3892     free((char *)tmp_buf); 
3893     
3894   }else if ( itemsize <= AMPI_ALLTOALL_MEDIUM_MSG ){
3895 #if AMPI_COMLIB
3896     if(comm == MPI_COMM_WORLD) {
3897       // commlib support
3898       ptr->getAlltoall().beginIteration();
3899       for(i=0;i<size;i++) {
3900         ptr->delesend(MPI_ATA_TAG, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
3901                       sendtype, i, comm, ptr->getComlibProxy());
3902       }
3903       ptr->getAlltoall().endIteration();
3904     } else
3905 #endif 
3906       {
3907         for(i=0;i<size;i++) {
3908           int dst = (rank+i) % size;
3909           ptr->send(MPI_ATA_TAG, rank, ((char*)sendbuf)+(itemsize*dst), sendcount,
3910                     sendtype, dst, comm);
3911         }
3912       }
3913     dttype = ptr->getDDT()->getType(recvtype) ;
3914     itemsize = dttype->getSize(recvcount) ;
3915     MPI_Status status;
3916     for(i=0;i<size;i++) {
3917       int dst = (rank+i) % size;
3918       AMPI_Recv(((char*)recvbuf)+(itemsize*dst), recvcount, recvtype,
3919                 dst, MPI_ATA_TAG, comm, &status);
3920     }
3921   } else {    // large messages
3922     /* Long message. Use pairwise exchange. If comm_size is a
3923        power-of-two, use exclusive-or to create pairs. Else send
3924        to rank+i, receive from rank-i. */
3925     
3926     int pof2;
3927     int src, dst;
3928     /* Is comm_size a power-of-two? */
3929     i = 1;
3930     while (i < size)
3931       i *= 2;
3932     if (i == size)
3933       pof2 = 1;
3934     else
3935       pof2 = 0;
3936     
3937     /* The i=0 case takes care of moving local data into recvbuf */
3938     for (i=0; i<size; i++) {
3939       if (pof2 == 1) {
3940         /* use exclusive-or algorithm */
3941         src = dst = rank ^ i;
3942       }
3943       else {
3944         src = (rank - i + size) % size;
3945         dst = (rank + i) % size;
3946       }
3947       
3948       MPI_Status status;
3949       MPI_Sendrecv(((char *)sendbuf + dst*itemsize),
3950                    sendcount, sendtype, dst,
3951                    MPI_ATA_TAG,
3952                    ((char *)recvbuf + src*itemsize),
3953                    recvcount, recvtype, src,
3954                    MPI_ATA_TAG, comm, &status);
3955     }   // end of large message
3956   }
3957 #endif
3958
3959 #if AMPI_COUNTER
3960   getAmpiParent()->counters.alltoall++;
3961 #endif
3962   return 0;
3963 }
3964
3965 CDECL
3966 int AMPI_Alltoall2(void *sendbuf, int sendcount, MPI_Datatype sendtype,
3967                  void *recvbuf, int recvcount, MPI_Datatype recvtype,
3968                  MPI_Comm comm)
3969 {
3970   AMPIAPI("AMPI_Alltoall2");
3971   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Alltoall not allowed for Inter-communicator!");
3972   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
3973   ampi *ptr = getAmpiInstance(comm);
3974   CProxy_ampi pa(ptr->ckGetArrayID());
3975   int size = ptr->getSize(comm);
3976   CkDDT_DataType *dttype;
3977   int itemsize;
3978   int recvdisp;
3979   int myrank;
3980   int i;
3981   // Set flags for others to get
3982   ptr->setA2AIGetFlag((void*)sendbuf);
3983   MPI_Comm_rank(comm,&myrank);
3984   recvdisp = myrank*recvcount;
3985
3986   AMPI_Barrier(comm);
3987     // post receives
3988   MPI_Request *reqs = new MPI_Request[size];
3989   for(i=0;i<size;i++) {
3990           reqs[i] = pa[i].Alltoall_RemoteIGet(recvdisp, recvcount, recvtype,
3991 MPI_ATA_TAG);
3992   }
3993
3994   dttype = ptr->getDDT()->getType(recvtype) ;
3995   itemsize = dttype->getSize(recvcount) ;
3996   AmpiMsg *msg;
3997   for(i=0;i<size;i++) {
3998           msg = (AmpiMsg*)CkWaitReleaseFuture(reqs[i]);
3999           memcpy((char*)recvbuf+(itemsize*i), msg->data,itemsize);
4000           delete msg;
4001   }
4002   
4003   delete [] reqs;
4004   AMPI_Barrier(comm);
4005
4006   // Reset flags 
4007   ptr->resetA2AIGetFlag();
4008   
4009 #if AMPI_COUNTER
4010   getAmpiParent()->counters.alltoall++;
4011 #endif
4012   return 0;
4013 }
4014
4015 CDECL
4016 int AMPI_Ialltoall(void *sendbuf, int sendcount, MPI_Datatype sendtype,
4017                  void *recvbuf, int recvcount, MPI_Datatype recvtype,
4018                  MPI_Comm comm, MPI_Request *request)
4019 {
4020   AMPIAPI("AMPI_Ialltoall");
4021   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Ialltoall not allowed for Inter-communicator!");
4022   if(comm==MPI_COMM_SELF) return copyDatatype(comm,sendtype,sendcount,sendbuf,recvbuf);
4023   ampi *ptr = getAmpiInstance(comm);
4024   AmpiRequestList* reqs = getReqs();
4025   int size = ptr->getSize(comm);
4026   int reqsSize = reqs->size();
4027   CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
4028   int itemsize = dttype->getSize(sendcount) ;
4029   int i;
4030
4031 #if AMPI_COMLIB
4032   if(comm == MPI_COMM_WORLD) {
4033       // commlib support
4034       ptr->getAlltoall().beginIteration();
4035       for(i=0;i<size;i++) {
4036           ptr->delesend(MPI_ATA_TAG+reqsSize, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
4037                         sendtype, i, comm, ptr->getComlibProxy());
4038       }
4039       ptr->getAlltoall().endIteration();
4040   } else
4041 #endif
4042     for(i=0;i<size;i++) {
4043       ptr->send(MPI_ATA_TAG+reqsSize, ptr->getRank(comm), ((char*)sendbuf)+(itemsize*i), sendcount,
4044                 sendtype, i, comm);
4045     }
4046   
4047   // copy+paste from MPI_Irecv
4048   ATAReq *newreq = new ATAReq(size);
4049   for(i=0;i<size;i++){
4050     if(newreq->addReq(((char*)recvbuf)+(itemsize*i),recvcount,recvtype,i,MPI_ATA_TAG+reqsSize,comm)!=(i+1))
4051       CkAbort("MPI_Ialltoall: Error adding requests into ATAReq!");
4052   }
4053   *request = reqs->insert(newreq);
4054   AMPI_DEBUG("MPI_Ialltoall: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
4055   return 0;
4056 }
4057
4058 CDECL
4059 int AMPI_Alltoallv(void *sendbuf, int *sendcounts_, int *sdispls_,
4060                   MPI_Datatype sendtype, void *recvbuf, int *recvcounts_,
4061                   int *rdispls_, MPI_Datatype recvtype, MPI_Comm comm)
4062 {
4063   if(getAmpiParent()->isInter(comm)) CkAbort("MPI_Alltoallv not allowed for Inter-communicator!");
4064   if(comm==MPI_COMM_SELF) return 0;
4065   ampi *ptr = getAmpiInstance(comm);
4066   int size = ptr->getSize(comm);
4067   int *sendcounts = sendcounts_;
4068   int *sdispls = sdispls_;
4069   int *recvcounts = recvcounts_;
4070   int *rdispls = rdispls_;
4071   if (CpvAccess(CmiPICMethod) == 2)       // copyglobals make separate copy
4072   {
4073       // FIXME: we don't need to make copy if it is not global variable
4074     sendcounts = new int[size];
4075     sdispls = new int[size];
4076     recvcounts = new int[size];
4077     rdispls = new int[size];
4078     for (int i=0; i<size; i++) {
4079       sendcounts[i] = sendcounts_[i];
4080       sdispls[i] = sdispls_[i];
4081       recvcounts[i] = recvcounts_[i];
4082       rdispls[i] = rdispls_[i];
4083     }
4084   }
4085   AMPIAPI("AMPI_Alltoallv");
4086   CkDDT_DataType* dttype = ptr->getDDT()->getType(sendtype) ;
4087   int itemsize = dttype->getSize() ;
4088   int i;
4089 #if AMPI_COMLIB
4090   if(comm == MPI_COMM_WORLD) {
4091       // commlib support
4092       ptr->getAlltoall().beginIteration();
4093       for(i=0;i<size;i++) {
4094           ptr->delesend(MPI_GATHER_TAG,ptr->getRank(comm),((char*)sendbuf)+(itemsize*sdispls[i]),sendcounts[i],
4095                         sendtype, i, comm, ptr->getComlibProxy());
4096       }
4097       ptr->getAlltoall().endIteration();
4098   } else
4099 #endif
4100   {
4101       for(i=0;i<size;i++)  {
4102         ptr->send(MPI_GATHER_TAG,ptr->getRank(comm),((char*)sendbuf)+(itemsize*sdispls[i]),sendcounts[i],
4103                     sendtype, i, comm);
4104       }
4105   }
4106   MPI_Status status;
4107   dttype = ptr->getDDT()->getType(recvtype) ;
4108   itemsize = dttype->getSize() ;
4109
4110   for(i=0;i<size;i++) {
4111     AMPI_Recv(((char*)recvbuf)+(itemsize*rdispls[i]), recvcounts[i], recvtype,
4112              i, MPI_GATHER_TAG, comm, &status);
4113   }
4114 #if AMPI_COUNTER
4115   getAmpiParent()->counters.alltoall++;
4116 #endif
4117   if (CpvAccess(CmiPICMethod) == 2)       // copyglobals
4118   {
4119     delete [] sendcounts;
4120     delete [] sdispls;
4121     delete [] recvcounts;
4122     delete [] rdispls;
4123   }
4124   return 0;
4125 }
4126
4127 CDECL
4128 int AMPI_Comm_dup(int comm, int *newcomm)
4129 {
4130   AMPIAPI("AMPI_Comm_dup");
4131   *newcomm = comm;
4132   return 0;
4133 }
4134
4135 CDECL
4136 int AMPI_Comm_split(int src,int color,int key,int *dest)
4137 {
4138   AMPIAPI("AMPI_Comm_split");
4139 #ifdef AMPIMSGLOG
4140   ampiParent* pptr = getAmpiParent();
4141   if(msgLogRead){
4142     PUParray(*(pptr->fromPUPer), (char *)dest, sizeof(int));
4143     return 0;
4144   }
4145 #endif
4146
4147   getAmpiInstance(src)->split(color,key,dest, 0);
4148   AMPI_Barrier(src);  // to prevent race condition in the new comm
4149
4150 #ifdef AMPIMSGLOG
4151   if(msgLogWrite && pptr->thisIndex == msgLogRank){
4152     PUParray(*(pptr->toPUPer), (char *)dest, sizeof(int));
4153   }
4154 #endif
4155
4156   return 0;
4157 }
4158
4159 CDECL
4160 int AMPI_Comm_free(int *comm)
4161 {
4162   AMPIAPI("AMPI_Comm_free");
4163   return 0;
4164 }
4165
4166 CDECL
4167 int AMPI_Comm_test_inter(MPI_Comm comm, int *flag){
4168   AMPIAPI("AMPI_Comm_test_inter");
4169   *flag = getAmpiParent()->isInter(comm);
4170   return 0;
4171 }
4172
4173 CDECL
4174 int AMPI_Comm_remote_size(MPI_Comm comm, int *size){
4175   AMPIAPI("AMPI_Comm_remote_size");
4176   *size = getAmpiParent()->getRemoteSize(comm);
4177   return 0;
4178 }
4179
4180 CDECL
4181 int AMPI_Comm_remote_group(MPI_Comm comm, MPI_Group *group){
4182   AMPIAPI("AMPI_Comm_remote_group");
4183   *group = getAmpiParent()->getRemoteGroup(comm);
4184   return 0;
4185 }
4186
4187 CDECL
4188 int AMPI_Intercomm_create(MPI_Comm lcomm, int lleader, MPI_Comm rcomm, int rleader, int tag, MPI_Comm *newintercomm){
4189   AMPIAPI("AMPI_Intercomm_create");
4190   ampi *ptr = getAmpiInstance(lcomm);
4191   int root = ptr->getIndexForRank(lleader);
4192   CkVec<int> rvec;
4193   int lrank;
4194   AMPI_Comm_rank(lcomm,&lrank);
4195
4196   if(lrank==lleader){
4197     int lsize, rsize;
4198     lsize = ptr->getSize(lcomm);
4199     int *larr = new int [lsize];
4200     int *rarr;
4201     CkVec<int> lvec = ptr->getIndices();
4202     MPI_Status sts;
4203
4204     // local leader exchanges groupStruct with remote leader
4205     int i;
4206     for(i=0;i<lsize;i++)
4207       larr[i] = lvec[i];
4208     AMPI_Send(&lsize,1,MPI_INT,rleader,tag,rcomm);
4209     AMPI_Recv(&rsize,1,MPI_INT,rleader,tag,rcomm,&sts);
4210
4211     rarr = new int [rsize];
4212     AMPI_Send(larr,lsize,MPI_INT,rleader,tag+1,rcomm);
4213     AMPI_Recv(rarr,rsize,MPI_INT,rleader,tag+1,rcomm,&sts);
4214     for(i=0;i<rsize;i++)
4215       rvec.push_back(rarr[i]);
4216
4217     delete [] larr;
4218     delete [] rarr;
4219
4220     if(rsize==0) CkAbort("MPI_Intercomm_create: remote size = 0! Does it really make sense to create an empty communicator?\n");
4221   }
4222   
4223   ptr->intercommCreate(rvec,root,newintercomm);
4224   return 0;
4225 }
4226
4227 CDECL
4228 int AMPI_Intercomm_merge(MPI_Comm intercomm, int high, MPI_Comm *newintracomm){
4229   AMPIAPI("AMPI_Intercomm_merge");
4230   ampi *ptr = getAmpiInstance(intercomm);
4231   int lroot, rroot, lrank, lhigh, rhigh, first;
4232   lroot = ptr->getIndexForRank(0);
4233   rroot = ptr->getIndexForRemoteRank(0);
4234   lhigh = high;
4235   lrank = ptr->getRank(intercomm);
4236
4237   if(lrank==0){
4238     MPI_Status sts;
4239     AMPI_Send(&lhigh,1,MPI_INT,0,10010,intercomm);
4240     AMPI_Recv(&rhigh,1,MPI_INT,0,10010,intercomm,&sts);
4241
4242     if((lhigh && rhigh) || (!lhigh && !rhigh)){ // same value: smaller root goes first (first=1 if local goes first)
4243       first = (lroot < rroot);
4244     }else{ // different values, then high=false goes first
4245       first = (lhigh == false);
4246     }
4247   }
4248
4249   ptr->intercommMerge(first, newintracomm);
4250   return 0;
4251 }
4252
4253 CDECL
4254 int AMPI_Abort(int comm, int errorcode)
4255 {
4256   AMPIAPI("AMPI_Abort");
4257   CkAbort("AMPI: User called MPI_Abort!\n");
4258   return errorcode;
4259 }
4260
4261 CDECL
4262 int AMPI_Get_count(MPI_Status *sts, MPI_Datatype dtype, int *count){
4263   AMPIAPI("AMPI_Get_count");
4264   CkDDT_DataType* dttype = getDDT()->getType(dtype);
4265   int itemsize = dttype->getSize() ;
4266   *count = sts->MPI_LENGTH/itemsize;
4267   return 0;
4268 }
4269
4270 CDECL
4271 int AMPI_Type_lb(MPI_Datatype dtype, MPI_Aint* displacement){
4272   AMPIAPI("AMPI_Type_lb");
4273   *displacement = getDDT()->getLB(dtype);
4274   return 0;
4275 }
4276
4277 CDECL
4278 int AMPI_Type_ub(MPI_Datatype dtype, MPI_Aint* displacement){
4279   AMPIAPI("AMPI_Type_ub");
4280   *displacement = getDDT()->getUB(dtype);
4281   return 0;
4282 }
4283
4284 CDECL
4285 int AMPI_Address(void* location, MPI_Aint *address){
4286   AMPIAPI("AMPI_Address");
4287   *address = (MPI_Aint)(unsigned long)(char *)location;
4288   return 0;
4289 }
4290
4291 CDECL
4292 int AMPI_Get_elements(MPI_Status *sts, MPI_Datatype dtype, int *count){
4293   AMPIAPI("AMPI_Get_elements");
4294   CkDDT_DataType* dttype = getDDT()->getType(dtype) ;
4295   int basesize = dttype->getBaseSize() ;
4296   if(basesize==0) basesize=dttype->getSize();
4297   *count = sts->MPI_LENGTH/basesize;
4298   return 0;
4299 }
4300
4301 CDECL
4302 int AMPI_Pack(void *inbuf, int incount, MPI_Datatype dtype, void *outbuf,
4303               int outsize, int *position, MPI_Comm comm)
4304 {
4305   AMPIAPI("AMPI_Pack");
4306   CkDDT_DataType* dttype = getDDT()->getType(dtype) ;
4307   int itemsize = dttype->getSize();
4308   dttype->serialize((char*)inbuf, ((char*)outbuf)+(*position), incount, 1);
4309   *position += (itemsize*incount);
4310   return 0;
4311 }
4312
4313 CDECL
4314 int AMPI_Unpack(void *inbuf, int insize, int *position, void *outbuf,
4315               int outcount, MPI_Datatype dtype, MPI_Comm comm)
4316 {
4317   AMPIAPI("AMPI_Unpack");
4318   CkDDT_DataType* dttype = getDDT()->getType(dtype) ;
4319   int itemsize = dttype->getSize();
4320   dttype->serialize(((char*)inbuf+(*position)), (char*)outbuf, outcount, 1);
4321   *position += (itemsize*outcount);
4322   return 0;
4323 }
4324
4325 CDECL
4326 int AMPI_Pack_size(int incount,MPI_Datatype datatype,MPI_Comm comm,int *sz)
4327 {
4328   AMPIAPI("AMPI_Pack_size");
4329   CkDDT_DataType* dttype = getDDT()->getType(datatype) ;
4330   *sz = incount*dttype->getSize() ;
4331   return 0;
4332 }
4333
4334 CDECL
4335 int AMPI_Get_processor_name(char *name, int *resultlen){
4336   AMPIAPI("AMPI_Get_processor_name");
4337   ampiParent *ptr = getAmpiParent();
4338   sprintf(name,"AMPI_VP[%d]_PE[%d]",ptr->thisIndex,ptr->getMyPe());
4339   *resultlen = strlen(name);
4340   return 0;
4341 }
4342
4343 /* Error handling */
4344 #if defined(USE_STDARG)
4345 void error_handler(MPI_Comm *, int *, ...);
4346 #else
4347 void error_handler ( MPI_Comm *, int * );
4348 #endif
4349
4350 CDECL
4351 int AMPI_Errhandler_create(MPI_Handler_function *function, MPI_Errhandler *errhandler){
4352         AMPIAPI("AMPI_Errhandler_create");
4353         return MPI_SUCCESS;
4354 }
4355
4356 CDECL
4357 int AMPI_Errhandler_set(MPI_Comm comm, MPI_Errhandler errhandler){
4358         AMPIAPI("AMPI_Errhandler_set");
4359         return MPI_SUCCESS;
4360 }