ffb8d8ee0e7e8028ef648d70535f123c4ef2bb40
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1 #ifndef AMPI_PRINT_MSG_SIZES
2 #define AMPI_PRINT_MSG_SIZES 0 // Record and print comm routines used & message sizes
3 #endif
4
5 #define AMPIMSGLOG    0
6 #define AMPI_PRINT_IDLE 0
7
8 #include "ampiimpl.h"
9 #include "tcharm.h"
10
11 #if CMK_BIGSIM_CHARM
12 #include "bigsim_logs.h"
13 #endif
14
15 #if CMK_TRACE_ENABLED
16 #include "register.h" // for _chareTable, _entryTable
17 #endif
18
19 // Default is to abort on error, but users can build
20 // AMPI with -DAMPI_ERRHANDLER_RETURN=1 to change it:
21 #if AMPI_ERRHANDLER_RETURN
22 #define AMPI_ERRHANDLER MPI_ERRORS_RETURN
23 #else
24 #define AMPI_ERRHANDLER MPI_ERRORS_ARE_FATAL
25 #endif
26
27 /* change this define to "x" to trace all send/recv's */
28 #define MSG_ORDER_DEBUG(x) //x /* empty */
29 /* change this define to "x" to trace user calls */
30 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl;
31 #define STARTUP_DEBUG(x) //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl;
32 #define FUNCCALL_DEBUG(x) //x /* empty */
33
34 /* For MPI_Get_library_version */
35 extern const char * const CmiCommitID;
36
37 static CkDDT *getDDT() noexcept {
38   return &getAmpiParent()->myDDT;
39 }
40
41 /* if error checking is disabled, ampiErrhandler is defined as a macro in ampiimpl.h */
42 #if AMPI_ERROR_CHECKING
43 int ampiErrhandler(const char* func, int errcode) noexcept {
44   if (AMPI_ERRHANDLER == MPI_ERRORS_ARE_FATAL && errcode != MPI_SUCCESS) {
45     // Abort with a nice message of the form: 'func' failed with error code 'errstr'.
46     //  where 'func' is the name of the failed AMPI_ function and 'errstr'
47     //  is the string returned by AMPI_Error_string for errcode.
48     int funclen = strlen(func);
49     const char* filler = " failed with error code ";
50     int fillerlen = strlen(filler);
51     int errstrlen;
52     char errstr[MPI_MAX_ERROR_STRING];
53     MPI_Error_string(errcode, errstr, &errstrlen);
54     vector<char> str(funclen + fillerlen + errstrlen);
55     strcpy(str.data(), func);
56     strcat(str.data(), filler);
57     strcat(str.data(), errstr);
58     CkAbort(str.data());
59   }
60   return errcode;
61 }
62 #endif
63
64 #if AMPI_PRINT_MSG_SIZES
65 #if !AMPI_ERROR_CHECKING
66 #error "AMPI_PRINT_MSG_SIZES requires AMPI error checking to be enabled!\n"
67 #endif
68 #include <string>
69 #include <sstream>
70 #include "ckliststring.h"
71 CkpvDeclare(CkListString, msgSizesRanks);
72
73 bool ampiParent::isRankRecordingMsgSizes() noexcept {
74   return (!CkpvAccess(msgSizesRanks).isEmpty() && CkpvAccess(msgSizesRanks).includes(thisIndex));
75 }
76
77 void ampiParent::recordMsgSize(const char* func, int msgSize) noexcept {
78   if (isRankRecordingMsgSizes()) {
79     msgSizes[func][msgSize]++;
80   }
81 }
82
83 typedef std::unordered_map<std::string, std::map<int, int> >::iterator outer_itr_t;
84 typedef std::map<int, int>::iterator inner_itr_t;
85
86 void ampiParent::printMsgSizes() noexcept {
87   if (isRankRecordingMsgSizes()) {
88     // Prints msgSizes in the form: "AMPI_Routine: [ (num_msgs: msg_size) ... ]".
89     // Each routine has its messages sorted by size, smallest to largest.
90     std::stringstream ss;
91     ss << std::endl << "Rank " << thisIndex << ":" << std::endl;
92     for (outer_itr_t i = msgSizes.begin(); i != msgSizes.end(); ++i) {
93       ss << i->first << ": [ ";
94       for (inner_itr_t j = i->second.begin(); j != i->second.end(); ++j) {
95         ss << "(" << j->second << ": " << j->first << " B) ";
96       }
97       ss << "]" << std::endl;
98     }
99     CkPrintf("%s", ss.str().c_str());
100   }
101 }
102 #endif //AMPI_PRINT_MSG_SIZES
103
104 inline int checkCommunicator(const char* func, MPI_Comm comm) noexcept {
105   if (comm == MPI_COMM_NULL)
106     return ampiErrhandler(func, MPI_ERR_COMM);
107   return MPI_SUCCESS;
108 }
109
110 inline int checkCount(const char* func, int count) noexcept {
111   if (count < 0)
112     return ampiErrhandler(func, MPI_ERR_COUNT);
113   return MPI_SUCCESS;
114 }
115
116 inline int checkData(const char* func, MPI_Datatype data) noexcept {
117   if (data == MPI_DATATYPE_NULL)
118     return ampiErrhandler(func, MPI_ERR_TYPE);
119   return MPI_SUCCESS;
120 }
121
122 inline int checkTag(const char* func, int tag) noexcept {
123   if (tag != MPI_ANY_TAG && (tag < 0 || tag > MPI_TAG_UB_VALUE))
124     return ampiErrhandler(func, MPI_ERR_TAG);
125   return MPI_SUCCESS;
126 }
127
128 inline int checkRank(const char* func, int rank, MPI_Comm comm) noexcept {
129   int size = (comm == MPI_COMM_NULL) ? 0 : getAmpiInstance(comm)->getSize();
130   if (((rank >= 0) && (rank < size)) ||
131       (rank == MPI_ANY_SOURCE)       ||
132       (rank == MPI_PROC_NULL)        ||
133       (rank == MPI_ROOT))
134     return MPI_SUCCESS;
135   return ampiErrhandler(func, MPI_ERR_RANK);
136 }
137
138 inline int checkBuf(const char* func, const void *buf, int count) noexcept {
139   if ((count != 0 && buf == NULL) || buf == MPI_IN_PLACE)
140     return ampiErrhandler(func, MPI_ERR_BUFFER);
141   return MPI_SUCCESS;
142 }
143
144 int errorCheck(const char* func, MPI_Comm comm, bool ifComm, int count,
145                bool ifCount, MPI_Datatype data, bool ifData, int tag,
146                bool ifTag, int rank, bool ifRank, const void *buf1,
147                bool ifBuf1, const void *buf2=nullptr, bool ifBuf2=false) noexcept {
148   int ret;
149   if (ifComm) {
150     ret = checkCommunicator(func, comm);
151     if (ret != MPI_SUCCESS)
152       return ampiErrhandler(func, ret);
153   }
154   if (ifCount) {
155     ret = checkCount(func, count);
156     if (ret != MPI_SUCCESS)
157       return ampiErrhandler(func, ret);
158   }
159   if (ifData) {
160     ret = checkData(func, data);
161     if (ret != MPI_SUCCESS)
162       return ampiErrhandler(func, ret);
163   }
164   if (ifTag) {
165     ret = checkTag(func, tag);
166     if (ret != MPI_SUCCESS)
167       return ampiErrhandler(func, ret);
168   }
169   if (ifRank) {
170     ret = checkRank(func, rank, comm);
171     if (ret != MPI_SUCCESS)
172       return ampiErrhandler(func, ret);
173   }
174   if (ifBuf1 && ifData) {
175     ret = checkBuf(func, buf1, count*getDDT()->getSize(data));
176     if (ret != MPI_SUCCESS)
177       return ampiErrhandler(func, ret);
178   }
179   if (ifBuf2 && ifData) {
180     ret = checkBuf(func, buf2, count*getDDT()->getSize(data));
181     if (ret != MPI_SUCCESS)
182       return ampiErrhandler(func, ret);
183   }
184 #if AMPI_PRINT_MSG_SIZES
185   getAmpiParent()->recordMsgSize(func, getDDT()->getSize(data) * count);
186 #endif
187   return MPI_SUCCESS;
188 }
189
190 //------------- startup -------------
191 static mpi_comm_worlds mpi_worlds;
192
193 int _mpi_nworlds; /*Accessed by ampif*/
194 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
195
196 class AmpiComplex {
197  public:
198   float re, im;
199   void operator+=(const AmpiComplex &a) noexcept {
200     re+=a.re;
201     im+=a.im;
202   }
203   void operator*=(const AmpiComplex &a) noexcept {
204     float nu_re=re*a.re-im*a.im;
205     im=re*a.im+im*a.re;
206     re=nu_re;
207   }
208   int operator>(const AmpiComplex &a) noexcept {
209     CkAbort("AMPI> Cannot compare complex numbers with MPI_MAX\n");
210     return 0;
211   }
212   int operator<(const AmpiComplex &a) noexcept {
213     CkAbort("AMPI> Cannot compare complex numbers with MPI_MIN\n");
214     return 0;
215   }
216 };
217
218 class AmpiDoubleComplex {
219  public:
220   double re, im;
221   void operator+=(const AmpiDoubleComplex &a) noexcept {
222     re+=a.re;
223     im+=a.im;
224   }
225   void operator*=(const AmpiDoubleComplex &a) noexcept {
226     double nu_re=re*a.re-im*a.im;
227     im=re*a.im+im*a.re;
228     re=nu_re;
229   }
230   int operator>(const AmpiDoubleComplex &a) noexcept {
231     CkAbort("AMPI> Cannot compare double complex numbers with MPI_MAX\n");
232     return 0;
233   }
234   int operator<(const AmpiDoubleComplex &a) noexcept {
235     CkAbort("AMPI> Cannot compare double complex numbers with MPI_MIN\n");
236     return 0;
237   }
238 };
239
240 class AmpiLongDoubleComplex {
241  public:
242   long double re, im;
243   void operator+=(const AmpiLongDoubleComplex &a) noexcept {
244     re+=a.re;
245     im+=a.im;
246   }
247   void operator*=(const AmpiLongDoubleComplex &a) noexcept {
248     long double nu_re=re*a.re-im*a.im;
249     im=re*a.im+im*a.re;
250     re=nu_re;
251   }
252   int operator>(const AmpiLongDoubleComplex &a) noexcept {
253     CkAbort("AMPI> Cannot compare long double complex numbers with MPI_MAX\n");
254     return 0;
255   }
256   int operator<(const AmpiLongDoubleComplex &a) noexcept {
257     CkAbort("AMPI> Cannot compare long double complex numbers with MPI_MIN\n");
258     return 0;
259   }
260 };
261
262 typedef struct { float val; int idx; } FloatInt;
263 typedef struct { double val; int idx; } DoubleInt;
264 typedef struct { long val; int idx; } LongInt;
265 typedef struct { int val; int idx; } IntInt;
266 typedef struct { short val; int idx; } ShortInt;
267 typedef struct { long double val; int idx; } LongdoubleInt;
268 typedef struct { float val; float idx; } FloatFloat;
269 typedef struct { double val; double idx; } DoubleDouble;
270
271 /* For MPI_MAX, MPI_MIN, MPI_SUM, and MPI_PROD: */
272 #define MPI_OP_SWITCH(OPNAME) \
273   int i; \
274 switch (*datatype) { \
275   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
276   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed short int); } break; \
277   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed int); } break; \
278   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long); } break; \
279   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
280   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
281   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
282   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long); } break; \
283   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
284   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
285   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
286   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiDoubleComplex); } break; \
287   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long long); } break; \
288   case MPI_SIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed char); } break; \
289   case MPI_UNSIGNED_LONG_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long long); } break; \
290   case MPI_WCHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(wchar_t); } break; \
291   case MPI_INT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int8_t); } break; \
292   case MPI_INT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int16_t); } break; \
293   case MPI_INT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int32_t); } break; \
294   case MPI_INT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int64_t); } break; \
295   case MPI_UINT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint8_t); } break; \
296   case MPI_UINT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint16_t); } break; \
297   case MPI_UINT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint32_t); } break; \
298   case MPI_UINT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint64_t); } break; \
299   case MPI_FLOAT_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
300   case MPI_LONG_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiLongDoubleComplex); } break; \
301   case MPI_AINT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(MPI_Aint); } break; \
302   default: \
303            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
304   CkAbort("Unsupported MPI datatype for MPI Op"); \
305 };\
306
307 /* For MPI_LAND, MPI_LOR, and MPI_LXOR: */
308 #define MPI_LOGICAL_OP_SWITCH(OPNAME) \
309   int i; \
310 switch (*datatype) { \
311   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed short int); } break; \
312   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed int); } break; \
313   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long); } break; \
314   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
315   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
316   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
317   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long); } break; \
318   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long long); } break; \
319   case MPI_SIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed char); } break; \
320   case MPI_UNSIGNED_LONG_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long long); } break; \
321   case MPI_INT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int8_t); } break; \
322   case MPI_INT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int16_t); } break; \
323   case MPI_INT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int32_t); } break; \
324   case MPI_INT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int64_t); } break; \
325   case MPI_UINT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint8_t); } break; \
326   case MPI_UINT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint16_t); } break; \
327   case MPI_UINT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint32_t); } break; \
328   case MPI_UINT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint64_t); } break; \
329   case MPI_LOGICAL: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
330   case MPI_C_BOOL: for(i=0;i<(*len);i++) { MPI_OP_IMPL(bool); } break; \
331   case MPI_AINT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(MPI_Aint); } break; \
332   default: \
333            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
334   CkAbort("Unsupported MPI datatype for MPI Op"); \
335 };\
336
337 /* For MPI_BAND, MPI_BOR, and MPI_BXOR: */
338 #define MPI_BITWISE_OP_SWITCH(OPNAME) \
339   int i; \
340 switch (*datatype) { \
341   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed short int); } break; \
342   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed int); } break; \
343   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long); } break; \
344   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
345   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
346   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
347   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long); } break; \
348   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long long); } break; \
349   case MPI_SIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed char); } break; \
350   case MPI_UNSIGNED_LONG_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long long); } break; \
351   case MPI_INT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int8_t); } break; \
352   case MPI_INT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int16_t); } break; \
353   case MPI_INT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int32_t); } break; \
354   case MPI_INT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int64_t); } break; \
355   case MPI_UINT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint8_t); } break; \
356   case MPI_UINT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint16_t); } break; \
357   case MPI_UINT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint32_t); } break; \
358   case MPI_UINT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint64_t); } break; \
359   case MPI_BYTE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
360   case MPI_AINT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(MPI_Aint); } break; \
361   default: \
362            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
363   CkAbort("Unsupported MPI datatype for MPI Op"); \
364 };\
365
366 void MPI_MAX_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
367 #define MPI_OP_IMPL(type) \
368   if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
369   MPI_OP_SWITCH(MPI_MAX)
370 #undef MPI_OP_IMPL
371 }
372
373 void MPI_MIN_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
374 #define MPI_OP_IMPL(type) \
375   if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
376   MPI_OP_SWITCH(MPI_MIN)
377 #undef MPI_OP_IMPL
378 }
379
380 void MPI_SUM_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
381 #define MPI_OP_IMPL(type) \
382   ((type *)inoutvec)[i] += ((type *)invec)[i];
383   MPI_OP_SWITCH(MPI_SUM)
384 #undef MPI_OP_IMPL
385 }
386
387 void MPI_PROD_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
388 #define MPI_OP_IMPL(type) \
389   ((type *)inoutvec)[i] *= ((type *)invec)[i];
390   MPI_OP_SWITCH(MPI_PROD)
391 #undef MPI_OP_IMPL
392 }
393
394 void MPI_REPLACE_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
395 #define MPI_OP_IMPL(type) \
396   ((type *)inoutvec)[i] = ((type *)invec)[i];
397   MPI_OP_SWITCH(MPI_REPLACE)
398 #undef MPI_OP_IMPL
399 }
400
401 void MPI_NO_OP_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
402   /* no-op */
403 }
404
405 void MPI_LAND_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
406 #define MPI_OP_IMPL(type) \
407   ((type *)inoutvec)[i] = ((type *)inoutvec)[i] && ((type *)invec)[i];
408   MPI_LOGICAL_OP_SWITCH(MPI_LAND)
409 #undef MPI_OP_IMPL
410 }
411
412 void MPI_BAND_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
413 #define MPI_OP_IMPL(type) \
414   ((type *)inoutvec)[i] = ((type *)inoutvec)[i] & ((type *)invec)[i];
415   MPI_BITWISE_OP_SWITCH(MPI_BAND)
416 #undef MPI_OP_IMPL
417 }
418
419 void MPI_LOR_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
420 #define MPI_OP_IMPL(type) \
421   ((type *)inoutvec)[i] = ((type *)inoutvec)[i] || ((type *)invec)[i];
422   MPI_LOGICAL_OP_SWITCH(MPI_LOR)
423 #undef MPI_OP_IMPL
424 }
425
426 void MPI_BOR_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
427 #define MPI_OP_IMPL(type) \
428   ((type *)inoutvec)[i] = ((type *)inoutvec)[i] | ((type *)invec)[i];
429   MPI_BITWISE_OP_SWITCH(MPI_BOR)
430 #undef MPI_OP_IMPL
431 }
432
433 void MPI_LXOR_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
434 #define MPI_OP_IMPL(type) \
435   ((type *)inoutvec)[i] = (((type *)inoutvec)[i]&&(!((type *)invec)[i]))||(!(((type *)inoutvec)[i])&&((type *)invec)[i]);
436   MPI_LOGICAL_OP_SWITCH(MPI_LXOR)
437 #undef MPI_OP_IMPL
438 }
439
440 void MPI_BXOR_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
441 #define MPI_OP_IMPL(type) \
442   ((type *)inoutvec)[i] = ((type *)inoutvec)[i] ^ ((type *)invec)[i];
443   MPI_BITWISE_OP_SWITCH(MPI_BXOR)
444 #undef MPI_OP_IMPL
445 }
446
447 #ifndef MIN
448 #define MIN(a,b) (a < b ? a : b)
449 #endif
450
451 void MPI_MAXLOC_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
452   int i;
453
454   switch (*datatype) {
455     case MPI_FLOAT_INT:
456       for(i=0;i<(*len);i++){
457         if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
458           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
459         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
460           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
461       }
462       break;
463     case MPI_DOUBLE_INT:
464       for(i=0;i<(*len);i++){
465         if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
466           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
467         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
468           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
469       }
470       break;
471     case MPI_LONG_INT:
472       for(i=0;i<(*len);i++){
473         if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
474           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
475         else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
476           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
477       }
478       break;
479     case MPI_2INT:
480       for(i=0;i<(*len);i++){
481         if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
482           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
483         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
484           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
485       }
486       break;
487     case MPI_SHORT_INT:
488       for(i=0;i<(*len);i++){
489         if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
490           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
491         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
492           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
493       }
494       break;
495     case MPI_LONG_DOUBLE_INT:
496       for(i=0;i<(*len);i++){
497         if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
498           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
499         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
500           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
501       }
502       break;
503     case MPI_2FLOAT:
504       for(i=0;i<(*len);i++){
505         if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
506           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
507         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
508           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
509       }
510       break;
511     case MPI_2DOUBLE:
512       for(i=0;i<(*len);i++){
513         if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
514           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
515         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
516           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
517       }
518       break;
519     default:
520       ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
521       CkAbort("exiting");
522   }
523 }
524
525 void MPI_MINLOC_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
526   int i;
527   switch (*datatype) {
528     case MPI_FLOAT_INT:
529       for(i=0;i<(*len);i++){
530         if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
531           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
532         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
533           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
534       }
535       break;
536     case MPI_DOUBLE_INT:
537       for(i=0;i<(*len);i++){
538         if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
539           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
540         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
541           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
542       }
543       break;
544     case MPI_LONG_INT:
545       for(i=0;i<(*len);i++){
546         if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
547           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
548         else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
549           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
550       }
551       break;
552     case MPI_2INT:
553       for(i=0;i<(*len);i++){
554         if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
555           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
556         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
557           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
558       }
559       break;
560     case MPI_SHORT_INT:
561       for(i=0;i<(*len);i++){
562         if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
563           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
564         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
565           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
566       }
567       break;
568     case MPI_LONG_DOUBLE_INT:
569       for(i=0;i<(*len);i++){
570         if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
571           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
572         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
573           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
574       }
575       break;
576     case MPI_2FLOAT:
577       for(i=0;i<(*len);i++){
578         if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
579           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
580         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
581           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
582       }
583       break;
584     case MPI_2DOUBLE:
585       for(i=0;i<(*len);i++){
586         if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
587           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
588         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
589           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
590       }
591       break;
592     default:
593       ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
594       CkAbort("exiting");
595   }
596 }
597
598 /*
599  * AMPI's generic reducer type, AmpiReducer, is used only
600  * for MPI_Op/MPI_Datatype combinations that Charm++ does
601  * not have built-in support for. AmpiReducer reduction
602  * contributions all contain an AmpiOpHeader, that contains
603  * the function pointer to an MPI_User_function* that is
604  * applied to all contributions in AmpiReducerFunc().
605  *
606  * If AmpiReducer is used, the final reduction message will
607  * have an additional sizeof(AmpiOpHeader) bytes in the
608  * buffer before any user data. ampi::processRednMsg() strips
609  * the header.
610  *
611  * If a non-commutative (user-defined) reduction is used,
612  * ampi::processNoncommutativeRednMsg() strips the headers
613  * and applies the op to all contributions in rank order.
614  */
615 CkReduction::reducerType AmpiReducer;
616
617 // every msg contains a AmpiOpHeader structure before user data
618 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs) noexcept {
619   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
620   MPI_Datatype dtype;
621   int szhdr, szdata, len;
622   MPI_User_function* func;
623   func = hdr->func;
624   dtype = hdr->dtype;
625   szdata = hdr->szdata;
626   len = hdr->len;
627   szhdr = sizeof(AmpiOpHeader);
628
629   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,NULL,AmpiReducer,msgs[0]);
630   void *retPtr = (char *)retmsg->getData() + szhdr;
631   for(int i=1;i<nMsg;i++){
632     (*func)((void *)((char *)msgs[i]->getData()+szhdr),retPtr,&len,&dtype);
633   }
634   return retmsg;
635 }
636
637 static CkReduction::reducerType getBuiltinReducerType(MPI_Datatype type, MPI_Op op) noexcept
638 {
639   switch (type) {
640     case MPI_INT32_T:
641       if (getDDT()->getSize(MPI_INT32_T) != getDDT()->getSize(MPI_INT)) break;
642       // else: fall thru to MPI_INT
643     case MPI_INT:
644       switch (op) {
645         case MPI_MAX:  return CkReduction::max_int;
646         case MPI_MIN:  return CkReduction::min_int;
647         case MPI_SUM:  return CkReduction::sum_int;
648         case MPI_PROD: return CkReduction::product_int;
649         case MPI_LAND: return CkReduction::logical_and_int;
650         case MPI_LOR:  return CkReduction::logical_or_int;
651         case MPI_LXOR: return CkReduction::logical_xor_int;
652         case MPI_BAND: return CkReduction::bitvec_and_int;
653         case MPI_BOR:  return CkReduction::bitvec_or_int;
654         case MPI_BXOR: return CkReduction::bitvec_xor_int;
655         default:       break;
656       }
657     case MPI_FLOAT:
658       switch (op) {
659         case MPI_MAX:  return CkReduction::max_float;
660         case MPI_MIN:  return CkReduction::min_float;
661         case MPI_SUM:  return CkReduction::sum_float;
662         case MPI_PROD: return CkReduction::product_float;
663         default:       break;
664       }
665     case MPI_DOUBLE:
666       switch (op) {
667         case MPI_MAX:  return CkReduction::max_double;
668         case MPI_MIN:  return CkReduction::min_double;
669         case MPI_SUM:  return CkReduction::sum_double;
670         case MPI_PROD: return CkReduction::product_double;
671         default:       break;
672       }
673     case MPI_INT8_T:
674       if (getDDT()->getSize(MPI_INT8_T) != getDDT()->getSize(MPI_CHAR)) break;
675       // else: fall thru to MPI_CHAR
676     case MPI_CHAR:
677       switch (op) {
678         case MPI_MAX:  return CkReduction::max_char;
679         case MPI_MIN:  return CkReduction::min_char;
680         case MPI_SUM:  return CkReduction::sum_char;
681         case MPI_PROD: return CkReduction::product_char;
682         default:       break;
683       }
684     case MPI_INT16_T:
685       if (getDDT()->getSize(MPI_INT16_T) != getDDT()->getSize(MPI_SHORT)) break;
686       // else: fall thru to MPI_SHORT
687     case MPI_SHORT:
688       switch (op) {
689         case MPI_MAX:  return CkReduction::max_short;
690         case MPI_MIN:  return CkReduction::min_short;
691         case MPI_SUM:  return CkReduction::sum_short;
692         case MPI_PROD: return CkReduction::product_short;
693         default:       break;
694       }
695     case MPI_LONG:
696       switch (op) {
697         case MPI_MAX:  return CkReduction::max_long;
698         case MPI_MIN:  return CkReduction::min_long;
699         case MPI_SUM:  return CkReduction::sum_long;
700         case MPI_PROD: return CkReduction::product_long;
701         default:       break;
702       }
703     case MPI_INT64_T:
704       if (getDDT()->getSize(MPI_INT64_T) != getDDT()->getSize(MPI_LONG_LONG)) break;
705       // else: fall thru to MPI_LONG_LONG
706     case MPI_LONG_LONG:
707       switch (op) {
708         case MPI_MAX:  return CkReduction::max_long_long;
709         case MPI_MIN:  return CkReduction::min_long_long;
710         case MPI_SUM:  return CkReduction::sum_long_long;
711         case MPI_PROD: return CkReduction::product_long_long;
712         default:       break;
713       }
714     case MPI_UINT8_T:
715       if (getDDT()->getSize(MPI_UINT8_T) != getDDT()->getSize(MPI_UNSIGNED_CHAR)) break;
716       // else: fall thru to MPI_UNSIGNED_CHAR
717     case MPI_UNSIGNED_CHAR:
718       switch (op) {
719         case MPI_MAX:  return CkReduction::max_uchar;
720         case MPI_MIN:  return CkReduction::min_uchar;
721         case MPI_SUM:  return CkReduction::sum_uchar;
722         case MPI_PROD: return CkReduction::product_uchar;
723         default:       break;
724       }
725     case MPI_UINT16_T:
726       if (getDDT()->getSize(MPI_UINT16_T) != getDDT()->getSize(MPI_UNSIGNED_SHORT)) break;
727       // else: fall thru to MPI_UNSIGNED_SHORT
728     case MPI_UNSIGNED_SHORT:
729       switch (op) {
730         case MPI_MAX:  return CkReduction::max_ushort;
731         case MPI_MIN:  return CkReduction::min_ushort;
732         case MPI_SUM:  return CkReduction::sum_ushort;
733         case MPI_PROD: return CkReduction::product_ushort;
734         default:       break;
735       }
736     case MPI_UINT32_T:
737       if (getDDT()->getSize(MPI_UINT32_T) != getDDT()->getSize(MPI_UNSIGNED)) break;
738       // else: fall thru to MPI_UNSIGNED
739     case MPI_UNSIGNED:
740       switch (op) {
741         case MPI_MAX:  return CkReduction::max_uint;
742         case MPI_MIN:  return CkReduction::min_uint;
743         case MPI_SUM:  return CkReduction::sum_uint;
744         case MPI_PROD: return CkReduction::product_uint;
745         default:       break;
746       }
747     case MPI_UNSIGNED_LONG:
748       switch (op) {
749         case MPI_MAX:  return CkReduction::max_ulong;
750         case MPI_MIN:  return CkReduction::min_ulong;
751         case MPI_SUM:  return CkReduction::sum_ulong;
752         case MPI_PROD: return CkReduction::product_ulong;
753         default:       break;
754       }
755     case MPI_UINT64_T:
756       if (getDDT()->getSize(MPI_UINT64_T) != getDDT()->getSize(MPI_UNSIGNED_LONG_LONG)) break;
757       // else: fall thru to MPI_UNSIGNED_LONG_LONG
758     case MPI_UNSIGNED_LONG_LONG:
759       switch (op) {
760         case MPI_MAX:  return CkReduction::max_ulong_long;
761         case MPI_MIN:  return CkReduction::min_ulong_long;
762         case MPI_SUM:  return CkReduction::sum_ulong_long;
763         case MPI_PROD: return CkReduction::product_ulong_long;
764         default:       break;
765       }
766     case MPI_C_BOOL:
767       switch (op) {
768         case MPI_LAND: return CkReduction::logical_and_bool;
769         case MPI_LOR:  return CkReduction::logical_or_bool;
770         case MPI_LXOR: return CkReduction::logical_xor_bool;
771         default:       break;
772       }
773     case MPI_LOGICAL:
774       switch (op) {
775         case MPI_LAND: return CkReduction::logical_and_int;
776         case MPI_LOR:  return CkReduction::logical_or_int;
777         case MPI_LXOR: return CkReduction::logical_xor_int;
778         default:       break;
779       }
780     case MPI_BYTE:
781       switch (op) {
782         case MPI_BAND: return CkReduction::bitvec_and_bool;
783         case MPI_BOR:  return CkReduction::bitvec_or_bool;
784         case MPI_BXOR: return CkReduction::bitvec_xor_bool;
785         default:       break;
786       }
787     default:
788       break;
789   }
790   return CkReduction::invalid;
791 }
792
793 class Builtin_kvs{
794  public:
795   int tag_ub,host,io,wtime_is_global,appnum,lastusedcode,universe_size;
796   int win_disp_unit,win_create_flavor,win_model;
797   int ampi_tmp;
798   void* win_base;
799   MPI_Aint win_size;
800   Builtin_kvs() noexcept {
801     tag_ub = MPI_TAG_UB_VALUE;
802     host = MPI_PROC_NULL;
803     io = 0;
804     wtime_is_global = 0;
805     appnum = 0;
806     lastusedcode = MPI_ERR_LASTCODE;
807     universe_size = 0;
808     win_base = NULL;
809     win_size = 0;
810     win_disp_unit = 0;
811     win_create_flavor = MPI_WIN_FLAVOR_CREATE;
812     win_model = MPI_WIN_SEPARATE;
813     ampi_tmp = 0;
814   }
815 };
816
817 // ------------ startup support -----------
818 int _ampi_fallback_setup_count = -1;
819 CLINKAGE void AMPI_Setup(void);
820 FLINKAGE void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
821
822 FLINKAGE void FTN_NAME(MPI_MAIN,mpi_main)(void);
823
824 /*Main routine used when missing MPI_Setup routine*/
825 CLINKAGE
826 void AMPI_Fallback_Main(int argc,char **argv)
827 {
828   AMPI_Main_cpp();
829   AMPI_Main_cpp(argc,argv);
830   AMPI_Main_c(argc,argv);
831   FTN_NAME(MPI_MAIN,mpi_main)();
832 }
833
834 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
835 /*Startup routine used if user *doesn't* write
836   a TCHARM_User_setup routine.
837  */
838 CLINKAGE
839 void AMPI_Setup_Switch(void) {
840   _ampi_fallback_setup_count=0;
841   FTN_NAME(AMPI_SETUP,ampi_setup)();
842   AMPI_Setup();
843   if (_ampi_fallback_setup_count==2)
844   { //Missing AMPI_Setup in both C and Fortran:
845     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
846   }
847 }
848
849 int AMPI_RDMA_THRESHOLD = AMPI_RDMA_THRESHOLD_DEFAULT;
850 int AMPI_SMP_RDMA_THRESHOLD = AMPI_SMP_RDMA_THRESHOLD_DEFAULT;
851 static bool nodeinit_has_been_called=false;
852 CtvDeclare(ampiParent*, ampiPtr);
853 CtvDeclare(bool, ampiInitDone);
854 CtvDeclare(void*,stackBottom);
855 CtvDeclare(bool, ampiFinalized);
856 CkpvDeclare(Builtin_kvs, bikvs);
857 CkpvDeclare(int, ampiThreadLevel);
858 CkpvDeclare(AmpiMsgPool, msgPool);
859
860 CLINKAGE
861 long ampiCurrentStackUsage(void){
862   int localVariable;
863
864   unsigned long p1 =  (unsigned long)(uintptr_t)((void*)&localVariable);
865   unsigned long p2 =  (unsigned long)(uintptr_t)(CtvAccess(stackBottom));
866
867   if(p1 > p2)
868     return p1 - p2;
869   else
870     return  p2 - p1;
871 }
872
873 FLINKAGE
874 void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
875   long usage = ampiCurrentStackUsage();
876   CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
877 }
878
879 CLINKAGE
880 void AMPI_threadstart(void *data);
881 static int AMPI_threadstart_idx = -1;
882
883 #if CMK_TRACE_ENABLED
884 CsvExtern(funcmap*, tcharm_funcmap);
885 #endif
886
887 // Predefined datatype's and op's are readonly, so store them only once per process here:
888 static const std::array<const CkDDT_DataType *, AMPI_MAX_PREDEFINED_TYPE+1> ampiPredefinedTypes = CkDDT::createPredefinedTypes();
889
890 static constexpr std::array<MPI_User_function*, AMPI_MAX_PREDEFINED_OP+1> ampiPredefinedOps = {{
891   MPI_MAX_USER_FN,
892   MPI_MIN_USER_FN,
893   MPI_SUM_USER_FN,
894   MPI_PROD_USER_FN,
895   MPI_LAND_USER_FN,
896   MPI_BAND_USER_FN,
897   MPI_LOR_USER_FN,
898   MPI_BOR_USER_FN,
899   MPI_LXOR_USER_FN,
900   MPI_BXOR_USER_FN,
901   MPI_MAXLOC_USER_FN,
902   MPI_MINLOC_USER_FN,
903   MPI_REPLACE_USER_FN,
904   MPI_NO_OP_USER_FN
905 }};
906
907 static void ampiNodeInit() noexcept
908 {
909 #if CMK_TRACE_ENABLED
910   TCharm::nodeInit(); // make sure tcharm_funcmap is set up
911   int funclength = sizeof(funclist)/sizeof(char*);
912   for (int i=0; i<funclength; i++) {
913     int event_id = traceRegisterUserEvent(funclist[i], -1);
914     CsvAccess(tcharm_funcmap)->insert(std::pair<std::string, int>(funclist[i], event_id));
915   }
916
917   // rename chare & function to something reasonable
918   // TODO: find a better way to do this
919   for (int i=0; i<_chareTable.size(); i++){
920     if (strcmp(_chareTable[i]->name, "dummy_thread_chare") == 0)
921       _chareTable[i]->name = "AMPI";
922   }
923   for (int i=0; i<_entryTable.size(); i++){
924     if (strcmp(_entryTable[i]->name, "dummy_thread_ep") == 0)
925       _entryTable[i]->setName("rank");
926   }
927 #endif
928
929   _mpi_nworlds=0;
930   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
931   {
932     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
933   }
934   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
935
936   /* read AMPI environment variables */
937   char *value;
938   bool rdmaSet = false;
939   if ((value = getenv("AMPI_RDMA_THRESHOLD"))) {
940     AMPI_RDMA_THRESHOLD = atoi(value);
941     rdmaSet = true;
942   }
943   if ((value = getenv("AMPI_SMP_RDMA_THRESHOLD"))) {
944     AMPI_SMP_RDMA_THRESHOLD = atoi(value);
945     rdmaSet = true;
946   }
947   if (rdmaSet && CkMyNode() == 0) {
948 #if AMPI_RDMA_IMPL
949     CkPrintf("AMPI> RDMA threshold is %d Bytes and SMP RDMA threshold is %d Bytes.\n", AMPI_RDMA_THRESHOLD, AMPI_SMP_RDMA_THRESHOLD);
950 #else
951     CkPrintf("Warning: AMPI RDMA threshold ignored since AMPI RDMA is disabled.\n");
952 #endif
953   }
954
955   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc, true /*streamable*/, "AmpiReducerFunc");
956
957   CkAssert(AMPI_threadstart_idx == -1);    // only initialize once
958   AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
959
960   nodeinit_has_been_called=true;
961
962    // ASSUME NO ANYTIME MIGRATION and STATIC INSERTON
963   _isAnytimeMigration = false;
964   _isStaticInsertion = true;
965 }
966
967 #if AMPI_PRINT_IDLE
968 static double totalidle=0.0, startT=0.0;
969 static int beginHandle, endHandle;
970 static void BeginIdle(void *dummy,double curWallTime) noexcept
971 {
972   startT = curWallTime;
973 }
974 static void EndIdle(void *dummy,double curWallTime) noexcept
975 {
976   totalidle += curWallTime - startT;
977 }
978 #endif
979
980 static void ampiProcInit() noexcept {
981   CtvInitialize(ampiParent*, ampiPtr);
982   CtvInitialize(bool,ampiInitDone);
983   CtvInitialize(bool,ampiFinalized);
984   CtvInitialize(void*,stackBottom);
985
986   CkpvInitialize(int, ampiThreadLevel);
987   CkpvAccess(ampiThreadLevel) = MPI_THREAD_SINGLE;
988
989   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
990   CkpvAccess(bikvs) = Builtin_kvs();
991
992   CkpvInitialize(AmpiMsgPool, msgPool); // pool of small AmpiMsg's
993   CkpvAccess(msgPool) = AmpiMsgPool(AMPI_MSG_POOL_SIZE, AMPI_POOLED_MSG_SIZE);
994
995 #if AMPIMSGLOG
996   char **argv=CkGetArgv();
997   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
998   if (CmiGetArgIntDesc(argv,"+msgLogRead", &msgLogRank, "Re-play message processing order for AMPI")) {
999     msgLogRead = 1;
1000   }
1001   char *procs = NULL;
1002   if (CmiGetArgStringDesc(argv, "+msgLogRanks", &procs, "A list of AMPI processors to record , e.g. 0,10,20-30")) {
1003     msgLogRanks.set(procs);
1004   }
1005   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
1006   if (CkMyPe() == 0) {
1007     if (msgLogWrite) CkPrintf("Writing AMPI messages of rank %s to log: %s\n", procs?procs:"", msgLogFilename);
1008     if (msgLogRead) CkPrintf("Reading AMPI messages of rank %s from log: %s\n", procs?procs:"", msgLogFilename);
1009   }
1010 #endif
1011
1012 #if AMPI_PRINT_MSG_SIZES
1013   // Only record and print message sizes if this option is given, and only for those ranks.
1014   // Running with the '+syncprint' option is recommended if printing from multiple ranks.
1015   char *ranks = NULL;
1016   CkpvInitialize(CkListString, msgSizesRanks);
1017   if (CmiGetArgStringDesc(CkGetArgv(), "+msgSizesRanks", &ranks,
1018       "A list of AMPI ranks to record and print message sizes on, e.g. 0,10,20-30")) {
1019     CkpvAccess(msgSizesRanks).set(ranks);
1020   }
1021 #endif
1022 }
1023
1024 #if AMPIMSGLOG
1025 static inline int record_msglog(int rank) noexcept {
1026   return msgLogRanks.includes(rank);
1027 }
1028 #endif
1029
1030 PUPfunctionpointer(MPI_MainFn)
1031
1032 class MPI_threadstart_t {
1033  public:
1034   MPI_MainFn fn;
1035   MPI_threadstart_t() noexcept {}
1036   MPI_threadstart_t(MPI_MainFn fn_) noexcept :fn(fn_) {}
1037   void start() {
1038     char **argv=CmiCopyArgs(CkGetArgv());
1039     int argc=CkGetArgc();
1040
1041     // Set a pointer to somewhere close to the bottom of the stack.
1042     // This is used for roughly estimating the stack usage later.
1043     CtvAccess(stackBottom) = &argv;
1044
1045 #if !CMK_NO_BUILD_SHARED
1046     // If charm++ is built with shared libraries, it does not support
1047     // a custom AMPI_Setup method and always uses AMPI_Fallback_Main.
1048     // Works around bug #1508.
1049     if (_ampi_fallback_setup_count != -1 && _ampi_fallback_setup_count != 2 && CkMyPe() == 0) {
1050       CkAbort("AMPI> The application provided a custom AMPI_Setup() method, "
1051       "but AMPI is built with shared library support. This is an unsupported "
1052       "configuration. Please recompile charm++/AMPI without `-build-shared` or "
1053       "remove the AMPI_Setup() function from your application.\n");
1054     }
1055     AMPI_Fallback_Main(argc,argv);
1056 #else
1057     (fn)(argc,argv);
1058 #endif
1059   }
1060   void pup(PUP::er &p) noexcept {
1061     p|fn;
1062   }
1063 };
1064 PUPmarshall(MPI_threadstart_t)
1065
1066 CLINKAGE
1067 void AMPI_threadstart(void *data)
1068 {
1069   STARTUP_DEBUG("MPI_threadstart")
1070   MPI_threadstart_t t;
1071   pupFromBuf(data,t);
1072 #if CMK_TRACE_IN_CHARM
1073   if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
1074 #endif
1075   t.start();
1076 }
1077
1078 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
1079 {
1080   STARTUP_DEBUG("ampiCreateMain")
1081   int _nchunks=TCHARM_Get_num_chunks();
1082   //Make a new threads array:
1083   MPI_threadstart_t s(mainFn);
1084   memBuf b; pupIntoBuf(b,s);
1085   TCHARM_Create_data(_nchunks,AMPI_threadstart_idx,
1086                      b.getData(), b.getSize());
1087 }
1088
1089 /* TCharm Semaphore ID's for AMPI startup */
1090 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
1091 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
1092
1093 static CProxy_ampiWorlds ampiWorldsGroup;
1094
1095 // Create MPI_COMM_SELF from MPI_COMM_WORLD
1096 static void createCommSelf() noexcept {
1097   STARTUP_DEBUG("ampiInit> creating MPI_COMM_SELF")
1098   MPI_Comm selfComm;
1099   MPI_Group worldGroup, selfGroup;
1100   int ranks[1] = { getAmpiInstance(MPI_COMM_WORLD)->getRank() };
1101
1102   MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
1103   MPI_Group_incl(worldGroup, 1, ranks, &selfGroup);
1104   MPI_Comm_create(MPI_COMM_WORLD, selfGroup, &selfComm);
1105   MPI_Comm_set_name(selfComm, "MPI_COMM_SELF");
1106
1107   CkAssert(selfComm == MPI_COMM_SELF);
1108   STARTUP_DEBUG("ampiInit> created MPI_COMM_SELF")
1109 }
1110
1111 /*
1112    Called from MPI_Init, a collective initialization call:
1113    creates a new AMPI array and attaches it to the current
1114    set of TCHARM threads.
1115  */
1116 static ampi *ampiInit(char **argv) noexcept
1117 {
1118   FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CkMyPe(), TCHARM_Element());)
1119   if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
1120   STARTUP_DEBUG("ampiInit> begin")
1121
1122   MPI_Comm new_world;
1123   int _nchunks;
1124   CkArrayOptions opts;
1125   CProxy_ampiParent parent;
1126   if (TCHARM_Element()==0) //the rank of a tcharm object
1127   { /* I'm responsible for building the arrays: */
1128     STARTUP_DEBUG("ampiInit> creating arrays")
1129
1130     // FIXME: Need to serialize global communicator allocation in one place.
1131     //Allocate the next communicator
1132     if(_mpi_nworlds == MPI_MAX_COMM_WORLDS)
1133     {
1134       CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
1135     }
1136     int new_idx=_mpi_nworlds;
1137     new_world=MPI_COMM_WORLD+new_idx;
1138
1139     //Create and attach the ampiParent array
1140     CkArrayID threads;
1141     opts=TCHARM_Attach_start(&threads,&_nchunks);
1142     opts.setSectionAutoDelegate(false);
1143     CkArrayCreatedMsg *m;
1144     CProxy_ampiParent::ckNew(new_world, threads, _nchunks, opts, CkCallbackResumeThread((void*&)m));
1145     parent = CProxy_ampiParent(m->aid);
1146     delete m;
1147     STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
1148   }
1149   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
1150
1151   FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
1152
1153   if (TCHARM_Element()==0)
1154   {
1155     //Make a new ampi array
1156     CkArrayID empty;
1157
1158     ampiCommStruct worldComm(new_world,empty,_nchunks);
1159     CProxy_ampi arr;
1160     CkArrayCreatedMsg *m;
1161     CProxy_ampi::ckNew(parent, worldComm, opts, CkCallbackResumeThread((void*&)m));
1162     arr = CProxy_ampi(m->aid);
1163     delete m;
1164
1165     //Broadcast info. to the mpi_worlds array
1166     // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
1167     ampiCommStruct newComm(new_world,arr,_nchunks);
1168     if (ampiWorldsGroup.ckGetGroupID().isZero())
1169       ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
1170     else
1171       ampiWorldsGroup.add(newComm);
1172     STARTUP_DEBUG("ampiInit> arrays created")
1173   }
1174
1175   // Find our ampi object:
1176   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
1177   CtvAccess(ampiInitDone)=true;
1178   CtvAccess(ampiFinalized)=false;
1179   STARTUP_DEBUG("ampiInit> complete")
1180 #if CMK_BIGSIM_CHARM
1181     //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
1182     TRACE_BG_ADD_TAG("AMPI_START");
1183 #endif
1184
1185   ampiParent* pptr = getAmpiParent();
1186   vector<int>& keyvals = pptr->getKeyvals(MPI_COMM_WORLD);
1187   pptr->setAttr(MPI_COMM_WORLD, keyvals, MPI_UNIVERSE_SIZE, &_nchunks);
1188   ptr->setCommName("MPI_COMM_WORLD");
1189
1190   pptr->ampiInitCallDone = 0;
1191
1192   CProxy_ampi cbproxy = ptr->getProxy();
1193   CkCallback cb(CkReductionTarget(ampi, allInitDone), cbproxy[0]);
1194   ptr->contribute(cb);
1195
1196   ampiParent *thisParent = getAmpiParent();
1197   while(thisParent->ampiInitCallDone!=1){
1198     thisParent->getTCharmThread()->stop();
1199     /*
1200      * thisParent needs to be updated in case of the parent is being pupped.
1201      * In such case, thisParent got changed
1202      */
1203     thisParent = getAmpiParent();
1204   }
1205
1206   createCommSelf();
1207
1208 #if CMK_BIGSIM_CHARM
1209   BgSetStartOutOfCore();
1210 #endif
1211
1212   return ptr;
1213 }
1214
1215 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
1216 class ampiWorlds : public CBase_ampiWorlds {
1217  public:
1218   ampiWorlds(const ampiCommStruct &nextWorld) noexcept {
1219     ampiWorldsGroup=thisgroup;
1220     add(nextWorld);
1221   }
1222   ampiWorlds(CkMigrateMessage *m) noexcept : CBase_ampiWorlds(m) {}
1223   void pup(PUP::er &p) noexcept { }
1224   void add(const ampiCommStruct &nextWorld) noexcept {
1225     int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD);
1226     mpi_worlds[new_idx]=nextWorld;
1227     if (_mpi_nworlds<=new_idx) _mpi_nworlds=new_idx+1;
1228     STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
1229   }
1230 };
1231
1232 //-------------------- ampiParent -------------------------
1233 ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_,int nRanks_) noexcept
1234   : threads(threads_), ampiReqs(64, &reqPool), myDDT(ampiPredefinedTypes),
1235     worldNo(worldNo_), predefinedOps(ampiPredefinedOps), isTmpRProxySet(false)
1236 {
1237   int barrier = 0x1234;
1238   STARTUP_DEBUG("ampiParent> starting up")
1239   thread=NULL;
1240   worldPtr=NULL;
1241   userAboutToMigrateFn=NULL;
1242   userJustMigratedFn=NULL;
1243   prepareCtv();
1244
1245   // Allocate an empty groupStruct to represent MPI_EMPTY_GROUP
1246   groups.push_back(new groupStruct);
1247
1248   init();
1249
1250   //ensure MPI_INFO_ENV will always be first info object
1251   defineInfoEnv(nRanks_);
1252   // define Info objects for AMPI_Migrate calls
1253   defineInfoMigration();
1254
1255   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
1256
1257 #if CMK_FAULT_EVAC
1258   AsyncEvacuate(false);
1259 #endif
1260 }
1261
1262 ampiParent::ampiParent(CkMigrateMessage *msg) noexcept
1263   : CBase_ampiParent(msg), myDDT(ampiPredefinedTypes), predefinedOps(ampiPredefinedOps)
1264 {
1265   thread=NULL;
1266   worldPtr=NULL;
1267
1268   init();
1269
1270 #if CMK_FAULT_EVAC
1271   AsyncEvacuate(false);
1272 #endif
1273 }
1274
1275 PUPfunctionpointer(MPI_MigrateFn)
1276
1277 void ampiParent::pup(PUP::er &p) noexcept {
1278   p|threads;
1279   p|worldNo;
1280   p|myDDT;
1281   p|splitComm;
1282   p|groupComm;
1283   p|cartComm;
1284   p|graphComm;
1285   p|distGraphComm;
1286   p|interComm;
1287   p|intraComm;
1288
1289   p|groups;
1290   p|winStructList;
1291   p|infos;
1292   p|userOps;
1293
1294   p|reqPool;
1295   ampiReqs.pup(p, &reqPool);
1296
1297   p|kvlist;
1298   p|isTmpRProxySet;
1299   p|tmpRProxy;
1300
1301   p|userAboutToMigrateFn;
1302   p|userJustMigratedFn;
1303
1304   p|ampiInitCallDone;
1305   p|resumeOnRecv;
1306   p|resumeOnColl;
1307   p|numBlockedReqs;
1308   p|bsendBufferSize;
1309   p((char *)&bsendBuffer, sizeof(void *));
1310
1311   // pup blockingReq
1312   AmpiReqType reqType;
1313   if (!p.isUnpacking()) {
1314     if (blockingReq) {
1315       reqType = blockingReq->getType();
1316     } else {
1317       reqType = AMPI_INVALID_REQ;
1318     }
1319   }
1320   p|reqType;
1321   if (reqType != AMPI_INVALID_REQ) {
1322     if (p.isUnpacking()) {
1323       switch (reqType) {
1324         case AMPI_I_REQ:
1325           blockingReq = new IReq;
1326           break;
1327         case AMPI_REDN_REQ:
1328           blockingReq = new RednReq;
1329           break;
1330         case AMPI_GATHER_REQ:
1331           blockingReq = new GatherReq;
1332           break;
1333         case AMPI_GATHERV_REQ:
1334           blockingReq = new GathervReq;
1335           break;
1336         case AMPI_SEND_REQ:
1337           blockingReq = new SendReq;
1338           break;
1339         case AMPI_SSEND_REQ:
1340           blockingReq = new SsendReq;
1341           break;
1342         case AMPI_ATA_REQ:
1343           blockingReq = new ATAReq;
1344           break;
1345         case AMPI_G_REQ:
1346           blockingReq = new GReq;
1347           break;
1348 #if CMK_CUDA
1349         case AMPI_GPU_REQ:
1350           CkAbort("AMPI> error trying to PUP a non-migratable GPU request!");
1351           break;
1352 #endif
1353         case AMPI_INVALID_REQ:
1354           CkAbort("AMPI> error trying to PUP an invalid request!");
1355           break;
1356       }
1357     }
1358     blockingReq->pup(p);
1359   } else {
1360     blockingReq = NULL;
1361   }
1362   if (p.isDeleting()) {
1363     delete blockingReq; blockingReq = NULL;
1364   }
1365
1366 #if AMPI_PRINT_MSG_SIZES
1367   p|msgSizes;
1368 #endif
1369 }
1370
1371 void ampiParent::prepareCtv() noexcept {
1372   thread=threads[thisIndex].ckLocal();
1373   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
1374   CtvAccessOther(thread->getThread(),ampiPtr) = this;
1375   STARTUP_DEBUG("ampiParent> found TCharm")
1376 }
1377
1378 void ampiParent::init() noexcept{
1379   resumeOnRecv = false;
1380   resumeOnColl = false;
1381   numBlockedReqs = 0;
1382   bsendBufferSize = 0;
1383   bsendBuffer = NULL;
1384   blockingReq = NULL;
1385 #if AMPIMSGLOG
1386   if(msgLogWrite && record_msglog(thisIndex)){
1387     char fname[128];
1388     sprintf(fname, "%s.%d", msgLogFilename,thisIndex);
1389 #if CMK_USE_ZLIB && 0
1390     fMsgLog = gzopen(fname,"wb");
1391     toPUPer = new PUP::tozDisk(fMsgLog);
1392 #else
1393     fMsgLog = fopen(fname,"wb");
1394     CkAssert(fMsgLog != NULL);
1395     toPUPer = new PUP::toDisk(fMsgLog);
1396 #endif
1397   }else if(msgLogRead){
1398     char fname[128];
1399     sprintf(fname, "%s.%d", msgLogFilename,msgLogRank);
1400 #if CMK_USE_ZLIB && 0
1401     fMsgLog = gzopen(fname,"rb");
1402     fromPUPer = new PUP::fromzDisk(fMsgLog);
1403 #else
1404     fMsgLog = fopen(fname,"rb");
1405     CkAssert(fMsgLog != NULL);
1406     fromPUPer = new PUP::fromDisk(fMsgLog);
1407 #endif
1408     CkPrintf("AMPI> opened message log file: %s for replay\n", fname);
1409   }
1410 #endif
1411 }
1412
1413 void ampiParent::finalize() noexcept {
1414 #if AMPIMSGLOG
1415   if(msgLogWrite && record_msglog(thisIndex)){
1416     delete toPUPer;
1417 #if CMK_USE_ZLIB && 0
1418     gzclose(fMsgLog);
1419 #else
1420     fclose(fMsgLog);
1421 #endif
1422   }else if(msgLogRead){
1423     delete fromPUPer;
1424 #if CMK_USE_ZLIB && 0
1425     gzclose(fMsgLog);
1426 #else
1427     fclose(fMsgLog);
1428 #endif
1429   }
1430 #endif
1431 }
1432
1433 void ampiParent::setUserAboutToMigrateFn(MPI_MigrateFn f) noexcept {
1434   userAboutToMigrateFn = f;
1435 }
1436
1437 void ampiParent::setUserJustMigratedFn(MPI_MigrateFn f) noexcept {
1438   userJustMigratedFn = f;
1439 }
1440
1441 void ampiParent::ckAboutToMigrate() noexcept {
1442   if (userAboutToMigrateFn) {
1443     (*userAboutToMigrateFn)();
1444   }
1445 }
1446
1447 void ampiParent::ckJustMigrated() noexcept {
1448   ArrayElement1D::ckJustMigrated();
1449   prepareCtv();
1450   if (userJustMigratedFn) {
1451     (*userJustMigratedFn)();
1452   }
1453 }
1454
1455 void ampiParent::ckJustRestored() noexcept {
1456   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
1457   ArrayElement1D::ckJustRestored();
1458   prepareCtv();
1459 }
1460
1461 ampiParent::~ampiParent() noexcept {
1462   STARTUP_DEBUG("ampiParent> destructor called");
1463   finalize();
1464 }
1465
1466 const ampiCommStruct& ampiParent::getWorldStruct() const noexcept {
1467   return worldPtr->getCommStruct();
1468 }
1469
1470 //Children call this when they are first created or just migrated
1471 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration) noexcept
1472 {
1473   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
1474
1475   if (s.getComm()>=MPI_COMM_WORLD)
1476   { //We now have our COMM_WORLD-- register it
1477     //Note that split communicators don't keep a raw pointer, so
1478     //they don't need to re-register on migration.
1479     if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
1480     worldPtr=ptr;
1481   }
1482
1483   if (forMigration) { //Restore AmpiRequest*'s in postedReqs:
1484     AmmEntry<AmpiRequest *> *e = ptr->postedReqs.first;
1485     while (e) {
1486       // AmmPupPostedReqs() packed these as MPI_Requests
1487       MPI_Request reqIdx = (MPI_Request)(intptr_t)e->msg;
1488       CkAssert(reqIdx != MPI_REQUEST_NULL);
1489       AmpiRequest* req = ampiReqs[reqIdx];
1490       CkAssert(req);
1491       e->msg = req;
1492       e = e->next;
1493     }
1494   }
1495   else { //Register the new communicator:
1496     MPI_Comm comm = s.getComm();
1497     STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
1498     if (comm>=MPI_COMM_WORLD) {
1499       // Pass the new ampi to the waiting ampiInit
1500       thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
1501     } else if (isSplit(comm)) {
1502       splitChildRegister(s);
1503     } else if (isGroup(comm)) {
1504       groupChildRegister(s);
1505     } else if (isCart(comm)) {
1506       cartChildRegister(s);
1507     } else if (isGraph(comm)) {
1508       graphChildRegister(s);
1509     } else if (isDistGraph(comm)) {
1510       distGraphChildRegister(s);
1511     } else if (isInter(comm)) {
1512       interChildRegister(s);
1513     } else if (isIntra(comm)) {
1514       intraChildRegister(s);
1515     }else
1516       CkAbort("ampiParent received child with bad communicator");
1517   }
1518
1519   return thread;
1520 }
1521
1522 // reduction client data - preparation for checkpointing
1523 class ckptClientStruct {
1524  public:
1525   const char *dname;
1526   ampiParent *ampiPtr;
1527   ckptClientStruct(const char *s, ampiParent *a) noexcept : dname(s), ampiPtr(a) {}
1528 };
1529
1530 static void checkpointClient(void *param,void *msg) noexcept
1531 {
1532   ckptClientStruct *client = (ckptClientStruct*)param;
1533   const char *dname = client->dname;
1534   ampiParent *ampiPtr = client->ampiPtr;
1535   ampiPtr->Checkpoint(strlen(dname), dname);
1536   delete client;
1537 }
1538
1539 void ampiParent::startCheckpoint(const char* dname) noexcept {
1540   if (thisIndex==0) {
1541     ckptClientStruct *clientData = new ckptClientStruct(dname, this);
1542     CkCallback *cb = new CkCallback(checkpointClient, clientData);
1543     thisProxy.ckSetReductionClient(cb);
1544   }
1545   contribute();
1546
1547   thread->stop();
1548
1549 #if CMK_BIGSIM_CHARM
1550   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
1551 #endif
1552 }
1553
1554 void ampiParent::Checkpoint(int len, const char* dname) noexcept {
1555   if (len == 0) {
1556     // memory checkpoint
1557     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1558     CkStartMemCheckpoint(cb);
1559   }
1560   else {
1561     char dirname[256];
1562     strncpy(dirname,dname,len);
1563     dirname[len]='\0';
1564     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1565     CkStartCheckpoint(dirname,cb);
1566   }
1567 }
1568
1569 void ampiParent::ResumeThread() noexcept {
1570   thread->resume();
1571 }
1572
1573 int ampiParent::createKeyval(MPI_Comm_copy_attr_function *copy_fn, MPI_Comm_delete_attr_function *delete_fn,
1574                              int *keyval, void* extra_state) noexcept {
1575   KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
1576   int idx = kvlist.size();
1577   kvlist.resize(idx+1);
1578   kvlist[idx] = newnode;
1579   *keyval = idx;
1580   return 0;
1581 }
1582
1583 int ampiParent::setUserKeyval(int context, int keyval, void *attribute_val) noexcept {
1584 #if AMPI_ERROR_CHECKING
1585   if (keyval < 0 || keyval >= kvlist.size() || kvlist[keyval] == NULL) {
1586     return MPI_ERR_KEYVAL;
1587   }
1588 #endif
1589   KeyvalNode &kv = *kvlist[keyval];
1590   if (kv.hasVal()) {
1591     int ret = (*kv.delete_fn)(context, keyval, kv.val, kv.extra_state);
1592     if (ret != MPI_SUCCESS) {
1593       return ret;
1594     }
1595   }
1596   kvlist[keyval]->setVal(attribute_val);
1597   return MPI_SUCCESS;
1598 }
1599
1600 int ampiParent::setAttr(int context, vector<int>& keyvals, int keyval, void* attribute_val) noexcept {
1601   if (kv_set_builtin(keyval, attribute_val)) {
1602     return MPI_SUCCESS;
1603   }
1604   keyvals.push_back(keyval);
1605   kvlist[keyval]->incRefCount();
1606   return setUserKeyval(context, keyval, attribute_val);
1607 }
1608
1609 bool ampiParent::kv_set_builtin(int keyval, void* attribute_val) noexcept {
1610   switch(keyval) {
1611     case MPI_TAG_UB:            /*immutable*/ return false;
1612     case MPI_HOST:              /*immutable*/ return false;
1613     case MPI_IO:                /*immutable*/ return false;
1614     case MPI_WTIME_IS_GLOBAL:   /*immutable*/ return false;
1615     case MPI_APPNUM:            /*immutable*/ return false;
1616     case MPI_LASTUSEDCODE:      /*immutable*/ return false;
1617     case MPI_UNIVERSE_SIZE:     (CkpvAccess(bikvs).universe_size)     = *((int*)attribute_val);      return true;
1618     case MPI_WIN_BASE:          (CkpvAccess(bikvs).win_base)          = attribute_val;               return true;
1619     case MPI_WIN_SIZE:          (CkpvAccess(bikvs).win_size)          = *((MPI_Aint*)attribute_val); return true;
1620     case MPI_WIN_DISP_UNIT:     (CkpvAccess(bikvs).win_disp_unit)     = *((int*)attribute_val);      return true;
1621     case MPI_WIN_CREATE_FLAVOR: (CkpvAccess(bikvs).win_create_flavor) = *((int*)attribute_val);      return true;
1622     case MPI_WIN_MODEL:         (CkpvAccess(bikvs).win_model)         = *((int*)attribute_val);      return true;
1623     case AMPI_MY_WTH:           /*immutable*/ return false;
1624     case AMPI_NUM_WTHS:         /*immutable*/ return false;
1625     case AMPI_MY_PROCESS:       /*immutable*/ return false;
1626     case AMPI_NUM_PROCESSES:    /*immutable*/ return false;
1627     default: return false;
1628   };
1629 }
1630
1631 bool ampiParent::kv_get_builtin(int keyval) noexcept {
1632   switch(keyval) {
1633     case MPI_TAG_UB:            kv_builtin_storage = &(CkpvAccess(bikvs).tag_ub);             return true;
1634     case MPI_HOST:              kv_builtin_storage = &(CkpvAccess(bikvs).host);               return true;
1635     case MPI_IO:                kv_builtin_storage = &(CkpvAccess(bikvs).io);                 return true;
1636     case MPI_WTIME_IS_GLOBAL:   kv_builtin_storage = &(CkpvAccess(bikvs).wtime_is_global);    return true;
1637     case MPI_APPNUM:            kv_builtin_storage = &(CkpvAccess(bikvs).appnum);             return true;
1638     case MPI_LASTUSEDCODE:      kv_builtin_storage = &(CkpvAccess(bikvs).lastusedcode);       return true;
1639     case MPI_UNIVERSE_SIZE:     kv_builtin_storage = &(CkpvAccess(bikvs).universe_size);      return true;
1640     case MPI_WIN_BASE:          win_base_storage   = &(CkpvAccess(bikvs).win_base);           return true;
1641     case MPI_WIN_SIZE:          win_size_storage   = &(CkpvAccess(bikvs).win_size);           return true;
1642     case MPI_WIN_DISP_UNIT:     kv_builtin_storage = &(CkpvAccess(bikvs).win_disp_unit);      return true;
1643     case MPI_WIN_CREATE_FLAVOR: kv_builtin_storage = &(CkpvAccess(bikvs).win_create_flavor);  return true;
1644     case MPI_WIN_MODEL:         kv_builtin_storage = &(CkpvAccess(bikvs).win_model);          return true;
1645     default: return false;
1646   };
1647 }
1648
1649 bool ampiParent::getBuiltinKeyval(int keyval, void *attribute_val) noexcept {
1650   if (kv_get_builtin(keyval)){
1651     /* All builtin keyvals are ints except MPI_WIN_BASE, which is a pointer
1652      * to the window's base address in C but an integer representation of
1653      * the base address in Fortran.
1654      * Also, MPI_WIN_SIZE is an MPI_Aint. */
1655     if (keyval == MPI_WIN_BASE)
1656       *((void**)attribute_val) = *win_base_storage;
1657     else if (keyval == MPI_WIN_SIZE)
1658       *(MPI_Aint**)attribute_val = win_size_storage;
1659     else
1660       *(int **)attribute_val = kv_builtin_storage;
1661     return true;
1662   } else {
1663     switch(keyval) {
1664       case AMPI_MY_WTH: *(int *)attribute_val = CkMyPe(); return true;
1665       case AMPI_NUM_WTHS: *(int *)attribute_val = CkNumPes(); return true;
1666       case AMPI_MY_PROCESS: *(int *)attribute_val = CkMyNode(); return true;
1667       case AMPI_NUM_PROCESSES: *(int *)attribute_val = CkNumNodes(); return true;
1668     }
1669   }
1670   return false;
1671 }
1672
1673 // Call copy_fn for each user-defined keyval in old_comm.
1674 int ampiParent::dupUserKeyvals(MPI_Comm old_comm, MPI_Comm new_comm) noexcept {
1675   ampiCommStruct &old_cs = *(ampiCommStruct *)&comm2CommStruct(old_comm);
1676   for (int i=0; i<old_cs.getKeyvals().size(); i++) {
1677     int keyval = old_cs.getKeyvals()[i];
1678     void *val_out;
1679     int flag = 0;
1680     bool isValid = (keyval != MPI_KEYVAL_INVALID && kvlist[keyval] != NULL);
1681     if (isValid) {
1682       // Call the user's copy_fn
1683       KeyvalNode& kv = *kvlist[keyval];
1684       int ret = (*kv.copy_fn)(old_comm, keyval, kv.extra_state, kv.val, &val_out, &flag);
1685       if (ret != MPI_SUCCESS) {
1686         return ret;
1687       }
1688       if (flag == 1) {
1689         // Set keyval in new_comm
1690         ampiCommStruct &cs = *(ampiCommStruct *)&comm2CommStruct(new_comm);
1691         cs.getKeyvals().push_back(keyval);
1692         kv.incRefCount();
1693       }
1694     }
1695   }
1696   return MPI_SUCCESS;
1697 }
1698
1699 int ampiParent::freeUserKeyval(int context, vector<int>& keyvals, int* keyval) noexcept {
1700   if (*keyval < 0 || *keyval >= kvlist.size()) {
1701     return MPI_SUCCESS;
1702   }
1703   // Call the user's delete_fn
1704   KeyvalNode& kv = *kvlist[*keyval];
1705   int ret = (*kv.delete_fn)(context, *keyval, kv.val, kv.extra_state);
1706   if (ret != MPI_SUCCESS) {
1707     return ret;
1708   }
1709   // Remove keyval from comm/win/type keyvals list
1710   kv.clearVal();
1711   for (int i=0; i<keyvals.size(); i++) {
1712     if (keyvals[i] == *keyval) {
1713       keyvals[*keyval] = MPI_KEYVAL_INVALID;
1714     }
1715   }
1716   if (!keyvals.empty()) {
1717     while (keyvals.back() == MPI_KEYVAL_INVALID) keyvals.pop_back();
1718   }
1719   // Remove keyval from parent kvlist if no remaining references to it
1720   if (kv.decRefCount() == 0) {
1721     delete kvlist[*keyval];
1722     kvlist[*keyval] = NULL;
1723   }
1724   *keyval = MPI_KEYVAL_INVALID;
1725   return MPI_SUCCESS;
1726 }
1727
1728 int ampiParent::freeUserKeyvals(int context, vector<int>& keyvals) noexcept {
1729   for (int i=0; i<keyvals.size(); i++) {
1730     int keyval = keyvals[i];
1731     // Call the user's delete_fn
1732     KeyvalNode& kv = *kvlist[keyval];
1733     int ret = (*kv.delete_fn)(context, keyval, kv.val, kv.extra_state);
1734     if (ret != MPI_SUCCESS) {
1735       return ret;
1736     }
1737     kv.clearVal();
1738     keyvals[i] = MPI_KEYVAL_INVALID;
1739     // Remove keyval from parent kvlist if no remaining references to it
1740     if (kv.decRefCount() == 0) {
1741       delete kvlist[keyval];
1742       kvlist[keyval] = NULL;
1743     }
1744   }
1745   keyvals.clear();
1746   return MPI_SUCCESS;
1747 }
1748
1749 bool ampiParent::getUserKeyval(MPI_Comm comm, vector<int>& keyvals, int keyval, void *attribute_val, int *flag) noexcept {
1750   if (keyval < 0 || keyval >= kvlist.size() || kvlist[keyval] == NULL) {
1751     *flag = 0;
1752     return false;
1753   }
1754   else {
1755     for (int i=0; i<keyvals.size(); i++) {
1756       int kv = keyvals[i];
1757       if (keyval == kv) { // Found a matching keyval
1758         *(void **)attribute_val = kvlist[keyval]->getVal();
1759         *flag = 1;
1760         return true;
1761       }
1762     }
1763     *flag = 0;
1764     return false;
1765   }
1766 }
1767
1768 int ampiParent::getAttr(int context, vector<int>& keyvals, int keyval, void *attribute_val, int *flag) noexcept {
1769   if (keyval == MPI_KEYVAL_INVALID) {
1770     *flag = 0;
1771     return MPI_ERR_KEYVAL;
1772   }
1773   else if (getBuiltinKeyval(keyval, attribute_val)) {
1774     *flag = 1;
1775     return MPI_SUCCESS;
1776   }
1777   else if (getUserKeyval(context, keyvals, keyval, attribute_val, flag)) {
1778     *flag = 1;
1779     return MPI_SUCCESS;
1780   }
1781   else {
1782     *flag = 0;
1783     return MPI_SUCCESS;
1784   }
1785 }
1786
1787 int ampiParent::deleteAttr(int context, vector<int>& keyvals, int keyval) noexcept {
1788   return freeUserKeyval(context, keyvals, &keyval);
1789 }
1790
1791 /*
1792  * AMPI Message Matching (Amm) queues:
1793  *   AmpiMsg*'s and AmpiRequest*'s are matched based on 2 ints: [tag, src].
1794  */
1795
1796 // Pt2pt msg queues:
1797 template class Amm<AmpiMsg *, AMPI_AMM_PT2PT_POOL_SIZE>;
1798 template class Amm<AmpiRequest *, AMPI_AMM_PT2PT_POOL_SIZE>;
1799
1800 // Bcast msg queues:
1801 template class Amm<AmpiMsg *, AMPI_AMM_COLL_POOL_SIZE>;
1802 template class Amm<AmpiRequest *, AMPI_AMM_COLL_POOL_SIZE>;
1803
1804 /* free all table entries but not the space pointed to by 'msg' */
1805 template<typename T, size_t N>
1806 void Amm<T, N>::freeAll() noexcept
1807 {
1808   AmmEntry<T>* cur = first;
1809   while (cur) {
1810     AmmEntry<T>* toDel = cur;
1811     cur = cur->next;
1812     deleteEntry(toDel);
1813   }
1814 }
1815
1816 /* free all msgs */
1817 template<typename T, size_t N>
1818 void Amm<T, N>::flushMsgs() noexcept
1819 {
1820   T msg = get(MPI_ANY_TAG, MPI_ANY_SOURCE);
1821   while (msg) {
1822     delete msg;
1823     msg = get(MPI_ANY_TAG, MPI_ANY_SOURCE);
1824   }
1825 }
1826
1827 template<typename T, size_t N>
1828 void Amm<T, N>::put(T msg) noexcept
1829 {
1830   AmmEntry<T>* e = newEntry(msg);
1831   *lasth = e;
1832   lasth = &e->next;
1833 }
1834
1835 template<typename T, size_t N>
1836 void Amm<T, N>::put(int tag, int src, T msg) noexcept
1837 {
1838   AmmEntry<T>* e = newEntry(tag, src, msg);
1839   *lasth = e;
1840   lasth = &e->next;
1841 }
1842
1843 template<typename T, size_t N>
1844 bool Amm<T, N>::match(const int tags1[AMM_NTAGS], const int tags2[AMM_NTAGS]) const noexcept
1845 {
1846   if (tags1[AMM_TAG]==tags2[AMM_TAG] && tags1[AMM_SRC]==tags2[AMM_SRC]) {
1847     // tag and src match
1848     return true;
1849   }
1850   else if (tags1[AMM_TAG]==tags2[AMM_TAG] && (tags1[AMM_SRC]==MPI_ANY_SOURCE || tags2[AMM_SRC]==MPI_ANY_SOURCE)) {
1851     // tag matches, src is MPI_ANY_SOURCE
1852     return true;
1853   }
1854   else if (tags1[AMM_SRC]==tags2[AMM_SRC] && (tags1[AMM_TAG]==MPI_ANY_TAG || tags2[AMM_TAG]==MPI_ANY_TAG)) {
1855     // src matches, tag is MPI_ANY_TAG
1856     return true;
1857   }
1858   else if ((tags1[AMM_SRC]==MPI_ANY_SOURCE || tags2[AMM_SRC]==MPI_ANY_SOURCE) && (tags1[AMM_TAG]==MPI_ANY_TAG || tags2[AMM_TAG]==MPI_ANY_TAG)) {
1859     // src and tag are MPI_ANY
1860     return true;
1861   }
1862   else {
1863     // no match
1864     return false;
1865   }
1866 }
1867
1868 template<typename T, size_t N>
1869 T Amm<T, N>::get(int tag, int src, int* rtags) noexcept
1870 {
1871   AmmEntry<T> *ent, **enth;
1872   T msg;
1873   int tags[AMM_NTAGS] = { tag, src };
1874
1875   enth = &first;
1876   while (true) {
1877     ent = *enth;
1878     if (!ent) return NULL;
1879     if (match(tags, ent->tags)) {
1880       if (rtags) memcpy(rtags, ent->tags, sizeof(int)*AMM_NTAGS);
1881       msg = ent->msg;
1882       // unlike probe, delete the matched entry:
1883       AmmEntry<T>* next = ent->next;
1884       *enth = next;
1885       if (!next) lasth = enth;
1886       deleteEntry(ent);
1887       return msg;
1888     }
1889     enth = &ent->next;
1890   }
1891 }
1892
1893 template<typename T, size_t N>
1894 T Amm<T, N>::probe(int tag, int src, int* rtags) noexcept
1895 {
1896   AmmEntry<T> *ent, **enth;
1897   T msg;
1898   int tags[AMM_NTAGS] = { tag, src };
1899   CkAssert(rtags);
1900
1901   enth = &first;
1902   while (true) {
1903     ent = *enth;
1904     if (!ent) return NULL;
1905     if (match(tags, ent->tags)) {
1906       memcpy(rtags, ent->tags, sizeof(int)*AMM_NTAGS);
1907       msg = ent->msg;
1908       return msg;
1909     }
1910     enth = &ent->next;
1911   }
1912 }
1913
1914 template<typename T, size_t N>
1915 int Amm<T, N>::size() const noexcept
1916 {
1917   int n = 0;
1918   AmmEntry<T> *e = first;
1919   while (e) {
1920     e = e->next;
1921     n++;
1922   }
1923   return n;
1924 }
1925
1926 template<typename T, size_t N>
1927 void Amm<T, N>::pup(PUP::er& p, AmmPupMessageFn msgpup) noexcept
1928 {
1929   int sz;
1930   if (!p.isUnpacking()) {
1931     sz = size();
1932     p|sz;
1933     AmmEntry<T> *doomed, *e = first;
1934     while (e) {
1935       pup_ints(&p, e->tags, AMM_NTAGS);
1936       msgpup(p, (void**)&e->msg);
1937       doomed = e;
1938       e = e->next;
1939       if (p.isDeleting()) {
1940         deleteEntry(doomed);
1941       }
1942     }
1943   } else { // unpacking
1944     p|sz;
1945     for (int i=0; i<sz; i++) {
1946       T msg;
1947       int tags[AMM_NTAGS];
1948       pup_ints(&p, tags, AMM_NTAGS);
1949       msgpup(p, (void**)&msg);
1950       put(tags[0], tags[1], msg);
1951     }
1952   }
1953 }
1954
1955 //----------------------- ampi -------------------------
1956 void ampi::init() noexcept {
1957   parent=NULL;
1958   thread=NULL;
1959
1960 #if CMK_FAULT_EVAC
1961   AsyncEvacuate(false);
1962 #endif
1963 }
1964
1965 ampi::ampi() noexcept
1966 {
1967   /* this constructor only exists so we can create an empty array during split */
1968   CkAbort("Default ampi constructor should never be called");
1969 }
1970
1971 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s) noexcept :parentProxy(parent_), oorder(s.getSize())
1972 {
1973   init();
1974
1975   myComm=s; myComm.setArrayID(thisArrayID);
1976   myRank=myComm.getRankForIndex(thisIndex);
1977
1978   findParent(false);
1979 }
1980
1981 ampi::ampi(CkMigrateMessage *msg) noexcept : CBase_ampi(msg)
1982 {
1983   init();
1984 }
1985
1986 void ampi::ckJustMigrated() noexcept
1987 {
1988   findParent(true);
1989   ArrayElement1D::ckJustMigrated();
1990 }
1991
1992 void ampi::ckJustRestored() noexcept
1993 {
1994   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
1995   findParent(true);
1996   ArrayElement1D::ckJustRestored();
1997 }
1998
1999 void ampi::findParent(bool forMigration) noexcept {
2000   STARTUP_DEBUG("ampi> finding my parent")
2001   parent=parentProxy[thisIndex].ckLocal();
2002 #if CMK_ERROR_CHECKING
2003   if (parent==NULL) CkAbort("AMPI can't find its parent!");
2004 #endif
2005   thread=parent->registerAmpi(this,myComm,forMigration);
2006 #if CMK_ERROR_CHECKING
2007   if (thread==NULL) CkAbort("AMPI can't find its thread!");
2008 #endif
2009 }
2010
2011 //The following method should be called on the first element of the
2012 //ampi array
2013 void ampi::allInitDone() noexcept {
2014   FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
2015   thisProxy.setInitDoneFlag();
2016 }
2017
2018 void ampi::setInitDoneFlag() noexcept {
2019   parent->ampiInitCallDone=1;
2020   parent->getTCharmThread()->start();
2021 }
2022
2023 static void AmmPupUnexpectedMsgs(PUP::er& p,void **msg) noexcept {
2024   CkPupMessage(p,msg,1);
2025   if (p.isDeleting()) delete (AmpiMsg *)*msg;
2026 }
2027
2028 static void AmmPupPostedReqs(PUP::er& p,void **msg) noexcept {
2029   // AmpiRequests objects are PUPed by AmpiRequestList, so here we pack
2030   // the reqIdx of posted requests and in ampiParent::registerAmpi we
2031   // lookup the AmpiRequest*'s using the indices. That is necessary because
2032   // the ampiParent object is unpacked after the ampi objects.
2033   if (p.isPacking()) {
2034     int reqIdx = ((AmpiRequest*)*msg)->getReqIdx();
2035     CkAssert(reqIdx != MPI_REQUEST_NULL);
2036     *msg = (void*)(intptr_t)reqIdx;
2037   }
2038   pup_pointer(&p, msg);
2039 #if CMK_ERROR_CHECKING
2040   if (p.isUnpacking()) {
2041     MPI_Request reqIdx = (MPI_Request)(intptr_t)*msg;
2042     CkAssert(reqIdx != MPI_REQUEST_NULL);
2043   }
2044 #endif
2045 }
2046
2047 void ampi::pup(PUP::er &p) noexcept
2048 {
2049   p|parentProxy;
2050   p|myComm;
2051   p|myRank;
2052   p|tmpVec;
2053   p|remoteProxy;
2054   unexpectedMsgs.pup(p, AmmPupUnexpectedMsgs);
2055   postedReqs.pup(p, AmmPupPostedReqs);
2056   unexpectedBcastMsgs.pup(p, AmmPupUnexpectedMsgs);
2057   postedBcastReqs.pup(p, AmmPupPostedReqs);
2058   p|greq_classes;
2059   p|oorder;
2060 }
2061
2062 ampi::~ampi() noexcept
2063 {
2064   if (CkInRestarting() || _BgOutOfCoreFlag==1) {
2065     // in restarting, we need to flush messages
2066     unexpectedMsgs.flushMsgs();
2067     postedReqs.freeAll();
2068     unexpectedBcastMsgs.flushMsgs();
2069     postedBcastReqs.freeAll();
2070   }
2071 }
2072
2073 //------------------------ Communicator Splitting ---------------------
2074 class ampiSplitKey {
2075  public:
2076   int nextSplitComm;
2077   int color; //New class of processes we'll belong to
2078   int key; //To determine rank in new ordering
2079   int rank; //Rank in old ordering
2080   ampiSplitKey() noexcept {}
2081   ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_) noexcept
2082     :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
2083 };
2084
2085 #define MPI_INTER 10
2086
2087 /* "type" may indicate whether call is for a cartesian topology etc. */
2088 void ampi::split(int color,int key,MPI_Comm *dest, int type) noexcept
2089 {
2090 #if CMK_BIGSIM_CHARM
2091   void *curLog; // store current log in timeline
2092   _TRACE_BG_TLINE_END(&curLog);
2093 #endif
2094   if (type == MPI_CART) {
2095     ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
2096     int rootIdx=myComm.getIndexForRank(0);
2097     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
2098     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
2099
2100     thread->suspend(); //Resumed by ampiParent::cartChildRegister
2101     MPI_Comm newComm=parent->getNextCart()-1;
2102     *dest=newComm;
2103   }
2104   else if (type == MPI_GRAPH) {
2105     ampiSplitKey splitKey(parent->getNextGraph(),color,key,myRank);
2106     int rootIdx=myComm.getIndexForRank(0);
2107     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
2108     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
2109
2110     thread->suspend(); //Resumed by ampiParent::graphChildRegister
2111     MPI_Comm newComm=parent->getNextGraph()-1;
2112     *dest=newComm;
2113   }
2114   else if (type == MPI_DIST_GRAPH) {
2115     ampiSplitKey splitKey(parent->getNextDistGraph(),color,key,myRank);
2116     int rootIdx=myComm.getIndexForRank(0);
2117     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
2118     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
2119
2120     thread->suspend(); //Resumed by ampiParent::distGraphChildRegister
2121     MPI_Comm newComm=parent->getNextDistGraph()-1;
2122     *dest=newComm;
2123   }
2124   else if (type == MPI_INTER) {
2125     ampiSplitKey splitKey(parent->getNextInter(),color,key,myRank);
2126     int rootIdx=myComm.getIndexForRank(0);
2127     CkCallback cb(CkIndex_ampi::splitPhaseInter(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
2128     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
2129
2130     thread->suspend(); //Resumed by ampiParent::interChildRegister
2131     MPI_Comm newComm=parent->getNextInter()-1;
2132     *dest=newComm;
2133   }
2134   else {
2135     ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
2136     int rootIdx=myComm.getIndexForRank(0);
2137     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
2138     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
2139
2140     thread->suspend(); //Resumed by ampiParent::splitChildRegister
2141     MPI_Comm newComm=parent->getNextSplit()-1;
2142     *dest=newComm;
2143   }
2144 #if CMK_BIGSIM_CHARM
2145   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
2146 #endif
2147 }
2148
2149 CLINKAGE
2150 int compareAmpiSplitKey(const void *a_, const void *b_) {
2151   const ampiSplitKey *a=(const ampiSplitKey *)a_;
2152   const ampiSplitKey *b=(const ampiSplitKey *)b_;
2153   if (a->color!=b->color) return a->color-b->color;
2154   if (a->key!=b->key) return a->key-b->key;
2155   return a->rank-b->rank;
2156 }
2157
2158 // Caller needs to eventually call newAmpi.doneInserting()
2159 CProxy_ampi ampi::createNewChildAmpiSync() noexcept {
2160   CkArrayOptions opts;
2161   opts.bindTo(parentProxy);
2162   opts.setSectionAutoDelegate(false);
2163   opts.setNumInitial(0);
2164   CkArrayID unusedAID;
2165   ampiCommStruct unusedComm;
2166   CkCallback cb(CkCallback::resumeThread);
2167   CProxy_ampi::ckNew(unusedAID, unusedComm, opts, cb);
2168   CkArrayCreatedMsg *newAmpiMsg = static_cast<CkArrayCreatedMsg*>(cb.thread_delay());
2169   CProxy_ampi newAmpi = newAmpiMsg->aid;
2170   delete newAmpiMsg;
2171   return newAmpi;
2172 }
2173
2174 void ampi::splitPhase1(CkReductionMsg *msg) noexcept
2175 {
2176   //Order the keys, which orders the ranks properly:
2177   int nKeys=msg->getSize()/sizeof(ampiSplitKey);
2178   ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
2179   if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
2180   qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
2181
2182   MPI_Comm newComm = -1;
2183   for(int i=0;i<nKeys;i++){
2184     if(keys[i].nextSplitComm>newComm)
2185       newComm = keys[i].nextSplitComm;
2186   }
2187
2188   //Loop over the sorted keys, which gives us the new arrays:
2189   int lastColor=keys[0].color-1; //The color we're building an array for
2190   CProxy_ampi lastAmpi; //The array for lastColor
2191   int lastRoot=0; //C value for new rank 0 process for latest color
2192   ampiCommStruct lastComm; //Communicator info. for latest color
2193   for (int c=0;c<nKeys;c++) {
2194     if (keys[c].color!=lastColor)
2195     { //Hit a new color-- need to build a new communicator and array
2196       lastColor=keys[c].color;
2197       lastRoot=c;
2198
2199       if (c!=0) lastAmpi.doneInserting();
2200       lastAmpi = createNewChildAmpiSync();
2201
2202       vector<int> indices; //Maps rank to array indices for new array
2203       for (int i=c;i<nKeys;i++) {
2204         if (keys[i].color!=lastColor) break; //Done with this color
2205         int idx=myComm.getIndexForRank(keys[i].rank);
2206         indices.push_back(idx);
2207       }
2208
2209       //FIXME: create a new communicator for each color, instead of
2210       // (confusingly) re-using the same MPI_Comm number for each.
2211       lastComm=ampiCommStruct(newComm,lastAmpi,indices);
2212     }
2213     int newRank=c-lastRoot;
2214     int newIdx=lastComm.getIndexForRank(newRank);
2215
2216     lastAmpi[newIdx].insert(parentProxy,lastComm);
2217   }
2218   lastAmpi.doneInserting();
2219
2220   delete msg;
2221 }
2222
2223 void ampi::splitPhaseInter(CkReductionMsg *msg) noexcept
2224 {
2225   //Order the keys, which orders the ranks properly:
2226   int nKeys=msg->getSize()/sizeof(ampiSplitKey);
2227   ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
2228   if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
2229   qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
2230
2231   MPI_Comm newComm = -1;
2232   for(int i=0;i<nKeys;i++){
2233     if(keys[i].nextSplitComm>newComm)
2234       newComm = keys[i].nextSplitComm; // FIXME: use nextSplitr instead of nextInter?
2235   }
2236
2237   //Loop over the sorted keys, which gives us the new arrays:
2238   int lastColor=keys[0].color-1; //The color we're building an array for
2239   CProxy_ampi lastAmpi; //The array for lastColor
2240   int lastRoot=0; //C value for new rank 0 process for latest color
2241   ampiCommStruct lastComm; //Communicator info. for latest color
2242
2243   lastAmpi = createNewChildAmpiSync();
2244
2245   for (int c=0;c<nKeys;c++) {
2246     vector<int> indices; // Maps rank to array indices for new array
2247     if (keys[c].color!=lastColor)
2248     { //Hit a new color-- need to build a new communicator and array
2249       lastColor=keys[c].color;
2250       lastRoot=c;
2251
2252       for (int i=c;i<nKeys;i++) {
2253         if (keys[i].color!=lastColor) break; //Done with this color
2254         int idx=myComm.getIndexForRank(keys[i].rank);
2255         indices.push_back(idx);
2256       }
2257
2258       if (c==0) {
2259         lastComm=ampiCommStruct(newComm,lastAmpi,indices, myComm.getRemoteIndices());
2260         for (int i=0; i<indices.size(); i++) {
2261           lastAmpi[indices[i]].insert(parentProxy,lastComm);
2262         }
2263         lastAmpi.doneInserting();
2264       }
2265     }
2266   }
2267
2268   parentProxy[0].ExchangeProxy(lastAmpi);
2269   delete msg;
2270 }
2271
2272 //...newly created array elements register with the parent, which calls:
2273 void ampiParent::splitChildRegister(const ampiCommStruct &s) noexcept {
2274   int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
2275   if (splitComm.size()<=idx) splitComm.resize(idx+1);
2276   splitComm[idx]=new ampiCommStruct(s);
2277   thread->resume(); //Matches suspend at end of ampi::split
2278 }
2279
2280 //-----------------create communicator from group--------------
2281 // The procedure is like that of comm_split very much,
2282 // so the code is shamelessly copied from above
2283 //   1. reduction to make sure all members have called
2284 //   2. the root in the old communicator create the new array
2285 //   3. ampiParent::register is called to register new array as new comm
2286 void ampi::commCreate(const vector<int>& vec,MPI_Comm* newcomm) noexcept {
2287   int rootIdx=vec[0];
2288   tmpVec = vec;
2289   CkCallback cb(CkReductionTarget(ampi,commCreatePhase1),CkArrayIndex1D(rootIdx),myComm.getProxy());
2290   MPI_Comm nextgroup = parent->getNextGroup();
2291   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
2292
2293   if(getPosOp(thisIndex,vec)>=0){
2294     thread->suspend(); //Resumed by ampiParent::groupChildRegister
2295     MPI_Comm retcomm = parent->getNextGroup()-1;
2296     *newcomm = retcomm;
2297   }else{
2298     *newcomm = MPI_COMM_NULL;
2299   }
2300 }
2301
2302 void ampi::insertNewChildAmpiElements(MPI_Comm nextComm, CProxy_ampi newAmpi) noexcept {
2303   ampiCommStruct newCommStruct = ampiCommStruct(nextComm, newAmpi, tmpVec);
2304   for (int i = 0; i < tmpVec.size(); ++i)
2305     newAmpi[tmpVec[i]].insert(parentProxy, newCommStruct);
2306   newAmpi.doneInserting();
2307 }
2308
2309 void ampi::commCreatePhase1(MPI_Comm nextGroupComm) noexcept {
2310   CProxy_ampi newAmpi = createNewChildAmpiSync();
2311   insertNewChildAmpiElements(nextGroupComm, newAmpi);
2312 }
2313
2314 void ampiParent::groupChildRegister(const ampiCommStruct &s) noexcept {
2315   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
2316   if (groupComm.size()<=idx) groupComm.resize(idx+1);
2317   groupComm[idx]=new ampiCommStruct(s);
2318   thread->resume(); //Matches suspend at end of ampi::split
2319 }
2320
2321 /* Virtual topology communicator creation */
2322
2323 // 0-dimensional cart comm: rank 0 creates a dup of COMM_SELF with topo info.
2324 MPI_Comm ampi::cartCreate0D() noexcept {
2325   if (getRank() == 0) {
2326     tmpVec.clear();
2327     tmpVec.push_back(0);
2328     commCreatePhase1(parent->getNextCart());
2329     MPI_Comm newComm = parent->getNextCart()-1;
2330     ampiCommStruct &newCommStruct = getAmpiParent()->getCart(newComm);
2331     ampiTopology *newTopo = newCommStruct.getTopology();
2332     newTopo->setndims(0);
2333     return newComm;
2334   }
2335   else {
2336     return MPI_COMM_NULL;
2337   }
2338 }
2339
2340 MPI_Comm ampi::cartCreate(vector<int>& vec, int ndims, const int* dims) noexcept {
2341   if (ndims == 0) {
2342     return cartCreate0D();
2343   }
2344
2345   // Subtract out ranks from the group that won't be in the new comm
2346   int newsize = dims[0];
2347   for (int i = 1; i < ndims; i++) {
2348     newsize *= dims[i];
2349   }
2350   for (int i = vec.size(); i > newsize; i--) {
2351     vec.pop_back();
2352   }
2353
2354   int rootIdx = vec[0];
2355   tmpVec = vec;
2356   CkCallback cb(CkReductionTarget(ampi,commCreatePhase1),CkArrayIndex1D(rootIdx),myComm.getProxy());
2357
2358   MPI_Comm nextcart = parent->getNextCart();
2359   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
2360
2361   if (getPosOp(thisIndex,vec)>=0) {
2362     thread->suspend(); //Resumed by ampiParent::cartChildRegister
2363     return parent->getNextCart()-1;
2364   } else {
2365     return MPI_COMM_NULL;
2366   }
2367 }
2368
2369 void ampiParent::cartChildRegister(const ampiCommStruct &s) noexcept {
2370   int idx=s.getComm()-MPI_COMM_FIRST_CART;
2371   if (cartComm.size()<=idx) {
2372     cartComm.resize(idx+1);
2373     cartComm.length()=idx+1;
2374   }
2375   cartComm[idx]=new ampiCommStruct(s,MPI_CART);
2376   thread->resume(); //Matches suspend at end of ampi::cartCreate
2377 }
2378
2379 void ampi::graphCreate(const vector<int>& vec,MPI_Comm* newcomm) noexcept {
2380   int rootIdx=vec[0];
2381   tmpVec = vec;
2382   CkCallback cb(CkReductionTarget(ampi,commCreatePhase1),CkArrayIndex1D(rootIdx),
2383       myComm.getProxy());
2384   MPI_Comm nextgraph = parent->getNextGraph();
2385   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
2386
2387   if(getPosOp(thisIndex,vec)>=0){
2388     thread->suspend(); //Resumed by ampiParent::graphChildRegister
2389     MPI_Comm retcomm = parent->getNextGraph()-1;
2390     *newcomm = retcomm;
2391   }else
2392     *newcomm = MPI_COMM_NULL;
2393 }
2394
2395 void ampiParent::graphChildRegister(const ampiCommStruct &s) noexcept {
2396   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
2397   if (graphComm.size()<=idx) {
2398     graphComm.resize(idx+1);
2399     graphComm.length()=idx+1;
2400   }
2401   graphComm[idx]=new ampiCommStruct(s,MPI_GRAPH);
2402   thread->resume(); //Matches suspend at end of ampi::graphCreate
2403 }
2404
2405 void ampi::distGraphCreate(const vector<int>& vec, MPI_Comm* newcomm) noexcept
2406 {
2407   int rootIdx = vec[0];
2408   tmpVec = vec;
2409   CkCallback cb(CkReductionTarget(ampi,commCreatePhase1), CkArrayIndex1D(rootIdx), myComm.getProxy());
2410   MPI_Comm nextDistGraph = parent->getNextDistGraph();
2411   contribute(sizeof(nextDistGraph), &nextDistGraph, CkReduction::max_int, cb);
2412
2413   if (getPosOp(thisIndex,vec) >= 0) {
2414     thread->suspend(); //Resumed by ampiParent::distGraphChildRegister
2415     MPI_Comm retcomm = parent->getNextDistGraph()-1;
2416     *newcomm = retcomm;
2417   }
2418   else {
2419     *newcomm = MPI_COMM_NULL;
2420   }
2421 }
2422
2423 void ampiParent::distGraphChildRegister(const ampiCommStruct &s) noexcept
2424 {
2425   int idx = s.getComm()-MPI_COMM_FIRST_DIST_GRAPH;
2426   if (distGraphComm.size() <= idx) {
2427     distGraphComm.resize(idx+1);
2428     distGraphComm.length() = idx+1;
2429   }
2430   distGraphComm[idx] = new ampiCommStruct(s,MPI_DIST_GRAPH);
2431   thread->resume(); //Matches suspend at end of ampi::distGraphCreate
2432 }
2433
2434 void ampi::intercommCreate(const vector<int>& remoteVec, const int root, MPI_Comm tcomm, MPI_Comm *ncomm) noexcept {
2435   if (thisIndex==root) { // not everybody gets the valid rvec
2436     tmpVec = remoteVec;
2437   }
2438   CkCallback cb(CkReductionTarget(ampi, intercommCreatePhase1),CkArrayIndex1D(root),myComm.getProxy());
2439   MPI_Comm nextinter = parent->getNextInter();
2440   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
2441   thread->suspend(); //Not resumed by ampiParent::interChildRegister. Resumed by ExchangeProxy.
2442   *ncomm = parent->getNextInter()-1;
2443 }
2444
2445 void ampi::intercommCreatePhase1(MPI_Comm nextInterComm) noexcept {
2446
2447   CProxy_ampi newAmpi = createNewChildAmpiSync();
2448   const vector<int>& lgroup = myComm.getIndices();
2449   ampiCommStruct newCommstruct = ampiCommStruct(nextInterComm,newAmpi,lgroup,tmpVec);
2450   for(int i=0;i<lgroup.size();i++){
2451     int newIdx=lgroup[i];
2452     newAmpi[newIdx].insert(parentProxy,newCommstruct);
2453   }
2454   newAmpi.doneInserting();
2455
2456   parentProxy[0].ExchangeProxy(newAmpi);
2457 }
2458
2459 void ampiParent::interChildRegister(const ampiCommStruct &s) noexcept {
2460   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
2461   if (interComm.size()<=idx) interComm.resize(idx+1);
2462   interComm[idx]=new ampiCommStruct(s);
2463   // don't resume the thread yet, till parent set remote proxy
2464 }
2465
2466 void ampi::intercommMerge(int first, MPI_Comm *ncomm) noexcept { // first valid only at local root
2467   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
2468     vector<int> lvec = myComm.getIndices();
2469     vector<int> rvec = myComm.getRemoteIndices();
2470     int rsize = rvec.size();
2471     tmpVec = lvec;
2472     for(int i=0;i<rsize;i++)
2473       tmpVec.push_back(rvec[i]);
2474     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
2475   }else{
2476     tmpVec.resize(0);
2477   }
2478
2479   int rootIdx=myComm.getIndexForRank(0);
2480   CkCallback cb(CkReductionTarget(ampi, intercommMergePhase1),CkArrayIndex1D(rootIdx),myComm.getProxy());
2481   MPI_Comm nextintra = parent->getNextIntra();
2482   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
2483
2484   thread->suspend(); //Resumed by ampiParent::interChildRegister
2485   MPI_Comm newcomm=parent->getNextIntra()-1;
2486   *ncomm=newcomm;
2487 }
2488
2489 void ampi::intercommMergePhase1(MPI_Comm nextIntraComm) noexcept {
2490   // gets called on two roots, first root creates the comm
2491   if(tmpVec.size()==0) return;
2492   CProxy_ampi newAmpi = createNewChildAmpiSync();
2493   insertNewChildAmpiElements(nextIntraComm, newAmpi);
2494 }
2495
2496 void ampiParent::intraChildRegister(const ampiCommStruct &s) noexcept {
2497   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
2498   if (intraComm.size()<=idx) intraComm.resize(idx+1);
2499   intraComm[idx]=new ampiCommStruct(s);
2500   thread->resume(); //Matches suspend at end of ampi::split
2501 }
2502
2503 void ampi::topoDup(int topoType, int rank, MPI_Comm comm, MPI_Comm *newComm) noexcept
2504 {
2505   if (getAmpiParent()->isInter(comm)) {
2506     split(0, rank, newComm, MPI_INTER);
2507   } else {
2508     split(0, rank, newComm, topoType);
2509
2510     if (topoType != MPI_UNDEFINED) {
2511       ampiTopology *topo, *newTopo;
2512       if (topoType == MPI_CART) {
2513         topo = getAmpiParent()->getCart(comm).getTopology();
2514         newTopo = getAmpiParent()->getCart(*newComm).getTopology();
2515       } else if (topoType == MPI_GRAPH) {
2516         topo = getAmpiParent()->getGraph(comm).getTopology();
2517         newTopo = getAmpiParent()->getGraph(*newComm).getTopology();
2518       } else {
2519         CkAssert(topoType == MPI_DIST_GRAPH);
2520         topo = getAmpiParent()->getDistGraph(comm).getTopology();
2521         newTopo = getAmpiParent()->getDistGraph(*newComm).getTopology();
2522       }
2523       newTopo->dup(topo);
2524     }
2525   }
2526 }
2527
2528 //------------------------ communication -----------------------
2529 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo) noexcept
2530 {
2531   if (universeNo>MPI_COMM_WORLD) {
2532     int worldDex=universeNo-MPI_COMM_WORLD-1;
2533     if (worldDex>=_mpi_nworlds)
2534       CkAbort("Bad world communicator passed to universeComm2CommStruct");
2535     return mpi_worlds[worldDex];
2536   }
2537   CkAbort("Bad communicator passed to universeComm2CommStruct");
2538   return mpi_worlds[0]; // meaningless return
2539 }
2540
2541 void ampiParent::block() noexcept {
2542   thread->suspend();
2543 }
2544
2545 void ampiParent::yield() noexcept {
2546   thread->schedule();
2547 }
2548
2549 void ampi::unblock() noexcept {
2550   thread->resume();
2551 }
2552
2553 ampiParent* ampiParent::blockOnRecv() noexcept {
2554   resumeOnRecv = true;
2555   // In case this thread is migrated while suspended,
2556   // save myComm to get the ampi instance back. Then
2557   // return "dis" in case the caller needs it.
2558   thread->suspend();
2559   ampiParent* dis = getAmpiParent();
2560   dis->resumeOnRecv = false;
2561   return dis;
2562 }
2563
2564 ampi* ampi::blockOnRecv() noexcept {
2565   parent->resumeOnRecv = true;
2566   // In case this thread is migrated while suspended,
2567   // save myComm to get the ampi instance back. Then
2568   // return "dis" in case the caller needs it.
2569   MPI_Comm comm = myComm.getComm();
2570   thread->suspend();
2571   ampi *dis = getAmpiInstance(comm);
2572   dis->parent->resumeOnRecv = false;
2573   return dis;
2574 }
2575
2576 void ampi::setBlockingReq(AmpiRequest *req) noexcept {
2577   CkAssert(parent->blockingReq == NULL);
2578   CkAssert(parent->resumeOnColl == false);
2579   parent->blockingReq = req;
2580   parent->resumeOnColl = true;
2581 }
2582
2583 // block on (All)Reduce or (All)Gather(v)
2584 ampi* ampi::blockOnColl() noexcept {
2585 #if CMK_BIGSIM_CHARM
2586   void *curLog; // store current log in timeline
2587   _TRACE_BG_TLINE_END(&curLog);
2588 #if CMK_TRACE_IN_CHARM
2589   if(CpvAccess(traceOn)) traceSuspend();
2590 #endif
2591 #endif
2592
2593   CkAssert(parent->resumeOnColl == true);
2594   MPI_Comm comm = myComm.getComm();
2595   thread->suspend();
2596   ampi *dis = getAmpiInstance(comm);
2597   dis->parent->resumeOnColl = false;
2598
2599 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2600   CpvAccess(_currentObj) = dis;
2601 #endif
2602 #if CMK_BIGSIM_CHARM
2603 #if CMK_TRACE_IN_CHARM
2604   if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
2605 #endif
2606   TRACE_BG_AMPI_BREAK(dis->thread->getThread(), "RECV_RESUME", NULL, 0, 0);
2607   if (dis->parent->blockingReq->eventPe == CkMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(dis->parent->blockingReq->event);
2608 #endif
2609
2610   delete dis->parent->blockingReq; dis->parent->blockingReq = NULL;
2611   return dis;
2612 }
2613
2614 void ampi::ssend_ack(int sreq_idx) noexcept {
2615   if (sreq_idx == 1)
2616     thread->resume();           // MPI_Ssend
2617   else {
2618     sreq_idx -= 2;              // start from 2
2619     AmpiRequestList& reqs = getReqs();
2620     AmpiRequest *sreq = reqs[sreq_idx];
2621     sreq->complete = true;
2622     handleBlockedReq(sreq);
2623     resumeThreadIfReady();
2624   }
2625 }
2626
2627 void ampi::injectMsg(int size, char* buf) noexcept
2628 {
2629   generic(makeAmpiMsg(thisIndex, 0, thisIndex, (void*)buf, size, MPI_CHAR, MPI_COMM_WORLD, 0));
2630 }
2631
2632 void ampi::generic(AmpiMsg* msg) noexcept
2633 {
2634   MSG_ORDER_DEBUG(
2635     CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d (seq %d) resumeOnRecv %d\n",
2636              thisIndex, msg->getTag(), msg->getSrcRank(), getComm(), msg->getSeq(), parent->resumeOnRecv);
2637   )
2638 #if CMK_BIGSIM_CHARM
2639   TRACE_BG_ADD_TAG("AMPI_generic");
2640   msg->event = NULL;
2641 #endif
2642
2643   if(msg->getSeq() != 0) {
2644     int seqIdx = msg->getSeqIdx();
2645     int n=oorder.put(seqIdx,msg);
2646     if (n>0) { // This message was in-order
2647       inorder(msg);
2648       if (n>1) { // It enables other, previously out-of-order messages
2649         while((msg=oorder.getOutOfOrder(seqIdx))!=0) {
2650           inorder(msg);
2651         }
2652       }
2653     }
2654   } else { //Cross-world or system messages are unordered
2655     inorder(msg);
2656   }
2657   // msg may be free'ed from calling inorder()
2658
2659   resumeThreadIfReady();
2660 }
2661
2662 // Same as ampi::generic except it's [nokeep] and msg is sequenced
2663 void ampi::bcastResult(AmpiMsg* msg) noexcept
2664 {
2665   MSG_ORDER_DEBUG(
2666     CkPrintf("AMPI vp %d bcast arrival: tag=%d, src=%d, comm=%d (seq %d) resumeOnRecv %d\n",
2667              thisIndex, msg->getTag(), msg->getSrcRank(), getComm(), msg->getSeq(), parent->resumeOnRecv);
2668   )
2669 #if CMK_BIGSIM_CHARM
2670   TRACE_BG_ADD_TAG("AMPI_generic");
2671   msg->event = NULL;
2672 #endif
2673
2674   CkAssert(msg->getSeq() != 0);
2675   int seqIdx = msg->getSeqIdx();
2676   int n=oorder.put(seqIdx,msg);
2677   if (n>0) { // This message was in-order
2678     inorderBcast(msg, false); // inorderBcast() is [nokeep]-aware, unlike inorder()
2679     if (n>1) { // It enables other, previously out-of-order messages
2680       while((msg=oorder.getOutOfOrder(seqIdx))!=0) {
2681         inorderBcast(msg, true);
2682       }
2683     }
2684   }
2685   // [nokeep] entry method, so do not delete msg
2686   resumeThreadIfReady();
2687 }
2688
2689 inline static AmpiRequestList &getReqs() noexcept;
2690
2691 void AmpiRequestList::freeNonPersReq(int &idx) noexcept {
2692   ampiParent* pptr = getAmpiParent();
2693   if (!reqs[idx]->isPersistent()) {
2694     free(pptr->reqPool, idx, pptr->getDDT());
2695     idx = MPI_REQUEST_NULL;
2696   }
2697 }
2698
2699 void AmpiRequestList::free(AmpiRequestPool &reqPool, int idx, CkDDT *ddt) noexcept {
2700   if (idx < 0) return;
2701   reqs[idx]->free(ddt);
2702   reqPool.deleteReq(reqs[idx]);
2703   reqs[idx] = NULL;
2704   startIdx = std::min(idx, startIdx);
2705 }
2706
2707 void ampi::inorder(AmpiMsg* msg) noexcept
2708 {
2709   MSG_ORDER_DEBUG(
2710     CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d (seq %d)\n",
2711              thisIndex, msg->getTag(), msg->getSrcRank(), getComm(), msg->getSeq());
2712   )
2713
2714 #if CMK_BIGSIM_CHARM
2715   _TRACE_BG_TLINE_END(&msg->event); // store current log
2716   msg->eventPe = CkMyPe();
2717 #endif
2718
2719   //Check posted recvs:
2720   int tag = msg->getTag();
2721   int srcRank = msg->getSrcRank();
2722   AmpiRequest* req = postedReqs.get(tag, srcRank);
2723   if (req) { // receive posted
2724     handleBlockedReq(req);
2725     req->receive(this, msg);
2726   } else {
2727     unexpectedMsgs.put(msg);
2728   }
2729 }
2730
2731 void ampi::inorderBcast(AmpiMsg* msg, bool deleteMsg) noexcept
2732 {
2733   MSG_ORDER_DEBUG(
2734     CkPrintf("AMPI vp %d inorder bcast: tag=%d, src=%d, comm=%d (seq %d)\n",
2735              thisIndex, msg->getTag(), msg->getSrcRank(), getComm(), msg->getSeq());
2736   )
2737
2738 #if CMK_BIGSIM_CHARM
2739   _TRACE_BG_TLINE_END(&msg->event); // store current log
2740   msg->eventPe = CkMyPe();
2741 #endif
2742
2743   //Check posted recvs:
2744   int tag = msg->getTag();
2745   int srcRank = msg->getSrcRank();
2746   AmpiRequest* req = postedBcastReqs.get(tag, srcRank);
2747   if (req) { // receive posted
2748     handleBlockedReq(req);
2749     req->receive(this, msg, deleteMsg);
2750   } else {
2751     // Reference the [nokeep] msg so it isn't freed by the runtime
2752     CmiReference(UsrToEnv(msg));
2753     unexpectedBcastMsgs.put(msg);
2754   }
2755 }
2756
2757 static inline AmpiMsg* rdma2AmpiMsg(char *buf, int size, CMK_REFNUM_TYPE seq, int tag, int srcRank,
2758                                     int ssendReq) noexcept
2759 {
2760   // Convert an Rdma message (parameter marshalled buffer) to an AmpiMsg
2761   AmpiMsg* msg = new (size, 0) AmpiMsg(seq, ssendReq, tag, srcRank, size);
2762   memcpy(msg->data, buf, size); // Assumes the buffer is contiguous
2763   return msg;
2764 }
2765
2766 // RDMA version of ampi::generic
2767 void ampi::genericRdma(char* buf, int size, CMK_REFNUM_TYPE seq, int tag, int srcRank, MPI_Comm destcomm, int ssendReq) noexcept
2768 {
2769   MSG_ORDER_DEBUG(
2770     CkPrintf("[%d] in ampi::genericRdma on index %d, size=%d, seq=%d, srcRank=%d, tag=%d, comm=%d, ssendReq=%d\n",
2771              CkMyPe(), getIndexForRank(getRank()), size, seq, srcRank, tag, destcomm, ssendReq);
2772   )
2773
2774   if (seq != 0) {
2775     int seqIdx = srcRank;
2776     int n = oorder.isInOrder(seqIdx, seq);
2777     if (n > 0) { // This message was in-order
2778       inorderRdma(buf, size, seq, tag, srcRank, destcomm, ssendReq);
2779       if (n > 1) { // It enables other, previously out-of-order messages
2780         AmpiMsg *msg = NULL;
2781         while ((msg = oorder.getOutOfOrder(seqIdx)) != 0) {
2782           inorder(msg);
2783         }
2784       }
2785     } else { // This message was out-of-order: stash it (as an AmpiMsg)
2786       AmpiMsg *msg = rdma2AmpiMsg(buf, size, seq, tag, srcRank, ssendReq);
2787       oorder.putOutOfOrder(seqIdx, msg);
2788     }
2789   } else { // Cross-world or system messages are unordered
2790     inorderRdma(buf, size, seq, tag, srcRank, destcomm, ssendReq);
2791   }
2792
2793   resumeThreadIfReady();
2794 }
2795
2796 // RDMA version of ampi::inorder
2797 void ampi::inorderRdma(char* buf, int size, CMK_REFNUM_TYPE seq, int tag, int srcRank,
2798                        MPI_Comm comm, int ssendReq) noexcept
2799 {
2800   MSG_ORDER_DEBUG(
2801     CkPrintf("AMPI vp %d inorderRdma: tag=%d, src=%d, comm=%d  (seq %d)\n",
2802              thisIndex, tag, srcRank, comm, seq);
2803   )
2804
2805   //Check posted recvs:
2806   AmpiRequest* req = postedReqs.get(tag, srcRank);
2807   if (req) { // receive posted
2808     handleBlockedReq(req);
2809     req->receiveRdma(this, buf, size, ssendReq, srcRank, comm);
2810   } else {
2811     AmpiMsg* msg = rdma2AmpiMsg(buf, size, seq, tag, srcRank, ssendReq);
2812     unexpectedMsgs.put(msg);
2813   }
2814 }
2815
2816 // Callback from ampi::genericRdma() signaling that the send buffer is now safe to re-use
2817 void ampi::completedRdmaSend(CkDataMsg *msg) noexcept
2818 {
2819   // refnum is the index into reqList for this SendReq
2820   int reqIdx = CkGetRefNum(msg);
2821
2822   MSG_ORDER_DEBUG(
2823     CkPrintf("[%d] in ampi::completedRdmaSend on index %d, reqIdx = %d\n",
2824              CkMyPe(), parent->thisIndex, reqIdx);
2825   )
2826
2827   AmpiRequestList& reqList = getReqs();
2828   AmpiRequest* sreq = reqList[reqIdx];
2829   sreq->complete = true;
2830
2831   handleBlockedReq(sreq);
2832   resumeThreadIfReady();
2833   // CkDataMsg is allocated & freed by the runtime, so do not delete msg
2834 }
2835
2836 void handle_MPI_BOTTOM(void* &buf, MPI_Datatype type) noexcept
2837 {
2838   if (buf == MPI_BOTTOM) {
2839     buf = (void*)getDDT()->getType(type)->getLB();
2840     getDDT()->getType(type)->setAbsolute(true);
2841   }
2842 }
2843
2844 void handle_MPI_BOTTOM(void* &buf1, MPI_Datatype type1, void* &buf2, MPI_Datatype type2) noexcept
2845 {
2846   if (buf1 == MPI_BOTTOM) {
2847     buf1 = (void*)getDDT()->getType(type1)->getLB();
2848     getDDT()->getType(type1)->setAbsolute(true);
2849   }
2850   if (buf2 == MPI_BOTTOM) {
2851     buf2 = (void*)getDDT()->getType(type2)->getLB();
2852     getDDT()->getType(type2)->setAbsolute(true);
2853   }
2854 }
2855
2856 AmpiMsg *ampi::makeBcastMsg(const void *buf,int count,MPI_Datatype type,int root,MPI_Comm destcomm) noexcept
2857 {
2858   CkDDT_DataType *ddt = getDDT()->getType(type);
2859   int len = ddt->getSize(count);
2860   CMK_REFNUM_TYPE seq = getSeqNo(root, destcomm, MPI_BCAST_TAG);
2861   // Do not use the msg pool for bcasts:
2862   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, MPI_REQUEST_NULL, MPI_BCAST_TAG, root, len);
2863   ddt->serialize((char*)buf, msg->getData(), count, msg->getLength(), PACK);
2864   return msg;
2865 }
2866
2867 AmpiMsg *ampi::makeAmpiMsg(int destRank,int t,int sRank,const void *buf,int count,
2868                            MPI_Datatype type,MPI_Comm destcomm, int ssendReq/*=0*/) noexcept
2869 {
2870   CkDDT_DataType *ddt = getDDT()->getType(type);
2871   int len = ddt->getSize(count);
2872   CMK_REFNUM_TYPE seq = getSeqNo(destRank, destcomm, t);
2873   AmpiMsg *msg = CkpvAccess(msgPool).newAmpiMsg(seq, ssendReq, t, sRank, len);
2874   ddt->serialize((char*)buf, msg->getData(), count, msg->getLength(), PACK);
2875   return msg;
2876 }
2877
2878 MPI_Request ampi::send(int t, int sRank, const void* buf, int count, MPI_Datatype type,
2879                        int rank, MPI_Comm destcomm, int ssendReq/*=0*/, AmpiSendType sendType/*=BLOCKING_SEND*/) noexcept
2880 {
2881 #if CMK_TRACE_IN_CHARM
2882   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
2883 #endif
2884
2885 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2886   MPI_Comm disComm = myComm.getComm();
2887   ampi *dis = getAmpiInstance(disComm);
2888   CpvAccess(_currentObj) = dis;
2889 #endif
2890
2891   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2892   MPI_Request req = delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),ssendReq,sendType);
2893   if (sendType == BLOCKING_SEND && req != MPI_REQUEST_NULL) {
2894     AmpiRequestList& reqList = getReqs();
2895     AmpiRequest *sreq = reqList[req];
2896     sreq->wait(MPI_STATUS_IGNORE);
2897     reqList.free(parent->reqPool, req, parent->getDDT());
2898     req = MPI_REQUEST_NULL;
2899   }
2900
2901 #if CMK_TRACE_IN_CHARM
2902   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
2903 #endif
2904
2905   if (ssendReq == 1) {
2906     // waiting for receiver side
2907     parent->resumeOnRecv = false;            // so no one else awakes it
2908     parent->block();
2909   }
2910
2911   return req;
2912 }
2913
2914 void ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx) noexcept
2915 {
2916   AmpiMsg *msg = new (len, 0) AmpiMsg(0, 0, t, sRank, len);
2917   memcpy(msg->getData(), buf, len);
2918   CProxy_ampi pa(aid);
2919   pa[idx].generic(msg);
2920 }
2921
2922 CMK_REFNUM_TYPE ampi::getSeqNo(int destRank, MPI_Comm destcomm, int tag) noexcept {
2923   int seqIdx = (tag >= MPI_BCAST_TAG) ? COLL_SEQ_IDX : destRank;
2924   CMK_REFNUM_TYPE seq = 0;
2925   if (destcomm<=MPI_COMM_WORLD && tag<=MPI_BCAST_TAG) { //Not cross-module: set seqno
2926     seq = oorder.nextOutgoing(seqIdx);
2927   }
2928   return seq;
2929 }
2930
2931 MPI_Request ampi::sendRdmaMsg(int t, int sRank, const void* buf, int size, MPI_Datatype type, int destIdx,
2932                               int destRank, MPI_Comm destcomm, CProxy_ampi arrProxy, int ssendReq) noexcept
2933 {
2934   CMK_REFNUM_TYPE seq = getSeqNo(destRank, destcomm, t);
2935
2936   if (ssendReq) { // Using a SsendReq to track matching receive, so no need for SendReq here
2937     arrProxy[destIdx].genericRdma(CkSendBuffer(buf), size, seq, t, sRank, destcomm, ssendReq);
2938     return MPI_REQUEST_NULL;
2939   }
2940   else { // Set up a SendReq to track completion of the send buffer
2941     MPI_Request req = postReq(parent->reqPool.newReq<SendReq>(type, destcomm, getDDT()));
2942     CkCallback completedSendCB(CkIndex_ampi::completedRdmaSend(NULL), thisProxy[thisIndex], true/*inline*/);
2943     completedSendCB.setRefnum(req);
2944
2945     arrProxy[destIdx].genericRdma(CkSendBuffer(buf, completedSendCB), size, seq, t, sRank, destcomm, ssendReq);
2946     return req;
2947   }
2948 }
2949
2950 // Call genericRdma inline on the local destination object
2951 MPI_Request ampi::sendLocalMsg(int t, int sRank, const void* buf, int size, MPI_Datatype type, int destRank,
2952                                MPI_Comm destcomm, ampi* destPtr, int ssendReq, AmpiSendType sendType) noexcept
2953 {
2954   CMK_REFNUM_TYPE seq = getSeqNo(destRank, destcomm, t);
2955
2956   destPtr->genericRdma((char*)buf, size, seq, t, sRank, destcomm, ssendReq);
2957
2958   if (ssendReq || sendType == BLOCKING_SEND) {
2959     return MPI_REQUEST_NULL;
2960   }
2961   else { // SendReq is pre-completed since we directly copied the send buffer
2962     return postReq(parent->reqPool.newReq<SendReq>(type, destcomm, getDDT(), AMPI_REQ_COMPLETED));
2963   }
2964 }
2965
2966 MPI_Request ampi::delesend(int t, int sRank, const void* buf, int count, MPI_Datatype type,
2967                            int rank, MPI_Comm destcomm, CProxy_ampi arrProxy, int ssendReq,
2968                            AmpiSendType sendType) noexcept
2969 {
2970   if (rank==MPI_PROC_NULL) return MPI_REQUEST_NULL;
2971   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2972   int destIdx;
2973   if(isInter()){
2974     sRank = thisIndex;
2975     destIdx = dest.getIndexForRemoteRank(rank);
2976     arrProxy = remoteProxy;
2977   } else {
2978     destIdx = dest.getIndexForRank(rank);
2979   }
2980
2981   MSG_ORDER_DEBUG(
2982     CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
2983   )
2984
2985   ampi *destPtr = arrProxy[destIdx].ckLocal();
2986   CkDDT_DataType *ddt = getDDT()->getType(type);
2987   int size = ddt->getSize(count);
2988   if (ddt->isContig()) {
2989 #if AMPI_LOCAL_IMPL
2990     if (destPtr != NULL) {
2991       return sendLocalMsg(t, sRank, buf, size, type, rank, destcomm, destPtr, ssendReq, sendType);
2992     }
2993 #endif
2994 #if AMPI_RDMA_IMPL
2995     if (size >= AMPI_RDMA_THRESHOLD ||
2996        (size >= AMPI_SMP_RDMA_THRESHOLD && destLikelyWithinProcess(arrProxy, destIdx)))
2997     {
2998       return sendRdmaMsg(t, sRank, buf, size, type, destIdx, rank, destcomm, arrProxy, ssendReq);
2999     }
3000 #endif
3001   }
3002 #if AMPI_LOCAL_IMPL
3003   if (destPtr != NULL) {
3004     destPtr->generic(makeAmpiMsg(rank, t, sRank, buf, count, type, destcomm, ssendReq));
3005     return MPI_REQUEST_NULL;
3006   } else
3007 #endif
3008   {
3009     arrProxy[destIdx].generic(makeAmpiMsg(rank, t, sRank, buf, count, type, destcomm, ssendReq));
3010     return MPI_REQUEST_NULL;
3011   }
3012 }
3013
3014 void ampi::processAmpiMsg(AmpiMsg *msg, void* buf, MPI_Datatype type, int count) noexcept
3015 {
3016   int ssendReq = msg->getSsendReq();
3017   if (ssendReq > 0) { // send an ack to sender
3018     int srcRank = msg->getSrcRank();
3019     int srcIdx = getIndexForRank(srcRank);
3020     thisProxy[srcIdx].ssend_ack(ssendReq);
3021   }
3022
3023   CkDDT_DataType *ddt = getDDT()->getType(type);
3024
3025   ddt->serialize((char*)buf, msg->getData(), count, msg->getLength(), UNPACK);
3026 }
3027
3028 // RDMA version of ampi::processAmpiMsg
3029 void ampi::processRdmaMsg(const void *sbuf, int slength, int ssendReq, int srank, void* rbuf,
3030                           int rcount, MPI_Datatype rtype, MPI_Comm comm) noexcept
3031 {
3032   if (ssendReq > 0) { // send an ack to sender
3033     int srcIdx = getIndexForRank(srank);
3034     thisProxy[srcIdx].ssend_ack(ssendReq);
3035   }
3036
3037   CkDDT_DataType *ddt = getDDT()->getType(rtype);
3038
3039   ddt->serialize((char*)rbuf, (char*)sbuf, rcount, slength, UNPACK);
3040 }
3041
3042 void ampi::processRednMsg(CkReductionMsg *msg, void* buf, MPI_Datatype type, int count) noexcept
3043 {
3044   // The first sizeof(AmpiOpHeader) bytes in the redn msg data are reserved
3045   // for an AmpiOpHeader if our custom AmpiReducer type was used.
3046   int szhdr = (msg->getReducer() == AmpiReducer) ? sizeof(AmpiOpHeader) : 0;
3047   getDDT()->getType(type)->serialize((char*)buf, (char*)msg->getData()+szhdr, count, msg->getLength()-szhdr, UNPACK);
3048 }
3049
3050 void ampi::processNoncommutativeRednMsg(CkReductionMsg *msg, void* buf, MPI_Datatype type, int count, MPI_User_function* func) noexcept
3051 {
3052   CkReduction::tupleElement* results = NULL;
3053   int numReductions = 0;
3054   msg->toTuple(&results, &numReductions);
3055
3056   // Contributions are unordered and consist of a (srcRank, data) tuple
3057   char *data           = (char*)(results[1].data);
3058   CkDDT_DataType *ddt  = getDDT()->getType(type);
3059   int contributionSize = ddt->getSize(count);
3060   int commSize         = getSize();
3061
3062   // Store pointers to each contribution's data at index 'srcRank' in contributionData
3063   // If the max rank value fits into an unsigned short int, srcRanks are those, otherwise int's
3064   vector<void *> contributionData(commSize);
3065   if (commSize < std::numeric_limits<unsigned short int>::max()) {
3066     unsigned short int *srcRank = (unsigned short int*)(results[0].data);
3067     for (int i=0; i<commSize; i++) {
3068       contributionData[srcRank[i]] = &data[i * contributionSize];
3069     }
3070   }
3071   else {
3072     int *srcRank = (int*)(results[0].data);
3073     for (int i=0; i<commSize; i++) {
3074       contributionData[srcRank[i]] = &data[i * contributionSize];
3075     }
3076   }
3077
3078   if (ddt->isContig()) {
3079     // Copy rank 0's contribution into buf first
3080     memcpy(buf, contributionData[0], contributionSize);
3081
3082     // Invoke the MPI_User_function on the contributions in 'rank' order
3083     for (int i=1; i<commSize; i++) {
3084       (*func)(contributionData[i], buf, &count, &type);
3085     }
3086   }
3087   else {
3088     int contributionExtent = ddt->getExtent() * count;
3089
3090     // Deserialize rank 0's contribution into buf first
3091     ddt->serialize((char*)contributionData[0], (char*)buf, count, contributionExtent, UNPACK);
3092
3093     // Invoke the MPI_User_function on the deserialized contributions in 'rank' order
3094     vector<char> deserializedBuf(contributionExtent);
3095     for (int i=1; i<commSize; i++) {
3096       ddt->serialize((char*)contributionData[i], deserializedBuf.data(), count, contributionExtent, UNPACK);
3097       (*func)(deserializedBuf.data(), buf, &count, &type);
3098     }
3099   }
3100   delete [] results;
3101 }
3102
3103 void ampi::processGatherMsg(CkReductionMsg *msg, void* buf, MPI_Datatype type, int recvCount) noexcept
3104 {
3105   CkReduction::tupleElement* results = NULL;
3106   int numReductions = 0;
3107   msg->toTuple(&results, &numReductions);
3108   CkAssert(numReductions == 2);
3109
3110   // Re-order the gather data based on the rank of the contributor
3111   char *data             = (char*)(results[1].data);
3112   CkDDT_DataType *ddt    = getDDT()->getType(type);
3113   int contributionSize   = ddt->getSize(recvCount);
3114   int contributionExtent = ddt->getExtent()*recvCount;
3115   int commSize           = getSize();
3116
3117   // If the max rank value fits into an unsigned short int, srcRanks are those, otherwise int's
3118   if (commSize < std::numeric_limits<unsigned short int>::max()) {
3119     unsigned short int *srcRank = (unsigned short int*)(results[0].data);
3120     for (int i=0; i<commSize; i++) {
3121       ddt->serialize(&(((char*)buf)[srcRank[i] * contributionExtent]),
3122                      &data[i * contributionSize],
3123                      recvCount,
3124                      contributionSize,
3125                      UNPACK);
3126     }
3127   }
3128   else {
3129     int *srcRank = (int*)(results[0].data);
3130     for (int i=0; i<commSize; i++) {
3131       ddt->serialize(&(((char*)buf)[srcRank[i] * contributionExtent]),
3132                      &data[i * contributionSize],
3133                      recvCount,
3134                      contributionSize,
3135                      UNPACK);
3136     }
3137   }
3138   delete [] results;
3139 }
3140
3141 void ampi::processGathervMsg(CkReductionMsg *msg, void* buf, MPI_Datatype type,
3142                              int* recvCounts, int* displs) noexcept
3143 {
3144   CkReduction::tupleElement* results = NULL;
3145   int numReductions = 0;
3146   msg->toTuple(&results, &numReductions);
3147   CkAssert(numReductions == 3);
3148
3149   // Re-order the gather data based on the rank of the contributor
3150   int *dataSize          = (int*)(results[1].data);
3151   char *data             = (char*)(results[2].data);
3152   CkDDT_DataType *ddt    = getDDT()->getType(type);
3153   int contributionSize   = ddt->getSize();
3154   int contributionExtent = ddt->getExtent();
3155   int commSize           = getSize();
3156   int currDataOffset     = 0;
3157
3158   // If the max rank value fits into an unsigned short int, srcRanks are those, otherwise int's
3159   if (commSize < std::numeric_limits<unsigned short int>::max()) {
3160     unsigned short int *srcRank = (unsigned short int*)(results[0].data);
3161     for (int i=0; i<commSize; i++) {
3162       ddt->serialize(&((char*)buf)[displs[srcRank[i]] * contributionExtent],
3163                      &data[currDataOffset],
3164                      recvCounts[srcRank[i]],
3165                      contributionSize * recvCounts[srcRank[i]],
3166                      UNPACK);
3167       currDataOffset += dataSize[i];
3168     }
3169   }
3170   else {
3171     int *srcRank = (int*)(results[0].data);
3172     for (int i=0; i<commSize; i++) {
3173       ddt->serialize(&((char*)buf)[displs[srcRank[i]] * contributionExtent],
3174                      &data[currDataOffset],
3175                      recvCounts[srcRank[i]],
3176                      contributionSize * recvCounts[srcRank[i]],
3177                      UNPACK);
3178       currDataOffset += dataSize[i];
3179     }
3180   }
3181   delete [] results;
3182 }
3183
3184 static inline void clearStatus(MPI_Status *sts) noexcept {
3185   if (sts != MPI_STATUS_IGNORE) {
3186     sts->MPI_TAG    = MPI_ANY_TAG;
3187     sts->MPI_SOURCE = MPI_ANY_SOURCE;
3188     sts->MPI_COMM   = MPI_COMM_NULL;
3189     sts->MPI_LENGTH = 0;
3190     sts->MPI_ERROR  = MPI_SUCCESS;
3191     sts->MPI_CANCEL = 0;
3192   }
3193 }
3194
3195 static inline void clearStatus(MPI_Status sts[], int idx) noexcept {
3196   if (sts != MPI_STATUSES_IGNORE) {
3197     clearStatus(&sts[idx]);
3198   }
3199 }
3200
3201 static inline bool handle_MPI_PROC_NULL(int src, MPI_Comm comm, MPI_Status* sts) noexcept
3202 {
3203   if (src == MPI_PROC_NULL) {
3204     clearStatus(sts);
3205     if (sts != MPI_STATUS_IGNORE) sts->MPI_SOURCE = MPI_PROC_NULL;
3206     return true;
3207   }
3208   return false;
3209 }
3210
3211 int ampi::recv(int t, int s, void* buf, int count, MPI_Datatype type, MPI_Comm comm, MPI_Status *sts) noexcept
3212 {
3213   MPI_Comm disComm = myComm.getComm();
3214   if (handle_MPI_PROC_NULL(s, disComm, sts)) return 0;
3215
3216 #if CMK_BIGSIM_CHARM
3217    void *curLog; // store current log in timeline
3218   _TRACE_BG_TLINE_END(&curLog);
3219 #if CMK_TRACE_IN_CHARM
3220   if(CpvAccess(traceOn)) traceSuspend();
3221 #endif
3222 #endif
3223
3224   if (isInter()) {
3225     s = myComm.getIndexForRemoteRank(s);
3226   }
3227
3228   MSG_ORDER_DEBUG(
3229     CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
3230   )
3231
3232   ampi *dis = getAmpiInstance(disComm);
3233   MPI_Status tmpStatus;
3234   AmpiMsg* msg = unexpectedMsgs.get(t, s, (sts == MPI_STATUS_IGNORE) ? (int*)&tmpStatus : (int*)sts);
3235   if (msg) { // the matching message has already arrived
3236     if (sts != MPI_STATUS_IGNORE) {
3237       sts->MPI_SOURCE = msg->getSrcRank();
3238       sts->MPI_TAG    = msg->getTag();
3239       sts->MPI_COMM   = comm;
3240       sts->MPI_LENGTH = msg->getLength();
3241       sts->MPI_CANCEL = 0;
3242     }
3243     processAmpiMsg(msg, buf, type, count);
3244 #if CMK_BIGSIM_CHARM
3245     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
3246     if (msg->eventPe == CkMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
3247 #endif
3248     CkpvAccess(msgPool).deleteAmpiMsg(msg);
3249   }
3250   else { // post a request and block until the matching message arrives
3251     int request = postReq(dis->parent->reqPool.newReq<IReq>(buf, count, type, s, t, comm, getDDT(), AMPI_REQ_BLOCKED));
3252     CkAssert(parent->numBlockedReqs == 0);
3253     parent->numBlockedReqs = 1;
3254     dis = dis->blockOnRecv(); // "dis" is updated in case an ampi thread is migrated while waiting for a message
3255     parent = dis->parent;
3256     AmpiRequestList& reqs = parent->getReqs();
3257     if (sts != MPI_STATUS_IGNORE) {
3258       AmpiRequest& req = *reqs[request];
3259       sts->MPI_SOURCE = req.src;
3260       sts->MPI_TAG    = req.tag;
3261       sts->MPI_COMM   = req.comm;
3262       sts->MPI_LENGTH = req.getNumReceivedBytes(getDDT());
3263       sts->MPI_CANCEL = 0;
3264     }
3265     reqs.freeNonPersReq(request);
3266   }
3267
3268 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
3269   CpvAccess(_currentObj) = dis;
3270   MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
3271 #endif
3272 #if CMK_BIGSIM_CHARM && CMK_TRACE_IN_CHARM
3273   //Due to the reason mentioned the in the else-statement above, we need to
3274   //use "dis" as "this" in the case of migration (or out-of-core execution in BigSim)
3275   if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
3276 #endif
3277
3278   return 0;
3279 }
3280
3281 void ampi::probe(int t, int s, MPI_Comm comm, MPI_Status *sts) noexcept
3282 {
3283   if (handle_MPI_PROC_NULL(s, comm, sts)) return;
3284
3285 #if CMK_BIGSIM_CHARM
3286   void *curLog; // store current log in timeline
3287   _TRACE_BG_TLINE_END(&curLog);
3288 #endif
3289
3290   ampi *dis = getAmpiInstance(comm);
3291   AmpiMsg *msg = NULL;
3292   while(1) {
3293     MPI_Status tmpStatus;
3294     msg = unexpectedMsgs.probe(t, s, (sts == MPI_STATUS_IGNORE) ? (int*)&tmpStatus : (int*)sts);
3295     if (msg) break;
3296     // "dis" is updated in case an ampi thread is migrated while waiting for a message
3297     dis = dis->blockOnRecv();
3298   }
3299
3300   if (sts != MPI_STATUS_IGNORE) {
3301     sts->MPI_SOURCE = msg->getSrcRank();
3302     sts->MPI_TAG    = msg->getTag();
3303     sts->MPI_COMM   = comm;
3304     sts->MPI_LENGTH = msg->getLength();
3305     sts->MPI_CANCEL = 0;
3306   }
3307
3308 #if CMK_BIGSIM_CHARM
3309   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
3310 #endif
3311 }
3312
3313 void ampi::mprobe(int t, int s, MPI_Comm comm, MPI_Status *sts, MPI_Message *message) noexcept
3314 {
3315   if (handle_MPI_PROC_NULL(s, comm, sts)) {
3316     *message = MPI_MESSAGE_NO_PROC;
3317     return;
3318   }
3319
3320 #if CMK_BIGSIM_CHARM
3321   void *curLog; // store current log in timeline
3322   _TRACE_BG_TLINE_END(&curLog);
3323 #endif
3324
3325   ampi *dis = this;
3326   AmpiMsg *msg = NULL;
3327   while(1) {
3328     MPI_Status tmpStatus;
3329     // We call get() rather than probe() here because we want to remove this msg
3330     // from ampi::unexpectedMsgs and then insert it into ampiParent::matchedMsgs
3331     msg = unexpectedMsgs.get(t, s, (sts == MPI_STATUS_IGNORE) ? (int*)&tmpStatus : (int*)sts);
3332     if (msg)
3333       break;
3334     // "dis" is updated in case an ampi thread is migrated while waiting for a message
3335     dis = dis->blockOnRecv();
3336   }
3337
3338   msg->setComm(comm);
3339   *message = parent->putMatchedMsg(msg);
3340
3341   if (sts != MPI_STATUS_IGNORE) {
3342     sts->MPI_SOURCE = msg->getSrcRank();
3343     sts->MPI_TAG    = msg->getTag();
3344     sts->MPI_COMM   = msg->getComm();
3345     sts->MPI_LENGTH = msg->getLength();
3346     sts->MPI_CANCEL = 0;
3347   }
3348
3349 #if CMK_BIGSIM_CHARM
3350   _TRACE_BG_SET_INFO((char *)msg, "MPROBE_RESUME",  &curLog, 1);
3351 #endif
3352 }
3353
3354 int ampi::iprobe(int t, int s, MPI_Comm comm, MPI_Status *sts) noexcept
3355 {
3356   if (handle_MPI_PROC_NULL(s, comm, sts)) return 1;
3357
3358   MPI_Status tmpStatus;
3359   AmpiMsg* msg = unexpectedMsgs.probe(t, s, (sts == MPI_STATUS_IGNORE) ? (int*)&tmpStatus : (int*)sts);
3360   if (msg) {
3361     msg->setComm(comm);
3362     if (sts != MPI_STATUS_IGNORE) {
3363       sts->MPI_SOURCE = msg->getSrcRank();
3364       sts->MPI_TAG    = msg->getTag();
3365       sts->MPI_COMM   = msg->getComm();
3366       sts->MPI_LENGTH = msg->getLength();
3367       sts->MPI_CANCEL = 0;
3368     }
3369     return 1;
3370   }
3371 #if CMK_BIGSIM_CHARM
3372   void *curLog; // store current log in timeline
3373   _TRACE_BG_TLINE_END(&curLog);
3374 #endif
3375   thread->schedule();
3376 #if CMK_BIGSIM_CHARM
3377   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
3378 #endif
3379   return 0;
3380 }
3381
3382 int ampi::improbe(int tag, int source, MPI_Comm comm, MPI_Status *sts,
3383                   MPI_Message *message) noexcept
3384 {
3385   if (handle_MPI_PROC_NULL(source, comm, sts)) {
3386     *message = MPI_MESSAGE_NO_PROC;
3387     return 1;
3388   }
3389
3390   MPI_Status tmpStatus;
3391   // We call get() rather than probe() here because we want to remove this msg
3392   // from ampi::unexpectedMsgs and then insert it into ampiParent::matchedMsgs
3393   AmpiMsg* msg = unexpectedMsgs.get(tag, source, (sts == MPI_STATUS_IGNORE) ? (int*)&tmpStatus : (int*)sts);
3394   if (msg) {
3395     msg->setComm(comm);
3396     *message = parent->putMatchedMsg(msg);
3397     if (sts != MPI_STATUS_IGNORE) {
3398       sts->MPI_SOURCE = msg->getSrcRank();
3399       sts->MPI_TAG    = msg->getTag();
3400       sts->MPI_COMM   = comm;
3401       sts->MPI_LENGTH = msg->getLength();
3402       sts->MPI_CANCEL = 0;
3403     }
3404     return 1;
3405   }
3406
3407 #if CMK_BIGSIM_CHARM
3408   void *curLog; // store current log in timeline
3409   _TRACE_BG_TLINE_END(&curLog);
3410 #endif
3411   thread->schedule();
3412 #if CMK_BIGSIM_CHARM
3413   _TRACE_BG_SET_INFO(NULL, "IMPROBE_RESUME",  &curLog, 1);
3414 #endif
3415   return 0;
3416 }
3417
3418 void ampi::bcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm destcomm) noexcept
3419 {
3420   MPI_Request req;
3421
3422   if (root==getRank()) {
3423 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
3424     CpvAccess(_currentObj) = this;
3425 #endif
3426     irecvBcast(buf, count, type, root, destcomm, &req);
3427     thisProxy.bcastResult(makeBcastMsg(buf, count, type, root, destcomm));
3428   }
3429   else { // Non-root ranks need to increment the outgoing sequence number for collectives
3430     oorder.incCollSeqOutgoing();
3431     irecvBcast(buf, count, type, root, destcomm, &req);
3432   }
3433
3434   MPI_Wait(&req, MPI_STATUS_IGNORE);
3435 }
3436
3437 int ampi::intercomm_bcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm intercomm) noexcept
3438 {
3439   if (root==MPI_ROOT) {
3440 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
3441     CpvAccess(_currentObj) = this;
3442 #endif
3443     remoteProxy.bcastResult(makeBcastMsg(buf, count, type, getRank(), intercomm));
3444   }
3445   else { // Non-root ranks need to increment the outgoing sequence number for collectives
3446     oorder.incCollSeqOutgoing();
3447   }
3448
3449   if (root!=MPI_PROC_NULL && root!=MPI_ROOT) {
3450     // remote group ranks
3451     MPI_Request req;
3452     irecvBcast(buf, count, type, root, intercomm, &req);
3453     MPI_Wait(&req, MPI_STATUS_IGNORE);
3454   }
3455   return MPI_SUCCESS;
3456 }
3457
3458 void ampi::ibcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm destcomm, MPI_Request* request) noexcept
3459 {
3460   if (root==getRank()) {
3461 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
3462     CpvAccess(_currentObj) = this;
3463 #endif
3464     thisProxy.bcastResult(makeBcastMsg(buf, count, type, getRank(), destcomm));
3465   }
3466   else { // Non-root ranks need to increment the outgoing sequence number for collectives
3467     oorder.incCollSeqOutgoing();
3468   }
3469
3470   // call irecv to post an IReq and check for any pending messages
3471   irecvBcast(buf, count, type, root, destcomm, request);
3472 }
3473
3474 int ampi::intercomm_ibcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm intercomm, MPI_Request *request) noexcept
3475 {
3476   if (root==MPI_ROOT) {
3477 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
3478     CpvAccess(_currentObj) = this;
3479 #endif
3480     remoteProxy.bcastResult(makeBcastMsg(buf, count, type, getRank(), intercomm));
3481   }
3482   else { // Non-root ranks need to increment the outgoing sequence number for collectives
3483     oorder.incCollSeqOutgoing();
3484   }
3485
3486   if (root!=MPI_PROC_NULL && root!=MPI_ROOT) {
3487     // call irecv to post IReq and process pending messages
3488     irecvBcast(buf, count, type, root, intercomm, request);
3489   }
3490   return MPI_SUCCESS;
3491 }
3492
3493 void ampi::bcastraw(void* buf, int len, CkArrayID aid) noexcept
3494 {
3495   AmpiMsg *msg = new (len, 0) AmpiMsg(0, 0, MPI_BCAST_TAG, 0, len);
3496   memcpy(msg->getData(), buf, len);
3497   CProxy_ampi pa(aid);
3498   pa.generic(msg);
3499 }
3500
3501 int ampi::intercomm_scatter(int root, const void *sendbuf, int sendcount, MPI_Datatype sendtype,
3502                             void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm intercomm) noexcept
3503 {
3504   if (root == MPI_ROOT) {
3505     int remote_size = getRemoteIndices().size();
3506
3507     CkDDT_DataType* dttype = getDDT()->getType(sendtype) ;
3508     int itemsize = dttype->getSize(sendcount) ;
3509     for(int i = 0; i < remote_size; i++) {
3510         send(MPI_SCATTER_TAG, getRank(), ((char*)sendbuf)+(itemsize*i),
3511              sendcount, sendtype, i, intercomm);
3512     }
3513   }
3514
3515   if (root!=MPI_PROC_NULL && root!=MPI_ROOT) { //remote group ranks
3516     if(-1==recv(MPI_SCATTER_TAG, root, recvbuf, recvcount, recvtype, intercomm))
3517       CkAbort("AMPI> Error in intercomm MPI_Scatter recv");
3518   }
3519
3520   return MPI_SUCCESS;
3521 }
3522
3523 int ampi::intercomm_iscatter(int root, const void *sendbuf, int sendcount, MPI_Datatype sendtype,
3524                              void *recvbuf, int recvcount, MPI_Datatype recvtype,
3525                              MPI_Comm intercomm, MPI_Request *request) noexcept
3526 {
3527   if (root == MPI_ROOT) {
3528     int remote_size = getRemoteIndices().size();
3529
3530     CkDDT_DataType* dttype = getDDT()->getType(sendtype) ;
3531     int itemsize = dttype->getSize(sendcount) ;
3532     // use an ATAReq to non-block the caller and get a request ptr
3533     ATAReq *newreq = new ATAReq(remote_size);
3534     for(int i = 0; i < remote_size; i++) {
3535       newreq->reqs[i] = send(MPI_SCATTER_TAG, getRank(), ((char*)sendbuf)+(itemsize*i),
3536                              sendcount, sendtype, i, intercomm, 0, I_SEND);
3537     }
3538     *request = postReq(newreq);
3539   }
3540
3541   if (root!=MPI_PROC_NULL && root!=MPI_ROOT) { //remote group ranks
3542     // call irecv to post an IReq and process any pending messages
3543     irecv(recvbuf,recvcount,recvtype,root,MPI_SCATTER_TAG,intercomm,request);
3544   }
3545
3546   return MPI_SUCCESS;
3547 }
3548
3549 int ampi::intercomm_scatterv(int root, const void* sendbuf, const int* sendcounts, const int* displs,
3550                              MPI_Datatype sendtype, void* recvbuf, int recvcount,
3551                              MPI_Datatype recvtype, MPI_Comm intercomm) noexcept
3552 {
3553   if (root == MPI_ROOT) {
3554     int remote_size = getRemoteIndices().size();
3555
3556     CkDDT_DataType* dttype = getDDT()->getType(sendtype);
3557     int itemsize = dttype->getSize();
3558     for (int i = 0; i < remote_size; i++) {
3559         send(MPI_SCATTER_TAG, getRank(), ((char*)sendbuf)+(itemsize*displs[i]),
3560              sendcounts[i], sendtype, i, intercomm);
3561     }
3562   }
3563
3564   if (root != MPI_PROC_NULL && root != MPI_ROOT) { // remote group ranks
3565     if (-1 == recv(MPI_SCATTER_TAG, root, recvbuf, recvcount, recvtype, intercomm))
3566       CkAbort("AMPI> Error in intercomm MPI_Scatterv recv");
3567   }
3568
3569   return MPI_SUCCESS;
3570 }
3571
3572 int ampi::intercomm_iscatterv(int root, const void* sendbuf, const int* sendcounts, const int* displs,
3573                               MPI_Datatype sendtype, void* recvbuf, int recvcount,
3574                               MPI_Datatype recvtype, MPI_Comm intercomm, MPI_Request* request) noexcept
3575 {
3576   if (root == MPI_ROOT) {
3577     int remote_size = getRemoteIndices().size();
3578
3579     CkDDT_DataType* dttype = getDDT()->getType(sendtype);
3580     int itemsize = dttype->getSize();
3581     // use an ATAReq to non-block the caller and get a request ptr
3582     ATAReq *newreq = new ATAReq(remote_size);
3583     for (int i = 0; i < remote_size; i++) {
3584       newreq->reqs[i] = send(MPI_SCATTER_TAG, getRank(), ((char*)sendbuf)+(itemsize*displs[i]),
3585                              sendcounts[i], sendtype, i, intercomm, 0, I_SEND);
3586     }
3587     *request = postReq(newreq);
3588   }
3589
3590   if (root != MPI_PROC_NULL && root != MPI_ROOT) { // remote group ranks
3591     // call irecv to post an IReq and process any pending messages
3592     irecv(recvbuf, recvcount, recvtype, root, MPI_SCATTER_TAG, intercomm, request);
3593   }
3594
3595   return MPI_SUCCESS;
3596 }
3597
3598 int MPI_comm_null_copy_fn(MPI_Comm comm, int keyval, void *extra_state,
3599                           void *attr_in, void *attr_out, int *flag){
3600   (*flag) = 0;
3601   return (MPI_SUCCESS);
3602 }
3603
3604 int MPI_comm_dup_fn(MPI_Comm comm, int keyval, void *extra_state,
3605                     void *attr_in, void *attr_out, int *flag){
3606   (*(void **)attr_out) = attr_in;
3607   (*flag) = 1;
3608   return (MPI_SUCCESS);
3609 }
3610
3611 int MPI_comm_null_delete_fn(MPI_Comm comm, int keyval, void *attr, void *extra_state){
3612   return (MPI_SUCCESS);
3613 }
3614
3615 int MPI_type_null_copy_fn(MPI_Datatype type, int keyval, void *extra_state,
3616                           void *attr_in, void *attr_out, int *flag){
3617   (*flag) = 0;
3618   return (MPI_SUCCESS);
3619 }
3620
3621 int MPI_type_dup_fn(MPI_Datatype type, int keyval, void *extra_state,
3622                     void *attr_in, void *attr_out, int *flag){
3623   (*(void **)attr_out) = attr_in;
3624   (*flag) = 1;
3625   return (MPI_SUCCESS);
3626 }
3627
3628 int MPI_type_null_delete_fn(MPI_Datatype type, int keyval, void *attr, void *extra_state){
3629   return (MPI_SUCCESS);
3630 }
3631
3632 void AmpiSeqQ::pup(PUP::er &p) noexcept {