2481a0b72d28851cf6cff31cc4043451c344b433
[charm.git] / src / conv-com / petable.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /*********************************************
9  * File : petable.C
10  *
11  * Author: Krishnan V
12  *
13  * The message buffer
14  *********************************************/
15 #include <string.h>
16 #include <stdlib.h>
17 #include <converse.h>
18 #include "comlib.h"
19 #include "petable.h"
20 #include "converse.h"
21
22 #define BIGBUFFERSIZE 65536
23 #define PTPREALLOC    100
24
25 struct AllToAllHdr {
26   char dummy[CmiReservedHeaderSize];
27   int size;
28   int refno;
29   comID id;
30   int ufield;
31   int nmsgs;
32 };
33
34
35 /**************************************************************
36  * Preallocated memory=P*MSGQLEN ptr + 2P ints + 1000 ptrs
37  **************************************************************/
38 PeTable :: PeTable(int n) {
39   NumPes=n;
40   magic=0;
41   PeList = (PTinfo ***)CmiAlloc(sizeof(PTinfo *)*NumPes);
42   //  ComlibPrintf("Pelist[%d][%d]\n", NumPes, MSGQLEN);
43   msgnum=new int[NumPes];
44   MaxSize=new int[NumPes];
45   for (int i=0;i<NumPes;i++) {
46     msgnum[i]=0;
47     MaxSize[i]=MSGQLEN;
48     PeList[i]=(PTinfo **)CmiAlloc(sizeof(PTinfo *)*MSGQLEN);
49     for (int j=0;j<MSGQLEN;j++) PeList[i][j]=0;
50   }
51
52   //ptrlist=(PTinfo **)CmiAlloc(1000*sizeof(PTinfo *));
53   //  FreeList= new GList;
54   //CombBuffer=(char *)CmiAlloc(BIGBUFFERSIZE);
55
56   PTFreeList=NULL;
57   PTFreeChunks=NULL;
58 }
59
60 PeTable :: ~PeTable() {
61   int i;
62   for (i=0;i<NumPes;i++) CmiFree(PeList[i]);
63   CmiFree(PeList);
64   delete msgnum;
65   delete MaxSize;
66   GarbageCollect();
67   //CmiFree(ptrlist);
68   PTinfo *tmp;
69   while (PTFreeChunks) {
70     tmp=PTFreeChunks;
71     PTFreeChunks=PTNEXTCHUNK(tmp);
72     CmiFree(tmp);
73   }
74   // delete FreeList;
75
76 }
77
78 void PeTable:: Purge() {
79   for (int i=0; i<NumPes;i++) {
80     if (msgnum[i]) {
81       // ComlibPrintf("%d Warning: %d Undelivered Messages for %d\n", CkMyPe(), msgnum[i], i);
82       //msgnum[i]=0;
83     }
84   }
85   GarbageCollect();
86   //  ComlibPrintf("combcount = %d\n", combcount);
87   //combcount = 0;
88 }
89
90 void PeTable :: ExtractAndDeliverLocalMsgs(int index) {
91   int j;
92   msgstruct m;
93
94   ComlibPrintf("%d:Delivering %d local messages\n", CkMyPe(), msgnum[index]);
95   for (j=msgnum[index]-1;(j>=0);j--) {
96
97     m.msgsize=PeList[index][j]->msgsize;
98     m.msg=PeList[index][j]->msg;
99
100     if (--(PeList[index][j]->refCount) <=0) {
101       CmiSyncSendAndFree(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
102       PTFREE(PeList[index][j]);
103     }
104     else {
105       CmiSyncSend(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
106     }
107     PeList[index][j]=NULL;
108   }
109   msgnum[index]=j+1;
110
111   return;
112 }
113
114
115 #undef PACK
116 #undef PACKMSG
117 //#define PACKINT(data) {((int*)t)[0] = data; t+=sizeof(int);}
118 #define PACK(type,data) {junk=(char *)&(data); memcpy(t, junk, sizeof(type)); t+=sizeof(type);}
119 #define PACKMSG(data, size) { memcpy(p+msg_offset, (data), size); msg_offset += size; }
120
121 /*
122   Protocol:
123   |     AllToAllHdr      |npe|ChunkHdr1|msg1|ChunkHdr2|msg2|...
124   |ref|comid|ufield|nmsgs|
125 */
126 char * PeTable ::ExtractAndPack(comID id, int ufield, int npe, 
127                                 int *pelist, int *length) {
128   char *junk;
129   int nummsgs, offset, num_distinctmsgs;
130         
131   int tot_msgsize=TotalMsgSize(npe, pelist, &nummsgs, &num_distinctmsgs);
132
133   ComlibPrintf("%d In ExtractAndPack %d, %d\n", CmiMyPe(), npe, nummsgs); 
134
135   if (tot_msgsize ==0) {
136     *length=0;
137         
138     ComlibPrintf("Returning NULL\n");
139     return(NULL);
140   }
141     
142   int msg_offset = sizeof(struct AllToAllHdr) + (npe + nummsgs + 1) * sizeof(int);
143   //int msg_offset = CmiReservedHeaderSize + sizeof(comID) 
144   //    + (npe + 4 + nummsgs) * sizeof(int);  
145
146   msg_offset = ALIGN8(msg_offset);
147     
148   *length = tot_msgsize;
149   *length += msg_offset;
150   char *p;
151   p=(char *)CmiAlloc(*length);
152
153   char *t = p + CmiReservedHeaderSize;
154   int i, j;
155   if (!p) CmiAbort("Big time problem\n");
156   magic++;
157
158   int refno = id.refno;    
159
160   PACK(int, *length);
161   PACK(int, refno);
162   PACK(comID, id);
163   PACK(int, ufield);
164   PACK(int, nummsgs);
165   PACK(int, npe);
166     
167   int lesspe=0;
168   int npacked = 0;
169   for (i=0;i<npe;i++) {
170     int index=pelist[i];
171
172     if (msgnum[index]<=0) {
173       lesspe++;
174             
175       ComlibPrintf("[%d] msgnum[index]<=0 !!!!!\n", CkMyPe());
176       continue;
177     }
178         
179     ComlibPrintf("%d Packing pelist[%d]\n", CkMyPe(), index);
180     register int newval=-1*pelist[i];
181     PACK(int, newval); 
182
183     for (j=0;j<msgnum[index];j++) {
184       if (PeList[index][j]->magic == magic) {
185         offset=(PeList[index][j]->offset);
186       }
187       else {
188         npacked ++;
189                 
190         //offset=msg_offset;
191         offset=npacked;
192         PeList[index][j]->magic=magic;
193         PeList[index][j]->offset=offset;
194         PTinfo *tempmsg=PeList[index][j];
195                 
196         CmiChunkHeader hdr;
197         hdr.size = tempmsg->msgsize;
198         hdr.ref = -1;
199         PACKMSG(&hdr, sizeof(CmiChunkHeader));
200         PACKMSG(tempmsg->msg, tempmsg->msgsize);
201
202         msg_offset = ALIGN8(msg_offset);
203       }
204             
205       //ComlibPrintf("%d Packing msg_offset=%d\n", CkMyPe(), offset);
206       PACK(int, offset); 
207
208       if (--(PeList[index][j]->refCount) <=0) {
209         CmiFree(PeList[index][j]->msg);
210                 
211         PTFREE(PeList[index][j]);
212       }
213       PeList[index][j]=NULL;
214     }
215     msgnum[index]=0;
216   }
217   //offset=-1;
218   //PACK(int, offset);
219
220   /*    
221         if (lesspe) {
222         t=p+CmiReservedHeaderSize+2*sizeof(int) + sizeof(comID);
223         npe=npe-lesspe;
224         PACK(int, npe);
225         }
226   */
227
228   return(p);
229
230
231 /*Used for all to all multicast operations.  Assumes that each message
232   is destined to all the processors, to speed up all to all
233   substantially --Sameer 09/03/03 
234   
235   Protocol:
236   |     AllToAllHdr      |ChunkHdr1|msg1|ChunkHdr2|msg2|...
237   |ref|comid|ufield|nmsgs|
238 */
239
240 char * PeTable ::ExtractAndPackAll(comID id, int ufield, int *length) {
241   int nmsgs = 0;
242   int i, j;
243   int index = 0;
244
245   ComlibPrintf("[%d] In Extract And Pack All\n", CkMyPe());
246
247   //Increment magic to detect duplicate messages
248   magic++;
249
250   register int total_msg_size = 0;
251
252   //first compute size
253   for (i=0;i<NumPes;i++) {
254     index = i;
255     for (j=msgnum[index]-1; (j>=0); j--) {
256       if (PeList[index][j]->magic != magic) {                
257         total_msg_size += ALIGN8(PeList[index][j]->msgsize);
258         total_msg_size += 2 * sizeof(int);
259         PeList[index][j]->magic=magic;
260
261         nmsgs ++;
262       }            
263     }
264   }
265     
266   total_msg_size += ALIGN8(sizeof(AllToAllHdr));
267
268   ComlibPrintf("[%d] Message Size %d, nmsgs %d **%d**\n", CkMyPe(), total_msg_size, nmsgs, sizeof(AllToAllHdr));
269     
270   //poiter to the combined message, UGLY NAME
271   char *p = (char *) CmiAlloc(total_msg_size * sizeof(char));    
272
273   ComlibPrintf("After cmialloc\n");
274
275   //buffer to copy stuff into
276   char *t = p; 
277   char *junk = NULL;
278     
279   int dummy = 0;
280     
281   int refno = 0;
282
283   AllToAllHdr ahdr;
284   ahdr.size = *length;
285   ahdr.refno = refno;
286   ahdr.id = id;
287   ahdr.ufield = ufield;
288   ahdr.nmsgs = nmsgs;
289
290   /*
291     PACKINT(refno);    
292     PACK(comID, id);
293       
294     PACKINT(ufield);
295     PACKINT(nmsgs);
296     //    PACKINT(dummy); //Aligning to 8 bytes
297     */
298
299   PACK(AllToAllHdr, ahdr);   
300
301   int msg_offset = ALIGN8(sizeof(AllToAllHdr));
302     
303   //Increment magic again for creating the message
304   magic++;
305   for (i=0;i<NumPes;i++) {
306     index=i;
307     int ref = 0;
308     int size;
309
310     for (j=msgnum[index]-1; (j>=0); j--) {
311       //Check if it is a duplicate
312       if (PeList[index][j]->magic != magic) {                
313         size = PeList[index][j]->msgsize;
314         PACKMSG(&size, sizeof(int));
315         PACKMSG(&ref, sizeof(int));
316         PeList[index][j]->magic=magic;
317         PACKMSG(PeList[index][j]->msg, size);
318
319         msg_offset = ALIGN8(msg_offset);
320       }
321
322       //Free it when all the processors have gotten rid of it
323       if (--(PeList[index][j]->refCount) <=0) {
324         ComlibPrintf("before cmifree \n");
325         CmiFree(PeList[index][j]->msg);   
326         ComlibPrintf("after cmifree \n");
327
328         PTFREE(PeList[index][j]);
329       }
330       //Assign the current processors message pointer to NULL
331       PeList[index][j] = NULL;
332     }
333     msgnum[index] = 0;
334   }
335     
336   *length = total_msg_size;
337   return p;
338 }
339
340 #undef UNPACK
341 #define UNPACK(type,data) {junk=(char *)&(data); memcpy(junk, t, sizeof(type));t+=sizeof(type);}
342 #undef UNPACKMSG
343 #define UNPACKMSG(dest,src, size) { memcpy(dest, src, size); offset += size;}
344
345 int PeTable :: UnpackAndInsert(void *in) {
346   char *junk;
347   char *t =(char *)in + CmiReservedHeaderSize;
348   int i, ufield, npe, pe, nummsgs, ptrlistindex=0;
349   char *msgend, *msgcur;
350   comID id;
351   int refno = 0;
352   int msgsize;
353
354   register int offset;
355
356   UNPACK(int, msgsize);
357   UNPACK(int, refno);
358   ComlibPrintf("%d UnPackAndInsert\n", CkMyPe());
359   UNPACK(comID, id);
360   UNPACK(int, ufield);
361   UNPACK(int, nummsgs);
362   UNPACK(int, npe);
363
364   // unpack all messages into an array
365   msgend = (char*)in + msgsize;
366   msgcur = (char*)in + ALIGN8(sizeof(struct AllToAllHdr) + (npe+nummsgs+1)*sizeof(int));
367   while (msgcur < msgend) {
368     CmiChunkHeader *ch = (CmiChunkHeader *)msgcur;
369
370     PTinfo *temp;
371     PTALLOC(temp);
372     temp->msgsize=ch->size;
373     temp->msg=(void*)&ch[1];
374     temp->refCount=0;
375     temp->magic=0;
376     temp->offset=0;
377
378     ptrvec.insert(++ptrlistindex, temp);
379
380     // fix the ref field of the message
381     ch->ref = (int)((char*)in - (char*)temp->msg);
382     msgcur += ALIGN8(ch->size) + sizeof(CmiChunkHeader);
383   }
384
385   pe = -1;
386   //for (i=0;i<npe;i++) {
387   for (i=0; i<nummsgs; ++i) {
388     //UNPACK(int, pe);
389     //pe *= -1;
390
391     UNPACK(int, offset);
392     if (offset <= 0) {
393       pe = -1 * offset;
394       --i;
395       continue;
396     }
397
398     if (msgnum[pe] >= MaxSize[pe]) {
399       REALLOC(PeList[pe], MaxSize[pe]);
400       MaxSize[pe] *= 2;
401     }
402     PeList[pe][msgnum[pe]] = ptrvec[offset];
403     (ptrvec[offset])->refCount++;
404     msgnum[pe]++;
405
406     /*
407       while (offset > 0) {
408       int tempmsgsize;
409       UNPACKMSG(&(tempmsgsize), (char *)in+offset, sizeof(int));
410       int ptr;
411       UNPACKMSG(&ptr, (char *)in+offset, sizeof(int));
412
413       if (ptr >=0 )  {
414       if (msgnum[pe] >= MaxSize[pe]) {
415       REALLOC(PeList[pe], MaxSize[pe]);
416       MaxSize[pe] *= 2;
417       }
418       PeList[pe][msgnum[pe]]=ptrvec[ptr];
419       (ptrvec[ptr])->refCount++;
420       msgnum[pe]++;
421
422       UNPACK(int, offset);
423       continue;
424       }
425             
426       PTinfo *temp;
427       PTALLOC(temp);
428       temp->msgsize=tempmsgsize;
429       temp->refCount=1;
430       temp->magic=0;
431       temp->offset=0;
432
433       ptrvec.insert(ptrlistindex, temp);
434       memcpy((char *)in+offset-sizeof(int), &ptrlistindex, sizeof(int));
435
436       ptrlistindex++;
437       temp->msg=(void *)((char *)in+offset);
438       if (msgnum[pe] >= MaxSize[pe]) {
439
440       REALLOC(PeList[pe], MaxSize[pe]);
441       MaxSize[pe] *= 2;
442       }
443       PeList[pe][msgnum[pe]]=temp;
444       msgnum[pe]++;
445       UNPACK(int, offset);
446       //}
447       //t -=sizeof(int);
448       }
449       *(int *)((char *)in -sizeof(int))=ptrlistindex; 
450       */
451   }
452
453   REFFIELD(in) = ptrlistindex;
454   if (ptrlistindex==0) {
455     REFFIELD(in) = 1;
456     CmiFree(in);
457   }
458
459   /*  
460       for (i=0;i<ptrlistindex;i++) {
461       char * actualmsg=(char *)(ptrvec[i]->msg);
462       int *rc=(int *)(actualmsg-sizeof(int));
463       *rc=(int)((char *)in-actualmsg);
464       //ComlibPrintf("I am inserting %d\n", *rc);
465       }
466   */
467
468   ptrvec.removeAll();
469   
470   return(ufield);
471 }
472
473 /* Unpack and insert an all to all message, the router provides the
474    list of processors to insert into.
475    Same protocol as mentioned earlier.
476 */
477
478 int PeTable :: UnpackAndInsertAll(void *in, int npes, int *pelist) {
479   char *junk;
480   char *t =(char *)in /*+CmiReservedHeaderSize*/;
481   int i,  
482     ufield,   // user field or ths stage of the iteration 
483     nmsgs,    // number of messages in combo message
484     refno,    // reference number
485     dummy;    // alignment dummy
486   
487   comID id;
488
489   /*
490     UNPACK(int, refno);      
491     UNPACK(comID, id);
492     
493     UNPACK(int, ufield);
494     UNPACK(int, nmsgs);
495     //UNPACK(int, dummy);
496     int header_size = sizeof(comID) + CmiReservedHeaderSize + 3 *sizeof(int);
497     if(header_size % 8 != 0)
498     t+= 8 - header_size % 8;
499   */
500
501   AllToAllHdr ahdr;
502   UNPACK(AllToAllHdr, ahdr);
503
504   if(sizeof(AllToAllHdr) & 7 != 0)
505     t += 8 - sizeof(AllToAllHdr) & 7;
506
507   refno = ahdr.refno;
508   id = ahdr.id;
509   nmsgs = ahdr.nmsgs;
510   ufield = ahdr.ufield;
511
512   ComlibPrintf("[%d] unpack and insert all %d, %d\n", CkMyPe(), ufield, nmsgs);
513   
514   //Inserting a memory foot print may, change later
515   CmiChunkHeader *chdr= (CmiChunkHeader*)((char*)in - sizeof(CmiChunkHeader));
516
517   int *ref;
518   int size;
519   char *msg;
520   for(int count = 0; count < nmsgs; count++){
521
522     t += sizeof(CmiChunkHeader);
523     msg = t;
524
525     // Get the size of the message, and set the ref field correctly for CmiFree
526     size = SIZEFIELD(msg);
527     REFFIELD(msg) = (int)((char *)in - (char *)msg);
528
529     t += ALIGN8(size);
530
531     // Do CmiReference(msg), this is done bypassing converse!
532     chdr->ref++;
533
534     /*
535       UNPACK(int, size);
536       ref = (int *)t;
537       t += sizeof(int);
538     
539       *ref = (int)((char *)(&chdr->ref) - (char *)ref);
540       chdr->ref ++;
541
542       ComlibPrintf("ref = %d, global_ref = %d\n", *ref, chdr->ref);
543     
544       msg = t;
545       t += ALIGN8(size);
546     */
547     InsertMsgs(npes, pelist, size, msg);
548   }  
549
550   CmiFree(in);
551   return ufield;
552 }
553
554 /*
555  * Added by Filippo, 2005/04/18 to allow a zero-copy sending, through the
556  * CmiVectorSend functions.
557  */
558 PTvectorlist PeTable :: ExtractAndVectorize(comID id, int ufield, int npe, int *pelist) {
559   char *junk;
560   int nummsgs = 0;
561   int offset, num_distinctmsgs;
562   int i, j;
563
564   for (i=0; i<npe; ++i) nummsgs += msgnum[pelist[i]];
565
566   ComlibPrintf("%d In ExtractAndVectorize %d, %d\n", CmiMyPe(), npe, nummsgs); 
567
568   if (nummsgs ==0) {
569     ComlibPrintf("Returning NULL\n");
570     return(NULL);
571   }
572     
573   int headersize = sizeof(struct AllToAllHdr) + (npe + nummsgs + 1) * sizeof(int);
574   //int msg_offset = CmiReservedHeaderSize + sizeof(comID) 
575   //    + (npe + 4 + nummsgs) * sizeof(int);  
576
577   char *p;
578   p=(char *)CmiAlloc(headersize);
579
580   char *t = p + CmiReservedHeaderSize;
581   if (!p) CmiAbort("Big time problem\n");
582   magic++;
583
584   int refno = id.refno;    
585
586   // SHOULD PACK THE SIZE, which is not available: fix elan and undo this cvs update
587   PACK(int, refno);
588   PACK(comID, id);
589   PACK(int, ufield);
590   PACK(int, nummsgs);
591   PACK(int, npe);
592     
593   int lesspe=0;
594   int npacked = 0;
595   for (i=0;i<npe;i++) {
596     int index=pelist[i];
597
598     if (msgnum[index]<=0) {
599       lesspe++;
600             
601       ComlibPrintf("[%d] msgnum[index]<=0 !!!!!\n", CkMyPe());
602       continue;
603     }
604         
605     ComlibPrintf("%d Packing pelist[%d]\n", CkMyPe(), index);
606     register int newval=-1*pelist[i];
607     PACK(int, newval); 
608
609     for (j=0;j<msgnum[index];j++) {
610       if (PeList[index][j]->magic == magic) {
611         offset=(PeList[index][j]->offset);
612       }
613       else {
614         npacked ++;
615                 
616         //offset=msg_offset;
617         offset=npacked;
618         PeList[index][j]->magic=magic;
619         PeList[index][j]->offset=offset;
620         PTinfo *tempmsg=PeList[index][j];
621
622         ptrvec.insert(npacked, tempmsg);
623       }
624       
625       //ComlibPrintf("%d Packing msg_offset=%d\n", CkMyPe(), offset);
626       PACK(int, offset); 
627
628       --(PeList[index][j]->refCount);
629       /*
630         if (--(PeList[index][j]->refCount) <=0) {
631         CmiFree(PeList[index][j]->msg);
632                 
633         PTFREE(PeList[index][j]);
634         }
635       */
636       PeList[index][j]=NULL;
637     }
638     msgnum[index]=0;
639   }
640   //offset=-1;
641   //PACK(int, offset);
642
643   // See petable.h for a description of this structure
644   PTvectorlist result = (PTvectorlist)CmiAlloc(sizeof(struct ptvectorlist) +
645                                                (npacked+1)*2*sizeof(char*) +
646                                                2*sizeof(CmiChunkHeader));
647   result->count = npacked + 1;
648   result->sizes = (int*)((char*)result + sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
649   result->msgs  = (char**)((char*)result->sizes + (npacked+1)*sizeof(int) + sizeof(CmiChunkHeader));
650
651   SIZEFIELD(result->sizes) = (npacked+1)*sizeof(int);
652   REFFIELD(result->sizes) = - (sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
653   SIZEFIELD(result->msgs) = (npacked+1)*sizeof(int);
654   REFFIELD(result->msgs) = - (sizeof(struct ptvectorlist) + (npacked+1)*sizeof(int) + 2*sizeof(CmiChunkHeader));
655   CmiReference(result);
656
657   result->sizes[0] = headersize;
658   result->msgs[0] = p;
659   PTinfo *temp;
660   for (i=1; i<=npacked; ++i) {
661     temp = ptrvec[i];
662     result->sizes[i] = temp->msgsize;
663     result->msgs[i] = (char*)temp->msg;
664     // if there is still reference we CmiReference the message so it does not get deleted
665     // otherwise we free also the PTinfo struct use to hold it
666     if (temp->refCount > 0) CmiReference(result->msgs[i]);
667     else PTFREE(temp);
668   }
669
670   ptrvec.removeAll();
671
672   /*    
673         if (lesspe) {
674         t=p+CmiReservedHeaderSize+2*sizeof(int) + sizeof(comID);
675         npe=npe-lesspe;
676         PACK(int, npe);
677         }
678   */
679
680   //return(p);
681
682   return result;
683 }
684
685 /*
686  * Added by Filippo, 2005/04/18 to allow a zero-copy sending, through the
687  * CmiVectorSend functions.
688  */
689 PTvectorlist PeTable :: ExtractAndVectorizeAll(comID id, int ufield) {
690   int nmsgs = 0, i, j;
691   int index = 0;
692
693   ComlibPrintf("[%d] In Extract And Vectorize All\n", CkMyPe());
694
695   //Increment magic to detect duplicate messages
696   magic++;
697
698   //register int total_msg_size = 0;
699
700   //first compute size
701   for (i=0;i<NumPes;i++) {
702     index = i;
703     for (j=msgnum[index]-1; (j>=0); j--) {
704       if (PeList[index][j]->magic != magic) {                
705         //total_msg_size += ALIGN8(PeList[index][j]->msgsize);
706         //total_msg_size += 2 * sizeof(int);
707         PeList[index][j]->magic=magic;
708         ptrvec.insert(nmsgs, PeList[index][j]);
709         PeList[index][j] = NULL;
710         nmsgs ++;
711       }
712       --(PeList[index][j]->refCount);
713     }
714   }
715
716   //total_msg_size += ALIGN8(sizeof(AllToAllHdr));
717
718   ComlibPrintf("[%d] nmsgs %d **%d**\n", CkMyPe(), nmsgs, sizeof(AllToAllHdr));
719     
720   //poiter to the message header
721   AllToAllHdr *ahdr = (AllToAllHdr *) CmiAlloc(sizeof(struct AllToAllHdr));
722
723   ComlibPrintf("After cmialloc\n");
724
725   /*
726   //buffer to copy stuff into
727   char *t = p; 
728   char *junk = NULL;
729     
730   int dummy = 0;
731   */
732
733   int refno = 0;
734
735   ahdr->refno = refno;
736   ahdr->id = id;
737   ahdr->ufield = ufield;
738   ahdr->nmsgs = nmsgs;
739
740   /*
741     PACKINT(refno);    
742     PACK(comID, id);
743       
744     PACKINT(ufield);
745     PACKINT(nmsgs);
746     //    PACKINT(dummy); //Aligning to 8 bytes
747     */
748
749   // See petable.h for a description of this structure
750   PTvectorlist result = (PTvectorlist)CmiAlloc(sizeof(struct ptvectorlist) +
751                                                (nmsgs+1)*2*sizeof(char*) +
752                                                2*sizeof(CmiChunkHeader));
753   result->count = nmsgs + 1;
754   result->sizes = (int*)((char*)result + sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
755   result->msgs  = (char**)((char*)result->sizes + (nmsgs+1)*sizeof(int) + sizeof(CmiChunkHeader));
756
757   SIZEFIELD(result->sizes) = (nmsgs+1)*sizeof(int);
758   REFFIELD(result->sizes) = - (sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
759   SIZEFIELD(result->msgs) = (nmsgs+1)*sizeof(int);
760   REFFIELD(result->msgs) = - (sizeof(struct ptvectorlist) + (nmsgs+1)*sizeof(int) + 2*sizeof(CmiChunkHeader));
761   CmiReference(result);
762
763   result->sizes[0] = sizeof(ahdr);
764   result->msgs[0] = (char*)ahdr;
765   PTinfo *temp;
766   for (i=1; i<nmsgs; ++i) {
767     temp = ptrvec[i];
768     result->sizes[i] = temp->msgsize;
769     result->msgs[i] = (char*)temp->msg;
770     // if there is still reference we CmiReference the message so it does not get deleted
771     // otherwise we free also the PTinfo struct use to hold it
772     if (temp->refCount > 0) CmiReference(result->msgs[i]);
773     else PTFREE(temp);
774   }
775
776   ptrvec.removeAll();
777
778   return result;
779
780   /*
781   PACK(AllToAllHdr, ahdr);   
782
783   int msg_offset = ALIGN8(sizeof(AllToAllHdr));
784     
785   //Increment magic again for creating the message
786   magic++;
787   for (i=0;i<NumPes;i++) {
788     index=i;
789     int ref = 0;
790     int size;
791
792     for (j=msgnum[index]-1; (j>=0); j--) {
793       //Check if it is a duplicate
794       if (PeList[index][j]->magic != magic) {                
795         size = PeList[index][j]->msgsize;
796         PACKMSG(&size, sizeof(int));
797         PACKMSG(&ref, sizeof(int));
798         PeList[index][j]->magic=magic;
799         PACKMSG(PeList[index][j]->msg, size);
800
801         msg_offset = ALIGN8(msg_offset);
802       }
803
804       //Free it when all the processors have gotten rid of it
805       if (--(PeList[index][j]->refCount) <=0) {
806         ComlibPrintf("before cmifree \n");
807         CmiFree(PeList[index][j]->msg);   
808         ComlibPrintf("after cmifree \n");
809
810         PTFREE(PeList[index][j]);
811       }
812       //Assign the current processors message pointer to NULL
813       PeList[index][j] = NULL;
814     }
815     msgnum[index] = 0;
816   }
817     
818   *length = total_msg_size;
819   return p;
820   */
821 }
822
823 void PeTable :: GarbageCollect() {
824 }
825