fixing for ft
[charm.git] / src / conv-com / petable.C
1 /**
2    @addtogroup ConvComlib
3    @{
4    @file 
5    @brief Tables of messages for PEs 
6 */
7
8 #include <string.h>
9 #include <stdlib.h>
10 //#include <converse.h>
11 //#include "convcomlib.h"
12 #include "petable.h"
13 //#include "converse.h"
14
15 #define BIGBUFFERSIZE 65536
16 #define PTPREALLOC    100
17
18 struct AllToAllHdr {
19   char dummy[CmiReservedHeaderSize];
20   int refno;
21   comID id;
22   int size;
23   int ufield;
24   int nmsgs;
25 };
26
27
28 /**************************************************************
29  * Preallocated memory=P*MSGQLEN ptr + 2P ints + 1000 ptrs
30  **************************************************************/
31 PeTable :: PeTable(int n) {
32   NumPes=n;
33   magic=0;
34   PeList = (PTinfo ***)CmiAlloc(sizeof(PTinfo *)*NumPes);
35   //  ComlibPrintf("Pelist[%d][%d]\n", NumPes, MSGQLEN);
36   msgnum=new int[NumPes];
37   MaxSize=new int[NumPes];
38   for (int i=0;i<NumPes;i++) {
39     msgnum[i]=0;
40     MaxSize[i]=MSGQLEN;
41     PeList[i]=(PTinfo **)CmiAlloc(sizeof(PTinfo *)*MSGQLEN);
42     for (int j=0;j<MSGQLEN;j++) PeList[i][j]=0;
43   }
44
45   //ptrlist=(PTinfo **)CmiAlloc(1000*sizeof(PTinfo *));
46   //  FreeList= new GList;
47   //CombBuffer=(char *)CmiAlloc(BIGBUFFERSIZE);
48
49   PTFreeList=NULL;
50   PTFreeChunks=NULL;
51 }
52
53 PeTable :: ~PeTable() {
54   int i;
55   for (i=0;i<NumPes;i++) CmiFree(PeList[i]);
56   CmiFree(PeList);
57   delete msgnum;
58   delete MaxSize;
59   GarbageCollect();
60   //CmiFree(ptrlist);
61   PTinfo *tmp;
62   while (PTFreeChunks) {
63     tmp=PTFreeChunks;
64     PTFreeChunks=PTNEXTCHUNK(tmp);
65     CmiFree(tmp);
66   }
67   // delete FreeList;
68
69 }
70
71 void PeTable:: Purge() {
72   for (int i=0; i<NumPes;i++) {
73     if (msgnum[i]) {
74       // ComlibPrintf("%d Warning: %d Undelivered Messages for %d\n", CkMyPe(), msgnum[i], i);
75       //msgnum[i]=0;
76     }
77   }
78   GarbageCollect();
79   //  ComlibPrintf("combcount = %d\n", combcount);
80   //combcount = 0;
81 }
82
83 void PeTable :: ExtractAndDeliverLocalMsgs(int index, Strategy *myStrat) {
84   int j;
85   msgstruct m;
86
87   ComlibPrintf("%d:Delivering %d local messages\n", CkMyPe(), msgnum[index]);
88   for (j=msgnum[index]-1;(j>=0);j--) {
89
90     m.msgsize=PeList[index][j]->msgsize;
91     m.msg=PeList[index][j]->msg;
92
93     if (--(PeList[index][j]->refCount) <=0) {
94       //CmiSyncSendAndFree(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
95       myStrat->deliver((char*)m.msg, m.msgsize);
96       PTFREE(PeList[index][j]);
97     }
98     else {
99       char *dupmsg = (char*)CmiAlloc(m.msgsize);
100       memcpy(dupmsg, m.msg, m.msgsize);
101       myStrat->deliver(dupmsg, m.msgsize);
102       //CmiSyncSend(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
103     }
104     PeList[index][j]=NULL;
105   }
106   msgnum[index]=j+1;
107
108   return;
109 }
110
111
112 #undef PACK
113 #undef PACKMSG
114 //#define PACKINT(data) {((int*)t)[0] = data; t+=sizeof(int);}
115 #define PACK(type,data) {junk=(char *)&(data); memcpy(t, junk, sizeof(type)); t+=sizeof(type);}
116 #define PACKMSG(data, size) { memcpy(p+msg_offset, (data), size); msg_offset += size; }
117
118 /*
119   Protocol:
120   |     AllToAllHdr      |npe|ChunkHdr1|msg1|ChunkHdr2|msg2|...
121   |ref|comid|ufield|nmsgs|
122 */
123 char * PeTable ::ExtractAndPack(comID id, int ufield, int npe, 
124                                 int *pelist, int *length) {
125   char *junk;
126   int nummsgs, offset, num_distinctmsgs;
127         
128   int tot_msgsize=TotalMsgSize(npe, pelist, &nummsgs, &num_distinctmsgs);
129
130   ComlibPrintf("%d In ExtractAndPack %d, %d\n", CmiMyPe(), npe, nummsgs); 
131
132   if (tot_msgsize ==0) {
133     *length=0;
134         
135     ComlibPrintf("Returning NULL\n");
136     return(NULL);
137   }
138     
139   int msg_offset = sizeof(struct AllToAllHdr) + (npe + nummsgs + 1) * sizeof(int);
140   //int msg_offset = CmiReservedHeaderSize + sizeof(comID) 
141   //    + (npe + 4 + nummsgs) * sizeof(int);  
142
143   msg_offset = ALIGN8(msg_offset);
144     
145   *length = tot_msgsize;
146   *length += msg_offset;
147   char *p;
148   p=(char *)CmiAlloc(*length);
149
150   char *t = p + CmiReservedHeaderSize;
151   int i, j;
152   if (!p) CmiAbort("Big time problem\n");
153   magic++;
154
155   int refno = id.refno;    
156
157   PACK(int, refno);
158   PACK(comID, id);
159   PACK(int, *length);
160   PACK(int, ufield);
161   PACK(int, nummsgs);
162   PACK(int, npe);
163     
164   int lesspe=0;
165   int npacked = 0;
166   for (i=0;i<npe;i++) {
167     int index=pelist[i];
168
169     if (msgnum[index]<=0) {
170       lesspe++;
171             
172       ComlibPrintf("[%d] msgnum[index]<=0 !!!!!\n", CkMyPe());
173       continue;
174     }
175         
176     ComlibPrintf("%d Packing pelist[%d]\n", CkMyPe(), index);
177     register int newval=-1*pelist[i];
178     PACK(int, newval); 
179
180     for (j=0;j<msgnum[index];j++) {
181       if (PeList[index][j]->magic == magic) {
182         offset=(PeList[index][j]->offset);
183       }
184       else {
185         npacked ++;
186                 
187         //offset=msg_offset;
188         offset=npacked;
189         PeList[index][j]->magic=magic;
190         PeList[index][j]->offset=offset;
191         PTinfo *tempmsg=PeList[index][j];
192                 
193         CmiChunkHeader hdr;
194         hdr.size = tempmsg->msgsize;
195         hdr.ref = -1;
196         PACKMSG(&hdr, sizeof(CmiChunkHeader));
197         PACKMSG(tempmsg->msg, tempmsg->msgsize);
198
199         msg_offset = ALIGN8(msg_offset);
200       }
201             
202       //ComlibPrintf("%d Packing msg_offset=%d\n", CkMyPe(), offset);
203       PACK(int, offset); 
204
205       if (--(PeList[index][j]->refCount) <=0) {
206         CmiFree(PeList[index][j]->msg);
207                 
208         PTFREE(PeList[index][j]);
209       }
210       PeList[index][j]=NULL;
211     }
212     msgnum[index]=0;
213   }
214   //offset=-1;
215   //PACK(int, offset);
216
217   /*    
218         if (lesspe) {
219         t=p+CmiReservedHeaderSize+2*sizeof(int) + sizeof(comID);
220         npe=npe-lesspe;
221         PACK(int, npe);
222         }
223   */
224
225   return(p);
226
227
228 /*Used for all to all multicast operations.  Assumes that each message
229   is destined to all the processors, to speed up all to all
230   substantially --Sameer 09/03/03 
231   
232   Protocol:
233   |     AllToAllHdr      |ChunkHdr1|msg1|ChunkHdr2|msg2|...
234   |ref|comid|ufield|nmsgs|
235 */
236
237 char * PeTable ::ExtractAndPackAll(comID id, int ufield, int *length) {
238   int nmsgs = 0;
239   int i, j;
240   int index = 0;
241
242   ComlibPrintf("[%d] In Extract And Pack All\n", CkMyPe());
243
244   //Increment magic to detect duplicate messages
245   magic++;
246
247   register int total_msg_size = 0;
248
249   //first compute size
250   for (i=0;i<NumPes;i++) {
251     index = i;
252     for (j=msgnum[index]-1; (j>=0); j--) {
253       if (PeList[index][j]->magic != magic) {                
254         total_msg_size += ALIGN8(PeList[index][j]->msgsize);
255         total_msg_size += 2 * sizeof(int);
256         PeList[index][j]->magic=magic;
257
258         nmsgs ++;
259       }            
260     }
261   }
262     
263   total_msg_size += ALIGN8(sizeof(AllToAllHdr));
264
265   ComlibPrintf("[%d] Message Size %d, nmsgs %d **%d**\n", CkMyPe(), total_msg_size, nmsgs, sizeof(AllToAllHdr));
266     
267   //poiter to the combined message, UGLY NAME
268   char *p = (char *) CmiAlloc(total_msg_size * sizeof(char));    
269
270   ComlibPrintf("After cmialloc\n");
271
272   //buffer to copy stuff into
273   char *t = p; 
274   char *junk = NULL;
275     
276   int refno = 0;
277
278   AllToAllHdr ahdr;
279   ahdr.refno = refno;
280   ahdr.id = id;
281   ahdr.size = *length;
282   ahdr.ufield = ufield;
283   ahdr.nmsgs = nmsgs;
284
285   /*
286     PACKINT(refno);    
287     PACK(comID, id);
288       
289     PACKINT(ufield);
290     PACKINT(nmsgs);
291     //    PACKINT(dummy); //Aligning to 8 bytes
292     */
293
294   PACK(AllToAllHdr, ahdr);   
295
296   int msg_offset = ALIGN8(sizeof(AllToAllHdr));
297     
298   //Increment magic again for creating the message
299   magic++;
300   for (i=0;i<NumPes;i++) {
301     index=i;
302     int ref = 0;
303     int size;
304
305     for (j=msgnum[index]-1; (j>=0); j--) {
306       //Check if it is a duplicate
307       if (PeList[index][j]->magic != magic) {                
308         size = PeList[index][j]->msgsize;
309         PACKMSG(&size, sizeof(int));
310         PACKMSG(&ref, sizeof(int));
311         PeList[index][j]->magic=magic;
312         PACKMSG(PeList[index][j]->msg, size);
313
314         msg_offset = ALIGN8(msg_offset);
315       }
316
317       //Free it when all the processors have gotten rid of it
318       if (--(PeList[index][j]->refCount) <=0) {
319         ComlibPrintf("before cmifree \n");
320         CmiFree(PeList[index][j]->msg);   
321         ComlibPrintf("after cmifree \n");
322
323         PTFREE(PeList[index][j]);
324       }
325       //Assign the current processors message pointer to NULL
326       PeList[index][j] = NULL;
327     }
328     msgnum[index] = 0;
329   }
330     
331   *length = total_msg_size;
332   return p;
333 }
334
335 #undef UNPACK
336 #define UNPACK(type,data) {junk=(char *)&(data); memcpy(junk, t, sizeof(type));t+=sizeof(type);}
337 #undef UNPACKMSG
338 #define UNPACKMSG(dest,src, size) { memcpy(dest, src, size); offset += size;}
339
340 int PeTable :: UnpackAndInsert(void *in) {
341   char *junk;
342   char *t =(char *)in + CmiReservedHeaderSize;
343   int i, ufield, npe, pe, nummsgs, ptrlistindex=0;
344   char *msgend, *msgcur;
345   comID id;
346   int refno = 0;
347   int msgsize;
348
349   register int offset;
350
351   UNPACK(int, refno);
352   ComlibPrintf("%d UnPackAndInsert\n", CkMyPe());
353   UNPACK(comID, id);
354   UNPACK(int, msgsize);
355   UNPACK(int, ufield);
356   UNPACK(int, nummsgs);
357   UNPACK(int, npe);
358
359   // unpack all messages into an array
360   msgend = (char*)in + msgsize;
361   msgcur = (char*)in + ALIGN8(sizeof(struct AllToAllHdr) + (npe+nummsgs+1)*sizeof(int));
362   while (msgcur < msgend) {
363     CmiChunkHeader *ch = (CmiChunkHeader *)msgcur;
364
365     PTinfo *temp;
366     PTALLOC(temp);
367     temp->msgsize=ch->size;
368     temp->msg=(void*)&ch[1];
369     temp->refCount=0;
370     temp->magic=0;
371     temp->offset=0;
372
373     ptrvec.insert(++ptrlistindex, temp);
374
375     // fix the ref field of the message
376     ch->ref = (int)((char*)in - (char*)temp->msg);
377     msgcur += ALIGN8(ch->size) + sizeof(CmiChunkHeader);
378   }
379
380   pe = -1;
381   //for (i=0;i<npe;i++) {
382   for (i=0; i<nummsgs; ++i) {
383     //UNPACK(int, pe);
384     //pe *= -1;
385
386     UNPACK(int, offset);
387     if (offset <= 0) {
388       pe = -1 * offset;
389       --i;
390       continue;
391     }
392
393     if (msgnum[pe] >= MaxSize[pe]) {
394       REALLOC(PeList[pe], MaxSize[pe]);
395       MaxSize[pe] *= 2;
396     }
397     PeList[pe][msgnum[pe]] = ptrvec[offset];
398     (ptrvec[offset])->refCount++;
399     msgnum[pe]++;
400
401     /*
402       while (offset > 0) {
403       int tempmsgsize;
404       UNPACKMSG(&(tempmsgsize), (char *)in+offset, sizeof(int));
405       int ptr;
406       UNPACKMSG(&ptr, (char *)in+offset, sizeof(int));
407
408       if (ptr >=0 )  {
409       if (msgnum[pe] >= MaxSize[pe]) {
410       REALLOC(PeList[pe], MaxSize[pe]);
411       MaxSize[pe] *= 2;
412       }
413       PeList[pe][msgnum[pe]]=ptrvec[ptr];
414       (ptrvec[ptr])->refCount++;
415       msgnum[pe]++;
416
417       UNPACK(int, offset);
418       continue;
419       }
420             
421       PTinfo *temp;
422       PTALLOC(temp);
423       temp->msgsize=tempmsgsize;
424       temp->refCount=1;
425       temp->magic=0;
426       temp->offset=0;
427
428       ptrvec.insert(ptrlistindex, temp);
429       memcpy((char *)in+offset-sizeof(int), &ptrlistindex, sizeof(int));
430
431       ptrlistindex++;
432       temp->msg=(void *)((char *)in+offset);
433       if (msgnum[pe] >= MaxSize[pe]) {
434
435       REALLOC(PeList[pe], MaxSize[pe]);
436       MaxSize[pe] *= 2;
437       }
438       PeList[pe][msgnum[pe]]=temp;
439       msgnum[pe]++;
440       UNPACK(int, offset);
441       //}
442       //t -=sizeof(int);
443       }
444       *(int *)((char *)in -sizeof(int))=ptrlistindex; 
445       */
446   }
447
448   REFFIELD(in) = ptrlistindex;
449   if (ptrlistindex==0) {
450     REFFIELD(in) = 1;
451     CmiFree(in);
452   }
453
454   /*  
455       for (i=0;i<ptrlistindex;i++) {
456       char * actualmsg=(char *)(ptrvec[i]->msg);
457       int *rc=(int *)(actualmsg-sizeof(int));
458       *rc=(int)((char *)in-actualmsg);
459       //ComlibPrintf("I am inserting %d\n", *rc);
460       }
461   */
462
463   ptrvec.removeAll();
464   
465   return(ufield);
466 }
467
468 /* Unpack and insert an all to all message, the router provides the
469    list of processors to insert into.
470    Same protocol as mentioned earlier.
471 */
472
473 int PeTable :: UnpackAndInsertAll(void *in, int npes, int *pelist) {
474   char *junk;
475   char *t =(char *)in /*+CmiReservedHeaderSize*/;
476   int i,  
477     ufield,   // user field or ths stage of the iteration 
478     nmsgs,    // number of messages in combo message
479     refno,    // reference number
480     dummy;    // alignment dummy
481   
482   comID id;
483
484   /*
485     UNPACK(int, refno);      
486     UNPACK(comID, id);
487     
488     UNPACK(int, ufield);
489     UNPACK(int, nmsgs);
490     //UNPACK(int, dummy);
491     int header_size = sizeof(comID) + CmiReservedHeaderSize + 3 *sizeof(int);
492     if(header_size % 8 != 0)
493     t+= 8 - header_size % 8;
494   */
495
496   AllToAllHdr ahdr;
497   UNPACK(AllToAllHdr, ahdr);
498
499   if((sizeof(AllToAllHdr) & 7) != 0)
500     t += 8 - (sizeof(AllToAllHdr) & 7);
501
502   refno = ahdr.refno;
503   id = ahdr.id;
504   nmsgs = ahdr.nmsgs;
505   ufield = ahdr.ufield;
506
507   ComlibPrintf("[%d] unpack and insert all %d, %d\n", CkMyPe(), ufield, nmsgs);
508   
509   //Inserting a memory foot print may, change later
510   CmiChunkHeader *chdr= (CmiChunkHeader*)((char*)in - sizeof(CmiChunkHeader));
511
512   int *ref;
513   int size;
514   char *msg;
515   for(int count = 0; count < nmsgs; count++){
516
517     t += sizeof(CmiChunkHeader);
518     msg = t;
519
520     // Get the size of the message, and set the ref field correctly for CmiFree
521     size = SIZEFIELD(msg);
522     REFFIELD(msg) = (int)((char *)in - (char *)msg);
523
524     t += ALIGN8(size);
525
526     // Do CmiReference(msg), this is done bypassing converse!
527     chdr->ref++;
528
529     /*
530       UNPACK(int, size);
531       ref = (int *)t;
532       t += sizeof(int);
533     
534       *ref = (int)((char *)(&chdr->ref) - (char *)ref);
535       chdr->ref ++;
536
537       ComlibPrintf("ref = %d, global_ref = %d\n", *ref, chdr->ref);
538     
539       msg = t;
540       t += ALIGN8(size);
541     */
542     InsertMsgs(npes, pelist, size, msg);
543   }  
544
545   CmiFree(in);
546   return ufield;
547 }
548
549 /*
550  * Added by Filippo, 2005/04/18 to allow a zero-copy sending, through the
551  * CmiVectorSend functions.
552  */
553 PTvectorlist PeTable :: ExtractAndVectorize(comID id, int ufield, int npe, int *pelist) {
554   char *junk;
555   int nummsgs = 0;
556   int offset;
557   int i, j;
558
559   for (i=0; i<npe; ++i) nummsgs += msgnum[pelist[i]];
560
561   ComlibPrintf("%d In ExtractAndVectorize %d, %d\n", CmiMyPe(), npe, nummsgs); 
562
563   if (nummsgs ==0) {
564     ComlibPrintf("Returning NULL\n");
565     return(NULL);
566   }
567     
568   int headersize = sizeof(struct AllToAllHdr) + (npe + nummsgs + 1) * sizeof(int);
569   //int msg_offset = CmiReservedHeaderSize + sizeof(comID) 
570   //    + (npe + 4 + nummsgs) * sizeof(int);  
571
572   char *p;
573   p=(char *)CmiAlloc(headersize);
574
575   char *t = p + CmiReservedHeaderSize;
576   if (!p) CmiAbort("Big time problem\n");
577   magic++;
578
579   int refno = id.refno;    
580
581   // SHOULD PACK THE SIZE, which is not available: fix elan and undo this cvs update
582   PACK(int, refno);
583   PACK(comID, id);
584   PACK(int, ufield);
585   PACK(int, nummsgs);
586   PACK(int, npe);
587     
588   int lesspe=0;
589   int npacked = 0;
590   for (i=0;i<npe;i++) {
591     int index=pelist[i];
592
593     if (msgnum[index]<=0) {
594       lesspe++;
595             
596       ComlibPrintf("[%d] msgnum[index]<=0 !!!!!\n", CkMyPe());
597       continue;
598     }
599         
600     ComlibPrintf("%d Packing pelist[%d]\n", CkMyPe(), index);
601     register int newval=-1*pelist[i];
602     PACK(int, newval); 
603
604     for (j=0;j<msgnum[index];j++) {
605       if (PeList[index][j]->magic == magic) {
606         offset=(PeList[index][j]->offset);
607       }
608       else {
609         npacked ++;
610                 
611         //offset=msg_offset;
612         offset=npacked;
613         PeList[index][j]->magic=magic;
614         PeList[index][j]->offset=offset;
615         PTinfo *tempmsg=PeList[index][j];
616
617         ptrvec.insert(npacked, tempmsg);
618       }
619       
620       //ComlibPrintf("%d Packing msg_offset=%d\n", CkMyPe(), offset);
621       PACK(int, offset); 
622
623       --(PeList[index][j]->refCount);
624       /*
625         if (--(PeList[index][j]->refCount) <=0) {
626         CmiFree(PeList[index][j]->msg);
627                 
628         PTFREE(PeList[index][j]);
629         }
630       */
631       PeList[index][j]=NULL;
632     }
633     msgnum[index]=0;
634   }
635   //offset=-1;
636   //PACK(int, offset);
637
638   // See petable.h for a description of this structure
639   PTvectorlist result = (PTvectorlist)CmiAlloc(sizeof(struct ptvectorlist) +
640                                                (npacked+1)*2*sizeof(char*) +
641                                                2*sizeof(CmiChunkHeader));
642   result->count = npacked + 1;
643   result->sizes = (int*)((char*)result + sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
644   result->msgs  = (char**)((char*)result->sizes + (npacked+1)*sizeof(int) + sizeof(CmiChunkHeader));
645
646   SIZEFIELD(result->sizes) = (npacked+1)*sizeof(int);
647   REFFIELD(result->sizes) = - (int)(sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
648   SIZEFIELD(result->msgs) = (npacked+1)*sizeof(int);
649   REFFIELD(result->msgs) = - (sizeof(struct ptvectorlist) + (npacked+1)*sizeof(int) + 2*sizeof(CmiChunkHeader));
650   CmiReference(result);
651
652   result->sizes[0] = headersize;
653   result->msgs[0] = p;
654   PTinfo *temp;
655   for (i=1; i<=npacked; ++i) {
656     temp = ptrvec[i];
657     result->sizes[i] = temp->msgsize;
658     result->msgs[i] = (char*)temp->msg;
659     // if there is still reference we CmiReference the message so it does not get deleted
660     // otherwise we free also the PTinfo struct use to hold it
661     if (temp->refCount > 0) CmiReference(result->msgs[i]);
662     else PTFREE(temp);
663   }
664
665   ptrvec.removeAll();
666
667   /*    
668         if (lesspe) {
669         t=p+CmiReservedHeaderSize+2*sizeof(int) + sizeof(comID);
670         npe=npe-lesspe;
671         PACK(int, npe);
672         }
673   */
674
675   //return(p);
676
677   return result;
678 }
679
680 /*
681  * Added by Filippo, 2005/04/18 to allow a zero-copy sending, through the
682  * CmiVectorSend functions.
683  */
684 PTvectorlist PeTable :: ExtractAndVectorizeAll(comID id, int ufield) {
685   int nmsgs = 0, i, j;
686   int index = 0;
687
688   ComlibPrintf("[%d] In Extract And Vectorize All\n", CkMyPe());
689
690   //Increment magic to detect duplicate messages
691   magic++;
692
693   //register int total_msg_size = 0;
694
695   //first compute size
696   for (i=0;i<NumPes;i++) {
697     index = i;
698     for (j=msgnum[index]-1; (j>=0); j--) {
699       if (PeList[index][j]->magic != magic) {                
700         //total_msg_size += ALIGN8(PeList[index][j]->msgsize);
701         //total_msg_size += 2 * sizeof(int);
702         PeList[index][j]->magic=magic;
703         ptrvec.insert(nmsgs, PeList[index][j]);
704         PeList[index][j] = NULL;
705         nmsgs ++;
706       }
707       --(PeList[index][j]->refCount);
708     }
709   }
710
711   //total_msg_size += ALIGN8(sizeof(AllToAllHdr));
712
713   ComlibPrintf("[%d] nmsgs %d **%d**\n", CkMyPe(), nmsgs, sizeof(AllToAllHdr));
714     
715   //poiter to the message header
716   AllToAllHdr *ahdr = (AllToAllHdr *) CmiAlloc(sizeof(struct AllToAllHdr));
717
718   ComlibPrintf("After cmialloc\n");
719
720   /*
721   //buffer to copy stuff into
722   char *t = p; 
723   char *junk = NULL;
724     
725   int dummy = 0;
726   */
727
728   int refno = 0;
729
730   ahdr->refno = refno;
731   ahdr->id = id;
732   ahdr->ufield = ufield;
733   ahdr->nmsgs = nmsgs;
734
735   /*
736     PACKINT(refno);    
737     PACK(comID, id);
738       
739     PACKINT(ufield);
740     PACKINT(nmsgs);
741     //    PACKINT(dummy); //Aligning to 8 bytes
742     */
743
744   // See petable.h for a description of this structure
745   PTvectorlist result = (PTvectorlist)CmiAlloc(sizeof(struct ptvectorlist) +
746                                                (nmsgs+1)*2*sizeof(char*) +
747                                                2*sizeof(CmiChunkHeader));
748   result->count = nmsgs + 1;
749   result->sizes = (int*)((char*)result + sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
750   result->msgs  = (char**)((char*)result->sizes + (nmsgs+1)*sizeof(int) + sizeof(CmiChunkHeader));
751
752   SIZEFIELD(result->sizes) = (nmsgs+1)*sizeof(int);
753   REFFIELD(result->sizes) = - (int)(sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
754   SIZEFIELD(result->msgs) = (nmsgs+1)*sizeof(int);
755   REFFIELD(result->msgs) = - (sizeof(struct ptvectorlist) + (nmsgs+1)*sizeof(int) + 2*sizeof(CmiChunkHeader));
756   CmiReference(result);
757
758   result->sizes[0] = sizeof(ahdr);
759   result->msgs[0] = (char*)ahdr;
760   PTinfo *temp;
761   for (i=1; i<nmsgs; ++i) {
762     temp = ptrvec[i];
763     result->sizes[i] = temp->msgsize;
764     result->msgs[i] = (char*)temp->msg;
765     // if there is still reference we CmiReference the message so it does not get deleted
766     // otherwise we free also the PTinfo struct use to hold it
767     if (temp->refCount > 0) CmiReference(result->msgs[i]);
768     else PTFREE(temp);
769   }
770
771   ptrvec.removeAll();
772
773   return result;
774
775   /*
776   PACK(AllToAllHdr, ahdr);   
777
778   int msg_offset = ALIGN8(sizeof(AllToAllHdr));
779     
780   //Increment magic again for creating the message
781   magic++;
782   for (i=0;i<NumPes;i++) {
783     index=i;
784     int ref = 0;
785     int size;
786
787     for (j=msgnum[index]-1; (j>=0); j--) {
788       //Check if it is a duplicate
789       if (PeList[index][j]->magic != magic) {                
790         size = PeList[index][j]->msgsize;
791         PACKMSG(&size, sizeof(int));
792         PACKMSG(&ref, sizeof(int));
793         PeList[index][j]->magic=magic;
794         PACKMSG(PeList[index][j]->msg, size);
795
796         msg_offset = ALIGN8(msg_offset);
797       }
798
799       //Free it when all the processors have gotten rid of it
800       if (--(PeList[index][j]->refCount) <=0) {
801         ComlibPrintf("before cmifree \n");
802         CmiFree(PeList[index][j]->msg);   
803         ComlibPrintf("after cmifree \n");
804
805         PTFREE(PeList[index][j]);
806       }
807       //Assign the current processors message pointer to NULL
808       PeList[index][j] = NULL;
809     }
810     msgnum[index] = 0;
811   }
812     
813   *length = total_msg_size;
814   return p;
815   */
816 }
817
818 void PeTable :: GarbageCollect() {
819 }
820
821 /*@}*/