51c1973dd71652e59c36f00b62f7528e72ad0e33
[charm.git] / src / libs / ck-libs / ampi / ampi.C
1 #ifndef AMPI_PRINT_MSG_SIZES
2 #define AMPI_PRINT_MSG_SIZES 0 // Record and print comm routines used & message sizes
3 #endif
4
5 #define AMPIMSGLOG    0
6 #define AMPI_PRINT_IDLE 0
7
8 #include "ampiimpl.h"
9 #include "tcharm.h"
10
11 #if CMK_BIGSIM_CHARM
12 #include "bigsim_logs.h"
13 #endif
14
15 #if CMK_TRACE_ENABLED
16 #include "register.h" // for _chareTable, _entryTable
17 #endif
18
19 // Default is to abort on error, but users can build
20 // AMPI with -DAMPI_ERRHANDLER_RETURN=1 to change it:
21 #if AMPI_ERRHANDLER_RETURN
22 #define AMPI_ERRHANDLER MPI_ERRORS_RETURN
23 #else
24 #define AMPI_ERRHANDLER MPI_ERRORS_ARE_FATAL
25 #endif
26
27 /* change this define to "x" to trace all send/recv's */
28 #define MSG_ORDER_DEBUG(x) //x /* empty */
29 /* change this define to "x" to trace user calls */
30 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl;
31 #define STARTUP_DEBUG(x) //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl;
32 #define FUNCCALL_DEBUG(x) //x /* empty */
33
34 /* For MPI_Get_library_version */
35 extern const char * const CmiCommitID;
36
37 static CkDDT *getDDT() noexcept {
38   return &getAmpiParent()->myDDT;
39 }
40
41 /* if error checking is disabled, ampiErrhandler is defined as a macro in ampiimpl.h */
42 #if AMPI_ERROR_CHECKING
43 int ampiErrhandler(const char* func, int errcode) noexcept {
44   if (AMPI_ERRHANDLER == MPI_ERRORS_ARE_FATAL && errcode != MPI_SUCCESS) {
45     // Abort with a nice message of the form: 'func' failed with error code 'errstr'.
46     //  where 'func' is the name of the failed AMPI_ function and 'errstr'
47     //  is the string returned by AMPI_Error_string for errcode.
48     int funclen = strlen(func);
49     const char* filler = " failed with error code ";
50     int fillerlen = strlen(filler);
51     int errstrlen;
52     char errstr[MPI_MAX_ERROR_STRING];
53     MPI_Error_string(errcode, errstr, &errstrlen);
54     vector<char> str(funclen + fillerlen + errstrlen);
55     strcpy(str.data(), func);
56     strcat(str.data(), filler);
57     strcat(str.data(), errstr);
58     CkAbort(str.data());
59   }
60   return errcode;
61 }
62 #endif
63
64 #if AMPI_PRINT_MSG_SIZES
65 #if !AMPI_ERROR_CHECKING
66 #error "AMPI_PRINT_MSG_SIZES requires AMPI error checking to be enabled!\n"
67 #endif
68 #include <string>
69 #include <sstream>
70 #include "ckliststring.h"
71 CkpvDeclare(CkListString, msgSizesRanks);
72
73 bool ampiParent::isRankRecordingMsgSizes() noexcept {
74   return (!CkpvAccess(msgSizesRanks).isEmpty() && CkpvAccess(msgSizesRanks).includes(thisIndex));
75 }
76
77 void ampiParent::recordMsgSize(const char* func, int msgSize) noexcept {
78   if (isRankRecordingMsgSizes()) {
79     msgSizes[func][msgSize]++;
80   }
81 }
82
83 typedef std::unordered_map<std::string, std::map<int, int> >::iterator outer_itr_t;
84 typedef std::map<int, int>::iterator inner_itr_t;
85
86 void ampiParent::printMsgSizes() noexcept {
87   if (isRankRecordingMsgSizes()) {
88     // Prints msgSizes in the form: "AMPI_Routine: [ (num_msgs: msg_size) ... ]".
89     // Each routine has its messages sorted by size, smallest to largest.
90     std::stringstream ss;
91     ss << std::endl << "Rank " << thisIndex << ":" << std::endl;
92     for (outer_itr_t i = msgSizes.begin(); i != msgSizes.end(); ++i) {
93       ss << i->first << ": [ ";
94       for (inner_itr_t j = i->second.begin(); j != i->second.end(); ++j) {
95         ss << "(" << j->second << ": " << j->first << " B) ";
96       }
97       ss << "]" << std::endl;
98     }
99     CkPrintf("%s", ss.str().c_str());
100   }
101 }
102 #endif //AMPI_PRINT_MSG_SIZES
103
104 inline int checkCommunicator(const char* func, MPI_Comm comm) noexcept {
105   if (comm == MPI_COMM_NULL)
106     return ampiErrhandler(func, MPI_ERR_COMM);
107   return MPI_SUCCESS;
108 }
109
110 inline int checkCount(const char* func, int count) noexcept {
111   if (count < 0)
112     return ampiErrhandler(func, MPI_ERR_COUNT);
113   return MPI_SUCCESS;
114 }
115
116 inline int checkData(const char* func, MPI_Datatype data) noexcept {
117   if (data == MPI_DATATYPE_NULL)
118     return ampiErrhandler(func, MPI_ERR_TYPE);
119   return MPI_SUCCESS;
120 }
121
122 inline int checkTag(const char* func, int tag) noexcept {
123   if (tag != MPI_ANY_TAG && (tag < 0 || tag > MPI_TAG_UB_VALUE))
124     return ampiErrhandler(func, MPI_ERR_TAG);
125   return MPI_SUCCESS;
126 }
127
128 inline int checkRank(const char* func, int rank, MPI_Comm comm) noexcept {
129   int size = (comm == MPI_COMM_NULL) ? 0 : getAmpiInstance(comm)->getSize();
130   if (((rank >= 0) && (rank < size)) ||
131       (rank == MPI_ANY_SOURCE)       ||
132       (rank == MPI_PROC_NULL)        ||
133       (rank == MPI_ROOT))
134     return MPI_SUCCESS;
135   return ampiErrhandler(func, MPI_ERR_RANK);
136 }
137
138 inline int checkBuf(const char* func, const void *buf, int count) noexcept {
139   if ((count != 0 && buf == NULL) || buf == MPI_IN_PLACE)
140     return ampiErrhandler(func, MPI_ERR_BUFFER);
141   return MPI_SUCCESS;
142 }
143
144 int errorCheck(const char* func, MPI_Comm comm, bool ifComm, int count,
145                bool ifCount, MPI_Datatype data, bool ifData, int tag,
146                bool ifTag, int rank, bool ifRank, const void *buf1,
147                bool ifBuf1, const void *buf2=nullptr, bool ifBuf2=false) noexcept {
148   int ret;
149   if (ifComm) {
150     ret = checkCommunicator(func, comm);
151     if (ret != MPI_SUCCESS)
152       return ampiErrhandler(func, ret);
153   }
154   if (ifCount) {
155     ret = checkCount(func, count);
156     if (ret != MPI_SUCCESS)
157       return ampiErrhandler(func, ret);
158   }
159   if (ifData) {
160     ret = checkData(func, data);
161     if (ret != MPI_SUCCESS)
162       return ampiErrhandler(func, ret);
163   }
164   if (ifTag) {
165     ret = checkTag(func, tag);
166     if (ret != MPI_SUCCESS)
167       return ampiErrhandler(func, ret);
168   }
169   if (ifRank) {
170     ret = checkRank(func, rank, comm);
171     if (ret != MPI_SUCCESS)
172       return ampiErrhandler(func, ret);
173   }
174   if (ifBuf1 && ifData) {
175     ret = checkBuf(func, buf1, count*getDDT()->getSize(data));
176     if (ret != MPI_SUCCESS)
177       return ampiErrhandler(func, ret);
178   }
179   if (ifBuf2 && ifData) {
180     ret = checkBuf(func, buf2, count*getDDT()->getSize(data));
181     if (ret != MPI_SUCCESS)
182       return ampiErrhandler(func, ret);
183   }
184 #if AMPI_PRINT_MSG_SIZES
185   getAmpiParent()->recordMsgSize(func, getDDT()->getSize(data) * count);
186 #endif
187   return MPI_SUCCESS;
188 }
189
190 //------------- startup -------------
191 static mpi_comm_worlds mpi_worlds;
192
193 int _mpi_nworlds; /*Accessed by ampif*/
194 int MPI_COMM_UNIVERSE[MPI_MAX_COMM_WORLDS]; /*Accessed by user code*/
195
196 class AmpiComplex {
197  public:
198   float re, im;
199   void operator+=(const AmpiComplex &a) noexcept {
200     re+=a.re;
201     im+=a.im;
202   }
203   void operator*=(const AmpiComplex &a) noexcept {
204     float nu_re=re*a.re-im*a.im;
205     im=re*a.im+im*a.re;
206     re=nu_re;
207   }
208   int operator>(const AmpiComplex &a) noexcept {
209     CkAbort("AMPI> Cannot compare complex numbers with MPI_MAX\n");
210     return 0;
211   }
212   int operator<(const AmpiComplex &a) noexcept {
213     CkAbort("AMPI> Cannot compare complex numbers with MPI_MIN\n");
214     return 0;
215   }
216 };
217
218 class AmpiDoubleComplex {
219  public:
220   double re, im;
221   void operator+=(const AmpiDoubleComplex &a) noexcept {
222     re+=a.re;
223     im+=a.im;
224   }
225   void operator*=(const AmpiDoubleComplex &a) noexcept {
226     double nu_re=re*a.re-im*a.im;
227     im=re*a.im+im*a.re;
228     re=nu_re;
229   }
230   int operator>(const AmpiDoubleComplex &a) noexcept {
231     CkAbort("AMPI> Cannot compare double complex numbers with MPI_MAX\n");
232     return 0;
233   }
234   int operator<(const AmpiDoubleComplex &a) noexcept {
235     CkAbort("AMPI> Cannot compare double complex numbers with MPI_MIN\n");
236     return 0;
237   }
238 };
239
240 class AmpiLongDoubleComplex {
241  public:
242   long double re, im;
243   void operator+=(const AmpiLongDoubleComplex &a) noexcept {
244     re+=a.re;
245     im+=a.im;
246   }
247   void operator*=(const AmpiLongDoubleComplex &a) noexcept {
248     long double nu_re=re*a.re-im*a.im;
249     im=re*a.im+im*a.re;
250     re=nu_re;
251   }
252   int operator>(const AmpiLongDoubleComplex &a) noexcept {
253     CkAbort("AMPI> Cannot compare long double complex numbers with MPI_MAX\n");
254     return 0;
255   }
256   int operator<(const AmpiLongDoubleComplex &a) noexcept {
257     CkAbort("AMPI> Cannot compare long double complex numbers with MPI_MIN\n");
258     return 0;
259   }
260 };
261
262 typedef struct { float val; int idx; } FloatInt;
263 typedef struct { double val; int idx; } DoubleInt;
264 typedef struct { long val; int idx; } LongInt;
265 typedef struct { int val; int idx; } IntInt;
266 typedef struct { short val; int idx; } ShortInt;
267 typedef struct { long double val; int idx; } LongdoubleInt;
268 typedef struct { float val; float idx; } FloatFloat;
269 typedef struct { double val; double idx; } DoubleDouble;
270
271 /* For MPI_MAX, MPI_MIN, MPI_SUM, and MPI_PROD: */
272 #define MPI_OP_SWITCH(OPNAME) \
273   int i; \
274 switch (*datatype) { \
275   case MPI_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
276   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed short int); } break; \
277   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed int); } break; \
278   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long); } break; \
279   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
280   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
281   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
282   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long); } break; \
283   case MPI_FLOAT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(float); } break; \
284   case MPI_DOUBLE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(double); } break; \
285   case MPI_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
286   case MPI_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiDoubleComplex); } break; \
287   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long long); } break; \
288   case MPI_SIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed char); } break; \
289   case MPI_UNSIGNED_LONG_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long long); } break; \
290   case MPI_WCHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(wchar_t); } break; \
291   case MPI_INT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int8_t); } break; \
292   case MPI_INT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int16_t); } break; \
293   case MPI_INT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int32_t); } break; \
294   case MPI_INT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int64_t); } break; \
295   case MPI_UINT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint8_t); } break; \
296   case MPI_UINT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint16_t); } break; \
297   case MPI_UINT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint32_t); } break; \
298   case MPI_UINT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint64_t); } break; \
299   case MPI_FLOAT_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiComplex); } break; \
300   case MPI_LONG_DOUBLE_COMPLEX: for(i=0;i<(*len);i++) { MPI_OP_IMPL(AmpiLongDoubleComplex); } break; \
301   case MPI_AINT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(MPI_Aint); } break; \
302   default: \
303            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
304   CkAbort("Unsupported MPI datatype for MPI Op"); \
305 };\
306
307 /* For MPI_LAND, MPI_LOR, and MPI_LXOR: */
308 #define MPI_LOGICAL_OP_SWITCH(OPNAME) \
309   int i; \
310 switch (*datatype) { \
311   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed short int); } break; \
312   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed int); } break; \
313   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long); } break; \
314   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
315   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
316   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
317   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long); } break; \
318   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long long); } break; \
319   case MPI_SIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed char); } break; \
320   case MPI_UNSIGNED_LONG_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long long); } break; \
321   case MPI_INT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int8_t); } break; \
322   case MPI_INT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int16_t); } break; \
323   case MPI_INT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int32_t); } break; \
324   case MPI_INT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int64_t); } break; \
325   case MPI_UINT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint8_t); } break; \
326   case MPI_UINT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint16_t); } break; \
327   case MPI_UINT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint32_t); } break; \
328   case MPI_UINT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint64_t); } break; \
329   case MPI_LOGICAL: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int); } break; \
330   case MPI_C_BOOL: for(i=0;i<(*len);i++) { MPI_OP_IMPL(bool); } break; \
331   case MPI_AINT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(MPI_Aint); } break; \
332   default: \
333            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
334   CkAbort("Unsupported MPI datatype for MPI Op"); \
335 };\
336
337 /* For MPI_BAND, MPI_BOR, and MPI_BXOR: */
338 #define MPI_BITWISE_OP_SWITCH(OPNAME) \
339   int i; \
340 switch (*datatype) { \
341   case MPI_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed short int); } break; \
342   case MPI_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed int); } break; \
343   case MPI_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long); } break; \
344   case MPI_UNSIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned char); } break; \
345   case MPI_UNSIGNED_SHORT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned short); } break; \
346   case MPI_UNSIGNED: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned int); } break; \
347   case MPI_UNSIGNED_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long); } break; \
348   case MPI_LONG_LONG_INT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed long long); } break; \
349   case MPI_SIGNED_CHAR: for(i=0;i<(*len);i++) { MPI_OP_IMPL(signed char); } break; \
350   case MPI_UNSIGNED_LONG_LONG: for(i=0;i<(*len);i++) { MPI_OP_IMPL(unsigned long long); } break; \
351   case MPI_INT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int8_t); } break; \
352   case MPI_INT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int16_t); } break; \
353   case MPI_INT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int32_t); } break; \
354   case MPI_INT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(int64_t); } break; \
355   case MPI_UINT8_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint8_t); } break; \
356   case MPI_UINT16_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint16_t); } break; \
357   case MPI_UINT32_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint32_t); } break; \
358   case MPI_UINT64_T: for(i=0;i<(*len);i++) { MPI_OP_IMPL(uint64_t); } break; \
359   case MPI_BYTE: for(i=0;i<(*len);i++) { MPI_OP_IMPL(char); } break; \
360   case MPI_AINT: for(i=0;i<(*len);i++) { MPI_OP_IMPL(MPI_Aint); } break; \
361   default: \
362            ckerr << "Type " << *datatype << " with Op "#OPNAME" not supported." << endl; \
363   CkAbort("Unsupported MPI datatype for MPI Op"); \
364 };\
365
366 void MPI_MAX_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
367 #define MPI_OP_IMPL(type) \
368   if(((type *)invec)[i] > ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
369   MPI_OP_SWITCH(MPI_MAX)
370 #undef MPI_OP_IMPL
371 }
372
373 void MPI_MIN_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
374 #define MPI_OP_IMPL(type) \
375   if(((type *)invec)[i] < ((type *)inoutvec)[i]) ((type *)inoutvec)[i] = ((type *)invec)[i];
376   MPI_OP_SWITCH(MPI_MIN)
377 #undef MPI_OP_IMPL
378 }
379
380 void MPI_SUM_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
381 #define MPI_OP_IMPL(type) \
382   ((type *)inoutvec)[i] += ((type *)invec)[i];
383   MPI_OP_SWITCH(MPI_SUM)
384 #undef MPI_OP_IMPL
385 }
386
387 void MPI_PROD_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
388 #define MPI_OP_IMPL(type) \
389   ((type *)inoutvec)[i] *= ((type *)invec)[i];
390   MPI_OP_SWITCH(MPI_PROD)
391 #undef MPI_OP_IMPL
392 }
393
394 void MPI_REPLACE_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
395 #define MPI_OP_IMPL(type) \
396   ((type *)inoutvec)[i] = ((type *)invec)[i];
397   MPI_OP_SWITCH(MPI_REPLACE)
398 #undef MPI_OP_IMPL
399 }
400
401 void MPI_NO_OP_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
402   /* no-op */
403 }
404
405 void MPI_LAND_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
406 #define MPI_OP_IMPL(type) \
407   ((type *)inoutvec)[i] = ((type *)inoutvec)[i] && ((type *)invec)[i];
408   MPI_LOGICAL_OP_SWITCH(MPI_LAND)
409 #undef MPI_OP_IMPL
410 }
411
412 void MPI_BAND_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
413 #define MPI_OP_IMPL(type) \
414   ((type *)inoutvec)[i] = ((type *)inoutvec)[i] & ((type *)invec)[i];
415   MPI_BITWISE_OP_SWITCH(MPI_BAND)
416 #undef MPI_OP_IMPL
417 }
418
419 void MPI_LOR_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
420 #define MPI_OP_IMPL(type) \
421   ((type *)inoutvec)[i] = ((type *)inoutvec)[i] || ((type *)invec)[i];
422   MPI_LOGICAL_OP_SWITCH(MPI_LOR)
423 #undef MPI_OP_IMPL
424 }
425
426 void MPI_BOR_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
427 #define MPI_OP_IMPL(type) \
428   ((type *)inoutvec)[i] = ((type *)inoutvec)[i] | ((type *)invec)[i];
429   MPI_BITWISE_OP_SWITCH(MPI_BOR)
430 #undef MPI_OP_IMPL
431 }
432
433 void MPI_LXOR_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
434 #define MPI_OP_IMPL(type) \
435   ((type *)inoutvec)[i] = (((type *)inoutvec)[i]&&(!((type *)invec)[i]))||(!(((type *)inoutvec)[i])&&((type *)invec)[i]);
436   MPI_LOGICAL_OP_SWITCH(MPI_LXOR)
437 #undef MPI_OP_IMPL
438 }
439
440 void MPI_BXOR_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
441 #define MPI_OP_IMPL(type) \
442   ((type *)inoutvec)[i] = ((type *)inoutvec)[i] ^ ((type *)invec)[i];
443   MPI_BITWISE_OP_SWITCH(MPI_BXOR)
444 #undef MPI_OP_IMPL
445 }
446
447 #ifndef MIN
448 #define MIN(a,b) (a < b ? a : b)
449 #endif
450
451 void MPI_MAXLOC_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
452   int i;
453
454   switch (*datatype) {
455     case MPI_FLOAT_INT:
456       for(i=0;i<(*len);i++){
457         if(((FloatInt *)invec)[i].val > ((FloatInt *)inoutvec)[i].val)
458           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
459         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
460           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
461       }
462       break;
463     case MPI_DOUBLE_INT:
464       for(i=0;i<(*len);i++){
465         if(((DoubleInt *)invec)[i].val > ((DoubleInt *)inoutvec)[i].val)
466           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
467         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
468           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
469       }
470       break;
471     case MPI_LONG_INT:
472       for(i=0;i<(*len);i++){
473         if(((LongInt *)invec)[i].val > ((LongInt *)inoutvec)[i].val)
474           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
475         else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
476           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
477       }
478       break;
479     case MPI_2INT:
480       for(i=0;i<(*len);i++){
481         if(((IntInt *)invec)[i].val > ((IntInt *)inoutvec)[i].val)
482           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
483         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
484           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
485       }
486       break;
487     case MPI_SHORT_INT:
488       for(i=0;i<(*len);i++){
489         if(((ShortInt *)invec)[i].val > ((ShortInt *)inoutvec)[i].val)
490           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
491         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
492           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
493       }
494       break;
495     case MPI_LONG_DOUBLE_INT:
496       for(i=0;i<(*len);i++){
497         if(((LongdoubleInt *)invec)[i].val > ((LongdoubleInt *)inoutvec)[i].val)
498           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
499         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
500           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
501       }
502       break;
503     case MPI_2FLOAT:
504       for(i=0;i<(*len);i++){
505         if(((FloatFloat *)invec)[i].val > ((FloatFloat *)inoutvec)[i].val)
506           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
507         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
508           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
509       }
510       break;
511     case MPI_2DOUBLE:
512       for(i=0;i<(*len);i++){
513         if(((DoubleDouble *)invec)[i].val > ((DoubleDouble *)inoutvec)[i].val)
514           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
515         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
516           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
517       }
518       break;
519     default:
520       ckerr << "Type " << *datatype << " with Op MPI_MAXLOC not supported." << endl;
521       CkAbort("exiting");
522   }
523 }
524
525 void MPI_MINLOC_USER_FN( void *invec, void *inoutvec, int *len, MPI_Datatype *datatype){
526   int i;
527   switch (*datatype) {
528     case MPI_FLOAT_INT:
529       for(i=0;i<(*len);i++){
530         if(((FloatInt *)invec)[i].val < ((FloatInt *)inoutvec)[i].val)
531           ((FloatInt *)inoutvec)[i] = ((FloatInt *)invec)[i];
532         else if(((FloatInt *)invec)[i].val == ((FloatInt *)inoutvec)[i].val)
533           ((FloatInt *)inoutvec)[i].idx = MIN(((FloatInt *)inoutvec)[i].idx, ((FloatInt *)invec)[i].idx);
534       }
535       break;
536     case MPI_DOUBLE_INT:
537       for(i=0;i<(*len);i++){
538         if(((DoubleInt *)invec)[i].val < ((DoubleInt *)inoutvec)[i].val)
539           ((DoubleInt *)inoutvec)[i] = ((DoubleInt *)invec)[i];
540         else if(((DoubleInt *)invec)[i].val == ((DoubleInt *)inoutvec)[i].val)
541           ((DoubleInt *)inoutvec)[i].idx = MIN(((DoubleInt *)inoutvec)[i].idx, ((DoubleInt *)invec)[i].idx);
542       }
543       break;
544     case MPI_LONG_INT:
545       for(i=0;i<(*len);i++){
546         if(((LongInt *)invec)[i].val < ((LongInt *)inoutvec)[i].val)
547           ((LongInt *)inoutvec)[i] = ((LongInt *)invec)[i];
548         else if(((LongInt *)invec)[i].val == ((LongInt *)inoutvec)[i].val)
549           ((LongInt *)inoutvec)[i].idx = MIN(((LongInt *)inoutvec)[i].idx, ((LongInt *)invec)[i].idx);
550       }
551       break;
552     case MPI_2INT:
553       for(i=0;i<(*len);i++){
554         if(((IntInt *)invec)[i].val < ((IntInt *)inoutvec)[i].val)
555           ((IntInt *)inoutvec)[i] = ((IntInt *)invec)[i];
556         else if(((IntInt *)invec)[i].val == ((IntInt *)inoutvec)[i].val)
557           ((IntInt *)inoutvec)[i].idx = MIN(((IntInt *)inoutvec)[i].idx, ((IntInt *)invec)[i].idx);
558       }
559       break;
560     case MPI_SHORT_INT:
561       for(i=0;i<(*len);i++){
562         if(((ShortInt *)invec)[i].val < ((ShortInt *)inoutvec)[i].val)
563           ((ShortInt *)inoutvec)[i] = ((ShortInt *)invec)[i];
564         else if(((ShortInt *)invec)[i].val == ((ShortInt *)inoutvec)[i].val)
565           ((ShortInt *)inoutvec)[i].idx = MIN(((ShortInt *)inoutvec)[i].idx, ((ShortInt *)invec)[i].idx);
566       }
567       break;
568     case MPI_LONG_DOUBLE_INT:
569       for(i=0;i<(*len);i++){
570         if(((LongdoubleInt *)invec)[i].val < ((LongdoubleInt *)inoutvec)[i].val)
571           ((LongdoubleInt *)inoutvec)[i] = ((LongdoubleInt *)invec)[i];
572         else if(((LongdoubleInt *)invec)[i].val == ((LongdoubleInt *)inoutvec)[i].val)
573           ((LongdoubleInt *)inoutvec)[i].idx = MIN(((LongdoubleInt *)inoutvec)[i].idx, ((LongdoubleInt *)invec)[i].idx);
574       }
575       break;
576     case MPI_2FLOAT:
577       for(i=0;i<(*len);i++){
578         if(((FloatFloat *)invec)[i].val < ((FloatFloat *)inoutvec)[i].val)
579           ((FloatFloat *)inoutvec)[i] = ((FloatFloat *)invec)[i];
580         else if(((FloatFloat *)invec)[i].val == ((FloatFloat *)inoutvec)[i].val)
581           ((FloatFloat *)inoutvec)[i].idx = MIN(((FloatFloat *)inoutvec)[i].idx, ((FloatFloat *)invec)[i].idx);
582       }
583       break;
584     case MPI_2DOUBLE:
585       for(i=0;i<(*len);i++){
586         if(((DoubleDouble *)invec)[i].val < ((DoubleDouble *)inoutvec)[i].val)
587           ((DoubleDouble *)inoutvec)[i] = ((DoubleDouble *)invec)[i];
588         else if(((DoubleDouble *)invec)[i].val == ((DoubleDouble *)inoutvec)[i].val)
589           ((DoubleDouble *)inoutvec)[i].idx = MIN(((DoubleDouble *)inoutvec)[i].idx, ((DoubleDouble *)invec)[i].idx);
590       }
591       break;
592     default:
593       ckerr << "Type " << *datatype << " with Op MPI_MINLOC not supported." << endl;
594       CkAbort("exiting");
595   }
596 }
597
598 /*
599  * AMPI's generic reducer type, AmpiReducer, is used only
600  * for MPI_Op/MPI_Datatype combinations that Charm++ does
601  * not have built-in support for. AmpiReducer reduction
602  * contributions all contain an AmpiOpHeader, that contains
603  * the function pointer to an MPI_User_function* that is
604  * applied to all contributions in AmpiReducerFunc().
605  *
606  * If AmpiReducer is used, the final reduction message will
607  * have an additional sizeof(AmpiOpHeader) bytes in the
608  * buffer before any user data. ampi::processRednMsg() strips
609  * the header.
610  *
611  * If a non-commutative (user-defined) reduction is used,
612  * ampi::processNoncommutativeRednMsg() strips the headers
613  * and applies the op to all contributions in rank order.
614  */
615 CkReduction::reducerType AmpiReducer;
616
617 // every msg contains a AmpiOpHeader structure before user data
618 CkReductionMsg *AmpiReducerFunc(int nMsg, CkReductionMsg **msgs) noexcept {
619   AmpiOpHeader *hdr = (AmpiOpHeader *)msgs[0]->getData();
620   MPI_Datatype dtype;
621   int szhdr, szdata, len;
622   MPI_User_function* func;
623   func = hdr->func;
624   dtype = hdr->dtype;
625   szdata = hdr->szdata;
626   len = hdr->len;
627   szhdr = sizeof(AmpiOpHeader);
628
629   CkReductionMsg *retmsg = CkReductionMsg::buildNew(szhdr+szdata,NULL,AmpiReducer,msgs[0]);
630   void *retPtr = (char *)retmsg->getData() + szhdr;
631   for(int i=1;i<nMsg;i++){
632     (*func)((void *)((char *)msgs[i]->getData()+szhdr),retPtr,&len,&dtype);
633   }
634   return retmsg;
635 }
636
637 static CkReduction::reducerType getBuiltinReducerType(MPI_Datatype type, MPI_Op op) noexcept
638 {
639   switch (type) {
640     case MPI_INT32_T:
641       if (getDDT()->getSize(MPI_INT32_T) != getDDT()->getSize(MPI_INT)) break;
642       // else: fall thru to MPI_INT
643     case MPI_INT:
644       switch (op) {
645         case MPI_MAX:  return CkReduction::max_int;
646         case MPI_MIN:  return CkReduction::min_int;
647         case MPI_SUM:  return CkReduction::sum_int;
648         case MPI_PROD: return CkReduction::product_int;
649         case MPI_LAND: return CkReduction::logical_and_int;
650         case MPI_LOR:  return CkReduction::logical_or_int;
651         case MPI_LXOR: return CkReduction::logical_xor_int;
652         case MPI_BAND: return CkReduction::bitvec_and_int;
653         case MPI_BOR:  return CkReduction::bitvec_or_int;
654         case MPI_BXOR: return CkReduction::bitvec_xor_int;
655         default:       break;
656       }
657     case MPI_FLOAT:
658       switch (op) {
659         case MPI_MAX:  return CkReduction::max_float;
660         case MPI_MIN:  return CkReduction::min_float;
661         case MPI_SUM:  return CkReduction::sum_float;
662         case MPI_PROD: return CkReduction::product_float;
663         default:       break;
664       }
665     case MPI_DOUBLE:
666       switch (op) {
667         case MPI_MAX:  return CkReduction::max_double;
668         case MPI_MIN:  return CkReduction::min_double;
669         case MPI_SUM:  return CkReduction::sum_double;
670         case MPI_PROD: return CkReduction::product_double;
671         default:       break;
672       }
673     case MPI_INT8_T:
674       if (getDDT()->getSize(MPI_INT8_T) != getDDT()->getSize(MPI_CHAR)) break;
675       // else: fall thru to MPI_CHAR
676     case MPI_CHAR:
677       switch (op) {
678         case MPI_MAX:  return CkReduction::max_char;
679         case MPI_MIN:  return CkReduction::min_char;
680         case MPI_SUM:  return CkReduction::sum_char;
681         case MPI_PROD: return CkReduction::product_char;
682         default:       break;
683       }
684     case MPI_INT16_T:
685       if (getDDT()->getSize(MPI_INT16_T) != getDDT()->getSize(MPI_SHORT)) break;
686       // else: fall thru to MPI_SHORT
687     case MPI_SHORT:
688       switch (op) {
689         case MPI_MAX:  return CkReduction::max_short;
690         case MPI_MIN:  return CkReduction::min_short;
691         case MPI_SUM:  return CkReduction::sum_short;
692         case MPI_PROD: return CkReduction::product_short;
693         default:       break;
694       }
695     case MPI_LONG:
696       switch (op) {
697         case MPI_MAX:  return CkReduction::max_long;
698         case MPI_MIN:  return CkReduction::min_long;
699         case MPI_SUM:  return CkReduction::sum_long;
700         case MPI_PROD: return CkReduction::product_long;
701         default:       break;
702       }
703     case MPI_INT64_T:
704       if (getDDT()->getSize(MPI_INT64_T) != getDDT()->getSize(MPI_LONG_LONG)) break;
705       // else: fall thru to MPI_LONG_LONG
706     case MPI_LONG_LONG:
707       switch (op) {
708         case MPI_MAX:  return CkReduction::max_long_long;
709         case MPI_MIN:  return CkReduction::min_long_long;
710         case MPI_SUM:  return CkReduction::sum_long_long;
711         case MPI_PROD: return CkReduction::product_long_long;
712         default:       break;
713       }
714     case MPI_UINT8_T:
715       if (getDDT()->getSize(MPI_UINT8_T) != getDDT()->getSize(MPI_UNSIGNED_CHAR)) break;
716       // else: fall thru to MPI_UNSIGNED_CHAR
717     case MPI_UNSIGNED_CHAR:
718       switch (op) {
719         case MPI_MAX:  return CkReduction::max_uchar;
720         case MPI_MIN:  return CkReduction::min_uchar;
721         case MPI_SUM:  return CkReduction::sum_uchar;
722         case MPI_PROD: return CkReduction::product_uchar;
723         default:       break;
724       }
725     case MPI_UINT16_T:
726       if (getDDT()->getSize(MPI_UINT16_T) != getDDT()->getSize(MPI_UNSIGNED_SHORT)) break;
727       // else: fall thru to MPI_UNSIGNED_SHORT
728     case MPI_UNSIGNED_SHORT:
729       switch (op) {
730         case MPI_MAX:  return CkReduction::max_ushort;
731         case MPI_MIN:  return CkReduction::min_ushort;
732         case MPI_SUM:  return CkReduction::sum_ushort;
733         case MPI_PROD: return CkReduction::product_ushort;
734         default:       break;
735       }
736     case MPI_UINT32_T:
737       if (getDDT()->getSize(MPI_UINT32_T) != getDDT()->getSize(MPI_UNSIGNED)) break;
738       // else: fall thru to MPI_UNSIGNED
739     case MPI_UNSIGNED:
740       switch (op) {
741         case MPI_MAX:  return CkReduction::max_uint;
742         case MPI_MIN:  return CkReduction::min_uint;
743         case MPI_SUM:  return CkReduction::sum_uint;
744         case MPI_PROD: return CkReduction::product_uint;
745         default:       break;
746       }
747     case MPI_UNSIGNED_LONG:
748       switch (op) {
749         case MPI_MAX:  return CkReduction::max_ulong;
750         case MPI_MIN:  return CkReduction::min_ulong;
751         case MPI_SUM:  return CkReduction::sum_ulong;
752         case MPI_PROD: return CkReduction::product_ulong;
753         default:       break;
754       }
755     case MPI_UINT64_T:
756       if (getDDT()->getSize(MPI_UINT64_T) != getDDT()->getSize(MPI_UNSIGNED_LONG_LONG)) break;
757       // else: fall thru to MPI_UNSIGNED_LONG_LONG
758     case MPI_UNSIGNED_LONG_LONG:
759       switch (op) {
760         case MPI_MAX:  return CkReduction::max_ulong_long;
761         case MPI_MIN:  return CkReduction::min_ulong_long;
762         case MPI_SUM:  return CkReduction::sum_ulong_long;
763         case MPI_PROD: return CkReduction::product_ulong_long;
764         default:       break;
765       }
766     case MPI_C_BOOL:
767       switch (op) {
768         case MPI_LAND: return CkReduction::logical_and_bool;
769         case MPI_LOR:  return CkReduction::logical_or_bool;
770         case MPI_LXOR: return CkReduction::logical_xor_bool;
771         default:       break;
772       }
773     case MPI_LOGICAL:
774       switch (op) {
775         case MPI_LAND: return CkReduction::logical_and_int;
776         case MPI_LOR:  return CkReduction::logical_or_int;
777         case MPI_LXOR: return CkReduction::logical_xor_int;
778         default:       break;
779       }
780     case MPI_BYTE:
781       switch (op) {
782         case MPI_BAND: return CkReduction::bitvec_and_bool;
783         case MPI_BOR:  return CkReduction::bitvec_or_bool;
784         case MPI_BXOR: return CkReduction::bitvec_xor_bool;
785         default:       break;
786       }
787     default:
788       break;
789   }
790   return CkReduction::invalid;
791 }
792
793 class Builtin_kvs{
794  public:
795   int tag_ub,host,io,wtime_is_global,appnum,lastusedcode,universe_size;
796   int win_disp_unit,win_create_flavor,win_model;
797   int ampi_tmp;
798   void* win_base;
799   MPI_Aint win_size;
800   Builtin_kvs() noexcept {
801     tag_ub = MPI_TAG_UB_VALUE;
802     host = MPI_PROC_NULL;
803     io = 0;
804     wtime_is_global = 0;
805     appnum = 0;
806     lastusedcode = MPI_ERR_LASTCODE;
807     universe_size = 0;
808     win_base = NULL;
809     win_size = 0;
810     win_disp_unit = 0;
811     win_create_flavor = MPI_WIN_FLAVOR_CREATE;
812     win_model = MPI_WIN_SEPARATE;
813     ampi_tmp = 0;
814   }
815 };
816
817 // ------------ startup support -----------
818 int _ampi_fallback_setup_count = -1;
819 CLINKAGE void AMPI_Setup(void);
820 FLINKAGE void FTN_NAME(AMPI_SETUP,ampi_setup)(void);
821
822 FLINKAGE void FTN_NAME(MPI_MAIN,mpi_main)(void);
823
824 /*Main routine used when missing MPI_Setup routine*/
825 CLINKAGE
826 void AMPI_Fallback_Main(int argc,char **argv)
827 {
828   AMPI_Main_cpp();
829   AMPI_Main_cpp(argc,argv);
830   AMPI_Main_c(argc,argv);
831   FTN_NAME(MPI_MAIN,mpi_main)();
832 }
833
834 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen);
835 /*Startup routine used if user *doesn't* write
836   a TCHARM_User_setup routine.
837  */
838 CLINKAGE
839 void AMPI_Setup_Switch(void) {
840   _ampi_fallback_setup_count=0;
841   FTN_NAME(AMPI_SETUP,ampi_setup)();
842   AMPI_Setup();
843   if (_ampi_fallback_setup_count==2)
844   { //Missing AMPI_Setup in both C and Fortran:
845     ampiCreateMain(AMPI_Fallback_Main,"default",strlen("default"));
846   }
847 }
848
849 int AMPI_RDMA_THRESHOLD = AMPI_RDMA_THRESHOLD_DEFAULT;
850 int AMPI_SMP_RDMA_THRESHOLD = AMPI_SMP_RDMA_THRESHOLD_DEFAULT;
851 static bool nodeinit_has_been_called=false;
852 CtvDeclare(ampiParent*, ampiPtr);
853 CtvDeclare(bool, ampiInitDone);
854 CtvDeclare(void*,stackBottom);
855 CtvDeclare(bool, ampiFinalized);
856 CkpvDeclare(Builtin_kvs, bikvs);
857 CkpvDeclare(int, ampiThreadLevel);
858 CkpvDeclare(AmpiMsgPool, msgPool);
859
860 CLINKAGE
861 long ampiCurrentStackUsage(void){
862   int localVariable;
863
864   unsigned long p1 =  (unsigned long)(uintptr_t)((void*)&localVariable);
865   unsigned long p2 =  (unsigned long)(uintptr_t)(CtvAccess(stackBottom));
866
867   if(p1 > p2)
868     return p1 - p2;
869   else
870     return  p2 - p1;
871 }
872
873 FLINKAGE
874 void FTN_NAME(AMPICURRENTSTACKUSAGE, ampicurrentstackusage)(void){
875   long usage = ampiCurrentStackUsage();
876   CkPrintf("[%d] Stack usage is currently %ld\n", CkMyPe(), usage);
877 }
878
879 CLINKAGE
880 void AMPI_threadstart(void *data);
881 static int AMPI_threadstart_idx = -1;
882
883 #if CMK_TRACE_ENABLED
884 CsvExtern(funcmap*, tcharm_funcmap);
885 #endif
886
887 // Predefined datatype's and op's are readonly, so store them only once per process here:
888 static const std::array<const CkDDT_DataType *, AMPI_MAX_PREDEFINED_TYPE+1> ampiPredefinedTypes = CkDDT::createPredefinedTypes();
889
890 static constexpr std::array<MPI_User_function*, AMPI_MAX_PREDEFINED_OP+1> ampiPredefinedOps = {{
891   MPI_MAX_USER_FN,
892   MPI_MIN_USER_FN,
893   MPI_SUM_USER_FN,
894   MPI_PROD_USER_FN,
895   MPI_LAND_USER_FN,
896   MPI_BAND_USER_FN,
897   MPI_LOR_USER_FN,
898   MPI_BOR_USER_FN,
899   MPI_LXOR_USER_FN,
900   MPI_BXOR_USER_FN,
901   MPI_MAXLOC_USER_FN,
902   MPI_MINLOC_USER_FN,
903   MPI_REPLACE_USER_FN,
904   MPI_NO_OP_USER_FN
905 }};
906
907 #if defined _WIN32
908 # ifndef WIN32_LEAN_AND_MEAN
909 #  define WIN32_LEAN_AND_MEAN
910 # endif
911 # ifndef NOMINMAX
912 #  define NOMINMAX
913 # endif
914 # include <windows.h>
915 #elif defined __APPLE__
916 # include <unistd.h>
917 # include <libproc.h>
918 #elif CMK_HAS_REALPATH || CMK_HAS_READLINK
919 # ifndef _GNU_SOURCE
920 #  define _GNU_SOURCE
921 # endif
922 # ifndef __USE_GNU
923 #  define __USE_GNU
924 # endif
925 # include <unistd.h>
926 #endif
927
928 char * ampi_binary_path;
929
930 static void getAmpiBinaryPath() noexcept
931 {
932 #if defined _WIN32
933   ssize_t bufsize = MAX_PATH;
934   DWORD n;
935   do
936   {
937     ampi_binary_path = (char *)realloc(ampi_binary_path, bufsize);
938     SetLastError(0);
939     n = GetModuleFileName(NULL, ampi_binary_path, bufsize);
940     bufsize *= 2;
941   }
942   while (n == bufsize || GetLastError() == ERROR_INSUFFICIENT_BUFFER);
943
944   if (n == 0)
945   {
946     CkError("ERROR> GetModuleFileName(): %d\n", (int)GetLastError());
947     free(ampi_binary_path);
948     ampi_binary_path = nullptr;
949   }
950 #elif defined __APPLE__
951   ampi_binary_path = (char *)malloc(PROC_PIDPATHINFO_MAXSIZE);
952   pid_t pid = getpid();
953   int n = proc_pidpath(pid, ampi_binary_path, PROC_PIDPATHINFO_MAXSIZE);
954
955   if (n == 0)
956   {
957     CkError("ERROR> proc_pidpath(): %s\n", strerror(errno));
958     free(ampi_binary_path);
959     ampi_binary_path = nullptr;
960   }
961 #elif CMK_HAS_REALPATH
962   ampi_binary_path = realpath("/proc/self/exe", nullptr);
963   if (ampi_binary_path == nullptr)
964     CkError("ERROR> realpath(): %s\n", strerror(errno));
965 #elif CMK_HAS_READLINK
966   ssize_t bufsize = 256;
967   ssize_t n;
968   do
969   {
970     ampi_binary_path = (char *)realloc(ampi_binary_path, bufsize);
971     n = readlink("/proc/self/exe", ampi_binary_path, bufsize-1);
972     bufsize *= 2;
973   }
974   while (n == bufsize-1);
975
976   if (n == -1)
977   {
978     CkError("ERROR> readlink(): %s\n", strerror(errno));
979     free(ampi_binary_path);
980     ampi_binary_path = nullptr;
981   }
982   else
983   {
984     ampi_binary_path[n] = '\0';
985   }
986 #else
987   CkAbort("Could not get path to current binary!");
988 #endif
989 }
990
991 static void ampiNodeInit() noexcept
992 {
993   getAmpiBinaryPath();
994
995 #if CMK_TRACE_ENABLED
996   TCharm::nodeInit(); // make sure tcharm_funcmap is set up
997   int funclength = sizeof(funclist)/sizeof(char*);
998   for (int i=0; i<funclength; i++) {
999     int event_id = traceRegisterUserEvent(funclist[i], -1);
1000     CsvAccess(tcharm_funcmap)->insert(std::pair<std::string, int>(funclist[i], event_id));
1001   }
1002
1003   // rename chare & function to something reasonable
1004   // TODO: find a better way to do this
1005   for (int i=0; i<_chareTable.size(); i++){
1006     if (strcmp(_chareTable[i]->name, "dummy_thread_chare") == 0)
1007       _chareTable[i]->name = "AMPI";
1008   }
1009   for (int i=0; i<_entryTable.size(); i++){
1010     if (strcmp(_entryTable[i]->name, "dummy_thread_ep") == 0)
1011       _entryTable[i]->setName("rank");
1012   }
1013 #endif
1014
1015   _mpi_nworlds=0;
1016   for(int i=0;i<MPI_MAX_COMM_WORLDS; i++)
1017   {
1018     MPI_COMM_UNIVERSE[i] = MPI_COMM_WORLD+1+i;
1019   }
1020   TCHARM_Set_fallback_setup(AMPI_Setup_Switch);
1021
1022   /* read AMPI environment variables */
1023   char *value;
1024   bool rdmaSet = false;
1025   if ((value = getenv("AMPI_RDMA_THRESHOLD"))) {
1026     AMPI_RDMA_THRESHOLD = atoi(value);
1027     rdmaSet = true;
1028   }
1029   if ((value = getenv("AMPI_SMP_RDMA_THRESHOLD"))) {
1030     AMPI_SMP_RDMA_THRESHOLD = atoi(value);
1031     rdmaSet = true;
1032   }
1033   if (rdmaSet && CkMyNode() == 0) {
1034 #if AMPI_RDMA_IMPL
1035     CkPrintf("AMPI> RDMA threshold is %d Bytes and SMP RDMA threshold is %d Bytes.\n", AMPI_RDMA_THRESHOLD, AMPI_SMP_RDMA_THRESHOLD);
1036 #else
1037     CkPrintf("Warning: AMPI RDMA threshold ignored since AMPI RDMA is disabled.\n");
1038 #endif
1039   }
1040
1041   AmpiReducer = CkReduction::addReducer(AmpiReducerFunc, true /*streamable*/, "AmpiReducerFunc");
1042
1043   CkAssert(AMPI_threadstart_idx == -1);    // only initialize once
1044   AMPI_threadstart_idx = TCHARM_Register_thread_function(AMPI_threadstart);
1045
1046   nodeinit_has_been_called=true;
1047
1048    // ASSUME NO ANYTIME MIGRATION and STATIC INSERTON
1049   _isAnytimeMigration = false;
1050   _isStaticInsertion = true;
1051 }
1052
1053 #if AMPI_PRINT_IDLE
1054 static double totalidle=0.0, startT=0.0;
1055 static int beginHandle, endHandle;
1056 static void BeginIdle(void *dummy,double curWallTime) noexcept
1057 {
1058   startT = curWallTime;
1059 }
1060 static void EndIdle(void *dummy,double curWallTime) noexcept
1061 {
1062   totalidle += curWallTime - startT;
1063 }
1064 #endif
1065
1066 static void ampiProcInit() noexcept {
1067   CtvInitialize(ampiParent*, ampiPtr);
1068   CtvInitialize(bool,ampiInitDone);
1069   CtvInitialize(bool,ampiFinalized);
1070   CtvInitialize(void*,stackBottom);
1071
1072   CkpvInitialize(int, ampiThreadLevel);
1073   CkpvAccess(ampiThreadLevel) = MPI_THREAD_SINGLE;
1074
1075   CkpvInitialize(Builtin_kvs, bikvs); // built-in key-values
1076   CkpvAccess(bikvs) = Builtin_kvs();
1077
1078   CkpvInitialize(AmpiMsgPool, msgPool); // pool of small AmpiMsg's
1079   CkpvAccess(msgPool) = AmpiMsgPool(AMPI_MSG_POOL_SIZE, AMPI_POOLED_MSG_SIZE);
1080
1081 #if AMPIMSGLOG
1082   char **argv=CkGetArgv();
1083   msgLogWrite = CmiGetArgFlag(argv, "+msgLogWrite");
1084   if (CmiGetArgIntDesc(argv,"+msgLogRead", &msgLogRank, "Re-play message processing order for AMPI")) {
1085     msgLogRead = 1;
1086   }
1087   char *procs = NULL;
1088   if (CmiGetArgStringDesc(argv, "+msgLogRanks", &procs, "A list of AMPI processors to record , e.g. 0,10,20-30")) {
1089     msgLogRanks.set(procs);
1090   }
1091   CmiGetArgString(argv, "+msgLogFilename", &msgLogFilename);
1092   if (CkMyPe() == 0) {
1093     if (msgLogWrite) CkPrintf("Writing AMPI messages of rank %s to log: %s\n", procs?procs:"", msgLogFilename);
1094     if (msgLogRead) CkPrintf("Reading AMPI messages of rank %s from log: %s\n", procs?procs:"", msgLogFilename);
1095   }
1096 #endif
1097
1098 #if AMPI_PRINT_MSG_SIZES
1099   // Only record and print message sizes if this option is given, and only for those ranks.
1100   // Running with the '+syncprint' option is recommended if printing from multiple ranks.
1101   char *ranks = NULL;
1102   CkpvInitialize(CkListString, msgSizesRanks);
1103   if (CmiGetArgStringDesc(CkGetArgv(), "+msgSizesRanks", &ranks,
1104       "A list of AMPI ranks to record and print message sizes on, e.g. 0,10,20-30")) {
1105     CkpvAccess(msgSizesRanks).set(ranks);
1106   }
1107 #endif
1108 }
1109
1110 #if AMPIMSGLOG
1111 static inline int record_msglog(int rank) noexcept {
1112   return msgLogRanks.includes(rank);
1113 }
1114 #endif
1115
1116 PUPfunctionpointer(MPI_MainFn)
1117
1118 class MPI_threadstart_t {
1119  public:
1120   MPI_MainFn fn;
1121   MPI_threadstart_t() noexcept {}
1122   MPI_threadstart_t(MPI_MainFn fn_) noexcept :fn(fn_) {}
1123   void start() {
1124     char **argv=CmiCopyArgs(CkGetArgv());
1125     int argc=CkGetArgc();
1126
1127     // Set a pointer to somewhere close to the bottom of the stack.
1128     // This is used for roughly estimating the stack usage later.
1129     CtvAccess(stackBottom) = &argv;
1130
1131 #if !CMK_NO_BUILD_SHARED
1132     // If charm++ is built with shared libraries, it does not support
1133     // a custom AMPI_Setup method and always uses AMPI_Fallback_Main.
1134     // Works around bug #1508.
1135     if (_ampi_fallback_setup_count != -1 && _ampi_fallback_setup_count != 2 && CkMyPe() == 0) {
1136       CkAbort("AMPI> The application provided a custom AMPI_Setup() method, "
1137       "but AMPI is built with shared library support. This is an unsupported "
1138       "configuration. Please recompile charm++/AMPI without `-build-shared` or "
1139       "remove the AMPI_Setup() function from your application.\n");
1140     }
1141     AMPI_Fallback_Main(argc,argv);
1142 #else
1143     (fn)(argc,argv);
1144 #endif
1145   }
1146   void pup(PUP::er &p) noexcept {
1147     p|fn;
1148   }
1149 };
1150 PUPmarshall(MPI_threadstart_t)
1151
1152 CLINKAGE
1153 void AMPI_threadstart(void *data)
1154 {
1155   STARTUP_DEBUG("MPI_threadstart")
1156   MPI_threadstart_t t;
1157   pupFromBuf(data,t);
1158 #if CMK_TRACE_IN_CHARM
1159   if(CpvAccess(traceOn)) CthTraceResume(CthSelf());
1160 #endif
1161   t.start();
1162 }
1163
1164 void ampiCreateMain(MPI_MainFn mainFn, const char *name,int nameLen)
1165 {
1166   STARTUP_DEBUG("ampiCreateMain")
1167   int _nchunks=TCHARM_Get_num_chunks();
1168   //Make a new threads array:
1169   MPI_threadstart_t s(mainFn);
1170   memBuf b; pupIntoBuf(b,s);
1171   TCHARM_Create_data(_nchunks,AMPI_threadstart_idx,
1172                      b.getData(), b.getSize());
1173 }
1174
1175 /* TCharm Semaphore ID's for AMPI startup */
1176 #define AMPI_TCHARM_SEMAID 0x00A34100 /* __AMPI__ */
1177 #define AMPI_BARRIER_SEMAID 0x00A34200 /* __AMPI__ */
1178
1179 static CProxy_ampiWorlds ampiWorldsGroup;
1180
1181 // Create MPI_COMM_SELF from MPI_COMM_WORLD
1182 static void createCommSelf() noexcept {
1183   STARTUP_DEBUG("ampiInit> creating MPI_COMM_SELF")
1184   MPI_Comm selfComm;
1185   MPI_Group worldGroup, selfGroup;
1186   int ranks[1] = { getAmpiInstance(MPI_COMM_WORLD)->getRank() };
1187
1188   MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
1189   MPI_Group_incl(worldGroup, 1, ranks, &selfGroup);
1190   MPI_Comm_create(MPI_COMM_WORLD, selfGroup, &selfComm);
1191   MPI_Comm_set_name(selfComm, "MPI_COMM_SELF");
1192
1193   CkAssert(selfComm == MPI_COMM_SELF);
1194   STARTUP_DEBUG("ampiInit> created MPI_COMM_SELF")
1195 }
1196
1197 /*
1198    Called from MPI_Init, a collective initialization call:
1199    creates a new AMPI array and attaches it to the current
1200    set of TCHARM threads.
1201  */
1202 static ampi *ampiInit(char **argv) noexcept
1203 {
1204   FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CkMyPe(), TCHARM_Element());)
1205   if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
1206   STARTUP_DEBUG("ampiInit> begin")
1207
1208   MPI_Comm new_world;
1209   int _nchunks;
1210   CkArrayOptions opts;
1211   CProxy_ampiParent parent;
1212   if (TCHARM_Element()==0) //the rank of a tcharm object
1213   { /* I'm responsible for building the arrays: */
1214     STARTUP_DEBUG("ampiInit> creating arrays")
1215
1216     // FIXME: Need to serialize global communicator allocation in one place.
1217     //Allocate the next communicator
1218     if(_mpi_nworlds == MPI_MAX_COMM_WORLDS)
1219     {
1220       CkAbort("AMPI> Number of registered comm_worlds exceeded limit.\n");
1221     }
1222     int new_idx=_mpi_nworlds;
1223     new_world=MPI_COMM_WORLD+new_idx;
1224
1225     //Create and attach the ampiParent array
1226     CkArrayID threads;
1227     opts=TCHARM_Attach_start(&threads,&_nchunks);
1228     opts.setSectionAutoDelegate(false);
1229     CkArrayCreatedMsg *m;
1230     CProxy_ampiParent::ckNew(new_world, threads, _nchunks, opts, CkCallbackResumeThread((void*&)m));
1231     parent = CProxy_ampiParent(m->aid);
1232     delete m;
1233     STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
1234   }
1235   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
1236
1237   FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
1238
1239   if (TCHARM_Element()==0)
1240   {
1241     //Make a new ampi array
1242     CkArrayID empty;
1243
1244     ampiCommStruct worldComm(new_world,empty,_nchunks);
1245     CProxy_ampi arr;
1246     CkArrayCreatedMsg *m;
1247     CProxy_ampi::ckNew(parent, worldComm, opts, CkCallbackResumeThread((void*&)m));
1248     arr = CProxy_ampi(m->aid);
1249     delete m;
1250
1251     //Broadcast info. to the mpi_worlds array
1252     // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
1253     ampiCommStruct newComm(new_world,arr,_nchunks);
1254     if (ampiWorldsGroup.ckGetGroupID().isZero())
1255       ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
1256     else
1257       ampiWorldsGroup.add(newComm);
1258     STARTUP_DEBUG("ampiInit> arrays created")
1259   }
1260
1261   // Find our ampi object:
1262   ampi *ptr=(ampi *)TCharm::get()->semaGet(AMPI_TCHARM_SEMAID);
1263   CtvAccess(ampiInitDone)=true;
1264   CtvAccess(ampiFinalized)=false;
1265   STARTUP_DEBUG("ampiInit> complete")
1266 #if CMK_BIGSIM_CHARM
1267     //  TRACE_BG_AMPI_START(ptr->getThread(), "AMPI_START");
1268     TRACE_BG_ADD_TAG("AMPI_START");
1269 #endif
1270
1271   ampiParent* pptr = getAmpiParent();
1272   vector<int>& keyvals = pptr->getKeyvals(MPI_COMM_WORLD);
1273   pptr->setAttr(MPI_COMM_WORLD, keyvals, MPI_UNIVERSE_SIZE, &_nchunks);
1274   ptr->setCommName("MPI_COMM_WORLD");
1275
1276   pptr->ampiInitCallDone = 0;
1277
1278   CProxy_ampi cbproxy = ptr->getProxy();
1279   CkCallback cb(CkReductionTarget(ampi, allInitDone), cbproxy[0]);
1280   ptr->contribute(cb);
1281
1282   ampiParent *thisParent = getAmpiParent();
1283   while(thisParent->ampiInitCallDone!=1){
1284     thisParent->getTCharmThread()->stop();
1285     /*
1286      * thisParent needs to be updated in case of the parent is being pupped.
1287      * In such case, thisParent got changed
1288      */
1289     thisParent = getAmpiParent();
1290   }
1291
1292   createCommSelf();
1293
1294 #if CMK_BIGSIM_CHARM
1295   BgSetStartOutOfCore();
1296 #endif
1297
1298   return ptr;
1299 }
1300
1301 /// This group is used to broadcast the MPI_COMM_UNIVERSE communicators.
1302 class ampiWorlds : public CBase_ampiWorlds {
1303  public:
1304   ampiWorlds(const ampiCommStruct &nextWorld) noexcept {
1305     ampiWorldsGroup=thisgroup;
1306     add(nextWorld);
1307   }
1308   ampiWorlds(CkMigrateMessage *m) noexcept : CBase_ampiWorlds(m) {}
1309   void pup(PUP::er &p) noexcept { }
1310   void add(const ampiCommStruct &nextWorld) noexcept {
1311     int new_idx=nextWorld.getComm()-(MPI_COMM_WORLD);
1312     mpi_worlds[new_idx]=nextWorld;
1313     if (_mpi_nworlds<=new_idx) _mpi_nworlds=new_idx+1;
1314     STARTUP_DEBUG("ampiInit> listed MPI_COMM_UNIVERSE "<<new_idx)
1315   }
1316 };
1317
1318 //-------------------- ampiParent -------------------------
1319 ampiParent::ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_,int nRanks_) noexcept
1320   : threads(threads_), ampiReqs(64, &reqPool), myDDT(ampiPredefinedTypes),
1321     worldNo(worldNo_), predefinedOps(ampiPredefinedOps), isTmpRProxySet(false)
1322 {
1323   int barrier = 0x1234;
1324   STARTUP_DEBUG("ampiParent> starting up")
1325   thread=NULL;
1326   worldPtr=NULL;
1327   userAboutToMigrateFn=NULL;
1328   userJustMigratedFn=NULL;
1329   prepareCtv();
1330
1331   // Allocate an empty groupStruct to represent MPI_EMPTY_GROUP
1332   groups.push_back(new groupStruct);
1333
1334   init();
1335
1336   //ensure MPI_INFO_ENV will always be first info object
1337   defineInfoEnv(nRanks_);
1338   // define Info objects for AMPI_Migrate calls
1339   defineInfoMigration();
1340
1341   thread->semaPut(AMPI_BARRIER_SEMAID,&barrier);
1342
1343 #if CMK_FAULT_EVAC
1344   AsyncEvacuate(false);
1345 #endif
1346 }
1347
1348 ampiParent::ampiParent(CkMigrateMessage *msg) noexcept
1349   : CBase_ampiParent(msg), myDDT(ampiPredefinedTypes), predefinedOps(ampiPredefinedOps)
1350 {
1351   thread=NULL;
1352   worldPtr=NULL;
1353
1354   init();
1355
1356 #if CMK_FAULT_EVAC
1357   AsyncEvacuate(false);
1358 #endif
1359 }
1360
1361 PUPfunctionpointer(MPI_MigrateFn)
1362
1363 void ampiParent::pup(PUP::er &p) noexcept {
1364   p|threads;
1365   p|worldNo;
1366   p|myDDT;
1367   p|splitComm;
1368   p|groupComm;
1369   p|cartComm;
1370   p|graphComm;
1371   p|distGraphComm;
1372   p|interComm;
1373   p|intraComm;
1374
1375   p|groups;
1376   p|winStructList;
1377   p|infos;
1378   p|userOps;
1379
1380   p|reqPool;
1381   ampiReqs.pup(p, &reqPool);
1382
1383   p|kvlist;
1384   p|isTmpRProxySet;
1385   p|tmpRProxy;
1386
1387   p|userAboutToMigrateFn;
1388   p|userJustMigratedFn;
1389
1390   p|ampiInitCallDone;
1391   p|resumeOnRecv;
1392   p|resumeOnColl;
1393   p|numBlockedReqs;
1394   p|bsendBufferSize;
1395   p((char *)&bsendBuffer, sizeof(void *));
1396
1397   // pup blockingReq
1398   AmpiReqType reqType;
1399   if (!p.isUnpacking()) {
1400     if (blockingReq) {
1401       reqType = blockingReq->getType();
1402     } else {
1403       reqType = AMPI_INVALID_REQ;
1404     }
1405   }
1406   p|reqType;
1407   if (reqType != AMPI_INVALID_REQ) {
1408     if (p.isUnpacking()) {
1409       switch (reqType) {
1410         case AMPI_I_REQ:
1411           blockingReq = new IReq;
1412           break;
1413         case AMPI_REDN_REQ:
1414           blockingReq = new RednReq;
1415           break;
1416         case AMPI_GATHER_REQ:
1417           blockingReq = new GatherReq;
1418           break;
1419         case AMPI_GATHERV_REQ:
1420           blockingReq = new GathervReq;
1421           break;
1422         case AMPI_SEND_REQ:
1423           blockingReq = new SendReq;
1424           break;
1425         case AMPI_SSEND_REQ:
1426           blockingReq = new SsendReq;
1427           break;
1428         case AMPI_ATA_REQ:
1429           blockingReq = new ATAReq;
1430           break;
1431         case AMPI_G_REQ:
1432           blockingReq = new GReq;
1433           break;
1434 #if CMK_CUDA
1435         case AMPI_GPU_REQ:
1436           CkAbort("AMPI> error trying to PUP a non-migratable GPU request!");
1437           break;
1438 #endif
1439         case AMPI_INVALID_REQ:
1440           CkAbort("AMPI> error trying to PUP an invalid request!");
1441           break;
1442       }
1443     }
1444     blockingReq->pup(p);
1445   } else {
1446     blockingReq = NULL;
1447   }
1448   if (p.isDeleting()) {
1449     delete blockingReq; blockingReq = NULL;
1450   }
1451
1452 #if AMPI_PRINT_MSG_SIZES
1453   p|msgSizes;
1454 #endif
1455 }
1456
1457 void ampiParent::prepareCtv() noexcept {
1458   thread=threads[thisIndex].ckLocal();
1459   if (thread==NULL) CkAbort("AMPIParent cannot find its thread!\n");
1460   CtvAccessOther(thread->getThread(),ampiPtr) = this;
1461   STARTUP_DEBUG("ampiParent> found TCharm")
1462 }
1463
1464 void ampiParent::init() noexcept{
1465   resumeOnRecv = false;
1466   resumeOnColl = false;
1467   numBlockedReqs = 0;
1468   bsendBufferSize = 0;
1469   bsendBuffer = NULL;
1470   blockingReq = NULL;
1471 #if AMPIMSGLOG
1472   if(msgLogWrite && record_msglog(thisIndex)){
1473     char fname[128];
1474     sprintf(fname, "%s.%d", msgLogFilename,thisIndex);
1475 #if CMK_USE_ZLIB && 0
1476     fMsgLog = gzopen(fname,"wb");
1477     toPUPer = new PUP::tozDisk(fMsgLog);
1478 #else
1479     fMsgLog = fopen(fname,"wb");
1480     CkAssert(fMsgLog != NULL);
1481     toPUPer = new PUP::toDisk(fMsgLog);
1482 #endif
1483   }else if(msgLogRead){
1484     char fname[128];
1485     sprintf(fname, "%s.%d", msgLogFilename,msgLogRank);
1486 #if CMK_USE_ZLIB && 0
1487     fMsgLog = gzopen(fname,"rb");
1488     fromPUPer = new PUP::fromzDisk(fMsgLog);
1489 #else
1490     fMsgLog = fopen(fname,"rb");
1491     CkAssert(fMsgLog != NULL);
1492     fromPUPer = new PUP::fromDisk(fMsgLog);
1493 #endif
1494     CkPrintf("AMPI> opened message log file: %s for replay\n", fname);
1495   }
1496 #endif
1497 }
1498
1499 void ampiParent::finalize() noexcept {
1500 #if AMPIMSGLOG
1501   if(msgLogWrite && record_msglog(thisIndex)){
1502     delete toPUPer;
1503 #if CMK_USE_ZLIB && 0
1504     gzclose(fMsgLog);
1505 #else
1506     fclose(fMsgLog);
1507 #endif
1508   }else if(msgLogRead){
1509     delete fromPUPer;
1510 #if CMK_USE_ZLIB && 0
1511     gzclose(fMsgLog);
1512 #else
1513     fclose(fMsgLog);
1514 #endif
1515   }
1516 #endif
1517 }
1518
1519 void ampiParent::setUserAboutToMigrateFn(MPI_MigrateFn f) noexcept {
1520   userAboutToMigrateFn = f;
1521 }
1522
1523 void ampiParent::setUserJustMigratedFn(MPI_MigrateFn f) noexcept {
1524   userJustMigratedFn = f;
1525 }
1526
1527 void ampiParent::ckAboutToMigrate() noexcept {
1528   if (userAboutToMigrateFn) {
1529     (*userAboutToMigrateFn)();
1530   }
1531 }
1532
1533 void ampiParent::ckJustMigrated() noexcept {
1534   ArrayElement1D::ckJustMigrated();
1535   prepareCtv();
1536   if (userJustMigratedFn) {
1537     (*userJustMigratedFn)();
1538   }
1539 }
1540
1541 void ampiParent::ckJustRestored() noexcept {
1542   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
1543   ArrayElement1D::ckJustRestored();
1544   prepareCtv();
1545 }
1546
1547 ampiParent::~ampiParent() noexcept {
1548   STARTUP_DEBUG("ampiParent> destructor called");
1549   finalize();
1550 }
1551
1552 const ampiCommStruct& ampiParent::getWorldStruct() const noexcept {
1553   return worldPtr->getCommStruct();
1554 }
1555
1556 //Children call this when they are first created or just migrated
1557 TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration) noexcept
1558 {
1559   if (thread==NULL) prepareCtv(); //Prevents CkJustMigrated race condition
1560
1561   if (s.getComm()>=MPI_COMM_WORLD)
1562   { //We now have our COMM_WORLD-- register it
1563     //Note that split communicators don't keep a raw pointer, so
1564     //they don't need to re-register on migration.
1565     if (worldPtr!=NULL) CkAbort("One ampiParent has two MPI_COMM_WORLDs");
1566     worldPtr=ptr;
1567   }
1568
1569   if (forMigration) { //Restore AmpiRequest*'s in postedReqs:
1570     AmmEntry<AmpiRequest *> *e = ptr->postedReqs.first;
1571     while (e) {
1572       // AmmPupPostedReqs() packed these as MPI_Requests
1573       MPI_Request reqIdx = (MPI_Request)(intptr_t)e->msg;
1574       CkAssert(reqIdx != MPI_REQUEST_NULL);
1575       AmpiRequest* req = ampiReqs[reqIdx];
1576       CkAssert(req);
1577       e->msg = req;
1578       e = e->next;
1579     }
1580   }
1581   else { //Register the new communicator:
1582     MPI_Comm comm = s.getComm();
1583     STARTUP_DEBUG("ampiParent> registering new communicator "<<comm)
1584     if (comm>=MPI_COMM_WORLD) {
1585       // Pass the new ampi to the waiting ampiInit
1586       thread->semaPut(AMPI_TCHARM_SEMAID, ptr);
1587     } else if (isSplit(comm)) {
1588       splitChildRegister(s);
1589     } else if (isGroup(comm)) {
1590       groupChildRegister(s);
1591     } else if (isCart(comm)) {
1592       cartChildRegister(s);
1593     } else if (isGraph(comm)) {
1594       graphChildRegister(s);
1595     } else if (isDistGraph(comm)) {
1596       distGraphChildRegister(s);
1597     } else if (isInter(comm)) {
1598       interChildRegister(s);
1599     } else if (isIntra(comm)) {
1600       intraChildRegister(s);
1601     }else
1602       CkAbort("ampiParent received child with bad communicator");
1603   }
1604
1605   return thread;
1606 }
1607
1608 // reduction client data - preparation for checkpointing
1609 class ckptClientStruct {
1610  public:
1611   const char *dname;
1612   ampiParent *ampiPtr;
1613   ckptClientStruct(const char *s, ampiParent *a) noexcept : dname(s), ampiPtr(a) {}
1614 };
1615
1616 static void checkpointClient(void *param,void *msg) noexcept
1617 {
1618   ckptClientStruct *client = (ckptClientStruct*)param;
1619   const char *dname = client->dname;
1620   ampiParent *ampiPtr = client->ampiPtr;
1621   ampiPtr->Checkpoint(strlen(dname), dname);
1622   delete client;
1623 }
1624
1625 void ampiParent::startCheckpoint(const char* dname) noexcept {
1626   if (thisIndex==0) {
1627     ckptClientStruct *clientData = new ckptClientStruct(dname, this);
1628     CkCallback *cb = new CkCallback(checkpointClient, clientData);
1629     thisProxy.ckSetReductionClient(cb);
1630   }
1631   contribute();
1632
1633   thread->stop();
1634
1635 #if CMK_BIGSIM_CHARM
1636   TRACE_BG_ADD_TAG("CHECKPOINT_RESUME");
1637 #endif
1638 }
1639
1640 void ampiParent::Checkpoint(int len, const char* dname) noexcept {
1641   if (len == 0) {
1642     // memory checkpoint
1643     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1644     CkStartMemCheckpoint(cb);
1645   }
1646   else {
1647     char dirname[256];
1648     strncpy(dirname,dname,len);
1649     dirname[len]='\0';
1650     CkCallback cb(CkIndex_ampiParent::ResumeThread(),thisArrayID);
1651     CkStartCheckpoint(dirname,cb);
1652   }
1653 }
1654
1655 void ampiParent::ResumeThread() noexcept {
1656   thread->resume();
1657 }
1658
1659 int ampiParent::createKeyval(MPI_Comm_copy_attr_function *copy_fn, MPI_Comm_delete_attr_function *delete_fn,
1660                              int *keyval, void* extra_state) noexcept {
1661   KeyvalNode* newnode = new KeyvalNode(copy_fn, delete_fn, extra_state);
1662   int idx = kvlist.size();
1663   kvlist.resize(idx+1);
1664   kvlist[idx] = newnode;
1665   *keyval = idx;
1666   return 0;
1667 }
1668
1669 int ampiParent::setUserKeyval(int context, int keyval, void *attribute_val) noexcept {
1670 #if AMPI_ERROR_CHECKING
1671   if (keyval < 0 || keyval >= kvlist.size() || kvlist[keyval] == NULL) {
1672     return MPI_ERR_KEYVAL;
1673   }
1674 #endif
1675   KeyvalNode &kv = *kvlist[keyval];
1676   if (kv.hasVal()) {
1677     int ret = (*kv.delete_fn)(context, keyval, kv.val, kv.extra_state);
1678     if (ret != MPI_SUCCESS) {
1679       return ret;
1680     }
1681   }
1682   kvlist[keyval]->setVal(attribute_val);
1683   return MPI_SUCCESS;
1684 }
1685
1686 int ampiParent::setAttr(int context, vector<int>& keyvals, int keyval, void* attribute_val) noexcept {
1687   if (kv_set_builtin(keyval, attribute_val)) {
1688     return MPI_SUCCESS;
1689   }
1690   keyvals.push_back(keyval);
1691   kvlist[keyval]->incRefCount();
1692   return setUserKeyval(context, keyval, attribute_val);
1693 }
1694
1695 bool ampiParent::kv_set_builtin(int keyval, void* attribute_val) noexcept {
1696   switch(keyval) {
1697     case MPI_TAG_UB:            /*immutable*/ return false;
1698     case MPI_HOST:              /*immutable*/ return false;
1699     case MPI_IO:                /*immutable*/ return false;
1700     case MPI_WTIME_IS_GLOBAL:   /*immutable*/ return false;
1701     case MPI_APPNUM:            /*immutable*/ return false;
1702     case MPI_LASTUSEDCODE:      /*immutable*/ return false;
1703     case MPI_UNIVERSE_SIZE:     (CkpvAccess(bikvs).universe_size)     = *((int*)attribute_val);      return true;
1704     case MPI_WIN_BASE:          (CkpvAccess(bikvs).win_base)          = attribute_val;               return true;
1705     case MPI_WIN_SIZE:          (CkpvAccess(bikvs).win_size)          = *((MPI_Aint*)attribute_val); return true;
1706     case MPI_WIN_DISP_UNIT:     (CkpvAccess(bikvs).win_disp_unit)     = *((int*)attribute_val);      return true;
1707     case MPI_WIN_CREATE_FLAVOR: (CkpvAccess(bikvs).win_create_flavor) = *((int*)attribute_val);      return true;
1708     case MPI_WIN_MODEL:         (CkpvAccess(bikvs).win_model)         = *((int*)attribute_val);      return true;
1709     case AMPI_MY_WTH:           /*immutable*/ return false;
1710     case AMPI_NUM_WTHS:         /*immutable*/ return false;
1711     case AMPI_MY_PROCESS:       /*immutable*/ return false;
1712     case AMPI_NUM_PROCESSES:    /*immutable*/ return false;
1713     default: return false;
1714   };
1715 }
1716
1717 bool ampiParent::kv_get_builtin(int keyval) noexcept {
1718   switch(keyval) {
1719     case MPI_TAG_UB:            kv_builtin_storage = &(CkpvAccess(bikvs).tag_ub);             return true;
1720     case MPI_HOST:              kv_builtin_storage = &(CkpvAccess(bikvs).host);               return true;
1721     case MPI_IO:                kv_builtin_storage = &(CkpvAccess(bikvs).io);                 return true;
1722     case MPI_WTIME_IS_GLOBAL:   kv_builtin_storage = &(CkpvAccess(bikvs).wtime_is_global);    return true;
1723     case MPI_APPNUM:            kv_builtin_storage = &(CkpvAccess(bikvs).appnum);             return true;
1724     case MPI_LASTUSEDCODE:      kv_builtin_storage = &(CkpvAccess(bikvs).lastusedcode);       return true;
1725     case MPI_UNIVERSE_SIZE:     kv_builtin_storage = &(CkpvAccess(bikvs).universe_size);      return true;
1726     case MPI_WIN_BASE:          win_base_storage   = &(CkpvAccess(bikvs).win_base);           return true;
1727     case MPI_WIN_SIZE:          win_size_storage   = &(CkpvAccess(bikvs).win_size);           return true;
1728     case MPI_WIN_DISP_UNIT:     kv_builtin_storage = &(CkpvAccess(bikvs).win_disp_unit);      return true;
1729     case MPI_WIN_CREATE_FLAVOR: kv_builtin_storage = &(CkpvAccess(bikvs).win_create_flavor);  return true;
1730     case MPI_WIN_MODEL:         kv_builtin_storage = &(CkpvAccess(bikvs).win_model);          return true;
1731     default: return false;
1732   };
1733 }
1734
1735 bool ampiParent::getBuiltinKeyval(int keyval, void *attribute_val) noexcept {
1736   if (kv_get_builtin(keyval)){
1737     /* All builtin keyvals are ints except MPI_WIN_BASE, which is a pointer
1738      * to the window's base address in C but an integer representation of
1739      * the base address in Fortran.
1740      * Also, MPI_WIN_SIZE is an MPI_Aint. */
1741     if (keyval == MPI_WIN_BASE)
1742       *((void**)attribute_val) = *win_base_storage;
1743     else if (keyval == MPI_WIN_SIZE)
1744       *(MPI_Aint**)attribute_val = win_size_storage;
1745     else
1746       *(int **)attribute_val = kv_builtin_storage;
1747     return true;
1748   } else {
1749     switch(keyval) {
1750       case AMPI_MY_WTH: *(int *)attribute_val = CkMyPe(); return true;
1751       case AMPI_NUM_WTHS: *(int *)attribute_val = CkNumPes(); return true;
1752       case AMPI_MY_PROCESS: *(int *)attribute_val = CkMyNode(); return true;
1753       case AMPI_NUM_PROCESSES: *(int *)attribute_val = CkNumNodes(); return true;
1754     }
1755   }
1756   return false;
1757 }
1758
1759 // Call copy_fn for each user-defined keyval in old_comm.
1760 int ampiParent::dupUserKeyvals(MPI_Comm old_comm, MPI_Comm new_comm) noexcept {
1761   ampiCommStruct &old_cs = *(ampiCommStruct *)&comm2CommStruct(old_comm);
1762   for (int i=0; i<old_cs.getKeyvals().size(); i++) {
1763     int keyval = old_cs.getKeyvals()[i];
1764     void *val_out;
1765     int flag = 0;
1766     bool isValid = (keyval != MPI_KEYVAL_INVALID && kvlist[keyval] != NULL);
1767     if (isValid) {
1768       // Call the user's copy_fn
1769       KeyvalNode& kv = *kvlist[keyval];
1770       int ret = (*kv.copy_fn)(old_comm, keyval, kv.extra_state, kv.val, &val_out, &flag);
1771       if (ret != MPI_SUCCESS) {
1772         return ret;
1773       }
1774       if (flag == 1) {
1775         // Set keyval in new_comm
1776         ampiCommStruct &cs = *(ampiCommStruct *)&comm2CommStruct(new_comm);
1777         cs.getKeyvals().push_back(keyval);
1778         kv.incRefCount();
1779       }
1780     }
1781   }
1782   return MPI_SUCCESS;
1783 }
1784
1785 int ampiParent::freeUserKeyval(int context, vector<int>& keyvals, int* keyval) noexcept {
1786   if (*keyval < 0 || *keyval >= kvlist.size()) {
1787     return MPI_SUCCESS;
1788   }
1789   // Call the user's delete_fn
1790   KeyvalNode& kv = *kvlist[*keyval];
1791   int ret = (*kv.delete_fn)(context, *keyval, kv.val, kv.extra_state);
1792   if (ret != MPI_SUCCESS) {
1793     return ret;
1794   }
1795   // Remove keyval from comm/win/type keyvals list
1796   kv.clearVal();
1797   for (int i=0; i<keyvals.size(); i++) {
1798     if (keyvals[i] == *keyval) {
1799       keyvals[*keyval] = MPI_KEYVAL_INVALID;
1800     }
1801   }
1802   if (!keyvals.empty()) {
1803     while (keyvals.back() == MPI_KEYVAL_INVALID) keyvals.pop_back();
1804   }
1805   // Remove keyval from parent kvlist if no remaining references to it
1806   if (kv.decRefCount() == 0) {
1807     delete kvlist[*keyval];
1808     kvlist[*keyval] = NULL;
1809   }
1810   *keyval = MPI_KEYVAL_INVALID;
1811   return MPI_SUCCESS;
1812 }
1813
1814 int ampiParent::freeUserKeyvals(int context, vector<int>& keyvals) noexcept {
1815   for (int i=0; i<keyvals.size(); i++) {
1816     int keyval = keyvals[i];
1817     // Call the user's delete_fn
1818     KeyvalNode& kv = *kvlist[keyval];
1819     int ret = (*kv.delete_fn)(context, keyval, kv.val, kv.extra_state);
1820     if (ret != MPI_SUCCESS) {
1821       return ret;
1822     }
1823     kv.clearVal();
1824     keyvals[i] = MPI_KEYVAL_INVALID;
1825     // Remove keyval from parent kvlist if no remaining references to it
1826     if (kv.decRefCount() == 0) {
1827       delete kvlist[keyval];
1828       kvlist[keyval] = NULL;
1829     }
1830   }
1831   keyvals.clear();
1832   return MPI_SUCCESS;
1833 }
1834
1835 bool ampiParent::getUserKeyval(MPI_Comm comm, vector<int>& keyvals, int keyval, void *attribute_val, int *flag) noexcept {
1836   if (keyval < 0 || keyval >= kvlist.size() || kvlist[keyval] == NULL) {
1837     *flag = 0;
1838     return false;
1839   }
1840   else {
1841     for (int i=0; i<keyvals.size(); i++) {
1842       int kv = keyvals[i];
1843       if (keyval == kv) { // Found a matching keyval
1844         *(void **)attribute_val = kvlist[keyval]->getVal();
1845         *flag = 1;
1846         return true;
1847       }
1848     }
1849     *flag = 0;
1850     return false;
1851   }
1852 }
1853
1854 int ampiParent::getAttr(int context, vector<int>& keyvals, int keyval, void *attribute_val, int *flag) noexcept {
1855   if (keyval == MPI_KEYVAL_INVALID) {
1856     *flag = 0;
1857     return MPI_ERR_KEYVAL;
1858   }
1859   else if (getBuiltinKeyval(keyval, attribute_val)) {
1860     *flag = 1;
1861     return MPI_SUCCESS;
1862   }
1863   else if (getUserKeyval(context, keyvals, keyval, attribute_val, flag)) {
1864     *flag = 1;
1865     return MPI_SUCCESS;
1866   }
1867   else {
1868     *flag = 0;
1869     return MPI_SUCCESS;
1870   }
1871 }
1872
1873 int ampiParent::deleteAttr(int context, vector<int>& keyvals, int keyval) noexcept {
1874   return freeUserKeyval(context, keyvals, &keyval);
1875 }
1876
1877 /*
1878  * AMPI Message Matching (Amm) queues:
1879  *   AmpiMsg*'s and AmpiRequest*'s are matched based on 2 ints: [tag, src].
1880  */
1881
1882 // Pt2pt msg queues:
1883 template class Amm<AmpiMsg *, AMPI_AMM_PT2PT_POOL_SIZE>;
1884 template class Amm<AmpiRequest *, AMPI_AMM_PT2PT_POOL_SIZE>;
1885
1886 // Bcast msg queues:
1887 template class Amm<AmpiMsg *, AMPI_AMM_COLL_POOL_SIZE>;
1888 template class Amm<AmpiRequest *, AMPI_AMM_COLL_POOL_SIZE>;
1889
1890 /* free all table entries but not the space pointed to by 'msg' */
1891 template<typename T, size_t N>
1892 void Amm<T, N>::freeAll() noexcept
1893 {
1894   AmmEntry<T>* cur = first;
1895   while (cur) {
1896     AmmEntry<T>* toDel = cur;
1897     cur = cur->next;
1898     deleteEntry(toDel);
1899   }
1900 }
1901
1902 /* free all msgs */
1903 template<typename T, size_t N>
1904 void Amm<T, N>::flushMsgs() noexcept
1905 {
1906   T msg = get(MPI_ANY_TAG, MPI_ANY_SOURCE);
1907   while (msg) {
1908     delete msg;
1909     msg = get(MPI_ANY_TAG, MPI_ANY_SOURCE);
1910   }
1911 }
1912
1913 template<typename T, size_t N>
1914 void Amm<T, N>::put(T msg) noexcept
1915 {
1916   AmmEntry<T>* e = newEntry(msg);
1917   *lasth = e;
1918   lasth = &e->next;
1919 }
1920
1921 template<typename T, size_t N>
1922 void Amm<T, N>::put(int tag, int src, T msg) noexcept
1923 {
1924   AmmEntry<T>* e = newEntry(tag, src, msg);
1925   *lasth = e;
1926   lasth = &e->next;
1927 }
1928
1929 template<typename T, size_t N>
1930 bool Amm<T, N>::match(const int tags1[AMM_NTAGS], const int tags2[AMM_NTAGS]) const noexcept
1931 {
1932   if (tags1[AMM_TAG]==tags2[AMM_TAG] && tags1[AMM_SRC]==tags2[AMM_SRC]) {
1933     // tag and src match
1934     return true;
1935   }
1936   else if (tags1[AMM_TAG]==tags2[AMM_TAG] && (tags1[AMM_SRC]==MPI_ANY_SOURCE || tags2[AMM_SRC]==MPI_ANY_SOURCE)) {
1937     // tag matches, src is MPI_ANY_SOURCE
1938     return true;
1939   }
1940   else if (tags1[AMM_SRC]==tags2[AMM_SRC] && (tags1[AMM_TAG]==MPI_ANY_TAG || tags2[AMM_TAG]==MPI_ANY_TAG)) {
1941     // src matches, tag is MPI_ANY_TAG
1942     return true;
1943   }
1944   else if ((tags1[AMM_SRC]==MPI_ANY_SOURCE || tags2[AMM_SRC]==MPI_ANY_SOURCE) && (tags1[AMM_TAG]==MPI_ANY_TAG || tags2[AMM_TAG]==MPI_ANY_TAG)) {
1945     // src and tag are MPI_ANY
1946     return true;
1947   }
1948   else {
1949     // no match
1950     return false;
1951   }
1952 }
1953
1954 template<typename T, size_t N>
1955 T Amm<T, N>::get(int tag, int src, int* rtags) noexcept
1956 {
1957   AmmEntry<T> *ent, **enth;
1958   T msg;
1959   int tags[AMM_NTAGS] = { tag, src };
1960
1961   enth = &first;
1962   while (true) {
1963     ent = *enth;
1964     if (!ent) return NULL;
1965     if (match(tags, ent->tags)) {
1966       if (rtags) memcpy(rtags, ent->tags, sizeof(int)*AMM_NTAGS);
1967       msg = ent->msg;
1968       // unlike probe, delete the matched entry:
1969       AmmEntry<T>* next = ent->next;
1970       *enth = next;
1971       if (!next) lasth = enth;
1972       deleteEntry(ent);
1973       return msg;
1974     }
1975     enth = &ent->next;
1976   }
1977 }
1978
1979 template<typename T, size_t N>
1980 T Amm<T, N>::probe(int tag, int src, int* rtags) noexcept
1981 {
1982   AmmEntry<T> *ent, **enth;
1983   T msg;
1984   int tags[AMM_NTAGS] = { tag, src };
1985   CkAssert(rtags);
1986
1987   enth = &first;
1988   while (true) {
1989     ent = *enth;
1990     if (!ent) return NULL;
1991     if (match(tags, ent->tags)) {
1992       memcpy(rtags, ent->tags, sizeof(int)*AMM_NTAGS);
1993       msg = ent->msg;
1994       return msg;
1995     }
1996     enth = &ent->next;
1997   }
1998 }
1999
2000 template<typename T, size_t N>
2001 int Amm<T, N>::size() const noexcept
2002 {
2003   int n = 0;
2004   AmmEntry<T> *e = first;
2005   while (e) {
2006     e = e->next;
2007     n++;
2008   }
2009   return n;
2010 }
2011
2012 template<typename T, size_t N>
2013 void Amm<T, N>::pup(PUP::er& p, AmmPupMessageFn msgpup) noexcept
2014 {
2015   int sz;
2016   if (!p.isUnpacking()) {
2017     sz = size();
2018     p|sz;
2019     AmmEntry<T> *doomed, *e = first;
2020     while (e) {
2021       pup_ints(&p, e->tags, AMM_NTAGS);
2022       msgpup(p, (void**)&e->msg);
2023       doomed = e;
2024       e = e->next;
2025       if (p.isDeleting()) {
2026         deleteEntry(doomed);
2027       }
2028     }
2029   } else { // unpacking
2030     p|sz;
2031     for (int i=0; i<sz; i++) {
2032       T msg;
2033       int tags[AMM_NTAGS];
2034       pup_ints(&p, tags, AMM_NTAGS);
2035       msgpup(p, (void**)&msg);
2036       put(tags[0], tags[1], msg);
2037     }
2038   }
2039 }
2040
2041 //----------------------- ampi -------------------------
2042 void ampi::init() noexcept {
2043   parent=NULL;
2044   thread=NULL;
2045
2046 #if CMK_FAULT_EVAC
2047   AsyncEvacuate(false);
2048 #endif
2049 }
2050
2051 ampi::ampi() noexcept
2052 {
2053   /* this constructor only exists so we can create an empty array during split */
2054   CkAbort("Default ampi constructor should never be called");
2055 }
2056
2057 ampi::ampi(CkArrayID parent_,const ampiCommStruct &s) noexcept :parentProxy(parent_), oorder(s.getSize())
2058 {
2059   init();
2060
2061   myComm=s; myComm.setArrayID(thisArrayID);
2062   myRank=myComm.getRankForIndex(thisIndex);
2063
2064   findParent(false);
2065 }
2066
2067 ampi::ampi(CkMigrateMessage *msg) noexcept : CBase_ampi(msg)
2068 {
2069   init();
2070 }
2071
2072 void ampi::ckJustMigrated() noexcept
2073 {
2074   findParent(true);
2075   ArrayElement1D::ckJustMigrated();
2076 }
2077
2078 void ampi::ckJustRestored() noexcept
2079 {
2080   FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
2081   findParent(true);
2082   ArrayElement1D::ckJustRestored();
2083 }
2084
2085 void ampi::findParent(bool forMigration) noexcept {
2086   STARTUP_DEBUG("ampi> finding my parent")
2087   parent=parentProxy[thisIndex].ckLocal();
2088 #if CMK_ERROR_CHECKING
2089   if (parent==NULL) CkAbort("AMPI can't find its parent!");
2090 #endif
2091   thread=parent->registerAmpi(this,myComm,forMigration);
2092 #if CMK_ERROR_CHECKING
2093   if (thread==NULL) CkAbort("AMPI can't find its thread!");
2094 #endif
2095 }
2096
2097 //The following method should be called on the first element of the
2098 //ampi array
2099 void ampi::allInitDone() noexcept {
2100   FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
2101   thisProxy.setInitDoneFlag();
2102 }
2103
2104 void ampi::setInitDoneFlag() noexcept {
2105   parent->ampiInitCallDone=1;
2106   parent->getTCharmThread()->start();
2107 }
2108
2109 static void AmmPupUnexpectedMsgs(PUP::er& p,void **msg) noexcept {
2110   CkPupMessage(p,msg,1);
2111   if (p.isDeleting()) delete (AmpiMsg *)*msg;
2112 }
2113
2114 static void AmmPupPostedReqs(PUP::er& p,void **msg) noexcept {
2115   // AmpiRequests objects are PUPed by AmpiRequestList, so here we pack
2116   // the reqIdx of posted requests and in ampiParent::registerAmpi we
2117   // lookup the AmpiRequest*'s using the indices. That is necessary because
2118   // the ampiParent object is unpacked after the ampi objects.
2119   if (p.isPacking()) {
2120     int reqIdx = ((AmpiRequest*)*msg)->getReqIdx();
2121     CkAssert(reqIdx != MPI_REQUEST_NULL);
2122     *msg = (void*)(intptr_t)reqIdx;
2123   }
2124   pup_pointer(&p, msg);
2125 #if CMK_ERROR_CHECKING
2126   if (p.isUnpacking()) {
2127     MPI_Request reqIdx = (MPI_Request)(intptr_t)*msg;
2128     CkAssert(reqIdx != MPI_REQUEST_NULL);
2129   }
2130 #endif
2131 }
2132
2133 void ampi::pup(PUP::er &p) noexcept
2134 {
2135   p|parentProxy;
2136   p|myComm;
2137   p|myRank;
2138   p|tmpVec;
2139   p|remoteProxy;
2140   unexpectedMsgs.pup(p, AmmPupUnexpectedMsgs);
2141   postedReqs.pup(p, AmmPupPostedReqs);
2142   unexpectedBcastMsgs.pup(p, AmmPupUnexpectedMsgs);
2143   postedBcastReqs.pup(p, AmmPupPostedReqs);
2144   p|greq_classes;
2145   p|oorder;
2146 }
2147
2148 ampi::~ampi() noexcept
2149 {
2150   if (CkInRestarting() || _BgOutOfCoreFlag==1) {
2151     // in restarting, we need to flush messages
2152     unexpectedMsgs.flushMsgs();
2153     postedReqs.freeAll();
2154     unexpectedBcastMsgs.flushMsgs();
2155     postedBcastReqs.freeAll();
2156   }
2157 }
2158
2159 //------------------------ Communicator Splitting ---------------------
2160 class ampiSplitKey {
2161  public:
2162   int nextSplitComm;
2163   int color; //New class of processes we'll belong to
2164   int key; //To determine rank in new ordering
2165   int rank; //Rank in old ordering
2166   ampiSplitKey() noexcept {}
2167   ampiSplitKey(int nextSplitComm_,int color_,int key_,int rank_) noexcept
2168     :nextSplitComm(nextSplitComm_), color(color_), key(key_), rank(rank_) {}
2169 };
2170
2171 #define MPI_INTER 10
2172
2173 /* "type" may indicate whether call is for a cartesian topology etc. */
2174 void ampi::split(int color,int key,MPI_Comm *dest, int type) noexcept
2175 {
2176 #if CMK_BIGSIM_CHARM
2177   void *curLog; // store current log in timeline
2178   _TRACE_BG_TLINE_END(&curLog);
2179 #endif
2180   if (type == MPI_CART) {
2181     ampiSplitKey splitKey(parent->getNextCart(),color,key,myRank);
2182     int rootIdx=myComm.getIndexForRank(0);
2183     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
2184     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
2185
2186     thread->suspend(); //Resumed by ampiParent::cartChildRegister
2187     MPI_Comm newComm=parent->getNextCart()-1;
2188     *dest=newComm;
2189   }
2190   else if (type == MPI_GRAPH) {
2191     ampiSplitKey splitKey(parent->getNextGraph(),color,key,myRank);
2192     int rootIdx=myComm.getIndexForRank(0);
2193     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
2194     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
2195
2196     thread->suspend(); //Resumed by ampiParent::graphChildRegister
2197     MPI_Comm newComm=parent->getNextGraph()-1;
2198     *dest=newComm;
2199   }
2200   else if (type == MPI_DIST_GRAPH) {
2201     ampiSplitKey splitKey(parent->getNextDistGraph(),color,key,myRank);
2202     int rootIdx=myComm.getIndexForRank(0);
2203     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
2204     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
2205
2206     thread->suspend(); //Resumed by ampiParent::distGraphChildRegister
2207     MPI_Comm newComm=parent->getNextDistGraph()-1;
2208     *dest=newComm;
2209   }
2210   else if (type == MPI_INTER) {
2211     ampiSplitKey splitKey(parent->getNextInter(),color,key,myRank);
2212     int rootIdx=myComm.getIndexForRank(0);
2213     CkCallback cb(CkIndex_ampi::splitPhaseInter(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
2214     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
2215
2216     thread->suspend(); //Resumed by ampiParent::interChildRegister
2217     MPI_Comm newComm=parent->getNextInter()-1;
2218     *dest=newComm;
2219   }
2220   else {
2221     ampiSplitKey splitKey(parent->getNextSplit(),color,key,myRank);
2222     int rootIdx=myComm.getIndexForRank(0);
2223     CkCallback cb(CkIndex_ampi::splitPhase1(0),CkArrayIndex1D(rootIdx),myComm.getProxy());
2224     contribute(sizeof(splitKey),&splitKey,CkReduction::concat,cb);
2225
2226     thread->suspend(); //Resumed by ampiParent::splitChildRegister
2227     MPI_Comm newComm=parent->getNextSplit()-1;
2228     *dest=newComm;
2229   }
2230 #if CMK_BIGSIM_CHARM
2231   _TRACE_BG_SET_INFO(NULL, "SPLIT_RESUME", NULL, 0);
2232 #endif
2233 }
2234
2235 CLINKAGE
2236 int compareAmpiSplitKey(const void *a_, const void *b_) {
2237   const ampiSplitKey *a=(const ampiSplitKey *)a_;
2238   const ampiSplitKey *b=(const ampiSplitKey *)b_;
2239   if (a->color!=b->color) return a->color-b->color;
2240   if (a->key!=b->key) return a->key-b->key;
2241   return a->rank-b->rank;
2242 }
2243
2244 // Caller needs to eventually call newAmpi.doneInserting()
2245 CProxy_ampi ampi::createNewChildAmpiSync() noexcept {
2246   CkArrayOptions opts;
2247   opts.bindTo(parentProxy);
2248   opts.setSectionAutoDelegate(false);
2249   opts.setNumInitial(0);
2250   CkArrayID unusedAID;
2251   ampiCommStruct unusedComm;
2252   CkCallback cb(CkCallback::resumeThread);
2253   CProxy_ampi::ckNew(unusedAID, unusedComm, opts, cb);
2254   CkArrayCreatedMsg *newAmpiMsg = static_cast<CkArrayCreatedMsg*>(cb.thread_delay());
2255   CProxy_ampi newAmpi = newAmpiMsg->aid;
2256   delete newAmpiMsg;
2257   return newAmpi;
2258 }
2259
2260 void ampi::splitPhase1(CkReductionMsg *msg) noexcept
2261 {
2262   //Order the keys, which orders the ranks properly:
2263   int nKeys=msg->getSize()/sizeof(ampiSplitKey);
2264   ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
2265   if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
2266   qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
2267
2268   MPI_Comm newComm = -1;
2269   for(int i=0;i<nKeys;i++){
2270     if(keys[i].nextSplitComm>newComm)
2271       newComm = keys[i].nextSplitComm;
2272   }
2273
2274   //Loop over the sorted keys, which gives us the new arrays:
2275   int lastColor=keys[0].color-1; //The color we're building an array for
2276   CProxy_ampi lastAmpi; //The array for lastColor
2277   int lastRoot=0; //C value for new rank 0 process for latest color
2278   ampiCommStruct lastComm; //Communicator info. for latest color
2279   for (int c=0;c<nKeys;c++) {
2280     if (keys[c].color!=lastColor)
2281     { //Hit a new color-- need to build a new communicator and array
2282       lastColor=keys[c].color;
2283       lastRoot=c;
2284
2285       if (c!=0) lastAmpi.doneInserting();
2286       lastAmpi = createNewChildAmpiSync();
2287
2288       vector<int> indices; //Maps rank to array indices for new array
2289       for (int i=c;i<nKeys;i++) {
2290         if (keys[i].color!=lastColor) break; //Done with this color
2291         int idx=myComm.getIndexForRank(keys[i].rank);
2292         indices.push_back(idx);
2293       }
2294
2295       //FIXME: create a new communicator for each color, instead of
2296       // (confusingly) re-using the same MPI_Comm number for each.
2297       lastComm=ampiCommStruct(newComm,lastAmpi,indices);
2298     }
2299     int newRank=c-lastRoot;
2300     int newIdx=lastComm.getIndexForRank(newRank);
2301
2302     lastAmpi[newIdx].insert(parentProxy,lastComm);
2303   }
2304   lastAmpi.doneInserting();
2305
2306   delete msg;
2307 }
2308
2309 void ampi::splitPhaseInter(CkReductionMsg *msg) noexcept
2310 {
2311   //Order the keys, which orders the ranks properly:
2312   int nKeys=msg->getSize()/sizeof(ampiSplitKey);
2313   ampiSplitKey *keys=(ampiSplitKey *)msg->getData();
2314   if (nKeys!=myComm.getSize()) CkAbort("ampi::splitReduce expected a split contribution from every rank!");
2315   qsort(keys,nKeys,sizeof(ampiSplitKey),compareAmpiSplitKey);
2316
2317   MPI_Comm newComm = -1;
2318   for(int i=0;i<nKeys;i++){
2319     if(keys[i].nextSplitComm>newComm)
2320       newComm = keys[i].nextSplitComm; // FIXME: use nextSplitr instead of nextInter?
2321   }
2322
2323   //Loop over the sorted keys, which gives us the new arrays:
2324   int lastColor=keys[0].color-1; //The color we're building an array for
2325   CProxy_ampi lastAmpi; //The array for lastColor
2326   int lastRoot=0; //C value for new rank 0 process for latest color
2327   ampiCommStruct lastComm; //Communicator info. for latest color
2328
2329   lastAmpi = createNewChildAmpiSync();
2330
2331   for (int c=0;c<nKeys;c++) {
2332     vector<int> indices; // Maps rank to array indices for new array
2333     if (keys[c].color!=lastColor)
2334     { //Hit a new color-- need to build a new communicator and array
2335       lastColor=keys[c].color;
2336       lastRoot=c;
2337
2338       for (int i=c;i<nKeys;i++) {
2339         if (keys[i].color!=lastColor) break; //Done with this color
2340         int idx=myComm.getIndexForRank(keys[i].rank);
2341         indices.push_back(idx);
2342       }
2343
2344       if (c==0) {
2345         lastComm=ampiCommStruct(newComm,lastAmpi,indices, myComm.getRemoteIndices());
2346         for (int i=0; i<indices.size(); i++) {
2347           lastAmpi[indices[i]].insert(parentProxy,lastComm);
2348         }
2349         lastAmpi.doneInserting();
2350       }
2351     }
2352   }
2353
2354   parentProxy[0].ExchangeProxy(lastAmpi);
2355   delete msg;
2356 }
2357
2358 //...newly created array elements register with the parent, which calls:
2359 void ampiParent::splitChildRegister(const ampiCommStruct &s) noexcept {
2360   int idx=s.getComm()-MPI_COMM_FIRST_SPLIT;
2361   if (splitComm.size()<=idx) splitComm.resize(idx+1);
2362   splitComm[idx]=new ampiCommStruct(s);
2363   thread->resume(); //Matches suspend at end of ampi::split
2364 }
2365
2366 //-----------------create communicator from group--------------
2367 // The procedure is like that of comm_split very much,
2368 // so the code is shamelessly copied from above
2369 //   1. reduction to make sure all members have called
2370 //   2. the root in the old communicator create the new array
2371 //   3. ampiParent::register is called to register new array as new comm
2372 void ampi::commCreate(const vector<int>& vec,MPI_Comm* newcomm) noexcept {
2373   int rootIdx=vec[0];
2374   tmpVec = vec;
2375   CkCallback cb(CkReductionTarget(ampi,commCreatePhase1),CkArrayIndex1D(rootIdx),myComm.getProxy());
2376   MPI_Comm nextgroup = parent->getNextGroup();
2377   contribute(sizeof(nextgroup), &nextgroup,CkReduction::max_int,cb);
2378
2379   if(getPosOp(thisIndex,vec)>=0){
2380     thread->suspend(); //Resumed by ampiParent::groupChildRegister
2381     MPI_Comm retcomm = parent->getNextGroup()-1;
2382     *newcomm = retcomm;
2383   }else{
2384     *newcomm = MPI_COMM_NULL;
2385   }
2386 }
2387
2388 void ampi::insertNewChildAmpiElements(MPI_Comm nextComm, CProxy_ampi newAmpi) noexcept {
2389   ampiCommStruct newCommStruct = ampiCommStruct(nextComm, newAmpi, tmpVec);
2390   for (int i = 0; i < tmpVec.size(); ++i)
2391     newAmpi[tmpVec[i]].insert(parentProxy, newCommStruct);
2392   newAmpi.doneInserting();
2393 }
2394
2395 void ampi::commCreatePhase1(MPI_Comm nextGroupComm) noexcept {
2396   CProxy_ampi newAmpi = createNewChildAmpiSync();
2397   insertNewChildAmpiElements(nextGroupComm, newAmpi);
2398 }
2399
2400 void ampiParent::groupChildRegister(const ampiCommStruct &s) noexcept {
2401   int idx=s.getComm()-MPI_COMM_FIRST_GROUP;
2402   if (groupComm.size()<=idx) groupComm.resize(idx+1);
2403   groupComm[idx]=new ampiCommStruct(s);
2404   thread->resume(); //Matches suspend at end of ampi::split
2405 }
2406
2407 /* Virtual topology communicator creation */
2408
2409 // 0-dimensional cart comm: rank 0 creates a dup of COMM_SELF with topo info.
2410 MPI_Comm ampi::cartCreate0D() noexcept {
2411   if (getRank() == 0) {
2412     tmpVec.clear();
2413     tmpVec.push_back(0);
2414     commCreatePhase1(parent->getNextCart());
2415     MPI_Comm newComm = parent->getNextCart()-1;
2416     ampiCommStruct &newCommStruct = getAmpiParent()->getCart(newComm);
2417     ampiTopology *newTopo = newCommStruct.getTopology();
2418     newTopo->setndims(0);
2419     return newComm;
2420   }
2421   else {
2422     return MPI_COMM_NULL;
2423   }
2424 }
2425
2426 MPI_Comm ampi::cartCreate(vector<int>& vec, int ndims, const int* dims) noexcept {
2427   if (ndims == 0) {
2428     return cartCreate0D();
2429   }
2430
2431   // Subtract out ranks from the group that won't be in the new comm
2432   int newsize = dims[0];
2433   for (int i = 1; i < ndims; i++) {
2434     newsize *= dims[i];
2435   }
2436   for (int i = vec.size(); i > newsize; i--) {
2437     vec.pop_back();
2438   }
2439
2440   int rootIdx = vec[0];
2441   tmpVec = vec;
2442   CkCallback cb(CkReductionTarget(ampi,commCreatePhase1),CkArrayIndex1D(rootIdx),myComm.getProxy());
2443
2444   MPI_Comm nextcart = parent->getNextCart();
2445   contribute(sizeof(nextcart), &nextcart,CkReduction::max_int,cb);
2446
2447   if (getPosOp(thisIndex,vec)>=0) {
2448     thread->suspend(); //Resumed by ampiParent::cartChildRegister
2449     return parent->getNextCart()-1;
2450   } else {
2451     return MPI_COMM_NULL;
2452   }
2453 }
2454
2455 void ampiParent::cartChildRegister(const ampiCommStruct &s) noexcept {
2456   int idx=s.getComm()-MPI_COMM_FIRST_CART;
2457   if (cartComm.size()<=idx) {
2458     cartComm.resize(idx+1);
2459     cartComm.length()=idx+1;
2460   }
2461   cartComm[idx]=new ampiCommStruct(s,MPI_CART);
2462   thread->resume(); //Matches suspend at end of ampi::cartCreate
2463 }
2464
2465 void ampi::graphCreate(const vector<int>& vec,MPI_Comm* newcomm) noexcept {
2466   int rootIdx=vec[0];
2467   tmpVec = vec;
2468   CkCallback cb(CkReductionTarget(ampi,commCreatePhase1),CkArrayIndex1D(rootIdx),
2469       myComm.getProxy());
2470   MPI_Comm nextgraph = parent->getNextGraph();
2471   contribute(sizeof(nextgraph), &nextgraph,CkReduction::max_int,cb);
2472
2473   if(getPosOp(thisIndex,vec)>=0){
2474     thread->suspend(); //Resumed by ampiParent::graphChildRegister
2475     MPI_Comm retcomm = parent->getNextGraph()-1;
2476     *newcomm = retcomm;
2477   }else
2478     *newcomm = MPI_COMM_NULL;
2479 }
2480
2481 void ampiParent::graphChildRegister(const ampiCommStruct &s) noexcept {
2482   int idx=s.getComm()-MPI_COMM_FIRST_GRAPH;
2483   if (graphComm.size()<=idx) {
2484     graphComm.resize(idx+1);
2485     graphComm.length()=idx+1;
2486   }
2487   graphComm[idx]=new ampiCommStruct(s,MPI_GRAPH);
2488   thread->resume(); //Matches suspend at end of ampi::graphCreate
2489 }
2490
2491 void ampi::distGraphCreate(const vector<int>& vec, MPI_Comm* newcomm) noexcept
2492 {
2493   int rootIdx = vec[0];
2494   tmpVec = vec;
2495   CkCallback cb(CkReductionTarget(ampi,commCreatePhase1), CkArrayIndex1D(rootIdx), myComm.getProxy());
2496   MPI_Comm nextDistGraph = parent->getNextDistGraph();
2497   contribute(sizeof(nextDistGraph), &nextDistGraph, CkReduction::max_int, cb);
2498
2499   if (getPosOp(thisIndex,vec) >= 0) {
2500     thread->suspend(); //Resumed by ampiParent::distGraphChildRegister
2501     MPI_Comm retcomm = parent->getNextDistGraph()-1;
2502     *newcomm = retcomm;
2503   }
2504   else {
2505     *newcomm = MPI_COMM_NULL;
2506   }
2507 }
2508
2509 void ampiParent::distGraphChildRegister(const ampiCommStruct &s) noexcept
2510 {
2511   int idx = s.getComm()-MPI_COMM_FIRST_DIST_GRAPH;
2512   if (distGraphComm.size() <= idx) {
2513     distGraphComm.resize(idx+1);
2514     distGraphComm.length() = idx+1;
2515   }
2516   distGraphComm[idx] = new ampiCommStruct(s,MPI_DIST_GRAPH);
2517   thread->resume(); //Matches suspend at end of ampi::distGraphCreate
2518 }
2519
2520 void ampi::intercommCreate(const vector<int>& remoteVec, const int root, MPI_Comm tcomm, MPI_Comm *ncomm) noexcept {
2521   if (thisIndex==root) { // not everybody gets the valid rvec
2522     tmpVec = remoteVec;
2523   }
2524   CkCallback cb(CkReductionTarget(ampi, intercommCreatePhase1),CkArrayIndex1D(root),myComm.getProxy());
2525   MPI_Comm nextinter = parent->getNextInter();
2526   contribute(sizeof(nextinter), &nextinter,CkReduction::max_int,cb);
2527   thread->suspend(); //Not resumed by ampiParent::interChildRegister. Resumed by ExchangeProxy.
2528   *ncomm = parent->getNextInter()-1;
2529 }
2530
2531 void ampi::intercommCreatePhase1(MPI_Comm nextInterComm) noexcept {
2532
2533   CProxy_ampi newAmpi = createNewChildAmpiSync();
2534   const vector<int>& lgroup = myComm.getIndices();
2535   ampiCommStruct newCommstruct = ampiCommStruct(nextInterComm,newAmpi,lgroup,tmpVec);
2536   for(int i=0;i<lgroup.size();i++){
2537     int newIdx=lgroup[i];
2538     newAmpi[newIdx].insert(parentProxy,newCommstruct);
2539   }
2540   newAmpi.doneInserting();
2541
2542   parentProxy[0].ExchangeProxy(newAmpi);
2543 }
2544
2545 void ampiParent::interChildRegister(const ampiCommStruct &s) noexcept {
2546   int idx=s.getComm()-MPI_COMM_FIRST_INTER;
2547   if (interComm.size()<=idx) interComm.resize(idx+1);
2548   interComm[idx]=new ampiCommStruct(s);
2549   // don't resume the thread yet, till parent set remote proxy
2550 }
2551
2552 void ampi::intercommMerge(int first, MPI_Comm *ncomm) noexcept { // first valid only at local root
2553   if(myRank == 0 && first == 1){ // first (lower) group creates the intracommunicator for the higher group
2554     vector<int> lvec = myComm.getIndices();
2555     vector<int> rvec = myComm.getRemoteIndices();
2556     int rsize = rvec.size();
2557     tmpVec = lvec;
2558     for(int i=0;i<rsize;i++)
2559       tmpVec.push_back(rvec[i]);
2560     if(tmpVec.size()==0) CkAbort("Error in ampi::intercommMerge: merging empty comms!\n");
2561   }else{
2562     tmpVec.resize(0);
2563   }
2564
2565   int rootIdx=myComm.getIndexForRank(0);
2566   CkCallback cb(CkReductionTarget(ampi, intercommMergePhase1),CkArrayIndex1D(rootIdx),myComm.getProxy());
2567   MPI_Comm nextintra = parent->getNextIntra();
2568   contribute(sizeof(nextintra), &nextintra,CkReduction::max_int,cb);
2569
2570   thread->suspend(); //Resumed by ampiParent::interChildRegister
2571   MPI_Comm newcomm=parent->getNextIntra()-1;
2572   *ncomm=newcomm;
2573 }
2574
2575 void ampi::intercommMergePhase1(MPI_Comm nextIntraComm) noexcept {
2576   // gets called on two roots, first root creates the comm
2577   if(tmpVec.size()==0) return;
2578   CProxy_ampi newAmpi = createNewChildAmpiSync();
2579   insertNewChildAmpiElements(nextIntraComm, newAmpi);
2580 }
2581
2582 void ampiParent::intraChildRegister(const ampiCommStruct &s) noexcept {
2583   int idx=s.getComm()-MPI_COMM_FIRST_INTRA;
2584   if (intraComm.size()<=idx) intraComm.resize(idx+1);
2585   intraComm[idx]=new ampiCommStruct(s);
2586   thread->resume(); //Matches suspend at end of ampi::split
2587 }
2588
2589 void ampi::topoDup(int topoType, int rank, MPI_Comm comm, MPI_Comm *newComm) noexcept
2590 {
2591   if (getAmpiParent()->isInter(comm)) {
2592     split(0, rank, newComm, MPI_INTER);
2593   } else {
2594     split(0, rank, newComm, topoType);
2595
2596     if (topoType != MPI_UNDEFINED) {
2597       ampiTopology *topo, *newTopo;
2598       if (topoType == MPI_CART) {
2599         topo = getAmpiParent()->getCart(comm).getTopology();
2600         newTopo = getAmpiParent()->getCart(*newComm).getTopology();
2601       } else if (topoType == MPI_GRAPH) {
2602         topo = getAmpiParent()->getGraph(comm).getTopology();
2603         newTopo = getAmpiParent()->getGraph(*newComm).getTopology();
2604       } else {
2605         CkAssert(topoType == MPI_DIST_GRAPH);
2606         topo = getAmpiParent()->getDistGraph(comm).getTopology();
2607         newTopo = getAmpiParent()->getDistGraph(*newComm).getTopology();
2608       }
2609       newTopo->dup(topo);
2610     }
2611   }
2612 }
2613
2614 //------------------------ communication -----------------------
2615 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo) noexcept
2616 {
2617   if (universeNo>MPI_COMM_WORLD) {
2618     int worldDex=universeNo-MPI_COMM_WORLD-1;
2619     if (worldDex>=_mpi_nworlds)
2620       CkAbort("Bad world communicator passed to universeComm2CommStruct");
2621     return mpi_worlds[worldDex];
2622   }
2623   CkAbort("Bad communicator passed to universeComm2CommStruct");
2624   return mpi_worlds[0]; // meaningless return
2625 }
2626
2627 void ampiParent::block() noexcept {
2628   thread->suspend();
2629 }
2630
2631 void ampiParent::yield() noexcept {
2632   thread->schedule();
2633 }
2634
2635 void ampi::unblock() noexcept {
2636   thread->resume();
2637 }
2638
2639 ampiParent* ampiParent::blockOnRecv() noexcept {
2640   resumeOnRecv = true;
2641   // In case this thread is migrated while suspended,
2642   // save myComm to get the ampi instance back. Then
2643   // return "dis" in case the caller needs it.
2644   thread->suspend();
2645   ampiParent* dis = getAmpiParent();
2646   dis->resumeOnRecv = false;
2647   return dis;
2648 }
2649
2650 ampi* ampi::blockOnRecv() noexcept {
2651   parent->resumeOnRecv = true;
2652   // In case this thread is migrated while suspended,
2653   // save myComm to get the ampi instance back. Then
2654   // return "dis" in case the caller needs it.
2655   MPI_Comm comm = myComm.getComm();
2656   thread->suspend();
2657   ampi *dis = getAmpiInstance(comm);
2658   dis->parent->resumeOnRecv = false;
2659   return dis;
2660 }
2661
2662 void ampi::setBlockingReq(AmpiRequest *req) noexcept {
2663   CkAssert(parent->blockingReq == NULL);
2664   CkAssert(parent->resumeOnColl == false);
2665   parent->blockingReq = req;
2666   parent->resumeOnColl = true;
2667 }
2668
2669 // block on (All)Reduce or (All)Gather(v)
2670 ampi* ampi::blockOnColl() noexcept {
2671 #if CMK_BIGSIM_CHARM
2672   void *curLog; // store current log in timeline
2673   _TRACE_BG_TLINE_END(&curLog);
2674 #if CMK_TRACE_IN_CHARM
2675   if(CpvAccess(traceOn)) traceSuspend();
2676 #endif
2677 #endif
2678
2679   CkAssert(parent->resumeOnColl == true);
2680   MPI_Comm comm = myComm.getComm();
2681   thread->suspend();
2682   ampi *dis = getAmpiInstance(comm);
2683   dis->parent->resumeOnColl = false;
2684
2685 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2686   CpvAccess(_currentObj) = dis;
2687 #endif
2688 #if CMK_BIGSIM_CHARM
2689 #if CMK_TRACE_IN_CHARM
2690   if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
2691 #endif
2692   TRACE_BG_AMPI_BREAK(dis->thread->getThread(), "RECV_RESUME", NULL, 0, 0);
2693   if (dis->parent->blockingReq->eventPe == CkMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(dis->parent->blockingReq->event);
2694 #endif
2695
2696   delete dis->parent->blockingReq; dis->parent->blockingReq = NULL;
2697   return dis;
2698 }
2699
2700 void ampi::ssend_ack(int sreq_idx) noexcept {
2701   if (sreq_idx == 1)
2702     thread->resume();           // MPI_Ssend
2703   else {
2704     sreq_idx -= 2;              // start from 2
2705     AmpiRequestList& reqs = getReqs();
2706     AmpiRequest *sreq = reqs[sreq_idx];
2707     sreq->complete = true;
2708     handleBlockedReq(sreq);
2709     resumeThreadIfReady();
2710   }
2711 }
2712
2713 void ampi::injectMsg(int size, char* buf) noexcept
2714 {
2715   generic(makeAmpiMsg(thisIndex, 0, thisIndex, (void*)buf, size, MPI_CHAR, MPI_COMM_WORLD, 0));
2716 }
2717
2718 void ampi::generic(AmpiMsg* msg) noexcept
2719 {
2720   MSG_ORDER_DEBUG(
2721     CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d (seq %d) resumeOnRecv %d\n",
2722              thisIndex, msg->getTag(), msg->getSrcRank(), getComm(), msg->getSeq(), parent->resumeOnRecv);
2723   )
2724 #if CMK_BIGSIM_CHARM
2725   TRACE_BG_ADD_TAG("AMPI_generic");
2726   msg->event = NULL;
2727 #endif
2728
2729   if(msg->getSeq() != 0) {
2730     int seqIdx = msg->getSeqIdx();
2731     int n=oorder.put(seqIdx,msg);
2732     if (n>0) { // This message was in-order
2733       inorder(msg);
2734       if (n>1) { // It enables other, previously out-of-order messages
2735         while((msg=oorder.getOutOfOrder(seqIdx))!=0) {
2736           inorder(msg);
2737         }
2738       }
2739     }
2740   } else { //Cross-world or system messages are unordered
2741     inorder(msg);
2742   }
2743   // msg may be free'ed from calling inorder()
2744
2745   resumeThreadIfReady();
2746 }
2747
2748 // Same as ampi::generic except it's [nokeep] and msg is sequenced
2749 void ampi::bcastResult(AmpiMsg* msg) noexcept
2750 {
2751   MSG_ORDER_DEBUG(
2752     CkPrintf("AMPI vp %d bcast arrival: tag=%d, src=%d, comm=%d (seq %d) resumeOnRecv %d\n",
2753              thisIndex, msg->getTag(), msg->getSrcRank(), getComm(), msg->getSeq(), parent->resumeOnRecv);
2754   )
2755 #if CMK_BIGSIM_CHARM
2756   TRACE_BG_ADD_TAG("AMPI_generic");
2757   msg->event = NULL;
2758 #endif
2759
2760   CkAssert(msg->getSeq() != 0);
2761   int seqIdx = msg->getSeqIdx();
2762   int n=oorder.put(seqIdx,msg);
2763   if (n>0) { // This message was in-order
2764     inorderBcast(msg, false); // inorderBcast() is [nokeep]-aware, unlike inorder()
2765     if (n>1) { // It enables other, previously out-of-order messages
2766       while((msg=oorder.getOutOfOrder(seqIdx))!=0) {
2767         inorderBcast(msg, true);
2768       }
2769     }
2770   }
2771   // [nokeep] entry method, so do not delete msg
2772   resumeThreadIfReady();
2773 }
2774
2775 inline static AmpiRequestList &getReqs() noexcept;
2776
2777 void AmpiRequestList::freeNonPersReq(int &idx) noexcept {
2778   ampiParent* pptr = getAmpiParent();
2779   if (!reqs[idx]->isPersistent()) {
2780     free(pptr->reqPool, idx, pptr->getDDT());
2781     idx = MPI_REQUEST_NULL;
2782   }
2783 }
2784
2785 void AmpiRequestList::free(AmpiRequestPool &reqPool, int idx, CkDDT *ddt) noexcept {
2786   if (idx < 0) return;
2787   reqs[idx]->free(ddt);
2788   reqPool.deleteReq(reqs[idx]);
2789   reqs[idx] = NULL;
2790   startIdx = std::min(idx, startIdx);
2791 }
2792
2793 void ampi::inorder(AmpiMsg* msg) noexcept
2794 {
2795   MSG_ORDER_DEBUG(
2796     CkPrintf("AMPI vp %d inorder: tag=%d, src=%d, comm=%d (seq %d)\n",
2797              thisIndex, msg->getTag(), msg->getSrcRank(), getComm(), msg->getSeq());
2798   )
2799
2800 #if CMK_BIGSIM_CHARM
2801   _TRACE_BG_TLINE_END(&msg->event); // store current log
2802   msg->eventPe = CkMyPe();
2803 #endif
2804
2805   //Check posted recvs:
2806   int tag = msg->getTag();
2807   int srcRank = msg->getSrcRank();
2808   AmpiRequest* req = postedReqs.get(tag, srcRank);
2809   if (req) { // receive posted
2810     handleBlockedReq(req);
2811     req->receive(this, msg);
2812   } else {
2813     unexpectedMsgs.put(msg);
2814   }
2815 }
2816
2817 void ampi::inorderBcast(AmpiMsg* msg, bool deleteMsg) noexcept
2818 {
2819   MSG_ORDER_DEBUG(
2820     CkPrintf("AMPI vp %d inorder bcast: tag=%d, src=%d, comm=%d (seq %d)\n",
2821              thisIndex, msg->getTag(), msg->getSrcRank(), getComm(), msg->getSeq());
2822   )
2823
2824 #if CMK_BIGSIM_CHARM
2825   _TRACE_BG_TLINE_END(&msg->event); // store current log
2826   msg->eventPe = CkMyPe();
2827 #endif
2828
2829   //Check posted recvs:
2830   int tag = msg->getTag();
2831   int srcRank = msg->getSrcRank();
2832   AmpiRequest* req = postedBcastReqs.get(tag, srcRank);
2833   if (req) { // receive posted
2834     handleBlockedReq(req);
2835     req->receive(this, msg, deleteMsg);
2836   } else {
2837     // Reference the [nokeep] msg so it isn't freed by the runtime
2838     CmiReference(UsrToEnv(msg));
2839     unexpectedBcastMsgs.put(msg);
2840   }
2841 }
2842
2843 static inline AmpiMsg* rdma2AmpiMsg(char *buf, int size, CMK_REFNUM_TYPE seq, int tag, int srcRank,
2844                                     int ssendReq) noexcept
2845 {
2846   // Convert an Rdma message (parameter marshalled buffer) to an AmpiMsg
2847   AmpiMsg* msg = new (size, 0) AmpiMsg(seq, ssendReq, tag, srcRank, size);
2848   memcpy(msg->data, buf, size); // Assumes the buffer is contiguous
2849   return msg;
2850 }
2851
2852 // RDMA version of ampi::generic
2853 void ampi::genericRdma(char* buf, int size, CMK_REFNUM_TYPE seq, int tag, int srcRank, MPI_Comm destcomm, int ssendReq) noexcept
2854 {
2855   MSG_ORDER_DEBUG(
2856     CkPrintf("[%d] in ampi::genericRdma on index %d, size=%d, seq=%d, srcRank=%d, tag=%d, comm=%d, ssendReq=%d\n",
2857              CkMyPe(), getIndexForRank(getRank()), size, seq, srcRank, tag, destcomm, ssendReq);
2858   )
2859
2860   if (seq != 0) {
2861     int seqIdx = srcRank;
2862     int n = oorder.isInOrder(seqIdx, seq);
2863     if (n > 0) { // This message was in-order
2864       inorderRdma(buf, size, seq, tag, srcRank, destcomm, ssendReq);
2865       if (n > 1) { // It enables other, previously out-of-order messages
2866         AmpiMsg *msg = NULL;
2867         while ((msg = oorder.getOutOfOrder(seqIdx)) != 0) {
2868           inorder(msg);
2869         }
2870       }
2871     } else { // This message was out-of-order: stash it (as an AmpiMsg)
2872       AmpiMsg *msg = rdma2AmpiMsg(buf, size, seq, tag, srcRank, ssendReq);
2873       oorder.putOutOfOrder(seqIdx, msg);
2874     }
2875   } else { // Cross-world or system messages are unordered
2876     inorderRdma(buf, size, seq, tag, srcRank, destcomm, ssendReq);
2877   }
2878
2879   resumeThreadIfReady();
2880 }
2881
2882 // RDMA version of ampi::inorder
2883 void ampi::inorderRdma(char* buf, int size, CMK_REFNUM_TYPE seq, int tag, int srcRank,
2884                        MPI_Comm comm, int ssendReq) noexcept
2885 {
2886   MSG_ORDER_DEBUG(
2887     CkPrintf("AMPI vp %d inorderRdma: tag=%d, src=%d, comm=%d  (seq %d)\n",
2888              thisIndex, tag, srcRank, comm, seq);
2889   )
2890
2891   //Check posted recvs:
2892   AmpiRequest* req = postedReqs.get(tag, srcRank);
2893   if (req) { // receive posted
2894     handleBlockedReq(req);
2895     req->receiveRdma(this, buf, size, ssendReq, srcRank, comm);
2896   } else {
2897     AmpiMsg* msg = rdma2AmpiMsg(buf, size, seq, tag, srcRank, ssendReq);
2898     unexpectedMsgs.put(msg);
2899   }
2900 }
2901
2902 // Callback from ampi::genericRdma() signaling that the send buffer is now safe to re-use
2903 void ampi::completedRdmaSend(CkDataMsg *msg) noexcept
2904 {
2905   // refnum is the index into reqList for this SendReq
2906   int reqIdx = CkGetRefNum(msg);
2907
2908   MSG_ORDER_DEBUG(
2909     CkPrintf("[%d] in ampi::completedRdmaSend on index %d, reqIdx = %d\n",
2910              CkMyPe(), parent->thisIndex, reqIdx);
2911   )
2912
2913   AmpiRequestList& reqList = getReqs();
2914   AmpiRequest* sreq = reqList[reqIdx];
2915   sreq->complete = true;
2916
2917   handleBlockedReq(sreq);
2918   resumeThreadIfReady();
2919   // CkDataMsg is allocated & freed by the runtime, so do not delete msg
2920 }
2921
2922 void handle_MPI_BOTTOM(void* &buf, MPI_Datatype type) noexcept
2923 {
2924   if (buf == MPI_BOTTOM) {
2925     buf = (void*)getDDT()->getType(type)->getLB();
2926     getDDT()->getType(type)->setAbsolute(true);
2927   }
2928 }
2929
2930 void handle_MPI_BOTTOM(void* &buf1, MPI_Datatype type1, void* &buf2, MPI_Datatype type2) noexcept
2931 {
2932   if (buf1 == MPI_BOTTOM) {
2933     buf1 = (void*)getDDT()->getType(type1)->getLB();
2934     getDDT()->getType(type1)->setAbsolute(true);
2935   }
2936   if (buf2 == MPI_BOTTOM) {
2937     buf2 = (void*)getDDT()->getType(type2)->getLB();
2938     getDDT()->getType(type2)->setAbsolute(true);
2939   }
2940 }
2941
2942 AmpiMsg *ampi::makeBcastMsg(const void *buf,int count,MPI_Datatype type,int root,MPI_Comm destcomm) noexcept
2943 {
2944   CkDDT_DataType *ddt = getDDT()->getType(type);
2945   int len = ddt->getSize(count);
2946   CMK_REFNUM_TYPE seq = getSeqNo(root, destcomm, MPI_BCAST_TAG);
2947   // Do not use the msg pool for bcasts:
2948   AmpiMsg *msg = new (len, 0) AmpiMsg(seq, MPI_REQUEST_NULL, MPI_BCAST_TAG, root, len);
2949   ddt->serialize((char*)buf, msg->getData(), count, msg->getLength(), PACK);
2950   return msg;
2951 }
2952
2953 AmpiMsg *ampi::makeAmpiMsg(int destRank,int t,int sRank,const void *buf,int count,
2954                            MPI_Datatype type,MPI_Comm destcomm, int ssendReq/*=0*/) noexcept
2955 {
2956   CkDDT_DataType *ddt = getDDT()->getType(type);
2957   int len = ddt->getSize(count);
2958   CMK_REFNUM_TYPE seq = getSeqNo(destRank, destcomm, t);
2959   AmpiMsg *msg = CkpvAccess(msgPool).newAmpiMsg(seq, ssendReq, t, sRank, len);
2960   ddt->serialize((char*)buf, msg->getData(), count, msg->getLength(), PACK);
2961   return msg;
2962 }
2963
2964 MPI_Request ampi::send(int t, int sRank, const void* buf, int count, MPI_Datatype type,
2965                        int rank, MPI_Comm destcomm, int ssendReq/*=0*/, AmpiSendType sendType/*=BLOCKING_SEND*/) noexcept
2966 {
2967 #if CMK_TRACE_IN_CHARM
2968   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND", NULL, 0, 1);
2969 #endif
2970
2971 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2972   MPI_Comm disComm = myComm.getComm();
2973   ampi *dis = getAmpiInstance(disComm);
2974   CpvAccess(_currentObj) = dis;
2975 #endif
2976
2977   const ampiCommStruct &dest=comm2CommStruct(destcomm);
2978   MPI_Request req = delesend(t,sRank,buf,count,type,rank,destcomm,dest.getProxy(),ssendReq,sendType);
2979   if (sendType == BLOCKING_SEND && req != MPI_REQUEST_NULL) {
2980     AmpiRequestList& reqList = getReqs();
2981     AmpiRequest *sreq = reqList[req];
2982     sreq->wait(MPI_STATUS_IGNORE);
2983     reqList.free(parent->reqPool, req, parent->getDDT());
2984     req = MPI_REQUEST_NULL;
2985   }
2986
2987 #if CMK_TRACE_IN_CHARM
2988   TRACE_BG_AMPI_BREAK(thread->getThread(), "AMPI_SEND_END", NULL, 0, 1);
2989 #endif
2990
2991   if (ssendReq == 1) {
2992     // waiting for receiver side
2993     parent->resumeOnRecv = false;            // so no one else awakes it
2994     parent->block();
2995   }
2996
2997   return req;
2998 }
2999
3000 void ampi::sendraw(int t, int sRank, void* buf, int len, CkArrayID aid, int idx) noexcept
3001 {
3002   AmpiMsg *msg = new (len, 0) AmpiMsg(0, 0, t, sRank, len);
3003   memcpy(msg->getData(), buf, len);
3004   CProxy_ampi pa(aid);
3005   pa[idx].generic(msg);
3006 }
3007
3008 CMK_REFNUM_TYPE ampi::getSeqNo(int destRank, MPI_Comm destcomm, int tag) noexcept {
3009   int seqIdx = (tag >= MPI_BCAST_TAG) ? COLL_SEQ_IDX : destRank;
3010   CMK_REFNUM_TYPE seq = 0;
3011   if (destcomm<=MPI_COMM_WORLD && tag<=MPI_BCAST_TAG) { //Not cross-module: set seqno
3012     seq = oorder.nextOutgoing(seqIdx);
3013   }
3014   return seq;
3015 }
3016
3017 MPI_Request ampi::sendRdmaMsg(int t, int sRank, const void* buf, int size, MPI_Datatype type, int destIdx,
3018                               int destRank, MPI_Comm destcomm, CProxy_ampi arrProxy, int ssendReq) noexcept
3019 {
3020   CMK_REFNUM_TYPE seq = getSeqNo(destRank, destcomm, t);
3021
3022   if (ssendReq) { // Using a SsendReq to track matching receive, so no need for SendReq here
3023     arrProxy[destIdx].genericRdma(CkSendBuffer(buf), size, seq, t, sRank, destcomm, ssendReq);
3024     return MPI_REQUEST_NULL;
3025   }
3026   else { // Set up a SendReq to track completion of the send buffer
3027     MPI_Request req = postReq(parent->reqPool.newReq<SendReq>(type, destcomm, getDDT()));
3028     CkCallback completedSendCB(CkIndex_ampi::completedRdmaSend(NULL), thisProxy[thisIndex], true/*inline*/);
3029     completedSendCB.setRefnum(req);
3030
3031     arrProxy[destIdx].genericRdma(CkSendBuffer(buf, completedSendCB), size, seq, t, sRank, destcomm, ssendReq);
3032     return req;
3033   }
3034 }
3035
3036 // Call genericRdma inline on the local destination object
3037 MPI_Request ampi::sendLocalMsg(int t, int sRank, const void* buf, int size, MPI_Datatype type, int destRank,
3038                                MPI_Comm destcomm, ampi* destPtr, int ssendReq, AmpiSendType sendType) noexcept
3039 {
3040   CMK_REFNUM_TYPE seq = getSeqNo(destRank, destcomm, t);
3041
3042   destPtr->genericRdma((char*)buf, size, seq, t, sRank, destcomm, ssendReq);
3043
3044   if (ssendReq || sendType == BLOCKING_SEND) {
3045     return MPI_REQUEST_NULL;
3046   }
3047   else { // SendReq is pre-completed since we directly copied the send buffer
3048     return postReq(parent->reqPool.newReq<SendReq>(type, destcomm, getDDT(), AMPI_REQ_COMPLETED));
3049   }
3050 }
3051
3052 MPI_Request ampi::delesend(int t, int sRank, const void* buf, int count, MPI_Datatype type,
3053                            int rank, MPI_Comm destcomm, CProxy_ampi arrProxy, int ssendReq,
3054                            AmpiSendType sendType) noexcept
3055 {
3056   if (rank==MPI_PROC_NULL) return MPI_REQUEST_NULL;
3057   const ampiCommStruct &dest=comm2CommStruct(destcomm);
3058   int destIdx;
3059   if(isInter()){
3060     sRank = thisIndex;
3061     destIdx = dest.getIndexForRemoteRank(rank);
3062     arrProxy = remoteProxy;
3063   } else {
3064     destIdx = dest.getIndexForRank(rank);
3065   }
3066
3067   MSG_ORDER_DEBUG(
3068     CkPrintf("AMPI vp %d send: tag=%d, src=%d, comm=%d (to %d)\n",thisIndex,t,sRank,destcomm,destIdx);
3069   )
3070
3071   ampi *destPtr = arrProxy[destIdx].ckLocal();
3072   CkDDT_DataType *ddt = getDDT()->getType(type);
3073   int size = ddt->getSize(count);
3074   if (ddt->isContig()) {
3075 #if AMPI_LOCAL_IMPL
3076     if (destPtr != NULL) {
3077       return sendLocalMsg(t, sRank, buf, size, type, rank, destcomm, destPtr, ssendReq, sendType);
3078     }
3079 #endif
3080 #if AMPI_RDMA_IMPL
3081     if (size >= AMPI_RDMA_THRESHOLD ||
3082        (size >= AMPI_SMP_RDMA_THRESHOLD && destLikelyWithinProcess(arrProxy, destIdx)))
3083     {
3084       return sendRdmaMsg(t, sRank, buf, size, type, destIdx, rank, destcomm, arrProxy, ssendReq);
3085     }
3086 #endif
3087   }
3088 #if AMPI_LOCAL_IMPL
3089   if (destPtr != NULL) {
3090     destPtr->generic(makeAmpiMsg(rank, t, sRank, buf, count, type, destcomm, ssendReq));
3091     return MPI_REQUEST_NULL;
3092   } else
3093 #endif
3094   {
3095     arrProxy[destIdx].generic(makeAmpiMsg(rank, t, sRank, buf, count, type, destcomm, ssendReq));
3096     return MPI_REQUEST_NULL;
3097   }
3098 }
3099
3100 void ampi::processAmpiMsg(AmpiMsg *msg, void* buf, MPI_Datatype type, int count) noexcept
3101 {
3102   int ssendReq = msg->getSsendReq();
3103   if (ssendReq > 0) { // send an ack to sender
3104     int srcRank = msg->getSrcRank();
3105     int srcIdx = getIndexForRank(srcRank);
3106     thisProxy[srcIdx].ssend_ack(ssendReq);
3107   }
3108
3109   CkDDT_DataType *ddt = getDDT()->getType(type);
3110
3111   ddt->serialize((char*)buf, msg->getData(), count, msg->getLength(), UNPACK);
3112 }
3113
3114 // RDMA version of ampi::processAmpiMsg
3115 void ampi::processRdmaMsg(const void *sbuf, int slength, int ssendReq, int srank, void* rbuf,
3116                           int rcount, MPI_Datatype rtype, MPI_Comm comm) noexcept
3117 {
3118   if (ssendReq > 0) { // send an ack to sender
3119     int srcIdx = getIndexForRank(srank);
3120     thisProxy[srcIdx].ssend_ack(ssendReq);
3121   }
3122
3123   CkDDT_DataType *ddt = getDDT()->getType(rtype);
3124
3125   ddt->serialize((char*)rbuf, (char*)sbuf, rcount, slength, UNPACK);
3126 }
3127
3128 void ampi::processRednMsg(CkReductionMsg *msg, void* buf, MPI_Datatype type, int count) noexcept
3129 {
3130   // The first sizeof(AmpiOpHeader) bytes in the redn msg data are reserved
3131   // for an AmpiOpHeader if our custom AmpiReducer type was used.
3132   int szhdr = (msg->getReducer() == AmpiReducer) ? sizeof(AmpiOpHeader) : 0;
3133   getDDT()->getType(type)->serialize((char*)buf, (char*)msg->getData()+szhdr, count, msg->getLength()-szhdr, UNPACK);
3134 }
3135
3136 void ampi::processNoncommutativeRednMsg(CkReductionMsg *msg, void* buf, MPI_Datatype type, int count, MPI_User_function* func) noexcept
3137 {
3138   CkReduction::tupleElement* results = NULL;
3139   int numReductions = 0;
3140   msg->toTuple(&results, &numReductions);
3141
3142   // Contributions are unordered and consist of a (srcRank, data) tuple
3143   char *data           = (char*)(results[1].data);
3144   CkDDT_DataType *ddt  = getDDT()->getType(type);
3145   int contributionSize = ddt->getSize(count);
3146   int commSize         = getSize();
3147
3148   // Store pointers to each contribution's data at index 'srcRank' in contributionData
3149   // If the max rank value fits into an unsigned short int, srcRanks are those, otherwise int's
3150   vector<void *> contributionData(commSize);
3151   if (commSize < std::numeric_limits<unsigned short int>::max()) {
3152     unsigned short int *srcRank = (unsigned short int*)(results[0].data);
3153     for (int i=0; i<commSize; i++) {
3154       contributionData[srcRank[i]] = &data[i * contributionSize];
3155     }
3156   }
3157   else {
3158     int *srcRank = (int*)(results[0].data);
3159     for (int i=0; i<commSize; i++) {
3160       contributionData[srcRank[i]] = &data[i * contributionSize];
3161     }
3162   }
3163
3164   if (ddt->isContig()) {
3165     // Copy rank 0's contribution into buf first
3166     memcpy(buf, contributionData[0], contributionSize);
3167
3168     // Invoke the MPI_User_function on the contributions in 'rank' order
3169     for (int i=1; i<commSize; i++) {
3170       (*func)(contributionData[i], buf, &count, &type);
3171     }
3172   }
3173   else {
3174     int contributionExtent = ddt->getExtent() * count;
3175
3176     // Deserialize rank 0's contribution into buf first
3177     ddt->serialize((char*)contributionData[0], (char*)buf, count, contributionExtent, UNPACK);
3178
3179     // Invoke the MPI_User_function on the deserialized contributions in 'rank' order
3180     vector<char> deserializedBuf(contributionExtent);
3181     for (int i=1; i<commSize; i++) {
3182       ddt->serialize((char*)contributionData[i], deserializedBuf.data(), count, contributionExtent, UNPACK);
3183       (*func)(deserializedBuf.data(), buf, &count, &type);
3184     }
3185   }
3186   delete [] results;
3187 }
3188
3189 void ampi::processGatherMsg(CkReductionMsg *msg, void* buf, MPI_Datatype type, int recvCount) noexcept
3190 {
3191   CkReduction::tupleElement* results = NULL;
3192   int numReductions = 0;
3193   msg->toTuple(&results, &numReductions);
3194   CkAssert(numReductions == 2);
3195
3196   // Re-order the gather data based on the rank of the contributor
3197   char *data             = (char*)(results[1].data);
3198   CkDDT_DataType *ddt    = getDDT()->getType(type);
3199   int contributionSize   = ddt->getSize(recvCount);
3200   int contributionExtent = ddt->getExtent()*recvCount;
3201   int commSize           = getSize();
3202
3203   // If the max rank value fits into an unsigned short int, srcRanks are those, otherwise int's
3204   if (commSize < std::numeric_limits<unsigned short int>::max()) {
3205     unsigned short int *srcRank = (unsigned short int*)(results[0].data);
3206     for (int i=0; i<commSize; i++) {
3207       ddt->serialize(&(((char*)buf)[srcRank[i] * contributionExtent]),
3208                      &data[i * contributionSize],
3209                      recvCount,
3210                      contributionSize,
3211                      UNPACK);
3212     }
3213   }
3214   else {
3215     int *srcRank = (int*)(results[0].data);
3216     for (int i=0; i<commSize; i++) {
3217       ddt->serialize(&(((char*)buf)[srcRank[i] * contributionExtent]),
3218                      &data[i * contributionSize],
3219                      recvCount,
3220                      contributionSize,
3221                      UNPACK);
3222     }
3223   }
3224   delete [] results;
3225 }
3226
3227 void ampi::processGathervMsg(CkReductionMsg *msg, void* buf, MPI_Datatype type,
3228                              int* recvCounts, int* displs) noexcept
3229 {
3230   CkReduction::tupleElement* results = NULL;
3231   int numReductions = 0;
3232   msg->toTuple(&results, &numReductions);
3233   CkAssert(numReductions == 3);
3234
3235   // Re-order the gather data based on the rank of the contributor
3236   int *dataSize          = (int*)(results[1].data);
3237   char *data             = (char*)(results[2].data);
3238   CkDDT_DataType *ddt    = getDDT()->getType(type);
3239   int contributionSize   = ddt->getSize();
3240   int contributionExtent = ddt->getExtent();
3241   int commSize           = getSize();
3242   int currDataOffset     = 0;
3243
3244   // If the max rank value fits into an unsigned short int, srcRanks are those, otherwise int's
3245   if (commSize < std::numeric_limits<unsigned short int>::max()) {
3246     unsigned short int *srcRank = (unsigned short int*)(results[0].data);
3247     for (int i=0; i<commSize; i++) {
3248       ddt->serialize(&((char*)buf)[displs[srcRank[i]] * contributionExtent],
3249                      &data[currDataOffset],
3250                      recvCounts[srcRank[i]],
3251                      contributionSize * recvCounts[srcRank[i]],
3252                      UNPACK);
3253       currDataOffset += dataSize[i];
3254     }
3255   }
3256   else {
3257     int *srcRank = (int*)(results[0].data);
3258     for (int i=0; i<commSize; i++) {
3259       ddt->serialize(&((char*)buf)[displs[srcRank[i]] * contributionExtent],
3260                      &data[currDataOffset],
3261                      recvCounts[srcRank[i]],
3262                      contributionSize * recvCounts[srcRank[i]],
3263                      UNPACK);
3264       currDataOffset += dataSize[i];
3265     }
3266   }
3267   delete [] results;
3268 }
3269
3270 static inline void clearStatus(MPI_Status *sts) noexcept {
3271   if (sts != MPI_STATUS_IGNORE) {
3272     sts->MPI_TAG    = MPI_ANY_TAG;
3273     sts->MPI_SOURCE = MPI_ANY_SOURCE;
3274     sts->MPI_COMM   = MPI_COMM_NULL;
3275     sts->MPI_LENGTH = 0;
3276     sts->MPI_ERROR  = MPI_SUCCESS;
3277     sts->MPI_CANCEL = 0;
3278   }
3279 }
3280
3281 static inline void clearStatus(MPI_Status sts[], int idx) noexcept {
3282   if (sts != MPI_STATUSES_IGNORE) {
3283     clearStatus(&sts[idx]);
3284   }
3285 }
3286
3287 static inline bool handle_MPI_PROC_NULL(int src, MPI_Comm comm, MPI_Status* sts) noexcept
3288 {
3289   if (src == MPI_PROC_NULL) {
3290     clearStatus(sts);
3291     if (sts != MPI_STATUS_IGNORE) sts->MPI_SOURCE = MPI_PROC_NULL;
3292     return true;
3293   }
3294   return false;
3295 }
3296
3297 int ampi::recv(int t, int s, void* buf, int count, MPI_Datatype type, MPI_Comm comm, MPI_Status *sts) noexcept
3298 {
3299   MPI_Comm disComm = myComm.getComm();
3300   if (handle_MPI_PROC_NULL(s, disComm, sts)) return 0;
3301
3302 #if CMK_BIGSIM_CHARM
3303    void *curLog; // store current log in timeline
3304   _TRACE_BG_TLINE_END(&curLog);
3305 #if CMK_TRACE_IN_CHARM
3306   if(CpvAccess(traceOn)) traceSuspend();
3307 #endif
3308 #endif
3309
3310   if (isInter()) {
3311     s = myComm.getIndexForRemoteRank(s);
3312   }
3313
3314   MSG_ORDER_DEBUG(
3315     CkPrintf("AMPI vp %d blocking recv: tag=%d, src=%d, comm=%d\n",thisIndex,t,s,comm);
3316   )
3317
3318   ampi *dis = getAmpiInstance(disComm);
3319   MPI_Status tmpStatus;
3320   AmpiMsg* msg = unexpectedMsgs.get(t, s, (sts == MPI_STATUS_IGNORE) ? (int*)&tmpStatus : (int*)sts);
3321   if (msg) { // the matching message has already arrived
3322     if (sts != MPI_STATUS_IGNORE) {
3323       sts->MPI_SOURCE = msg->getSrcRank();
3324       sts->MPI_TAG    = msg->getTag();
3325       sts->MPI_COMM   = comm;
3326       sts->MPI_LENGTH = msg->getLength();
3327       sts->MPI_CANCEL = 0;
3328     }
3329     processAmpiMsg(msg, buf, type, count);
3330 #if CMK_BIGSIM_CHARM
3331     TRACE_BG_AMPI_BREAK(thread->getThread(), "RECV_RESUME", NULL, 0, 0);
3332     if (msg->eventPe == CkMyPe()) _TRACE_BG_ADD_BACKWARD_DEP(msg->event);
3333 #endif
3334     CkpvAccess(msgPool).deleteAmpiMsg(msg);
3335   }
3336   else { // post a request and block until the matching message arrives
3337     int request = postReq(dis->parent->reqPool.newReq<IReq>(buf, count, type, s, t, comm, getDDT(), AMPI_REQ_BLOCKED));
3338     CkAssert(parent->numBlockedReqs == 0);
3339     parent->numBlockedReqs = 1;
3340     dis = dis->blockOnRecv(); // "dis" is updated in case an ampi thread is migrated while waiting for a message
3341     parent = dis->parent;
3342     AmpiRequestList& reqs = parent->getReqs();
3343     if (sts != MPI_STATUS_IGNORE) {
3344       AmpiRequest& req = *reqs[request];
3345       sts->MPI_SOURCE = req.src;
3346       sts->MPI_TAG    = req.tag;
3347       sts->MPI_COMM   = req.comm;
3348       sts->MPI_LENGTH = req.getNumReceivedBytes(getDDT());
3349       sts->MPI_CANCEL = 0;
3350     }
3351     reqs.freeNonPersReq(request);
3352   }
3353
3354 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
3355   CpvAccess(_currentObj) = dis;
3356   MSG_ORDER_DEBUG( printf("[%d] AMPI thread rescheduled  to Index %d buf %p src %d\n",CkMyPe(),dis->thisIndex,buf,s); )
3357 #endif
3358 #if CMK_BIGSIM_CHARM && CMK_TRACE_IN_CHARM
3359   //Due to the reason mentioned the in the else-statement above, we need to
3360   //use "dis" as "this" in the case of migration (or out-of-core execution in BigSim)
3361   if(CpvAccess(traceOn)) CthTraceResume(dis->thread->getThread());
3362 #endif
3363
3364   return 0;
3365 }
3366
3367 void ampi::probe(int t, int s, MPI_Comm comm, MPI_Status *sts) noexcept
3368 {
3369   if (handle_MPI_PROC_NULL(s, comm, sts)) return;
3370
3371 #if CMK_BIGSIM_CHARM
3372   void *curLog; // store current log in timeline
3373   _TRACE_BG_TLINE_END(&curLog);
3374 #endif
3375
3376   ampi *dis = getAmpiInstance(comm);
3377   AmpiMsg *msg = NULL;
3378   while(1) {
3379     MPI_Status tmpStatus;
3380     msg = unexpectedMsgs.probe(t, s, (sts == MPI_STATUS_IGNORE) ? (int*)&tmpStatus : (int*)sts);
3381     if (msg) break;
3382     // "dis" is updated in case an ampi thread is migrated while waiting for a message
3383     dis = dis->blockOnRecv();
3384   }
3385
3386   if (sts != MPI_STATUS_IGNORE) {
3387     sts->MPI_SOURCE = msg->getSrcRank();
3388     sts->MPI_TAG    = msg->getTag();
3389     sts->MPI_COMM   = comm;
3390     sts->MPI_LENGTH = msg->getLength();
3391     sts->MPI_CANCEL = 0;
3392   }
3393
3394 #if CMK_BIGSIM_CHARM
3395   _TRACE_BG_SET_INFO((char *)msg, "PROBE_RESUME",  &curLog, 1);
3396 #endif
3397 }
3398
3399 void ampi::mprobe(int t, int s, MPI_Comm comm, MPI_Status *sts, MPI_Message *message) noexcept
3400 {
3401   if (handle_MPI_PROC_NULL(s, comm, sts)) {
3402     *message = MPI_MESSAGE_NO_PROC;
3403     return;
3404   }
3405
3406 #if CMK_BIGSIM_CHARM
3407   void *curLog; // store current log in timeline
3408   _TRACE_BG_TLINE_END(&curLog);
3409 #endif
3410
3411   ampi *dis = this;
3412   AmpiMsg *msg = NULL;
3413   while(1) {
3414     MPI_Status tmpStatus;
3415     // We call get() rather than probe() here because we want to remove this msg
3416     // from ampi::unexpectedMsgs and then insert it into ampiParent::matchedMsgs
3417     msg = unexpectedMsgs.get(t, s, (sts == MPI_STATUS_IGNORE) ? (int*)&tmpStatus : (int*)sts);
3418     if (msg)
3419       break;
3420     // "dis" is updated in case an ampi thread is migrated while waiting for a message
3421     dis = dis->blockOnRecv();
3422   }
3423
3424   msg->setComm(comm);
3425   *message = parent->putMatchedMsg(msg);
3426
3427   if (sts != MPI_STATUS_IGNORE) {
3428     sts->MPI_SOURCE = msg->getSrcRank();
3429     sts->MPI_TAG    = msg->getTag();
3430     sts->MPI_COMM   = msg->getComm();
3431     sts->MPI_LENGTH = msg->getLength();
3432     sts->MPI_CANCEL = 0;
3433   }
3434
3435 #if CMK_BIGSIM_CHARM
3436   _TRACE_BG_SET_INFO((char *)msg, "MPROBE_RESUME",  &curLog, 1);
3437 #endif
3438 }
3439
3440 int ampi::iprobe(int t, int s, MPI_Comm comm, MPI_Status *sts) noexcept
3441 {
3442   if (handle_MPI_PROC_NULL(s, comm, sts)) return 1;
3443
3444   MPI_Status tmpStatus;
3445   AmpiMsg* msg = unexpectedMsgs.probe(t, s, (sts == MPI_STATUS_IGNORE) ? (int*)&tmpStatus : (int*)sts);
3446   if (msg) {
3447     msg->setComm(comm);
3448     if (sts != MPI_STATUS_IGNORE) {
3449       sts->MPI_SOURCE = msg->getSrcRank();
3450       sts->MPI_TAG    = msg->getTag();
3451       sts->MPI_COMM   = msg->getComm();
3452       sts->MPI_LENGTH = msg->getLength();
3453       sts->MPI_CANCEL = 0;
3454     }
3455     return 1;
3456   }
3457 #if CMK_BIGSIM_CHARM
3458   void *curLog; // store current log in timeline
3459   _TRACE_BG_TLINE_END(&curLog);
3460 #endif
3461   thread->schedule();
3462 #if CMK_BIGSIM_CHARM
3463   _TRACE_BG_SET_INFO(NULL, "IPROBE_RESUME",  &curLog, 1);
3464 #endif
3465   return 0;
3466 }
3467
3468 int ampi::improbe(int tag, int source, MPI_Comm comm, MPI_Status *sts,
3469                   MPI_Message *message) noexcept
3470 {
3471   if (handle_MPI_PROC_NULL(source, comm, sts)) {
3472     *message = MPI_MESSAGE_NO_PROC;
3473     return 1;
3474   }
3475
3476   MPI_Status tmpStatus;
3477   // We call get() rather than probe() here because we want to remove this msg
3478   // from ampi::unexpectedMsgs and then insert it into ampiParent::matchedMsgs
3479   AmpiMsg* msg = unexpectedMsgs.get(tag, source, (sts == MPI_STATUS_IGNORE) ? (int*)&tmpStatus : (int*)sts);
3480   if (msg) {
3481     msg->setComm(comm);
3482     *message = parent->putMatchedMsg(msg);
3483     if (sts != MPI_STATUS_IGNORE) {
3484       sts->MPI_SOURCE = msg->getSrcRank();
3485       sts->MPI_TAG    = msg->getTag();
3486       sts->MPI_COMM   = comm;
3487       sts->MPI_LENGTH = msg->getLength();
3488       sts->MPI_CANCEL = 0;
3489     }
3490     return 1;
3491   }
3492
3493 #if CMK_BIGSIM_CHARM
3494   void *curLog; // store current log in timeline
3495   _TRACE_BG_TLINE_END(&curLog);
3496 #endif
3497   thread->schedule();
3498 #if CMK_BIGSIM_CHARM
3499   _TRACE_BG_SET_INFO(NULL, "IMPROBE_RESUME",  &curLog, 1);
3500 #endif
3501   return 0;
3502 }
3503
3504 void ampi::bcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm destcomm) noexcept
3505 {
3506   MPI_Request req;
3507
3508   if (root==getRank()) {
3509 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
3510     CpvAccess(_currentObj) = this;
3511 #endif
3512     irecvBcast(buf, count, type, root, destcomm, &req);
3513     thisProxy.bcastResult(makeBcastMsg(buf, count, type, root, destcomm));
3514   }
3515   else { // Non-root ranks need to increment the outgoing sequence number for collectives
3516     oorder.incCollSeqOutgoing();
3517     irecvBcast(buf, count, type, root, destcomm, &req);
3518   }
3519
3520   MPI_Wait(&req, MPI_STATUS_IGNORE);
3521 }
3522
3523 int ampi::intercomm_bcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm intercomm) noexcept
3524 {
3525   if (root==MPI_ROOT) {
3526 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
3527     CpvAccess(_currentObj) = this;
3528 #endif
3529     remoteProxy.bcastResult(makeBcastMsg(buf, count, type, getRank(), intercomm));
3530   }
3531   else { // Non-root ranks need to increment the outgoing sequence number for collectives
3532     oorder.incCollSeqOutgoing();
3533   }
3534
3535   if (root!=MPI_PROC_NULL && root!=MPI_ROOT) {
3536     // remote group ranks
3537     MPI_Request req;
3538     irecvBcast(buf, count, type, root, intercomm, &req);
3539     MPI_Wait(&req, MPI_STATUS_IGNORE);
3540   }
3541   return MPI_SUCCESS;
3542 }
3543
3544 void ampi::ibcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm destcomm, MPI_Request* request) noexcept
3545 {
3546   if (root==getRank()) {
3547 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
3548     CpvAccess(_currentObj) = this;
3549 #endif
3550     thisProxy.bcastResult(makeBcastMsg(buf, count, type, getRank(), destcomm));
3551   }
3552   else { // Non-root ranks need to increment the outgoing sequence number for collectives
3553     oorder.incCollSeqOutgoing();
3554   }
3555
3556   // call irecv to post an IReq and check for any pending messages
3557   irecvBcast(buf, count, type, root, destcomm, request);
3558 }
3559
3560 int ampi::intercomm_ibcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm intercomm, MPI_Request *request) noexcept
3561 {
3562   if (root==MPI_ROOT) {
3563 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
3564     CpvAccess(_currentObj) = this;
3565 #endif
3566     remoteProxy.bcastResult(makeBcastMsg(buf, count, type, getRank(), intercomm));
3567   }
3568   else { // Non-root ranks need to increment the outgoing sequence number for collectives
3569     oorder.incCollSeqOutgoing();
3570   }
3571
3572   if (root!=MPI_PROC_NULL && root!=MPI_ROOT) {
3573     // call irecv to post IReq and process pending messages
3574     irecvBcast(buf, count, type, root, intercomm, request);
3575   }
3576   return MPI_SUCCESS;
3577 }
3578
3579 void ampi::bcastraw(void* buf, int len, CkArrayID aid) noexcept
3580 {
3581   AmpiMsg *msg = new (len, 0) AmpiMsg(0, 0, MPI_BCAST_TAG, 0, len);
3582   memcpy(msg->getData(), buf, len);
3583   CProxy_ampi pa(aid);
3584   pa.generic(msg);
3585 }
3586
3587 int ampi::intercomm_scatter(int root, const void *sendbuf, int sendcount, MPI_Datatype sendtype,
3588                             void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm intercomm) noexcept
3589 {
3590   if (root == MPI_ROOT) {
3591     int remote_size = getRemoteIndices().size();
3592
3593     CkDDT_DataType* dttype = getDDT()->getType(sendtype) ;
3594     int itemsize = dttype->getSize(sendcount) ;
3595     for(int i = 0; i < remote_size; i++) {
3596         send(MPI_SCATTER_TAG, getRank(), ((char*)sendbuf)+(itemsize*i),
3597              sendcount, sendtype, i, intercomm);
3598     }
3599   }
3600
3601   if (root!=MPI_PROC_NULL && root!=MPI_ROOT) { //remote group ranks
3602     if(-1==recv(MPI_SCATTER_TAG, root, recvbuf, recvcount, recvtype, intercomm))
3603       CkAbort("AMPI> Error in intercomm MPI_Scatter recv");
3604   }
3605
3606   return MPI_SUCCESS;
3607 }
3608
3609 int ampi::intercomm_iscatter(int root, const void *sendbuf, int sendcount, MPI_Datatype sendtype,
3610                              void *recvbuf, int recvcount, MPI_Datatype recvtype,
3611                              MPI_Comm intercomm, MPI_Request *request) noexcept
3612 {
3613   if (root == MPI_ROOT) {
3614     int remote_size = getRemoteIndices().size();
3615
3616     CkDDT_DataType* dttype = getDDT()->getType(sendtype) ;
3617     int itemsize = dttype->getSize(sendcount) ;
3618     // use an ATAReq to non-block the caller and get a request ptr
3619     ATAReq *newreq = new ATAReq(remote_size);
3620     for(int i = 0; i < remote_size; i++) {
3621       newreq->reqs[i] = send(MPI_SCATTER_TAG, getRank(), ((char*)sendbuf)+(itemsize*i),
3622                              sendcount, sendtype, i, intercomm, 0, I_SEND);
3623     }
3624     *request = postReq(newreq);
3625   }
3626
3627   if (root!=MPI_PROC_NULL && root!=MPI_ROOT) { //remote group ranks
3628     // call irecv to post an IReq and process any pending messages
3629     irecv(recvbuf,recvcount,recvtype,root,MPI_SCATTER_TAG,intercomm,request);
3630   }
3631
3632   return MPI_SUCCESS;
3633 }
3634
3635 int ampi::intercomm_scatterv(int root, const void* sendbuf, const int* sendcounts, const int* displs,
3636                              MPI_Datatype sendtype, void* recvbuf, int recvcount,
3637                              MPI_Datatype recvtype, MPI_Comm intercomm) noexcept
3638 {
3639   if (root == MPI_ROOT) {
3640     int remote_size = getRemoteIndices().size();
3641