d0a3d4e256fb5e145f8d3edf77885fdb4132d545
[charm.git] / src / ck-core / envelope.h
1 /**
2  @defgroup CkEnvelope
3  \brief  Charm++ message header.
4 */
5 #ifndef _ENVELOPE_H
6 #define _ENVELOPE_H
7
8 #include <pup.h>
9 #include <middle.h>
10 #include <ckarrayindex.h>
11 #include <cklists.h>
12 #include <objid.h>
13
14 #ifndef CkIntbits
15 #define CkIntbits (sizeof(int)*8)
16 #endif
17
18 #if CMK_ERROR_CHECKING
19 #define _SET_USED(env, x) (env)->setUsed((x))
20 #define _CHECK_USED(env) do { if(env->isUsed()) \
21                            CmiAbort("Message being re-sent. Aborting...\n"); \
22                          } while(0)
23 #else
24 #define _SET_USED(env, x) do{}while(0)
25 #define _CHECK_USED(env) do{}while(0)
26 #endif
27
28 #define CkMsgAlignmentMask     (sizeof(double)-1)
29 #define CkMsgAlignLength(x) (((x)+CkMsgAlignmentMask)&(~(CkMsgAlignmentMask)))
30 #define CkMsgAlignOffset(x)     (CkMsgAlignLength(x)-(x))
31 #define CkPriobitsToInts(nBits)    ((nBits+CkIntbits-1)/CkIntbits)
32
33 #if CMK_MESSAGE_LOGGING
34 #define CK_FREE_MSG_MLOG        0x1
35 #define CK_BYPASS_DET_MLOG      0x2
36 #define CK_MULTICAST_MSG_MLOG   0x4
37 #define CK_REDUCTION_MSG_MLOG   0x8
38 #endif
39
40 //#define USE_CRITICAL_PATH_HEADER_ARRAY
41
42 /**
43     \addtogroup CriticalPathFramework 
44     @{
45 */
46
47 /** A class that is used to track the entry points and other information 
48     about a critical path as a charm++ program executes.
49
50     This class won't do useful things unless USE_CRITICAL_PATH_HEADER_ARRAY is defined
51 */
52 class PathHistoryEnvelope {
53  protected:
54   // When passing paths forward, store information on PEs, in backward pass, lookup necessary information
55   int sender_history_table_idx;
56   double totalTime;
57  public:
58   double getTotalTime() const{ return totalTime; }
59   int get_sender_history_table_idx() const{ return sender_history_table_idx; }
60   void set_sender_history_table_idx(int i) { sender_history_table_idx = i; }
61   PathHistoryEnvelope(){ 
62 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
63     reset(); 
64 #endif
65   }
66   double getTime() const{ return totalTime; }
67   void setTime(double t){ totalTime = t; }
68   void pup(PUP::er &p) {
69     p | sender_history_table_idx;
70     p | totalTime;
71   } 
72   void reset();
73   void print() const;
74   /// Write a description of the path into the beginning of the provided buffer. The buffer ought to be large enough.
75   void printHTMLToString(char* buf) const{
76     buf[0] = '\0';
77     sprintf(buf+strlen(buf), "Path Time=%lf<br> Sender idx=%d", (double)totalTime, (int)sender_history_table_idx);
78   }
79   /// The number of available EP counts 
80   int getNumUsed() const;
81   /// Return the count value for the idx'th available EP  
82   int getUsedCount(int idx) const;
83   /// Return the idx'th available EP 
84   int getUsedEp(int idx) const;
85   int getEpCount(int ep) const;
86   void incrementTotalTime(double time);
87   //  void createPath(envelope *originatingMsg);
88   void setDebug100();
89 };
90 /** @} */
91
92
93
94 typedef unsigned int   UInt;
95 typedef unsigned short UShort;
96 typedef unsigned char  UChar;
97
98 #include "charm.h" // for CkGroupID, and CkEnvelopeType
99 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
100 #include "ckobjid.h" //for the ckobjId
101 #endif
102
103 /**
104 @addtogroup CkEnvelope
105 */
106
107 CkpvExtern(int, envelopeEventID);
108
109 /**
110 @{
111 The class envelope defines a Charm++ message's header. The first
112 'CmiReservedHeaderSize' bytes of memory is exclusively reserved for Converse
113 header, which is defined in converse.h and platform specific config files.
114
115 After Charm++ envelope comes the payload, i.e. a variable-length amount of user
116 data. Following the user data, optionally, a variable number of bits of
117 priority data can be stored at the end. Function
118 envelope::alloc(msgtype,size,prio) is always used to allocate the whole
119 message. Note that this memory layout must be observed.
120
121 The following are a few terms that are used often:
122
123 <pre>
124  Envelope pointer        \
125  Converse message pointer -> [ [ Converse envelope ]       ]
126                              [       Charm envelope        ] 
127  User message pointer     -> [ User data/payload ... ]
128  Priority pointer         -> [ Priority ints ... ]
129  Extra data pointer       -> [ data specific to this message type ]
130 </pre>
131
132 The "message pointers" passed to and from users bypass the envelope and point
133 *directly* to the user data--the routine "EnvToUsr" below adjusts an envelope
134 (or converse message) pointer into this user message pointer.  There is a
135 corresponding routine "UsrToEnv" which takes the user data pointer and returns
136 a pointer to the envelope/converse message.
137
138 Unfortunately, in the guts of Charm++ it's not always clear whether you've been
139 given a converse or user message pointer, as both tend to be passed as void *.
140 Confusing the two will invariably result in data corruption and bizarre
141 crashes.
142
143 FIXME: Make CkMessage inherit from envelope,
144 which would unify converse, envelope, and 
145 user message pointers.
146 */
147
148  /**
149    These structures store the type-specific message information.
150  */
151 struct s_chare {  // NewChareMsg, NewVChareMsg, ForChareMsg, ForVidMsg, FillVidMsg
152         void *ptr;      ///< object pointer
153         UInt forAnyPe;  ///< Used only by newChare
154         int  bype;      ///< created by this pe
155 };
156
157 struct s_groupinit {         // NodeBocInitMsg, BocInitMsg
158         CkGroupID g;           ///< GroupID
159         CkNodeGroupID rednMgr; ///< Reduction manager for this group (constructor only!)
160         CkGroupID dep;         ///< create after dep is created (constructor only!)
161         int epoch;             ///< "epoch" this group was created during (0--mainchare, 1--later)
162 };
163
164 struct s_group {         // ForNodeBocMsg, ForBocMsg
165         CkGroupID g;           ///< GroupID
166         UShort arrayEp;        ///< Used only for array broadcasts
167 };
168
169 struct s_array{             ///< ForArrayEltMsg
170         CkArrayIndexBase index; ///< Array element index
171         CkGroupID arr;            ///< Array manager GID
172 #if CMK_SMP_TRACE_COMMTHREAD
173         UInt srcpe; 
174 #endif
175         UChar hopCount;           ///< number of times message has been routed
176         UChar ifNotThere;         ///< what to do if array element is missing
177 };
178
179 struct s_objid {
180         ck::ObjID id;
181 #if CMK_SMP_TRACE_COMMTHREAD
182         UInt srcpe;
183 #endif
184         UChar hopCount;           ///< number of times message has been routed
185         UChar ifNotThere;         ///< what to do if array element is missing
186 };
187
188 struct s_arrayinit{         ///< ArrayEltInitMsg
189         CkArrayIndexBase index; ///< Array element index
190         CkGroupID arr;            ///< Array manager GID
191 #if CMK_SMP_TRACE_COMMTHREAD
192         UInt srcpe; 
193 #endif
194         UChar hopCount;           ///< number of times message has been routed
195         UChar ifNotThere;         ///< what to do if array element is missing
196         int listenerData[CK_ARRAYLISTENER_MAXLEN]; ///< For creation
197 };
198
199 struct s_roData {    ///< RODataMsg for readonly data type
200         UInt count;
201 };
202
203 struct s_roMsg {     ///< ROMsgMsg for readonlys defined in ci files
204         UInt roIdx;
205 };
206
207 inline UShort extraSize(CkEnvelopeType type)
208 {
209   int ret = 0;
210   switch (type) {
211   case NewChareMsg:
212   case NewVChareMsg:
213   case ForChareMsg:
214   case ForVidMsg:
215   case FillVidMsg:
216   case DeleteVidMsg:
217     ret = sizeof(struct s_chare);
218     break;
219   case BocInitMsg:
220   case NodeBocInitMsg:
221     ret = sizeof(struct s_groupinit);
222     break;
223   case ForBocMsg:
224   case ForNodeBocMsg:
225     ret = sizeof(struct s_group);
226     break;
227   case ArrayEltInitMsg:
228     ret = sizeof(struct s_arrayinit);
229     break;
230   case ForArrayEltMsg:
231     ret = sizeof(struct s_array);
232     break;
233   case ForIDedObjMsg:
234     ret = sizeof(struct s_objid);
235     break;
236   case RODataMsg:
237     ret = sizeof(struct s_roData);
238     break;
239   case ROMsgMsg:
240     ret = sizeof(struct s_roMsg);
241     break;
242   case StartExitMsg:
243   case ExitMsg:
244   case ReqStatMsg:
245   case StatMsg:
246     break;
247   default:
248     CmiAbort("piggysize: unknown message type.");
249   }
250   return ret;
251 }
252
253 extern UInt  envMaxExtraSize;
254
255 class envelope {
256   private:
257     /// Converse message envelope, Must be first field in this class
258     char   core[CmiReservedHeaderSize];
259 public:
260     struct s_attribs {  // Packed bitwise struct
261       UChar msgIdx;     ///< Usertype of message (determines pack routine)
262       UChar mtype;      ///< e.g., ForBocMsg
263       UChar queueing:4; ///< Queueing strategy (FIFO, LIFO, PFIFO, ...)
264       UChar isPacked:1; ///< If true, message must be unpacked before use
265       UChar isUsed:1;   ///< Marker bit to prevent message re-send.
266     };
267 private:
268     //u_type type;           ///< Depends on message type (attribs.mtype)
269     
270     CMK_REFNUM_TYPE ref;            ///< Used by futures
271     UShort   extrasize;  ///< Byte count specific for message types
272     s_attribs attribs;
273     UChar align[CkMsgAlignOffset(CmiReservedHeaderSize+sizeof(CMK_REFNUM_TYPE)+sizeof(UShort)+sizeof(s_attribs))];    ///< padding to make sure sizeof(double) alignment
274     
275     //This struct should now be sizeof(void*) aligned.
276     UShort priobits;   ///< Number of bits of priority data after user data
277     UShort epIdx;      ///< Entry point to call
278     UInt   pe;         ///< source processor
279     UInt   event;      ///< used by projections
280     UInt   totalsize;  ///< Byte count from envelope start to end of priobits
281     
282   public:
283 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
284     CkObjID sender;
285     CkObjID recver;
286     MCount SN;
287 #if defined(_FAULT_CAUSAL_)
288     MCount TN;
289     MCount mlogPadding;         //HACK: aligns envelope to double size (to make xlc work)
290 #endif
291     int incarnation;
292     int flags;
293     UInt piggyBcastIdx;
294 #endif
295     void pup(PUP::er &p);
296     UInt   getEvent(void) const { return event; }
297     void   setEvent(const UInt e) { event = e; }
298     CMK_REFNUM_TYPE   getRef(void) const { return ref; }
299     void   setRef(const CMK_REFNUM_TYPE r) { ref = r; }
300     UChar  getQueueing(void) const { return attribs.queueing; }
301     void   setQueueing(const UChar q) { attribs.queueing=q; }
302     UChar  getMsgtype(void) const { return attribs.mtype; }
303     void   setMsgtype(const UChar m) { if (attribs.mtype!=m) { int old = extrasize; extrasize = extraSize((CkEnvelopeType)m); totalsize += extrasize - old; } attribs.mtype = m; }
304 #if CMK_ERROR_CHECKING
305     UChar  isUsed(void) { return attribs.isUsed; }
306     void   setUsed(const UChar u) { attribs.isUsed=u; }
307 #else /* CMK_ERROR_CHECKING */
308     inline void setUsed(const UChar u) {}
309 #endif
310     UChar  getMsgIdx(void) const { return attribs.msgIdx; }
311     void   setMsgIdx(const UChar idx) { attribs.msgIdx = idx; }
312     UInt   getTotalsize(void) const { return totalsize; }
313     void   setTotalsize(const UInt s) { totalsize = s; }
314     UInt   getUsersize(void) const { 
315       return totalsize - getPrioBytes() - sizeof(envelope) - extrasize; 
316     }
317     void   setUsersize(const UInt s) {
318       if (s == getUsersize()) {
319         return;
320       }
321       CkAssert(s < getUsersize());
322       UInt newPrioOffset = sizeof(envelope) + CkMsgAlignLength(s);
323       UInt newExtraDataOffset = newPrioOffset + getPrioBytes();
324       UInt newTotalsize = newExtraDataOffset + getExtrasize();
325       void *newPrioPtr = (void *) ((char *) this + newPrioOffset); 
326       void *newExtraPtr = (void *) ((char *) this + newExtraDataOffset);
327       // use memmove instead of memcpy in case memory areas overlap
328       memmove(newPrioPtr, getPrioPtr(), getPrioBytes()); 
329       memmove(newExtraPtr, (void *) extraData(), getExtrasize());
330       setTotalsize(newTotalsize); 
331     }
332
333     // s specifies number of bytes to remove from user portion of message
334     void shrinkUsersize(const UInt s) {
335       CkAssert(s <= getUsersize());
336       setUsersize(getUsersize() - s);
337     }
338
339     UShort getExtrasize(void) const { return extrasize; }
340     void   setExtrasize(const UShort s) { extrasize = s; }
341     UChar  isPacked(void) const { return attribs.isPacked; }
342     void   setPacked(const UChar p) { attribs.isPacked = p; }
343     UShort getPriobits(void) const { return priobits; }
344     void   setPriobits(const UShort p) { priobits = p; }
345     UShort getPrioWords(void) const { return CkPriobitsToInts(priobits); }
346     UShort getPrioBytes(void) const { return getPrioWords()*sizeof(int); }
347     void*  getPrioPtr(void) const { 
348       return (void *)((char *)this + totalsize - extrasize - getPrioBytes());
349     }
350     static envelope *alloc(const UChar type, const UInt size=0, const UShort prio=0)
351     {
352       CkAssert(type >= NewChareMsg && type < LAST_CK_ENVELOPE_TYPE);
353
354 #if CMK_USE_STL_MSGQ
355       // Ideally, this should be a static compile-time assert. However we need API changes for that
356       CkAssert(sizeof(CMK_MSG_PRIO_TYPE) >= sizeof(int)*CkPriobitsToInts(prio));
357 #endif
358
359       register UShort extrasize = extraSize((CkEnvelopeType)type);
360       register UInt tsize0 = sizeof(envelope)+ 
361             CkMsgAlignLength(size)+
362             sizeof(int)*CkPriobitsToInts(prio);
363       register UInt tsize = tsize0 + extrasize;
364       register envelope *env = (envelope *)CmiAlloc(tsize0);
365 #if CMK_REPLAYSYSTEM
366       //for record-replay
367       memset(env, 0, sizeof(envelope));
368       env->setEvent(++CkpvAccess(envelopeEventID));
369 #endif
370       env->setMsgtype(type);
371       env->totalsize = tsize;
372       env->extrasize = extrasize;
373       env->priobits = prio;
374       env->setPacked(0);
375       //env->type.group.dep.setZero();
376       ((struct s_groupinit *)env->extraData())->dep.setZero();
377       _SET_USED(env, 0);
378       env->setRef(0);
379       env->setEpIdx(0);
380
381 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
382       env->pathHistory.reset();
383 #endif
384
385 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
386       env->sender.type = TypeInvalid;
387       env->recver.type = TypeInvalid;
388       env->SN = 0;
389 #if defined(_FAULT_CAUSAL_)
390       env->TN = 0;
391 #endif
392           env->incarnation = -1;
393 #endif
394
395       return env;
396     }
397     void reset() {
398 #if CMK_REPLAYSYSTEM
399       setEvent(++CkpvAccess(envelopeEventID));
400 #endif
401       //type.group.dep.setZero();
402       ((struct s_groupinit *)extraData())->dep.setZero();
403     }
404     UShort getEpIdx(void) const { return epIdx; }
405     void   setEpIdx(const UShort idx) { epIdx = idx; }
406     UInt   getSrcPe(void) const { return pe; }
407     void   setSrcPe(const UInt s) { pe = s; }
408     static void setSrcPe(char *env, const UInt s) { ((envelope*)env)->setSrcPe(s); }
409
410 // Readonly-specific fields
411     inline char * extraData() const { return (char*)this+totalsize-extrasize; }
412
413     UInt   getCount(void) const { 
414       CkAssert(getMsgtype()==RODataMsg); return ((struct s_roData *)extraData())->count; 
415     }
416     void   setCount(const UInt c) { 
417       CkAssert(getMsgtype()==RODataMsg); ((struct s_roData *)extraData())->count = c; 
418     }
419     UInt   getRoIdx(void) const { 
420       CkAssert(getMsgtype()==ROMsgMsg); return ((struct s_roMsg*)extraData())->roIdx; 
421     }
422     void   setRoIdx(const UInt r) { 
423       CkAssert(getMsgtype()==ROMsgMsg); ((struct s_roMsg*)extraData())->roIdx = r; 
424     }
425     
426  // Chare-specific fields
427     UInt isForAnyPE(void) { 
428       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
429       return ((struct s_chare*)extraData())->forAnyPe; 
430     }
431     void setForAnyPE(UInt f) { 
432       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
433       ((struct s_chare*)extraData())->forAnyPe = f; 
434     }
435     void*  getVidPtr(void) const {
436       CkAssert(getMsgtype()==NewVChareMsg || getMsgtype()==ForVidMsg
437           || getMsgtype()==FillVidMsg ||  getMsgtype()==DeleteVidMsg);
438       return ((struct s_chare*)extraData())->ptr;
439     }
440     void   setVidPtr(void *p) {
441       CkAssert(getMsgtype()==NewVChareMsg || getMsgtype()==ForVidMsg
442           || getMsgtype()==FillVidMsg ||  getMsgtype()==DeleteVidMsg);
443       ((struct s_chare*)extraData())->ptr = p;
444     }
445     void*  getObjPtr(void) const { 
446       CkAssert(getMsgtype()==ForChareMsg); return ((struct s_chare*)extraData())->ptr; 
447     }
448     void   setObjPtr(void *p) { 
449       CkAssert(getMsgtype()==ForChareMsg); ((struct s_chare*)extraData())->ptr = p; 
450     }
451     UInt getByPe(void) { 
452       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
453       return ((struct s_chare*)extraData())->bype; 
454     }
455     void setByPe(UInt pe) { 
456       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
457       ((struct s_chare*)extraData())->bype = pe; 
458     }
459
460 // Group-specific fields
461     CkGroupID   getGroupNum(void) const {
462       CkAssert(getMsgtype()==ForBocMsg || getMsgtype()==ForNodeBocMsg);
463       return ((struct s_group*)extraData())->g;
464     }
465     void   setGroupNum(const CkGroupID g) {
466       CkAssert(getMsgtype()==ForBocMsg || getMsgtype()==ForNodeBocMsg);
467       ((struct s_group*)extraData())->g = g;
468     }
469
470     CkGroupID getInitGroupNum(void) const {
471       CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg);
472       return ((struct s_groupinit*)extraData())->g;
473     }
474     void   setInitGroupNum(const CkGroupID g) {
475       CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg);
476       ((struct s_groupinit*)extraData())->g = g;
477     }
478     void setGroupEpoch(int epoch) { CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg); ((struct s_groupinit*)extraData())->epoch=epoch; }
479     int getGroupEpoch(void) { CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg); return ((struct s_groupinit*)extraData())->epoch; }
480     void setRednMgr(CkNodeGroupID r){CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg);  ((struct s_groupinit*)extraData())->rednMgr = r; }
481     CkNodeGroupID getRednMgr(){ CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg); return ((struct s_groupinit*)extraData())->rednMgr; }
482     CkGroupID getGroupDep(){ CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg); return ((struct s_groupinit*)extraData())->dep; }
483     void setGroupDep(const CkGroupID &r){ CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg ); ((struct s_groupinit*)extraData())->dep = r; }
484
485 // Array-specific fields
486     CkGroupID getArrayMgr(void) const {
487         if (getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg)
488             return ((struct s_array*)extraData())->arr;
489         else if (getMsgtype() == ForIDedObjMsg)
490             return ((struct s_objid*)extraData())->id.getCollectionID();
491         else
492             CkAbort("Cannot return ArrayID from msg for non-array entity");
493         /* compiler appeasement, even though this will never be executed */
494         return ((struct s_array*)extraData())->arr;
495     }
496
497     void setArrayMgr(const CkGroupID gid) { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); ((struct s_array*)extraData())->arr = gid; }
498     int getArrayMgrIdx(void) const { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); return ((struct s_array*)extraData())->arr.idx;}
499     UShort &getsetArrayEp(void) {return epIdx;}
500     UShort &getsetArrayBcastEp(void) {return ((struct s_group*)extraData())->arrayEp;}
501     UChar &getsetArrayHops(void) { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); return ((struct s_array*)extraData())->hopCount;}
502     int getArrayIfNotThere(void) { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); return ((struct s_array*)extraData())->ifNotThere;}
503     void setArrayIfNotThere(int nt) { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); ((struct s_array*)extraData())->ifNotThere=nt;}
504     int *getsetArrayListenerData(void) { CkAssert(getMsgtype() == ArrayEltInitMsg); return ((struct s_arrayinit*)extraData())->listenerData;}
505 #if CMK_SMP_TRACE_COMMTHREAD
506     UInt &getsetArraySrcPe(void) {return ((struct s_array*)extraData())->srcpe;}
507 #else
508     UInt &getsetArraySrcPe(void) {return pe;}
509 #endif
510     CkArrayIndex &getsetArrayIndex(void) 
511     {
512       CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg);
513       return *(CkArrayIndex *)&((struct s_array*)extraData())->index;
514     }
515
516 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
517  public:
518     /** The information regarding the entry methods that executed along the path to this one.
519         \addtogroup CriticalPathFramework
520     */
521     PathHistoryEnvelope pathHistory;
522 #endif
523
524 };
525
526
527 inline envelope *UsrToEnv(const void *const msg) {
528   return (((envelope *) msg)-1);
529 }
530
531 inline void *EnvToUsr(const envelope *const env) {
532   return ((void *)(env+1));
533 }
534
535 inline envelope *_allocEnv(const int msgtype, const int size=0, const int prio=0) {
536   return envelope::alloc(msgtype,size,prio);
537 }
538
539 inline void *_allocMsg(const int msgtype, const int size, const int prio=0) {
540   return EnvToUsr(envelope::alloc(msgtype,size,prio));
541 }
542
543 inline void _resetEnv(envelope *env) {
544   env->reset();
545 }
546
547 inline void setEventID(envelope *env){
548   env->setEvent(++CkpvAccess(envelopeEventID));
549 }
550
551 /** @} */
552
553 extern UChar   _defaultQueueing;
554
555 extern void CkPackMessage(envelope **pEnv);
556 extern void CkUnpackMessage(envelope **pEnv);
557
558 class MsgPool: public SafePool<void *> {
559 private:
560     static void *_alloc(void) {
561       /* CkAllocSysMsg() called in .def.h is not thread of sigio safe */
562       register envelope *env = _allocEnv(ForChareMsg,0,0);
563       env->setQueueing(_defaultQueueing);
564       env->setMsgIdx(0);
565       return EnvToUsr(env);
566     }
567     static void _reset(void* m) {
568       register envelope *env = UsrToEnv(m);
569       _resetEnv(env);
570     }
571 public:
572     MsgPool():SafePool<void*>(_alloc, CkFreeMsg, _reset) {}
573 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
574         void *get(void){
575             return allocfn();
576         }
577         void put(void *m){
578         }
579 #endif
580 };
581
582 CkpvExtern(MsgPool*, _msgPool);
583
584 #endif