Zcpy Bcast Send API: Guard ZC API macros for non-RDMA layers
[charm.git] / src / ck-core / envelope.h
1 /**
2  @defgroup CkEnvelope
3  \brief  Charm++ message header.
4 */
5 #ifndef _ENVELOPE_H
6 #define _ENVELOPE_H
7
8 #include <pup.h>
9 #include <charm.h>
10 #include <middle.h>
11 #include <cklists.h>
12 #include <objid.h>
13
14 #ifndef CkIntbits
15 #define CkIntbits (sizeof(int)*8)
16 #endif
17
18 #if CMK_ERROR_CHECKING
19 #define _SET_USED(env, x) (env)->setUsed((x))
20 #define _CHECK_USED(env) do { if(env->isUsed()) \
21                            CmiAbort("Message being re-sent. Aborting...\n"); \
22                          } while(0)
23 #else
24 #define _SET_USED(env, x) do{}while(0)
25 #define _CHECK_USED(env) do{}while(0)
26 #endif
27
28 #define CkMsgAlignLength(x)     ALIGN_DEFAULT(x)
29 #define CkMsgAlignOffset(x)     (CkMsgAlignLength(x)-(x))
30 #define CkPriobitsToInts(nBits)    ((nBits+CkIntbits-1)/CkIntbits)
31
32 #if CMK_MESSAGE_LOGGING
33 #define CK_FREE_MSG_MLOG        0x1
34 #define CK_BYPASS_DET_MLOG      0x2
35 #define CK_MULTICAST_MSG_MLOG   0x4
36 #define CK_REDUCTION_MSG_MLOG   0x8
37 #endif
38
39 /**
40     \addtogroup CriticalPathFramework 
41     @{
42 */
43
44 /** A class that is used to track the entry points and other information 
45     about a critical path as a charm++ program executes.
46
47     This class won't do useful things unless USE_CRITICAL_PATH_HEADER_ARRAY is defined
48 */
49 class PathHistoryEnvelope {
50  protected:
51   // When passing paths forward, store information on PEs, in backward pass, lookup necessary information
52   int sender_history_table_idx;
53   double totalTime;
54  public:
55   double getTotalTime() const{ return totalTime; }
56   int get_sender_history_table_idx() const{ return sender_history_table_idx; }
57   void set_sender_history_table_idx(int i) { sender_history_table_idx = i; }
58   PathHistoryEnvelope(){ 
59     reset(); 
60   }
61   double getTime() const{ return totalTime; }
62   void setTime(double t){ totalTime = t; }
63   void pup(PUP::er &p) {
64     p | sender_history_table_idx;
65     p | totalTime;
66   } 
67   void reset();
68   void print() const;
69   /// Write a description of the path into the beginning of the provided buffer. The buffer ought to be large enough.
70   void printHTMLToString(char* buf) const{
71     buf[0] = '\0';
72     sprintf(buf+strlen(buf), "Path Time=%lf<br> Sender idx=%d", (double)totalTime, (int)sender_history_table_idx);
73   }
74   /// The number of available EP counts 
75   int getNumUsed() const;
76   /// Return the count value for the idx'th available EP  
77   int getUsedCount(int idx) const;
78   /// Return the idx'th available EP 
79   int getUsedEp(int idx) const;
80   int getEpCount(int ep) const;
81   void incrementTotalTime(double time);
82   //  void createPath(envelope *originatingMsg);
83   void setDebug100();
84 };
85 /** @} */
86
87
88
89 typedef unsigned int   UInt;
90 typedef unsigned short UShort;
91 typedef unsigned char  UChar;
92
93 #include "charm.h" // for CkGroupID, and CkEnvelopeType
94 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
95 #include "ckobjid.h" //for the ckobjId
96 #endif
97
98 /**
99 @addtogroup CkEnvelope
100 */
101
102 CkpvExtern(int, envelopeEventID);
103
104 /**
105 @{
106 The class envelope defines a Charm++ message's header. The first
107 'CmiReservedHeaderSize' bytes of memory is exclusively reserved for Converse
108 header, which is defined in converse.h and platform specific config files.
109
110 After Charm++ envelope comes the payload, i.e. a variable-length amount of user
111 data. Following the user data, optionally, a variable number of bits of
112 priority data can be stored at the end. Function
113 envelope::alloc(msgtype,size,prio) is always used to allocate the whole
114 message. Note that this memory layout must be observed.
115
116 The following are a few terms that are used often:
117
118 <pre>
119  Envelope pointer        \
120  Converse message pointer -> [ [ Converse envelope ]       ]
121                              [       Charm envelope        ] 
122  User message pointer     -> [ User data/payload ... ]
123  Priority pointer         -> [ Priority ints ... ]
124 </pre>
125
126 The "message pointers" passed to and from users bypass the envelope and point
127 *directly* to the user data--the routine "EnvToUsr" below adjusts an envelope
128 (or converse message) pointer into this user message pointer.  There is a
129 corresponding routine "UsrToEnv" which takes the user data pointer and returns
130 a pointer to the envelope/converse message.
131
132 Unfortunately, in the guts of Charm++ it's not always clear whether you've been
133 given a converse or user message pointer, as both tend to be passed as void *.
134 Confusing the two will invariably result in data corruption and bizarre
135 crashes.
136
137 FIXME: Make CkMessage inherit from envelope,
138 which would unify converse, envelope, and 
139 user message pointers.
140 */
141
142 namespace ck {
143
144   namespace impl {
145     /**
146        These structures store the type-specific message information.
147     */
148     union u_type {
149       struct s_chare {  // NewChareMsg, NewVChareMsg, ForChareMsg, ForVidMsg, FillVidMsg
150         void *ptr;      ///< object pointer
151         UInt forAnyPe;  ///< Used only by newChare
152         int  bype;      ///< created by this pe
153       } chare;
154       struct s_group {         // NodeBocInitMsg, BocInitMsg, ForNodeBocMsg, ForBocMsg
155         CkGroupID g;           ///< GroupID
156         CkNodeGroupID rednMgr; ///< Reduction manager for this group (constructor only!)
157         int epoch;             ///< "epoch" this group was created during (0--mainchare, 1--later)
158         UShort arrayEp;        ///< Used only for array broadcasts
159       } group;
160       struct s_array{             ///< For arrays only (ArrayEltInitMsg, ForArrayEltMsg)
161         CmiUInt8 id;              /// <ck::ObjID if it could be in a union
162         CkGroupID arr;            ///< Array manager GID
163 #if CMK_SMP_TRACE_COMMTHREAD
164         UInt srcpe;
165 #endif
166         UChar hopCount;           ///< number of times message has been routed
167         UChar ifNotThere;         ///< what to do if array element is missing
168       } array;
169       struct s_roData {    ///< RODataMsg for readonly data type
170         UInt count;
171       } roData;
172       struct s_roMsg {     ///< ROMsgMsg for readonlys defined in ci files
173         UInt roIdx;
174       } roMsg;
175     };
176
177     struct s_attribs {  // Packed bitwise struct
178       UChar msgIdx;     ///< Usertype of message (determines pack routine)
179       UChar mtype;      ///< e.g., ForBocMsg
180       UChar queueing:4; ///< Queueing strategy (FIFO, LIFO, PFIFO, ...)
181       UChar isPacked:1; ///< If true, message must be unpacked before use
182       UChar isUsed:1;   ///< Marker bit to prevent message re-send.
183       UChar isVarSysMsg:1; ///< True if msg is a variable sized sys message that doesn't use a pool
184     };
185
186   }
187 }
188
189 #if (defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
190 #define CMK_ENVELOPE_FT_FIELDS                           \
191   CkObjID sender;                                        \
192   CkObjID recver;                                        \
193   MCount SN;                                             \
194   int incarnation;                                       \
195   int flags;                                             \
196   UInt piggyBcastIdx;
197 #elif defined(_FAULT_CAUSAL_)
198 #define CMK_ENVELOPE_FT_FIELDS                           \
199   CkObjID sender;                                        \
200   CkObjID recver;                                        \
201   MCount SN;                                             \
202   MCount TN;                                             \
203   int incarnation;                                       \
204   int flags;                                             \
205   UInt piggyBcastIdx;
206 #else
207 #define CMK_ENVELOPE_FT_FIELDS
208 #endif
209
210 #if CMK_REPLAYSYSTEM || CMK_TRACE_ENABLED
211 #define CMK_ENVELOPE_OPTIONAL_FIELDS                                           \
212   UInt   event;        /* used by projections and record-replay */
213 #else
214 #define CMK_ENVELOPE_OPTIONAL_FIELDS
215 #endif
216
217 #define CMK_ENVELOPE_FIELDS                                                    \
218   /* Converse message envelope, Must be first field in this class */           \
219   char   core[CmiReservedHeaderSize];                                          \
220   UInt   pe;           /* source processor */                                  \
221   ck::impl::u_type type; /* Depends on message type (attribs.mtype) */         \
222   UInt   totalsize;    /* Byte count from envelope start to end of group dependencies */ \
223   CMK_ENVELOPE_OPTIONAL_FIELDS                                                 \
224   CMK_REFNUM_TYPE ref; /* Used by futures and SDAG */                          \
225   UShort priobits;     /* Number of bits of priority data after user data */   \
226   UShort groupDepNum;  /* Number of group dependencies */                      \
227   UShort epIdx;        /* Entry point to call */                               \
228   ck::impl::s_attribs attribs;
229
230 class envelope {
231 private:
232
233     class envelopeSizeHelper {
234       CMK_ENVELOPE_FIELDS
235       CMK_ENVELOPE_FT_FIELDS
236     };
237
238     #ifdef __clang__
239     #pragma GCC diagnostic push
240     #pragma GCC diagnostic ignored "-Wunused-private-field"
241     #endif
242     CMK_ENVELOPE_FIELDS
243     #ifdef __clang__
244     #pragma GCC diagnostic pop
245     #endif
246
247 public:
248
249     CMK_ENVELOPE_FT_FIELDS
250
251     #if defined(__GNUC__) || defined(__clang__)
252     #pragma GCC diagnostic push
253     #pragma GCC diagnostic ignored "-Wpedantic"
254     #if defined(__clang__)
255     #pragma GCC diagnostic ignored "-Wunused-private-field"
256     #endif
257     #endif
258     // padding to ensure ALIGN_BYTES alignment
259     UChar align[CkMsgAlignOffset(sizeof(envelopeSizeHelper))];
260     #if defined(__GNUC__) || defined(__clang__)
261     #pragma GCC diagnostic pop
262     #endif
263
264     void pup(PUP::er &p);
265 #if CMK_REPLAYSYSTEM || CMK_TRACE_ENABLED
266     UInt   getEvent(void) const { return event; }
267     void   setEvent(const UInt e) { event = e; }
268 #endif
269     CMK_REFNUM_TYPE   getRef(void) const { return ref; }
270     void   setRef(const CMK_REFNUM_TYPE r) { ref = r; }
271     UChar  getQueueing(void) const { return attribs.queueing; }
272     void   setQueueing(const UChar q) { attribs.queueing=q; }
273     UChar  getMsgtype(void) const { return attribs.mtype; }
274     void   setMsgtype(const UChar m) { attribs.mtype = m; }
275 #if CMK_ERROR_CHECKING
276     UChar  isUsed(void) const { return attribs.isUsed; }
277     void   setUsed(const UChar u) { attribs.isUsed=u; }
278 #else /* CMK_ERROR_CHECKING */
279     inline void setUsed(const UChar u) {}
280 #endif
281     UChar  getMsgIdx(void) const { return attribs.msgIdx; }
282     void   setMsgIdx(const UChar idx) { attribs.msgIdx = idx; }
283     UInt   getTotalsize(void) const { return totalsize; }
284     void   setTotalsize(const UInt s) { totalsize = s; }
285     UInt   getUsersize(void) const { 
286       return totalsize - getGroupDepSize() - getPrioBytes() - sizeof(envelope); 
287     }
288     void   setUsersize(const UInt s) {
289       if (s == getUsersize()) {
290         return;
291       }
292       CkAssert(s < getUsersize());
293       UInt newPrioOffset = sizeof(envelope) + CkMsgAlignLength(s);
294       UInt newTotalsize = newPrioOffset + getPrioBytes() + getGroupDepSize();
295       void *newPrioPtr = (void *) ((char *) this + newPrioOffset); 
296       // use memmove instead of memcpy in case memory areas overlap
297       memmove(newPrioPtr, getPrioPtr(), getPrioBytes()); 
298       setTotalsize(newTotalsize); 
299     }
300
301     // s specifies number of bytes to remove from user portion of message
302     void shrinkUsersize(const UInt s) {
303       CkAssert(s <= getUsersize());
304       setUsersize(getUsersize() - s);
305     }
306
307     UChar  isPacked(void) const { return attribs.isPacked; }
308     void   setPacked(const UChar p) { attribs.isPacked = p; }
309     UChar  isVarSysMsg(void) const { return attribs.isVarSysMsg; }
310     void   setIsVarSysMsg(const UChar d) { attribs.isVarSysMsg = d; }
311     UShort getPriobits(void) const { return priobits; }
312     void   setPriobits(const UShort p) { priobits = p; }
313     UShort getPrioWords(void) const { return CkPriobitsToInts(priobits); }
314     UShort getPrioBytes(void) const { return getPrioWords()*sizeof(int); }
315     void*  getPrioPtr(void) const { 
316       return (void *)((char *)this + totalsize - getGroupDepSize() - getPrioBytes());
317     }
318     void* getGroupDepPtr(void) const {
319       return (void *)((char *)this + totalsize - getGroupDepSize());
320     }
321     static envelope *alloc(const UChar type, const UInt size=0, const UShort prio=0, const GroupDepNum groupDepNumRequest=GroupDepNum{})
322     {
323 #if CMK_LOCKLESS_QUEUE
324       CkAssert(type>=NewChareMsg && type<LAST_CK_ENVELOPE_TYPE);
325 #else
326       CkAssert(type>=NewChareMsg && type<=ForArrayEltMsg);
327 #endif
328 #if CMK_USE_STL_MSGQ
329       // Ideally, this should be a static compile-time assert. However we need API changes for that
330       CkAssert(sizeof(CMK_MSG_PRIO_TYPE) >= sizeof(int)*CkPriobitsToInts(prio));
331 #endif
332
333       UInt tsize = sizeof(envelope)+ 
334                    CkMsgAlignLength(size)+
335                    sizeof(int)*CkPriobitsToInts(prio) +
336                    sizeof(CkGroupID)*(int)groupDepNumRequest;
337
338       //CkPrintf("[%d] inside envelope alloc groupDepNum:%d\n", CkMyPe(), (int)groupDepNumRequest);
339
340       envelope *env = (envelope *)CmiAlloc(tsize);
341 #if CMK_REPLAYSYSTEM
342       //for record-replay
343       memset(env, 0, sizeof(envelope));
344       env->setEvent(++CkpvAccess(envelopeEventID));
345 #endif
346       env->setMsgtype(type);
347       env->totalsize = tsize;
348       env->priobits = prio;
349       env->setPacked(0);
350       env->setGroupDepNum((int)groupDepNumRequest);
351       _SET_USED(env, 0);
352       env->setRef(0);
353       env->setEpIdx(0);
354       env->setIsVarSysMsg(0);
355
356 #if CMK_ONESIDED_IMPL
357       CMI_ZC_MSGTYPE(env) = CMK_REG_NO_ZC_MSG; // Set the default as CMK_REG_NO_ZC_MSG
358 #endif
359
360 #if USE_CRITICAL_PATH_HEADER_ARRAY
361       env->pathHistory.reset();
362 #endif
363
364 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
365       env->sender.type = TypeInvalid;
366       env->recver.type = TypeInvalid;
367       env->SN = 0;
368       env->flags = 0;
369 #if defined(_FAULT_CAUSAL_)
370       env->TN = 0;
371 #endif
372           env->incarnation = -1;
373 #endif
374
375       return env;
376     }
377     void reset() {
378 #if CMK_REPLAYSYSTEM
379       setEvent(++CkpvAccess(envelopeEventID));
380 #endif
381       memset(getGroupDepPtr(), 0, getGroupDepSize());
382     }
383     UShort getEpIdx(void) const { return epIdx; }
384     void   setEpIdx(const UShort idx) { epIdx = idx; }
385     UInt   getSrcPe(void) const { return pe; }
386     void   setSrcPe(const UInt s) { pe = s; }
387     static void setSrcPe(char *env, const UInt s) { ((envelope*)env)->setSrcPe(s); }
388
389 // Readonly-specific fields
390     UInt   getCount(void) const { 
391       CkAssert(getMsgtype()==RODataMsg); return type.roData.count; 
392     }
393     void   setCount(const UInt c) { 
394       CkAssert(getMsgtype()==RODataMsg); type.roData.count = c; 
395     }
396     UInt   getRoIdx(void) const { 
397       CkAssert(getMsgtype()==ROMsgMsg); return type.roMsg.roIdx; 
398     }
399     void   setRoIdx(const UInt r) { 
400       CkAssert(getMsgtype()==ROMsgMsg); type.roMsg.roIdx = r; 
401     }
402     
403  // Chare-specific fields
404     UInt isForAnyPE(void) const {
405       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
406       return type.chare.forAnyPe; 
407     }
408     void setForAnyPE(UInt f) { 
409       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
410       type.chare.forAnyPe = f; 
411     }
412     void*  getVidPtr(void) const {
413       CkAssert(getMsgtype()==NewVChareMsg || getMsgtype()==ForVidMsg
414           || getMsgtype()==FillVidMsg ||  getMsgtype()==DeleteVidMsg);
415       return type.chare.ptr;
416     }
417     void   setVidPtr(void *p) {
418       CkAssert(getMsgtype()==NewVChareMsg || getMsgtype()==ForVidMsg
419           || getMsgtype()==FillVidMsg ||  getMsgtype()==DeleteVidMsg);
420       type.chare.ptr = p;
421     }
422     void*  getObjPtr(void) const { 
423       CkAssert(getMsgtype()==ForChareMsg); return type.chare.ptr; 
424     }
425     void   setObjPtr(void *p) { 
426       CkAssert(getMsgtype()==ForChareMsg); type.chare.ptr = p; 
427     }
428     UInt getByPe(void) const {
429       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
430       return type.chare.bype; 
431     }
432     void setByPe(UInt pe) { 
433       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
434       type.chare.bype = pe; 
435     }
436
437 // Group-specific fields
438     CkGroupID   getGroupNum(void) const {
439       CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==ForBocMsg
440           || getMsgtype()==NodeBocInitMsg || getMsgtype()==ForNodeBocMsg);
441       return type.group.g;
442     }
443     void   setGroupNum(const CkGroupID g) {
444       CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==ForBocMsg
445           || getMsgtype()==NodeBocInitMsg || getMsgtype()==ForNodeBocMsg);
446       type.group.g = g;
447     }
448     void setGroupEpoch(int epoch) { CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg); type.group.epoch=epoch; }
449     int getGroupEpoch(void) const { CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg); return type.group.epoch; }
450     void setRednMgr(CkNodeGroupID r){ CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==ForBocMsg
451           || getMsgtype()==NodeBocInitMsg || getMsgtype()==ForNodeBocMsg);
452  type.group.rednMgr = r; }
453     CkNodeGroupID getRednMgr(){       CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==ForBocMsg
454           || getMsgtype()==NodeBocInitMsg || getMsgtype()==ForNodeBocMsg);
455  return type.group.rednMgr; }
456     UShort getGroupDepSize() const { return groupDepNum*sizeof(CkGroupID); }
457     UShort getGroupDepNum() const { return groupDepNum; }
458     void setGroupDepNum(const UShort &r) { groupDepNum = r; }
459     CkGroupID getGroupDep(int index=0) { return *((CkGroupID *)getGroupDepPtr() + index); }
460     void setGroupDep(const CkGroupID &r, int index=0) {
461       *(((CkGroupID *)getGroupDepPtr())+ index) = r;
462     }
463
464 // Array-specific fields
465     CkGroupID getArrayMgr(void) const { 
466       if (getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg)
467         return type.array.arr;
468       else
469             CkAbort("Cannot return ArrayID from msg for non-array entity");
470         /* compiler appeasement, even though this will never be executed */
471       return type.array.arr;
472     }
473     void setArrayMgr(const CkGroupID gid) { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg);  type.array.arr = gid; }
474     int getArrayMgrIdx(void) const {CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg);  return type.array.arr.idx;}
475     UShort &getsetArrayEp(void) {return epIdx;}
476     UShort &getsetArrayBcastEp(void) {return type.group.arrayEp;}
477 #if CMK_SMP_TRACE_COMMTHREAD
478     UInt &getsetArraySrcPe(void) {return type.array.srcpe;}
479 #else
480     UInt &getsetArraySrcPe(void) {return pe;}
481 #endif
482     UChar &getsetArrayHops(void) { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); return type.array.hopCount;}
483     int getArrayIfNotThere(void) const { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); return type.array.ifNotThere;}
484     void setArrayIfNotThere(int nt) { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); type.array.ifNotThere=nt;}
485
486     void setRecipientID(ck::ObjID objid)
487     {
488       CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg);
489       type.array.id = objid.getID();
490     }
491
492     CmiUInt8 getRecipientID() const
493     {
494       CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg);
495       return type.array.id;
496     }
497
498 #if USE_CRITICAL_PATH_HEADER_ARRAY
499  public:
500     /** The information regarding the entry methods that executed along the path to this one.
501         \addtogroup CriticalPathFramework
502     */
503     PathHistoryEnvelope pathHistory;
504 #endif
505
506 };
507
508
509 inline envelope *UsrToEnv(const void *const msg) {
510   return (envelope *)((intptr_t)msg - sizeof(envelope));
511 }
512
513 inline void *EnvToUsr(const envelope *const env) {
514   return (void *)((intptr_t)env + sizeof(envelope));
515 }
516
517 inline envelope *_allocEnv(const int msgtype, const int size=0, const int prio=0, const GroupDepNum groupDepNum=GroupDepNum{}) {
518   return envelope::alloc(msgtype,size,prio,groupDepNum);
519 }
520
521 inline void *_allocMsg(const int msgtype, const int size, const int prio=0, const GroupDepNum groupDepNum=GroupDepNum{}) {
522   return EnvToUsr(envelope::alloc(msgtype,size,prio,groupDepNum));
523 }
524
525 inline void _resetEnv(envelope *env) {
526   env->reset();
527 }
528
529 #if CMK_REPLAYSYSTEM
530 inline void setEventID(envelope *env){
531   env->setEvent(++CkpvAccess(envelopeEventID));
532 }
533 #endif
534
535 /** @} */
536
537 extern UChar   _defaultQueueing;
538
539 extern void CkPackMessage(envelope **pEnv);
540 extern void CkUnpackMessage(envelope **pEnv);
541
542 class MsgPool: public SafePool<void *> {
543 private:
544     static void *_alloc(void) {
545       /* CkAllocSysMsg() called in .def.h is not thread of sigio safe */
546       envelope *env = _allocEnv(ForChareMsg,0,0,GroupDepNum{});
547       env->setQueueing(_defaultQueueing);
548       env->setMsgIdx(0);
549       return EnvToUsr(env);
550     }
551     static void _reset(void* m) {
552       envelope *env = UsrToEnv(m);
553       _resetEnv(env);
554     }
555 public:
556     MsgPool():SafePool<void*>(_alloc, CkFreeMsg, _reset) {}
557 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
558         void *get(void){
559             return allocfn();
560         }
561         void put(void *m){
562         }
563 #endif
564 };
565
566 CkpvExtern(MsgPool*, _msgPool);
567
568 #endif