50a9df3fadeeafafb31a59c601c58b0cbc627364
[charm.git] / src / ck-core / envelope.h
1 /**
2  @defgroup CkEnvelope
3  \brief  Charm++ message header.
4 */
5 #ifndef _ENVELOPE_H
6 #define _ENVELOPE_H
7
8 #include <pup.h>
9 #include <middle.h>
10 #include <ckarrayindex.h>
11 #include <cklists.h>
12 #include <objid.h>
13
14 #ifndef CkIntbits
15 #define CkIntbits (sizeof(int)*8)
16 #endif
17
18 #if CMK_ERROR_CHECKING
19 #define _SET_USED(env, x) (env)->setUsed((x))
20 #define _CHECK_USED(env) do { if(env->isUsed()) \
21                            CmiAbort("Message being re-sent. Aborting...\n"); \
22                          } while(0)
23 #else
24 #define _SET_USED(env, x) do{}while(0)
25 #define _CHECK_USED(env) do{}while(0)
26 #endif
27
28 #define CkMsgAlignmentMask     (sizeof(double)-1)
29 #define CkMsgAlignLength(x) (((x)+CkMsgAlignmentMask)&(~(CkMsgAlignmentMask)))
30 #define CkMsgAlignOffset(x)     (CkMsgAlignLength(x)-(x))
31 #define CkPriobitsToInts(nBits)    ((nBits+CkIntbits-1)/CkIntbits)
32
33 #if CMK_MESSAGE_LOGGING
34 #define CK_FREE_MSG_MLOG        0x1
35 #define CK_BYPASS_DET_MLOG      0x2
36 #define CK_MULTICAST_MSG_MLOG   0x4
37 #define CK_REDUCTION_MSG_MLOG   0x8
38 #endif
39
40 //#define USE_CRITICAL_PATH_HEADER_ARRAY
41
42 /**
43     \addtogroup CriticalPathFramework 
44     @{
45 */
46
47 /** A class that is used to track the entry points and other information 
48     about a critical path as a charm++ program executes.
49
50     This class won't do useful things unless USE_CRITICAL_PATH_HEADER_ARRAY is defined
51 */
52 class PathHistoryEnvelope {
53  protected:
54   // When passing paths forward, store information on PEs, in backward pass, lookup necessary information
55   int sender_history_table_idx;
56   double totalTime;
57  public:
58   double getTotalTime() const{ return totalTime; }
59   int get_sender_history_table_idx() const{ return sender_history_table_idx; }
60   void set_sender_history_table_idx(int i) { sender_history_table_idx = i; }
61   PathHistoryEnvelope(){ 
62 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
63     reset(); 
64 #endif
65   }
66   double getTime() const{ return totalTime; }
67   void setTime(double t){ totalTime = t; }
68   void pup(PUP::er &p) {
69     p | sender_history_table_idx;
70     p | totalTime;
71   } 
72   void reset();
73   void print() const;
74   /// Write a description of the path into the beginning of the provided buffer. The buffer ought to be large enough.
75   void printHTMLToString(char* buf) const{
76     buf[0] = '\0';
77     sprintf(buf+strlen(buf), "Path Time=%lf<br> Sender idx=%d", (double)totalTime, (int)sender_history_table_idx);
78   }
79   /// The number of available EP counts 
80   int getNumUsed() const;
81   /// Return the count value for the idx'th available EP  
82   int getUsedCount(int idx) const;
83   /// Return the idx'th available EP 
84   int getUsedEp(int idx) const;
85   int getEpCount(int ep) const;
86   void incrementTotalTime(double time);
87   //  void createPath(envelope *originatingMsg);
88   void setDebug100();
89 };
90 /** @} */
91
92
93
94 typedef unsigned int   UInt;
95 typedef unsigned short UShort;
96 typedef unsigned char  UChar;
97
98 #include "charm.h" // for CkGroupID, and CkEnvelopeType
99 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
100 #include "ckobjid.h" //for the ckobjId
101 #endif
102
103 /**
104 @addtogroup CkEnvelope
105 */
106
107 CkpvExtern(int, envelopeEventID);
108
109 /**
110 @{
111 The class envelope defines a Charm++ message's header. The first
112 'CmiReservedHeaderSize' bytes of memory is exclusively reserved for Converse
113 header, which is defined in converse.h and platform specific config files.
114
115 After Charm++ envelope comes the payload, i.e. a variable-length amount of user
116 data. Following the user data, optionally, a variable number of bits of
117 priority data can be stored at the end. Function
118 envelope::alloc(msgtype,size,prio) is always used to allocate the whole
119 message. Note that this memory layout must be observed.
120
121 The following are a few terms that are used often:
122
123 <pre>
124  Envelope pointer        \
125  Converse message pointer -> [ [ Converse envelope ]       ]
126                              [       Charm envelope        ] 
127  User message pointer     -> [ User data/payload ... ]
128  Priority pointer         -> [ Priority ints ... ]
129  Extra data pointer       -> [ data specific to this message type ]
130 </pre>
131
132 The "message pointers" passed to and from users bypass the envelope and point
133 *directly* to the user data--the routine "EnvToUsr" below adjusts an envelope
134 (or converse message) pointer into this user message pointer.  There is a
135 corresponding routine "UsrToEnv" which takes the user data pointer and returns
136 a pointer to the envelope/converse message.
137
138 Unfortunately, in the guts of Charm++ it's not always clear whether you've been
139 given a converse or user message pointer, as both tend to be passed as void *.
140 Confusing the two will invariably result in data corruption and bizarre
141 crashes.
142
143 FIXME: Make CkMessage inherit from envelope,
144 which would unify converse, envelope, and 
145 user message pointers.
146 */
147
148  /**
149    These structures store the type-specific message information.
150  */
151 struct s_chare {  // NewChareMsg, NewVChareMsg, ForChareMsg, ForVidMsg, FillVidMsg
152         void *ptr;      ///< object pointer
153         UInt forAnyPe;  ///< Used only by newChare
154         int  bype;      ///< created by this pe
155 };
156
157 struct s_groupinit {         // NodeBocInitMsg, BocInitMsg
158         CkGroupID g;           ///< GroupID
159         CkNodeGroupID rednMgr; ///< Reduction manager for this group (constructor only!)
160         CkGroupID dep;         ///< create after dep is created (constructor only!)
161         int epoch;             ///< "epoch" this group was created during (0--mainchare, 1--later)
162 };
163
164 struct s_group {         // ForNodeBocMsg, ForBocMsg
165         CkGroupID g;           ///< GroupID
166         UShort arrayEp;        ///< Used only for array broadcasts
167 };
168
169 struct s_array{             ///< ForArrayEltMsg
170         CkArrayIndexBase index; ///< Array element index
171         CkGroupID arr;            ///< Array manager GID
172 #if CMK_SMP_TRACE_COMMTHREAD
173         UInt srcpe; 
174 #endif
175         UChar hopCount;           ///< number of times message has been routed
176         UChar ifNotThere;         ///< what to do if array element is missing
177 };
178
179 struct s_objid {
180         ck::ObjID id;
181 #if CMK_SMP_TRACE_COMMTHREAD
182         UInt srcpe;
183 #endif
184         UChar hopCount;           ///< number of times message has been routed
185         UChar ifNotThere;         ///< what to do if array element is missing
186 };
187
188 struct s_arrayinit{         ///< ArrayEltInitMsg
189         CkArrayIndexBase index; ///< Array element index
190         CkGroupID arr;            ///< Array manager GID
191 #if CMK_SMP_TRACE_COMMTHREAD
192         UInt srcpe; 
193 #endif
194         UChar hopCount;           ///< number of times message has been routed
195         UChar ifNotThere;         ///< what to do if array element is missing
196         int listenerData[CK_ARRAYLISTENER_MAXLEN]; ///< For creation
197 };
198
199 struct s_roData {    ///< RODataMsg for readonly data type
200         UInt count;
201 };
202
203 struct s_roMsg {     ///< ROMsgMsg for readonlys defined in ci files
204         UInt roIdx;
205 };
206
207
208 inline int getMaxExtrasize()
209 {
210
211     int ret=0;
212     ret = sizeof(struct s_chare);
213     if(ret <  sizeof(struct s_groupinit))
214         ret = sizeof(struct s_groupinit);
215     if(ret < sizeof(struct s_group))
216         ret = sizeof(struct s_group);
217     if(ret < sizeof(struct s_arrayinit))
218         ret = sizeof(struct s_arrayinit);
219     if(ret < sizeof(struct s_array))
220         ret = sizeof(struct s_array);
221     if ( ret < sizeof(struct s_roData))
222         ret = sizeof(struct s_roData);
223     if(ret < sizeof(struct s_roMsg))
224         ret = sizeof(struct s_roMsg);
225     return ret;
226
227 }
228
229 inline UShort extraSize(CkEnvelopeType type)
230 {
231   int ret = 0;
232   switch (type) {
233   case NewChareMsg:
234   case NewVChareMsg:
235   case ForChareMsg:
236   case ForVidMsg:
237   case FillVidMsg:
238   case DeleteVidMsg:
239     ret = sizeof(struct s_chare);
240     break;
241   case BocInitMsg:
242   case NodeBocInitMsg:
243     ret = sizeof(struct s_groupinit);
244     break;
245   case ForBocMsg:
246   case ForNodeBocMsg:
247     ret = sizeof(struct s_group);
248     break;
249   case ArrayEltInitMsg:
250     ret = sizeof(struct s_arrayinit);
251     break;
252   case ForArrayEltMsg:
253     ret = sizeof(struct s_array);
254     break;
255   case ForIDedObjMsg:
256     ret = sizeof(struct s_objid);
257     break;
258   case RODataMsg:
259     ret = sizeof(struct s_roData);
260     break;
261   case ROMsgMsg:
262     ret = sizeof(struct s_roMsg);
263     break;
264   case StartExitMsg:
265   case ExitMsg:
266   case ReqStatMsg:
267   case StatMsg:
268     break;
269   default:
270     CmiAbort("piggysize: unknown message type.");
271   }
272   return ret;
273 }
274
275 extern UInt  envMaxExtraSize;
276
277 class envelope {
278   private:
279     /// Converse message envelope, Must be first field in this class
280     char   core[CmiReservedHeaderSize];
281 public:
282     struct s_attribs {  // Packed bitwise struct
283       UChar msgIdx;     ///< Usertype of message (determines pack routine)
284       UChar mtype;      ///< e.g., ForBocMsg
285       UChar queueing:4; ///< Queueing strategy (FIFO, LIFO, PFIFO, ...)
286       UChar isPacked:1; ///< If true, message must be unpacked before use
287       UChar isUsed:1;   ///< Marker bit to prevent message re-send.
288     };
289 private:
290     //u_type type;           ///< Depends on message type (attribs.mtype)
291     
292     CMK_REFNUM_TYPE ref;            ///< Used by futures
293     UShort   extrasize;  ///< Byte count specific for message types
294     s_attribs attribs;
295     UChar align[CkMsgAlignOffset(CmiReservedHeaderSize+sizeof(CMK_REFNUM_TYPE)+sizeof(UShort)+sizeof(s_attribs))];    ///< padding to make sure sizeof(double) alignment
296     
297     //This struct should now be sizeof(void*) aligned.
298     UShort priobits;   ///< Number of bits of priority data after user data
299     UShort epIdx;      ///< Entry point to call
300     UInt   pe;         ///< source processor
301     UInt   event;      ///< used by projections
302     UInt   totalsize;  ///< Byte count from envelope start to end of priobits
303     
304   public:
305 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
306     CkObjID sender;
307     CkObjID recver;
308     MCount SN;
309 #if defined(_FAULT_CAUSAL_)
310     MCount TN;
311     MCount mlogPadding;         //HACK: aligns envelope to double size (to make xlc work)
312 #endif
313     int incarnation;
314     int flags;
315     UInt piggyBcastIdx;
316 #endif
317     void pup(PUP::er &p);
318     UInt   getEvent(void) const { return event; }
319     void   setEvent(const UInt e) { event = e; }
320     CMK_REFNUM_TYPE   getRef(void) const { return ref; }
321     void   setRef(const CMK_REFNUM_TYPE r) { ref = r; }
322     UChar  getQueueing(void) const { return attribs.queueing; }
323     void   setQueueing(const UChar q) { attribs.queueing=q; }
324     UChar  getMsgtype(void) const { return attribs.mtype; }
325     void   setMsgtype(const UChar m) { if (attribs.mtype!=m) { int old = extrasize; extrasize = extraSize((CkEnvelopeType)m); totalsize += extrasize - old; } attribs.mtype = m; }
326 #if CMK_ERROR_CHECKING
327     UChar  isUsed(void) { return attribs.isUsed; }
328     void   setUsed(const UChar u) { attribs.isUsed=u; }
329 #else /* CMK_ERROR_CHECKING */
330     inline void setUsed(const UChar u) {}
331 #endif
332     UChar  getMsgIdx(void) const { return attribs.msgIdx; }
333     void   setMsgIdx(const UChar idx) { attribs.msgIdx = idx; }
334     UInt   getTotalsize(void) const { return totalsize; }
335     void   setTotalsize(const UInt s) { totalsize = s; }
336     UInt   getUsersize(void) const { 
337       return totalsize - getPrioBytes() - sizeof(envelope) - extrasize; 
338     }
339     void   setUsersize(const UInt s) {
340       if (s == getUsersize()) {
341         return;
342       }
343       CkAssert(s < getUsersize());
344       UInt newPrioOffset = sizeof(envelope) + CkMsgAlignLength(s);
345       UInt newExtraDataOffset = newPrioOffset + getPrioBytes();
346       UInt newTotalsize = newExtraDataOffset + getExtrasize();
347       void *newPrioPtr = (void *) ((char *) this + newPrioOffset); 
348       void *newExtraPtr = (void *) ((char *) this + newExtraDataOffset);
349       // use memmove instead of memcpy in case memory areas overlap
350       memmove(newPrioPtr, getPrioPtr(), getPrioBytes()); 
351       memmove(newExtraPtr, (void *) extraData(), getExtrasize());
352       setTotalsize(newTotalsize); 
353     }
354
355     // s specifies number of bytes to remove from user portion of message
356     void shrinkUsersize(const UInt s) {
357       CkAssert(s <= getUsersize());
358       setUsersize(getUsersize() - s);
359     }
360
361     UShort getExtrasize(void) const { return extrasize; }
362     void   setExtrasize(const UShort s) { extrasize = s; }
363     UChar  isPacked(void) const { return attribs.isPacked; }
364     void   setPacked(const UChar p) { attribs.isPacked = p; }
365     UShort getPriobits(void) const { return priobits; }
366     void   setPriobits(const UShort p) { priobits = p; }
367     UShort getPrioWords(void) const { return CkPriobitsToInts(priobits); }
368     UShort getPrioBytes(void) const { return getPrioWords()*sizeof(int); }
369     void*  getPrioPtr(void) const { 
370       return (void *)((char *)this + totalsize - extrasize - getPrioBytes());
371     }
372     static envelope *alloc(const UChar type, const UInt size=0, const UShort prio=0)
373     {
374       CkAssert(type >= NewChareMsg && type < LAST_CK_ENVELOPE_TYPE);
375
376 #if CMK_USE_STL_MSGQ
377       // Ideally, this should be a static compile-time assert. However we need API changes for that
378       CkAssert(sizeof(CMK_MSG_PRIO_TYPE) >= sizeof(int)*CkPriobitsToInts(prio));
379 #endif
380
381       register UShort extrasize = extraSize((CkEnvelopeType)type);
382       register UInt tsize0 = sizeof(envelope)+ 
383             CkMsgAlignLength(size)+
384             sizeof(int)*CkPriobitsToInts(prio);
385       register UInt tsize = tsize0 + extrasize;
386       register envelope *env = (envelope *)CmiAlloc(tsize0);
387 #if CMK_REPLAYSYSTEM
388       //for record-replay
389       memset(env, 0, sizeof(envelope));
390       env->setEvent(++CkpvAccess(envelopeEventID));
391 #endif
392       env->setMsgtype(type);
393       env->totalsize = tsize;
394       env->extrasize = extrasize;
395       env->priobits = prio;
396       env->setPacked(0);
397       //env->type.group.dep.setZero();
398       ((struct s_groupinit *)env->extraData())->dep.setZero();
399       _SET_USED(env, 0);
400       env->setRef(0);
401       env->setEpIdx(0);
402
403 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
404       env->pathHistory.reset();
405 #endif
406
407 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
408       env->sender.type = TypeInvalid;
409       env->recver.type = TypeInvalid;
410       env->SN = 0;
411 #if defined(_FAULT_CAUSAL_)
412       env->TN = 0;
413 #endif
414           env->incarnation = -1;
415 #endif
416
417       return env;
418     }
419     void reset() {
420 #if CMK_REPLAYSYSTEM
421       setEvent(++CkpvAccess(envelopeEventID));
422 #endif
423       //type.group.dep.setZero();
424       ((struct s_groupinit *)extraData())->dep.setZero();
425     }
426     UShort getEpIdx(void) const { return epIdx; }
427     void   setEpIdx(const UShort idx) { epIdx = idx; }
428     UInt   getSrcPe(void) const { return pe; }
429     void   setSrcPe(const UInt s) { pe = s; }
430     static void setSrcPe(char *env, const UInt s) { ((envelope*)env)->setSrcPe(s); }
431
432 // Readonly-specific fields
433     inline char * extraData() const { return (char*)this+totalsize-extrasize; }
434
435     UInt   getCount(void) const { 
436       CkAssert(getMsgtype()==RODataMsg); return ((struct s_roData *)extraData())->count; 
437     }
438     void   setCount(const UInt c) { 
439       CkAssert(getMsgtype()==RODataMsg); ((struct s_roData *)extraData())->count = c; 
440     }
441     UInt   getRoIdx(void) const { 
442       CkAssert(getMsgtype()==ROMsgMsg); return ((struct s_roMsg*)extraData())->roIdx; 
443     }
444     void   setRoIdx(const UInt r) { 
445       CkAssert(getMsgtype()==ROMsgMsg); ((struct s_roMsg*)extraData())->roIdx = r; 
446     }
447     
448  // Chare-specific fields
449     UInt isForAnyPE(void) { 
450       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
451       return ((struct s_chare*)extraData())->forAnyPe; 
452     }
453     void setForAnyPE(UInt f) { 
454       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
455       ((struct s_chare*)extraData())->forAnyPe = f; 
456     }
457     void*  getVidPtr(void) const {
458       CkAssert(getMsgtype()==NewVChareMsg || getMsgtype()==ForVidMsg
459           || getMsgtype()==FillVidMsg ||  getMsgtype()==DeleteVidMsg);
460       return ((struct s_chare*)extraData())->ptr;
461     }
462     void   setVidPtr(void *p) {
463       CkAssert(getMsgtype()==NewVChareMsg || getMsgtype()==ForVidMsg
464           || getMsgtype()==FillVidMsg ||  getMsgtype()==DeleteVidMsg);
465       ((struct s_chare*)extraData())->ptr = p;
466     }
467     void*  getObjPtr(void) const { 
468       CkAssert(getMsgtype()==ForChareMsg); return ((struct s_chare*)extraData())->ptr; 
469     }
470     void   setObjPtr(void *p) { 
471       CkAssert(getMsgtype()==ForChareMsg); ((struct s_chare*)extraData())->ptr = p; 
472     }
473     UInt getByPe(void) { 
474       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
475       return ((struct s_chare*)extraData())->bype; 
476     }
477     void setByPe(UInt pe) { 
478       CkAssert(getMsgtype()==NewChareMsg || getMsgtype()==NewVChareMsg); 
479       ((struct s_chare*)extraData())->bype = pe; 
480     }
481
482 // Group-specific fields
483     CkGroupID   getGroupNum(void) const {
484       CkAssert(getMsgtype()==ForBocMsg || getMsgtype()==ForNodeBocMsg);
485       return ((struct s_group*)extraData())->g;
486     }
487     void   setGroupNum(const CkGroupID g) {
488       CkAssert(getMsgtype()==ForBocMsg || getMsgtype()==ForNodeBocMsg);
489       ((struct s_group*)extraData())->g = g;
490     }
491
492     CkGroupID getInitGroupNum(void) const {
493       CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg);
494       return ((struct s_groupinit*)extraData())->g;
495     }
496     void   setInitGroupNum(const CkGroupID g) {
497       CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg);
498       ((struct s_groupinit*)extraData())->g = g;
499     }
500     void setGroupEpoch(int epoch) { CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg); ((struct s_groupinit*)extraData())->epoch=epoch; }
501     int getGroupEpoch(void) { CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg); return ((struct s_groupinit*)extraData())->epoch; }
502     void setRednMgr(CkNodeGroupID r){CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg);  ((struct s_groupinit*)extraData())->rednMgr = r; }
503     CkNodeGroupID getRednMgr(){ CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg); return ((struct s_groupinit*)extraData())->rednMgr; }
504     CkGroupID getGroupDep(){ CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg); return ((struct s_groupinit*)extraData())->dep; }
505     void setGroupDep(const CkGroupID &r){ CkAssert(getMsgtype()==BocInitMsg || getMsgtype()==NodeBocInitMsg ); ((struct s_groupinit*)extraData())->dep = r; }
506
507 // Array-specific fields
508     CkGroupID getArrayMgr(void) const {
509         if (getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg)
510             return ((struct s_array*)extraData())->arr;
511         else if (getMsgtype() == ForIDedObjMsg)
512             return ((struct s_objid*)extraData())->id.getCollectionID();
513         else
514             CkAbort("Cannot return ArrayID from msg for non-array entity");
515         /* compiler appeasement, even though this will never be executed */
516         return ((struct s_array*)extraData())->arr;
517     }
518
519     void setArrayMgr(const CkGroupID gid) { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); ((struct s_array*)extraData())->arr = gid; }
520     int getArrayMgrIdx(void) const { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); return ((struct s_array*)extraData())->arr.idx;}
521     UShort &getsetArrayEp(void) {return epIdx;}
522     UShort &getsetArrayBcastEp(void) {return ((struct s_group*)extraData())->arrayEp;}
523     UChar &getsetArrayHops(void) { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); return ((struct s_array*)extraData())->hopCount;}
524     int getArrayIfNotThere(void) { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); return ((struct s_array*)extraData())->ifNotThere;}
525     void setArrayIfNotThere(int nt) { CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg); ((struct s_array*)extraData())->ifNotThere=nt;}
526     int *getsetArrayListenerData(void) { CkAssert(getMsgtype() == ArrayEltInitMsg); return ((struct s_arrayinit*)extraData())->listenerData;}
527 #if CMK_SMP_TRACE_COMMTHREAD
528     UInt &getsetArraySrcPe(void) {return ((struct s_array*)extraData())->srcpe;}
529 #else
530     UInt &getsetArraySrcPe(void) {return pe;}
531 #endif
532     CkArrayIndex &getsetArrayIndex(void) 
533     {
534       CkAssert(getMsgtype() == ForArrayEltMsg || getMsgtype() == ArrayEltInitMsg);
535       return *(CkArrayIndex *)&((struct s_array*)extraData())->index;
536     }
537
538 #ifdef USE_CRITICAL_PATH_HEADER_ARRAY
539  public:
540     /** The information regarding the entry methods that executed along the path to this one.
541         \addtogroup CriticalPathFramework
542     */
543     PathHistoryEnvelope pathHistory;
544 #endif
545
546 };
547
548 inline int getEnvelopesize()
549 {
550   return sizeof(envelope)+ getMaxExtrasize();
551 }
552
553
554 inline envelope *UsrToEnv(const void *const msg) {
555   return (((envelope *) msg)-1);
556 }
557
558 inline void *EnvToUsr(const envelope *const env) {
559   return ((void *)(env+1));
560 }
561
562 inline envelope *_allocEnv(const int msgtype, const int size=0, const int prio=0) {
563   return envelope::alloc(msgtype,size,prio);
564 }
565
566 inline void *_allocMsg(const int msgtype, const int size, const int prio=0) {
567   return EnvToUsr(envelope::alloc(msgtype,size,prio));
568 }
569
570 inline void _resetEnv(envelope *env) {
571   env->reset();
572 }
573
574 inline void setEventID(envelope *env){
575   env->setEvent(++CkpvAccess(envelopeEventID));
576 }
577
578 /** @} */
579
580 extern UChar   _defaultQueueing;
581
582 extern void CkPackMessage(envelope **pEnv);
583 extern void CkUnpackMessage(envelope **pEnv);
584
585 class MsgPool: public SafePool<void *> {
586 private:
587     static void *_alloc(void) {
588       /* CkAllocSysMsg() called in .def.h is not thread of sigio safe */
589       register envelope *env = _allocEnv(ForChareMsg,0,0);
590       env->setQueueing(_defaultQueueing);
591       env->setMsgIdx(0);
592       return EnvToUsr(env);
593     }
594     static void _reset(void* m) {
595       register envelope *env = UsrToEnv(m);
596       _resetEnv(env);
597     }
598 public:
599     MsgPool():SafePool<void*>(_alloc, CkFreeMsg, _reset) {}
600 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
601         void *get(void){
602             return allocfn();
603         }
604         void put(void *m){
605         }
606 #endif
607 };
608
609 CkpvExtern(MsgPool*, _msgPool);
610
611 #endif