remove ssize_t to make it compile on windows
[charm.git] / src / conv-com / petable.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9    @addtogroup ConvComlib
10    @{
11    @file 
12    @brief Tables of messages for PEs 
13 */
14
15 #include <string.h>
16 #include <stdlib.h>
17 //#include <converse.h>
18 //#include "convcomlib.h"
19 #include "petable.h"
20 //#include "converse.h"
21
22 #define BIGBUFFERSIZE 65536
23 #define PTPREALLOC    100
24
25 struct AllToAllHdr {
26   char dummy[CmiReservedHeaderSize];
27   int refno;
28   comID id;
29   int size;
30   int ufield;
31   int nmsgs;
32 };
33
34
35 /**************************************************************
36  * Preallocated memory=P*MSGQLEN ptr + 2P ints + 1000 ptrs
37  **************************************************************/
38 PeTable :: PeTable(int n) {
39   NumPes=n;
40   magic=0;
41   PeList = (PTinfo ***)CmiAlloc(sizeof(PTinfo *)*NumPes);
42   //  ComlibPrintf("Pelist[%d][%d]\n", NumPes, MSGQLEN);
43   msgnum=new int[NumPes];
44   MaxSize=new int[NumPes];
45   for (int i=0;i<NumPes;i++) {
46     msgnum[i]=0;
47     MaxSize[i]=MSGQLEN;
48     PeList[i]=(PTinfo **)CmiAlloc(sizeof(PTinfo *)*MSGQLEN);
49     for (int j=0;j<MSGQLEN;j++) PeList[i][j]=0;
50   }
51
52   //ptrlist=(PTinfo **)CmiAlloc(1000*sizeof(PTinfo *));
53   //  FreeList= new GList;
54   //CombBuffer=(char *)CmiAlloc(BIGBUFFERSIZE);
55
56   PTFreeList=NULL;
57   PTFreeChunks=NULL;
58 }
59
60 PeTable :: ~PeTable() {
61   int i;
62   for (i=0;i<NumPes;i++) CmiFree(PeList[i]);
63   CmiFree(PeList);
64   delete msgnum;
65   delete MaxSize;
66   GarbageCollect();
67   //CmiFree(ptrlist);
68   PTinfo *tmp;
69   while (PTFreeChunks) {
70     tmp=PTFreeChunks;
71     PTFreeChunks=PTNEXTCHUNK(tmp);
72     CmiFree(tmp);
73   }
74   // delete FreeList;
75
76 }
77
78 void PeTable:: Purge() {
79   for (int i=0; i<NumPes;i++) {
80     if (msgnum[i]) {
81       // ComlibPrintf("%d Warning: %d Undelivered Messages for %d\n", CkMyPe(), msgnum[i], i);
82       //msgnum[i]=0;
83     }
84   }
85   GarbageCollect();
86   //  ComlibPrintf("combcount = %d\n", combcount);
87   //combcount = 0;
88 }
89
90 void PeTable :: ExtractAndDeliverLocalMsgs(int index, Strategy *myStrat) {
91   int j;
92   msgstruct m;
93
94   ComlibPrintf("%d:Delivering %d local messages\n", CkMyPe(), msgnum[index]);
95   for (j=msgnum[index]-1;(j>=0);j--) {
96
97     m.msgsize=PeList[index][j]->msgsize;
98     m.msg=PeList[index][j]->msg;
99
100     if (--(PeList[index][j]->refCount) <=0) {
101       //CmiSyncSendAndFree(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
102       myStrat->deliver((char*)m.msg, m.msgsize);
103       PTFREE(PeList[index][j]);
104     }
105     else {
106       char *dupmsg = (char*)CmiAlloc(m.msgsize);
107       memcpy(dupmsg, m.msg, m.msgsize);
108       myStrat->deliver(dupmsg, m.msgsize);
109       //CmiSyncSend(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
110     }
111     PeList[index][j]=NULL;
112   }
113   msgnum[index]=j+1;
114
115   return;
116 }
117
118
119 #undef PACK
120 #undef PACKMSG
121 //#define PACKINT(data) {((int*)t)[0] = data; t+=sizeof(int);}
122 #define PACK(type,data) {junk=(char *)&(data); memcpy(t, junk, sizeof(type)); t+=sizeof(type);}
123 #define PACKMSG(data, size) { memcpy(p+msg_offset, (data), size); msg_offset += size; }
124
125 /*
126   Protocol:
127   |     AllToAllHdr      |npe|ChunkHdr1|msg1|ChunkHdr2|msg2|...
128   |ref|comid|ufield|nmsgs|
129 */
130 char * PeTable ::ExtractAndPack(comID id, int ufield, int npe, 
131                                 int *pelist, int *length) {
132   char *junk;
133   int nummsgs, offset, num_distinctmsgs;
134         
135   int tot_msgsize=TotalMsgSize(npe, pelist, &nummsgs, &num_distinctmsgs);
136
137   ComlibPrintf("%d In ExtractAndPack %d, %d\n", CmiMyPe(), npe, nummsgs); 
138
139   if (tot_msgsize ==0) {
140     *length=0;
141         
142     ComlibPrintf("Returning NULL\n");
143     return(NULL);
144   }
145     
146   int msg_offset = sizeof(struct AllToAllHdr) + (npe + nummsgs + 1) * sizeof(int);
147   //int msg_offset = CmiReservedHeaderSize + sizeof(comID) 
148   //    + (npe + 4 + nummsgs) * sizeof(int);  
149
150   msg_offset = ALIGN8(msg_offset);
151     
152   *length = tot_msgsize;
153   *length += msg_offset;
154   char *p;
155   p=(char *)CmiAlloc(*length);
156
157   char *t = p + CmiReservedHeaderSize;
158   int i, j;
159   if (!p) CmiAbort("Big time problem\n");
160   magic++;
161
162   int refno = id.refno;    
163
164   PACK(int, refno);
165   PACK(comID, id);
166   PACK(int, *length);
167   PACK(int, ufield);
168   PACK(int, nummsgs);
169   PACK(int, npe);
170     
171   int lesspe=0;
172   int npacked = 0;
173   for (i=0;i<npe;i++) {
174     int index=pelist[i];
175
176     if (msgnum[index]<=0) {
177       lesspe++;
178             
179       ComlibPrintf("[%d] msgnum[index]<=0 !!!!!\n", CkMyPe());
180       continue;
181     }
182         
183     ComlibPrintf("%d Packing pelist[%d]\n", CkMyPe(), index);
184     register int newval=-1*pelist[i];
185     PACK(int, newval); 
186
187     for (j=0;j<msgnum[index];j++) {
188       if (PeList[index][j]->magic == magic) {
189         offset=(PeList[index][j]->offset);
190       }
191       else {
192         npacked ++;
193                 
194         //offset=msg_offset;
195         offset=npacked;
196         PeList[index][j]->magic=magic;
197         PeList[index][j]->offset=offset;
198         PTinfo *tempmsg=PeList[index][j];
199                 
200         CmiChunkHeader hdr;
201         hdr.size = tempmsg->msgsize;
202         hdr.ref = -1;
203         PACKMSG(&hdr, sizeof(CmiChunkHeader));
204         PACKMSG(tempmsg->msg, tempmsg->msgsize);
205
206         msg_offset = ALIGN8(msg_offset);
207       }
208             
209       //ComlibPrintf("%d Packing msg_offset=%d\n", CkMyPe(), offset);
210       PACK(int, offset); 
211
212       if (--(PeList[index][j]->refCount) <=0) {
213         CmiFree(PeList[index][j]->msg);
214                 
215         PTFREE(PeList[index][j]);
216       }
217       PeList[index][j]=NULL;
218     }
219     msgnum[index]=0;
220   }
221   //offset=-1;
222   //PACK(int, offset);
223
224   /*    
225         if (lesspe) {
226         t=p+CmiReservedHeaderSize+2*sizeof(int) + sizeof(comID);
227         npe=npe-lesspe;
228         PACK(int, npe);
229         }
230   */
231
232   return(p);
233
234
235 /*Used for all to all multicast operations.  Assumes that each message
236   is destined to all the processors, to speed up all to all
237   substantially --Sameer 09/03/03 
238   
239   Protocol:
240   |     AllToAllHdr      |ChunkHdr1|msg1|ChunkHdr2|msg2|...
241   |ref|comid|ufield|nmsgs|
242 */
243
244 char * PeTable ::ExtractAndPackAll(comID id, int ufield, int *length) {
245   int nmsgs = 0;
246   int i, j;
247   int index = 0;
248
249   ComlibPrintf("[%d] In Extract And Pack All\n", CkMyPe());
250
251   //Increment magic to detect duplicate messages
252   magic++;
253
254   register int total_msg_size = 0;
255
256   //first compute size
257   for (i=0;i<NumPes;i++) {
258     index = i;
259     for (j=msgnum[index]-1; (j>=0); j--) {
260       if (PeList[index][j]->magic != magic) {                
261         total_msg_size += ALIGN8(PeList[index][j]->msgsize);
262         total_msg_size += 2 * sizeof(int);
263         PeList[index][j]->magic=magic;
264
265         nmsgs ++;
266       }            
267     }
268   }
269     
270   total_msg_size += ALIGN8(sizeof(AllToAllHdr));
271
272   ComlibPrintf("[%d] Message Size %d, nmsgs %d **%d**\n", CkMyPe(), total_msg_size, nmsgs, sizeof(AllToAllHdr));
273     
274   //poiter to the combined message, UGLY NAME
275   char *p = (char *) CmiAlloc(total_msg_size * sizeof(char));    
276
277   ComlibPrintf("After cmialloc\n");
278
279   //buffer to copy stuff into
280   char *t = p; 
281   char *junk = NULL;
282     
283   int dummy = 0;
284     
285   int refno = 0;
286
287   AllToAllHdr ahdr;
288   ahdr.refno = refno;
289   ahdr.id = id;
290   ahdr.size = *length;
291   ahdr.ufield = ufield;
292   ahdr.nmsgs = nmsgs;
293
294   /*
295     PACKINT(refno);    
296     PACK(comID, id);
297       
298     PACKINT(ufield);
299     PACKINT(nmsgs);
300     //    PACKINT(dummy); //Aligning to 8 bytes
301     */
302
303   PACK(AllToAllHdr, ahdr);   
304
305   int msg_offset = ALIGN8(sizeof(AllToAllHdr));
306     
307   //Increment magic again for creating the message
308   magic++;
309   for (i=0;i<NumPes;i++) {
310     index=i;
311     int ref = 0;
312     int size;
313
314     for (j=msgnum[index]-1; (j>=0); j--) {
315       //Check if it is a duplicate
316       if (PeList[index][j]->magic != magic) {                
317         size = PeList[index][j]->msgsize;
318         PACKMSG(&size, sizeof(int));
319         PACKMSG(&ref, sizeof(int));
320         PeList[index][j]->magic=magic;
321         PACKMSG(PeList[index][j]->msg, size);
322
323         msg_offset = ALIGN8(msg_offset);
324       }
325
326       //Free it when all the processors have gotten rid of it
327       if (--(PeList[index][j]->refCount) <=0) {
328         ComlibPrintf("before cmifree \n");
329         CmiFree(PeList[index][j]->msg);   
330         ComlibPrintf("after cmifree \n");
331
332         PTFREE(PeList[index][j]);
333       }
334       //Assign the current processors message pointer to NULL
335       PeList[index][j] = NULL;
336     }
337     msgnum[index] = 0;
338   }
339     
340   *length = total_msg_size;
341   return p;
342 }
343
344 #undef UNPACK
345 #define UNPACK(type,data) {junk=(char *)&(data); memcpy(junk, t, sizeof(type));t+=sizeof(type);}
346 #undef UNPACKMSG
347 #define UNPACKMSG(dest,src, size) { memcpy(dest, src, size); offset += size;}
348
349 int PeTable :: UnpackAndInsert(void *in) {
350   char *junk;
351   char *t =(char *)in + CmiReservedHeaderSize;
352   int i, ufield, npe, pe, nummsgs, ptrlistindex=0;
353   char *msgend, *msgcur;
354   comID id;
355   int refno = 0;
356   int msgsize;
357
358   register int offset;
359
360   UNPACK(int, refno);
361   ComlibPrintf("%d UnPackAndInsert\n", CkMyPe());
362   UNPACK(comID, id);
363   UNPACK(int, msgsize);
364   UNPACK(int, ufield);
365   UNPACK(int, nummsgs);
366   UNPACK(int, npe);
367
368   // unpack all messages into an array
369   msgend = (char*)in + msgsize;
370   msgcur = (char*)in + ALIGN8(sizeof(struct AllToAllHdr) + (npe+nummsgs+1)*sizeof(int));
371   while (msgcur < msgend) {
372     CmiChunkHeader *ch = (CmiChunkHeader *)msgcur;
373
374     PTinfo *temp;
375     PTALLOC(temp);
376     temp->msgsize=ch->size;
377     temp->msg=(void*)&ch[1];
378     temp->refCount=0;
379     temp->magic=0;
380     temp->offset=0;
381
382     ptrvec.insert(++ptrlistindex, temp);
383
384     // fix the ref field of the message
385     ch->ref = (int)((char*)in - (char*)temp->msg);
386     msgcur += ALIGN8(ch->size) + sizeof(CmiChunkHeader);
387   }
388
389   pe = -1;
390   //for (i=0;i<npe;i++) {
391   for (i=0; i<nummsgs; ++i) {
392     //UNPACK(int, pe);
393     //pe *= -1;
394
395     UNPACK(int, offset);
396     if (offset <= 0) {
397       pe = -1 * offset;
398       --i;
399       continue;
400     }
401
402     if (msgnum[pe] >= MaxSize[pe]) {
403       REALLOC(PeList[pe], MaxSize[pe]);
404       MaxSize[pe] *= 2;
405     }
406     PeList[pe][msgnum[pe]] = ptrvec[offset];
407     (ptrvec[offset])->refCount++;
408     msgnum[pe]++;
409
410     /*
411       while (offset > 0) {
412       int tempmsgsize;
413       UNPACKMSG(&(tempmsgsize), (char *)in+offset, sizeof(int));
414       int ptr;
415       UNPACKMSG(&ptr, (char *)in+offset, sizeof(int));
416
417       if (ptr >=0 )  {
418       if (msgnum[pe] >= MaxSize[pe]) {
419       REALLOC(PeList[pe], MaxSize[pe]);
420       MaxSize[pe] *= 2;
421       }
422       PeList[pe][msgnum[pe]]=ptrvec[ptr];
423       (ptrvec[ptr])->refCount++;
424       msgnum[pe]++;
425
426       UNPACK(int, offset);
427       continue;
428       }
429             
430       PTinfo *temp;
431       PTALLOC(temp);
432       temp->msgsize=tempmsgsize;
433       temp->refCount=1;
434       temp->magic=0;
435       temp->offset=0;
436
437       ptrvec.insert(ptrlistindex, temp);
438       memcpy((char *)in+offset-sizeof(int), &ptrlistindex, sizeof(int));
439
440       ptrlistindex++;
441       temp->msg=(void *)((char *)in+offset);
442       if (msgnum[pe] >= MaxSize[pe]) {
443
444       REALLOC(PeList[pe], MaxSize[pe]);
445       MaxSize[pe] *= 2;
446       }
447       PeList[pe][msgnum[pe]]=temp;
448       msgnum[pe]++;
449       UNPACK(int, offset);
450       //}
451       //t -=sizeof(int);
452       }
453       *(int *)((char *)in -sizeof(int))=ptrlistindex; 
454       */
455   }
456
457   REFFIELD(in) = ptrlistindex;
458   if (ptrlistindex==0) {
459     REFFIELD(in) = 1;
460     CmiFree(in);
461   }
462
463   /*  
464       for (i=0;i<ptrlistindex;i++) {
465       char * actualmsg=(char *)(ptrvec[i]->msg);
466       int *rc=(int *)(actualmsg-sizeof(int));
467       *rc=(int)((char *)in-actualmsg);
468       //ComlibPrintf("I am inserting %d\n", *rc);
469       }
470   */
471
472   ptrvec.removeAll();
473   
474   return(ufield);
475 }
476
477 /* Unpack and insert an all to all message, the router provides the
478    list of processors to insert into.
479    Same protocol as mentioned earlier.
480 */
481
482 int PeTable :: UnpackAndInsertAll(void *in, int npes, int *pelist) {
483   char *junk;
484   char *t =(char *)in /*+CmiReservedHeaderSize*/;
485   int i,  
486     ufield,   // user field or ths stage of the iteration 
487     nmsgs,    // number of messages in combo message
488     refno,    // reference number
489     dummy;    // alignment dummy
490   
491   comID id;
492
493   /*
494     UNPACK(int, refno);      
495     UNPACK(comID, id);
496     
497     UNPACK(int, ufield);
498     UNPACK(int, nmsgs);
499     //UNPACK(int, dummy);
500     int header_size = sizeof(comID) + CmiReservedHeaderSize + 3 *sizeof(int);
501     if(header_size % 8 != 0)
502     t+= 8 - header_size % 8;
503   */
504
505   AllToAllHdr ahdr;
506   UNPACK(AllToAllHdr, ahdr);
507
508   if((sizeof(AllToAllHdr) & 7) != 0)
509     t += 8 - (sizeof(AllToAllHdr) & 7);
510
511   refno = ahdr.refno;
512   id = ahdr.id;
513   nmsgs = ahdr.nmsgs;
514   ufield = ahdr.ufield;
515
516   ComlibPrintf("[%d] unpack and insert all %d, %d\n", CkMyPe(), ufield, nmsgs);
517   
518   //Inserting a memory foot print may, change later
519   CmiChunkHeader *chdr= (CmiChunkHeader*)((char*)in - sizeof(CmiChunkHeader));
520
521   int *ref;
522   int size;
523   char *msg;
524   for(int count = 0; count < nmsgs; count++){
525
526     t += sizeof(CmiChunkHeader);
527     msg = t;
528
529     // Get the size of the message, and set the ref field correctly for CmiFree
530     size = SIZEFIELD(msg);
531     REFFIELD(msg) = (int)((char *)in - (char *)msg);
532
533     t += ALIGN8(size);
534
535     // Do CmiReference(msg), this is done bypassing converse!
536     chdr->ref++;
537
538     /*
539       UNPACK(int, size);
540       ref = (int *)t;
541       t += sizeof(int);
542     
543       *ref = (int)((char *)(&chdr->ref) - (char *)ref);
544       chdr->ref ++;
545
546       ComlibPrintf("ref = %d, global_ref = %d\n", *ref, chdr->ref);
547     
548       msg = t;
549       t += ALIGN8(size);
550     */
551     InsertMsgs(npes, pelist, size, msg);
552   }  
553
554   CmiFree(in);
555   return ufield;
556 }
557
558 /*
559  * Added by Filippo, 2005/04/18 to allow a zero-copy sending, through the
560  * CmiVectorSend functions.
561  */
562 PTvectorlist PeTable :: ExtractAndVectorize(comID id, int ufield, int npe, int *pelist) {
563   char *junk;
564   int nummsgs = 0;
565   int offset, num_distinctmsgs;
566   int i, j;
567
568   for (i=0; i<npe; ++i) nummsgs += msgnum[pelist[i]];
569
570   ComlibPrintf("%d In ExtractAndVectorize %d, %d\n", CmiMyPe(), npe, nummsgs); 
571
572   if (nummsgs ==0) {
573     ComlibPrintf("Returning NULL\n");
574     return(NULL);
575   }
576     
577   int headersize = sizeof(struct AllToAllHdr) + (npe + nummsgs + 1) * sizeof(int);
578   //int msg_offset = CmiReservedHeaderSize + sizeof(comID) 
579   //    + (npe + 4 + nummsgs) * sizeof(int);  
580
581   char *p;
582   p=(char *)CmiAlloc(headersize);
583
584   char *t = p + CmiReservedHeaderSize;
585   if (!p) CmiAbort("Big time problem\n");
586   magic++;
587
588   int refno = id.refno;    
589
590   // SHOULD PACK THE SIZE, which is not available: fix elan and undo this cvs update
591   PACK(int, refno);
592   PACK(comID, id);
593   PACK(int, ufield);
594   PACK(int, nummsgs);
595   PACK(int, npe);
596     
597   int lesspe=0;
598   int npacked = 0;
599   for (i=0;i<npe;i++) {
600     int index=pelist[i];
601
602     if (msgnum[index]<=0) {
603       lesspe++;
604             
605       ComlibPrintf("[%d] msgnum[index]<=0 !!!!!\n", CkMyPe());
606       continue;
607     }
608         
609     ComlibPrintf("%d Packing pelist[%d]\n", CkMyPe(), index);
610     register int newval=-1*pelist[i];
611     PACK(int, newval); 
612
613     for (j=0;j<msgnum[index];j++) {
614       if (PeList[index][j]->magic == magic) {
615         offset=(PeList[index][j]->offset);
616       }
617       else {
618         npacked ++;
619                 
620         //offset=msg_offset;
621         offset=npacked;
622         PeList[index][j]->magic=magic;
623         PeList[index][j]->offset=offset;
624         PTinfo *tempmsg=PeList[index][j];
625
626         ptrvec.insert(npacked, tempmsg);
627       }
628       
629       //ComlibPrintf("%d Packing msg_offset=%d\n", CkMyPe(), offset);
630       PACK(int, offset); 
631
632       --(PeList[index][j]->refCount);
633       /*
634         if (--(PeList[index][j]->refCount) <=0) {
635         CmiFree(PeList[index][j]->msg);
636                 
637         PTFREE(PeList[index][j]);
638         }
639       */
640       PeList[index][j]=NULL;
641     }
642     msgnum[index]=0;
643   }
644   //offset=-1;
645   //PACK(int, offset);
646
647   // See petable.h for a description of this structure
648   PTvectorlist result = (PTvectorlist)CmiAlloc(sizeof(struct ptvectorlist) +
649                                                (npacked+1)*2*sizeof(char*) +
650                                                2*sizeof(CmiChunkHeader));
651   result->count = npacked + 1;
652   result->sizes = (int*)((char*)result + sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
653   result->msgs  = (char**)((char*)result->sizes + (npacked+1)*sizeof(int) + sizeof(CmiChunkHeader));
654
655   SIZEFIELD(result->sizes) = (npacked+1)*sizeof(int);
656   REFFIELD(result->sizes) = - (sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
657   SIZEFIELD(result->msgs) = (npacked+1)*sizeof(int);
658   REFFIELD(result->msgs) = - (sizeof(struct ptvectorlist) + (npacked+1)*sizeof(int) + 2*sizeof(CmiChunkHeader));
659   CmiReference(result);
660
661   result->sizes[0] = headersize;
662   result->msgs[0] = p;
663   PTinfo *temp;
664   for (i=1; i<=npacked; ++i) {
665     temp = ptrvec[i];
666     result->sizes[i] = temp->msgsize;
667     result->msgs[i] = (char*)temp->msg;
668     // if there is still reference we CmiReference the message so it does not get deleted
669     // otherwise we free also the PTinfo struct use to hold it
670     if (temp->refCount > 0) CmiReference(result->msgs[i]);
671     else PTFREE(temp);
672   }
673
674   ptrvec.removeAll();
675
676   /*    
677         if (lesspe) {
678         t=p+CmiReservedHeaderSize+2*sizeof(int) + sizeof(comID);
679         npe=npe-lesspe;
680         PACK(int, npe);
681         }
682   */
683
684   //return(p);
685
686   return result;
687 }
688
689 /*
690  * Added by Filippo, 2005/04/18 to allow a zero-copy sending, through the
691  * CmiVectorSend functions.
692  */
693 PTvectorlist PeTable :: ExtractAndVectorizeAll(comID id, int ufield) {
694   int nmsgs = 0, i, j;
695   int index = 0;
696
697   ComlibPrintf("[%d] In Extract And Vectorize All\n", CkMyPe());
698
699   //Increment magic to detect duplicate messages
700   magic++;
701
702   //register int total_msg_size = 0;
703
704   //first compute size
705   for (i=0;i<NumPes;i++) {
706     index = i;
707     for (j=msgnum[index]-1; (j>=0); j--) {
708       if (PeList[index][j]->magic != magic) {                
709         //total_msg_size += ALIGN8(PeList[index][j]->msgsize);
710         //total_msg_size += 2 * sizeof(int);
711         PeList[index][j]->magic=magic;
712         ptrvec.insert(nmsgs, PeList[index][j]);
713         PeList[index][j] = NULL;
714         nmsgs ++;
715       }
716       --(PeList[index][j]->refCount);
717     }
718   }
719
720   //total_msg_size += ALIGN8(sizeof(AllToAllHdr));
721
722   ComlibPrintf("[%d] nmsgs %d **%d**\n", CkMyPe(), nmsgs, sizeof(AllToAllHdr));
723     
724   //poiter to the message header
725   AllToAllHdr *ahdr = (AllToAllHdr *) CmiAlloc(sizeof(struct AllToAllHdr));
726
727   ComlibPrintf("After cmialloc\n");
728
729   /*
730   //buffer to copy stuff into
731   char *t = p; 
732   char *junk = NULL;
733     
734   int dummy = 0;
735   */
736
737   int refno = 0;
738
739   ahdr->refno = refno;
740   ahdr->id = id;
741   ahdr->ufield = ufield;
742   ahdr->nmsgs = nmsgs;
743
744   /*
745     PACKINT(refno);    
746     PACK(comID, id);
747       
748     PACKINT(ufield);
749     PACKINT(nmsgs);
750     //    PACKINT(dummy); //Aligning to 8 bytes
751     */
752
753   // See petable.h for a description of this structure
754   PTvectorlist result = (PTvectorlist)CmiAlloc(sizeof(struct ptvectorlist) +
755                                                (nmsgs+1)*2*sizeof(char*) +
756                                                2*sizeof(CmiChunkHeader));
757   result->count = nmsgs + 1;
758   result->sizes = (int*)((char*)result + sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
759   result->msgs  = (char**)((char*)result->sizes + (nmsgs+1)*sizeof(int) + sizeof(CmiChunkHeader));
760
761   SIZEFIELD(result->sizes) = (nmsgs+1)*sizeof(int);
762   REFFIELD(result->sizes) = - (sizeof(struct ptvectorlist) + sizeof(CmiChunkHeader));
763   SIZEFIELD(result->msgs) = (nmsgs+1)*sizeof(int);
764   REFFIELD(result->msgs) = - (sizeof(struct ptvectorlist) + (nmsgs+1)*sizeof(int) + 2*sizeof(CmiChunkHeader));
765   CmiReference(result);
766
767   result->sizes[0] = sizeof(ahdr);
768   result->msgs[0] = (char*)ahdr;
769   PTinfo *temp;
770   for (i=1; i<nmsgs; ++i) {
771     temp = ptrvec[i];
772     result->sizes[i] = temp->msgsize;
773     result->msgs[i] = (char*)temp->msg;
774     // if there is still reference we CmiReference the message so it does not get deleted
775     // otherwise we free also the PTinfo struct use to hold it
776     if (temp->refCount > 0) CmiReference(result->msgs[i]);
777     else PTFREE(temp);
778   }
779
780   ptrvec.removeAll();
781
782   return result;
783
784   /*
785   PACK(AllToAllHdr, ahdr);   
786
787   int msg_offset = ALIGN8(sizeof(AllToAllHdr));
788     
789   //Increment magic again for creating the message
790   magic++;
791   for (i=0;i<NumPes;i++) {
792     index=i;
793     int ref = 0;
794     int size;
795
796     for (j=msgnum[index]-1; (j>=0); j--) {
797       //Check if it is a duplicate
798       if (PeList[index][j]->magic != magic) {                
799         size = PeList[index][j]->msgsize;
800         PACKMSG(&size, sizeof(int));
801         PACKMSG(&ref, sizeof(int));
802         PeList[index][j]->magic=magic;
803         PACKMSG(PeList[index][j]->msg, size);
804
805         msg_offset = ALIGN8(msg_offset);
806       }
807
808       //Free it when all the processors have gotten rid of it
809       if (--(PeList[index][j]->refCount) <=0) {
810         ComlibPrintf("before cmifree \n");
811         CmiFree(PeList[index][j]->msg);   
812         ComlibPrintf("after cmifree \n");
813
814         PTFREE(PeList[index][j]);
815       }
816       //Assign the current processors message pointer to NULL
817       PeList[index][j] = NULL;
818     }
819     msgnum[index] = 0;
820   }
821     
822   *length = total_msg_size;
823   return p;
824   */
825 }
826
827 void PeTable :: GarbageCollect() {
828 }
829
830 /*@}*/