updated syntax of the AllToAll message, in order to prepare for the two new
[charm.git] / src / conv-com / petable.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /*********************************************
9  * File : petable.C
10  *
11  * Author: Krishnan V
12  *
13  * The message buffer
14  *********************************************/
15 #include <string.h>
16 #include <stdlib.h>
17 #include <converse.h>
18 #include "comlib.h"
19 #include "petable.h"
20 #include "converse.h"
21
22 #define BIGBUFFERSIZE 65536
23 #define PTPREALLOC    100
24
25 struct AllToAllHdr{
26     char dummy[CmiReservedHeaderSize];
27     int refno;
28     comID id;
29     int ufield;
30     int nmsgs;
31 };
32
33
34 /**************************************************************
35  * Preallocated memory=P*MSGQLEN ptr + 2P ints + 1000 ptrs
36  **************************************************************/
37 PeTable :: PeTable(int n)
38 {
39   NumPes=n;
40   magic=0;
41   PeList = (PTinfo ***)CmiAlloc(sizeof(PTinfo *)*NumPes);
42   //  ComlibPrintf("Pelist[%d][%d]\n", NumPes, MSGQLEN);
43   msgnum=new int[NumPes];
44   MaxSize=new int[NumPes];
45   for (int i=0;i<NumPes;i++) {
46         msgnum[i]=0;
47         MaxSize[i]=MSGQLEN;
48         PeList[i]=(PTinfo **)CmiAlloc(sizeof(PTinfo *)*MSGQLEN);
49         for (int j=0;j<MSGQLEN;j++) PeList[i][j]=0;
50   }
51
52   //ptrlist=(PTinfo **)CmiAlloc(1000*sizeof(PTinfo *));
53   //  FreeList= new GList;
54   //CombBuffer=(char *)CmiAlloc(BIGBUFFERSIZE);
55
56   PTFreeList=NULL;
57   PTFreeChunks=NULL;
58 }
59
60 PeTable :: ~PeTable()
61 {
62   int i;
63   for (i=0;i<NumPes;i++) CmiFree(PeList[i]);
64   CmiFree(PeList);
65   delete msgnum;
66   delete MaxSize;
67   GarbageCollect();
68   //CmiFree(ptrlist);
69   PTinfo *tmp;
70   while (PTFreeChunks) {
71         tmp=PTFreeChunks;
72         PTFreeChunks=PTNEXTCHUNK(tmp);
73         CmiFree(tmp);
74   }
75  // delete FreeList;
76
77 }
78
79 void PeTable:: Purge()
80 {
81   for (int i=0; i<NumPes;i++) {
82         if (msgnum[i]) {
83             // ComlibPrintf("%d Warning: %d Undelivered Messages for %d\n", CkMyPe(), msgnum[i], i);
84           //msgnum[i]=0;
85         }
86   }
87   GarbageCollect();
88   //  ComlibPrintf("combcount = %d\n", combcount);
89   //combcount = 0;
90 }
91
92 void PeTable :: ExtractAndDeliverLocalMsgs(int index)
93 {
94   int j;
95   msgstruct m;
96
97   ComlibPrintf("%d:Delivering %d local messages\n", CkMyPe(), msgnum[index]);
98   for (j=msgnum[index]-1;(j>=0);j--) {
99
100         m.msgsize=PeList[index][j]->msgsize;
101         m.msg=PeList[index][j]->msg;
102
103         if (--(PeList[index][j]->refCount) <=0) {
104             CmiSyncSendAndFree(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
105             PTFREE(PeList[index][j]);
106         }
107         else {
108             CmiSyncSend(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
109         }
110         PeList[index][j]=NULL;
111   }
112   msgnum[index]=j+1;
113
114   return;
115 }
116
117
118 #undef PACK
119 #undef PACKMSG
120 //#define PACKINT(data) {((int*)t)[0] = data; t+=sizeof(int);}
121 #define PACK(type,data) {junk=(char *)&(data); memcpy(t, junk, sizeof(type)); t+=sizeof(type);}
122 #define PACKMSG(data, size) { memcpy(p+msg_offset, (data), size); msg_offset += size; }
123
124 /*Used for all to all multicast operations.  Assumes that each message
125   is destined to all the processors, to speed up all to all
126   substantially --Sameer 09/03/03 
127   
128   Protocol:
129   |ref|comid|ufield|nmsgs|size|ref|msg1|size2|ref2|msg2|....
130 */
131
132 char * PeTable ::ExtractAndPackAll(comID id, int ufield, int *length)
133 {
134     int nmsgs = 0, i, j;
135     int index = 0;
136
137     ComlibPrintf("[%d] In Extract And Pack All\n", CkMyPe());
138
139     //Increment magic to detect duplicate messages
140     magic++;
141
142     register int total_msg_size = 0;
143
144     //first compute size
145     for (i=0;i<NumPes;i++) {
146         index = i;
147         for (j=msgnum[index]-1; (j>=0); j--) {
148             if (PeList[index][j]->magic != magic) {                
149                 total_msg_size += ALIGN8(PeList[index][j]->msgsize);
150                 total_msg_size += 2 * sizeof(int);
151                 PeList[index][j]->magic=magic;
152
153                 nmsgs ++;
154             }            
155         }
156     }
157     
158     total_msg_size += ALIGN8(sizeof(AllToAllHdr));
159
160     ComlibPrintf("[%d] Message Size %d, nmsgs %d **%d**\n", CkMyPe(), total_msg_size, nmsgs, sizeof(AllToAllHdr));
161     
162     //poiter to the combined message, UGLY NAME
163     char *p = (char *) CmiAlloc(total_msg_size * sizeof(char));    
164
165     ComlibPrintf("After cmialloc\n");
166
167     //buffer to copy stuff into
168     char *t = p; 
169     char *junk = NULL;
170     
171     int dummy = 0;
172     
173     int refno = 0;
174
175     AllToAllHdr ahdr;
176     ahdr.refno = refno;
177     ahdr.id = id;
178     ahdr.ufield = ufield;
179     ahdr.nmsgs = nmsgs;
180
181     /*
182       PACKINT(refno);    
183       PACK(comID, id);
184       
185       PACKINT(ufield);
186       PACKINT(nmsgs);
187       //    PACKINT(dummy); //Aligning to 8 bytes
188     */
189
190     PACK(AllToAllHdr, ahdr);   
191
192     int msg_offset = ALIGN8(sizeof(AllToAllHdr));
193     
194     //Increment magic again for creating the message
195     magic++;
196     for (i=0;i<NumPes;i++) {
197         index=i;
198         int ref = 0;
199         int size;
200
201         for (j=msgnum[index]-1; (j>=0); j--) {
202             //Check if it is a duplicate
203             if (PeList[index][j]->magic != magic) {                
204                 size = PeList[index][j]->msgsize;
205                 PACKMSG(&size, sizeof(int));
206                 PACKMSG(&ref, sizeof(int));
207                 PeList[index][j]->magic=magic;
208                 PACKMSG(PeList[index][j]->msg, size);
209
210                 msg_offset = ALIGN8(msg_offset);
211             }
212
213             //Free it when all the processors have gotten rid of it
214             if (--(PeList[index][j]->refCount) <=0) {
215                 ComlibPrintf("before cmifree \n");
216                 CmiFree(PeList[index][j]->msg);   
217                 ComlibPrintf("after cmifree \n");
218
219                 PTFREE(PeList[index][j]);
220             }
221             //Assign the current processors message pointer to NULL
222             PeList[index][j] = NULL;
223         }
224         msgnum[index] = 0;
225     }
226     
227     *length = total_msg_size;
228     return p;
229 }
230
231 char * PeTable ::ExtractAndPack(comID id, int ufield, int npe, 
232                                 int *pelist, int *length)
233 {
234     char *junk;
235     int nummsgs, offset, num_distinctmsgs;
236         
237     int tot_msgsize=TotalMsgSize(npe, pelist, &nummsgs, &num_distinctmsgs);
238
239     ComlibPrintf("%d In ExtractAndPack %d, %d\n", CmiMyPe(), npe, nummsgs); 
240
241     if (tot_msgsize ==0) {
242         *length=0;
243         
244         ComlibPrintf("Returning NULL\n");
245         return(NULL);
246     }
247     
248     int msg_offset = sizeof(struct AllToAllHdr) + (npe + nummsgs + 1) * sizeof(int);
249     //int msg_offset = CmiReservedHeaderSize + sizeof(comID) 
250     //    + (npe + 4 + nummsgs) * sizeof(int);  
251
252     msg_offset = ALIGN8(msg_offset);
253     
254     *length = tot_msgsize;
255     *length += msg_offset;
256     char *p;
257     p=(char *)CmiAlloc(*length);
258
259     char *t = p + CmiReservedHeaderSize;
260     int i, j;
261     if (!p) CmiAbort("Big time problem\n");
262     magic++;
263
264     int refno = id.refno;    
265
266     PACK(int, refno);
267     PACK(comID, id);
268     PACK(int, ufield);
269     PACK(int, nummsgs);
270     PACK(int, npe);
271     
272     int lesspe=0;
273     int npacked = 0;
274     for (i=0;i<npe;i++) {
275         int index=pelist[i];
276
277         if (msgnum[index]<=0) {
278             lesspe++;
279             
280             ComlibPrintf("[%d] msgnum[index]<=0 !!!!!\n", CkMyPe());
281             continue;
282         }
283         
284         ComlibPrintf("%d Packing pelist[%d]\n", CkMyPe(), index);
285         register int newval=-1*pelist[i];
286         PACK(int, newval); 
287
288         for (j=0;j<msgnum[index];j++) {
289             if (PeList[index][j]->magic == magic) {
290                 offset=(PeList[index][j]->offset);
291             }
292             else {
293                 npacked ++;
294                 
295                 //offset=msg_offset;
296                 offset=npacked;
297                 PeList[index][j]->magic=magic;
298                 PeList[index][j]->offset=offset;
299                 PTinfo *tempmsg=PeList[index][j];
300                 
301                 CmiChunkHeader hdr;
302                 hdr.size = tempmsg->msgsize;
303                 hdr.ref = -1;
304                 PACKMSG(&hdr, sizeof(CmiChunkHeader));
305                 PACKMSG(tempmsg->msg, tempmsg->msgsize);
306
307                 msg_offset = ALIGN8(msg_offset);
308             }
309             
310             //ComlibPrintf("%d Packing msg_offset=%d\n", CkMyPe(), offset);
311             PACK(int, offset); 
312
313             if (--(PeList[index][j]->refCount) <=0) {
314                 CmiFree(PeList[index][j]->msg);
315                 
316                 PTFREE(PeList[index][j]);
317             }
318             PeList[index][j]=NULL;
319         }
320         msgnum[index]=0;
321     }
322     //offset=-1;
323     //PACK(int, offset);
324
325     /*    
326     if (lesspe) {
327         t=p+CmiReservedHeaderSize+2*sizeof(int) + sizeof(comID);
328         npe=npe-lesspe;
329         PACK(int, npe);
330     }
331     */
332
333     return(p);
334
335
336 #undef UNPACK
337 #define UNPACK(type,data) {junk=(char *)&(data); memcpy(junk, t, sizeof(type));t+=sizeof(type);}
338 #undef UNPACKMSG
339 #define UNPACKMSG(dest,src, size) { memcpy(dest, src, size); offset += size;}
340
341 int PeTable :: UnpackAndInsert(void *in)
342 {
343   char *junk;
344   char *t =(char *)in + CmiReservedHeaderSize;
345   int i, ufield, npe, pe, nummsgs, ptrlistindex=0;
346   char *msgend, *msgcur;
347   comID id;
348   int refno = 0;
349
350   register int offset;
351
352   UNPACK(int, refno);
353   //ComlibPrintf("%d UnPacking id\n", CkMyPe());
354   UNPACK(comID, id);
355   UNPACK(int, ufield);
356   UNPACK(int, nummsgs);
357   UNPACK(int, npe);
358
359   // unpack all messages into an array
360   msgend = (char*)in + CmiSize(in);
361   msgcur = (char*)in + ALIGN8(sizeof(struct AllToAllHdr) + (npe+nummsgs+1)*sizeof(int));
362   while (msgcur < msgend) {
363     CmiChunkHeader *ch = (CmiChunkHeader *)msgcur;
364
365     PTinfo *temp;
366     PTALLOC(temp);
367     temp->msgsize=ch->size;
368     temp->msg=(void*)&ch[1];
369     temp->refCount=0;
370     temp->magic=0;
371     temp->offset=0;
372
373     ptrvec.insert(++ptrlistindex, temp);
374
375     // fix the ref field of the message
376     ch->ref = (int)((char*)in - (char*)temp->msg);
377     msgcur += ALIGN8(ch->size) + sizeof(CmiChunkHeader);
378   }
379
380   pe = -1;
381   //for (i=0;i<npe;i++) {
382   for (i=0; i<nummsgs; ++i) {
383     //UNPACK(int, pe);
384     //pe *= -1;
385
386     UNPACK(int, offset);
387     if (offset <= 0) {
388       pe = -1 * offset;
389       --i;
390       continue;
391     }
392
393     if (msgnum[pe] >= MaxSize[pe]) {
394       REALLOC(PeList[pe], MaxSize[pe]);
395       MaxSize[pe] *= 2;
396     }
397     PeList[pe][msgnum[pe]] = ptrvec[offset];
398     (ptrvec[offset])->refCount++;
399     msgnum[pe]++;
400
401     /*
402     while (offset > 0) {
403       int tempmsgsize;
404       UNPACKMSG(&(tempmsgsize), (char *)in+offset, sizeof(int));
405       int ptr;
406       UNPACKMSG(&ptr, (char *)in+offset, sizeof(int));
407
408       if (ptr >=0 )  {
409         if (msgnum[pe] >= MaxSize[pe]) {
410           REALLOC(PeList[pe], MaxSize[pe]);
411           MaxSize[pe] *= 2;
412         }
413         PeList[pe][msgnum[pe]]=ptrvec[ptr];
414         (ptrvec[ptr])->refCount++;
415         msgnum[pe]++;
416
417         UNPACK(int, offset);
418         continue;
419       }
420             
421       PTinfo *temp;
422       PTALLOC(temp);
423       temp->msgsize=tempmsgsize;
424       temp->refCount=1;
425       temp->magic=0;
426       temp->offset=0;
427
428       ptrvec.insert(ptrlistindex, temp);
429       memcpy((char *)in+offset-sizeof(int), &ptrlistindex, sizeof(int));
430
431       ptrlistindex++;
432       temp->msg=(void *)((char *)in+offset);
433       if (msgnum[pe] >= MaxSize[pe]) {
434
435         REALLOC(PeList[pe], MaxSize[pe]);
436         MaxSize[pe] *= 2;
437       }
438       PeList[pe][msgnum[pe]]=temp;
439       msgnum[pe]++;
440       UNPACK(int, offset);
441       //}
442     //t -=sizeof(int);
443   }
444   *(int *)((char *)in -sizeof(int))=ptrlistindex; 
445   */
446   }
447
448   REFFIELD(in) = ptrlistindex;
449   if (ptrlistindex==0) {
450     REFFIELD(in) = 1;
451     CmiFree(in);
452   }
453
454   /*  
455   for (i=0;i<ptrlistindex;i++) {
456     char * actualmsg=(char *)(ptrvec[i]->msg);
457     int *rc=(int *)(actualmsg-sizeof(int));
458     *rc=(int)((char *)in-actualmsg);
459     //ComlibPrintf("I am inserting %d\n", *rc);
460   }
461   */
462
463   ptrvec.removeAll();
464   
465   return(ufield);
466 }
467
468 /* Unpack and insert an all to all message, the router provides the
469    list of processors to insert into.
470    Same protocol as mentioned earlier.
471 */
472
473 int PeTable :: UnpackAndInsertAll(void *in, int npes, int *pelist){
474   char *junk;
475   char *t =(char *)in /*+CmiReservedHeaderSize*/;
476   int i,  
477       ufield,   // user field or ths stage of the iteration 
478       nmsgs,    // number of messages in combo message
479       refno,    // reference number
480       dummy;    // alignment dummy
481   
482   comID id;
483
484   /*
485     UNPACK(int, refno);      
486     UNPACK(comID, id);
487     
488     UNPACK(int, ufield);
489     UNPACK(int, nmsgs);
490     //UNPACK(int, dummy);
491     int header_size = sizeof(comID) + CmiReservedHeaderSize + 3 *sizeof(int);
492     if(header_size % 8 != 0)
493     t+= 8 - header_size % 8;
494   */
495
496   AllToAllHdr ahdr;
497   UNPACK(AllToAllHdr, ahdr);
498
499   if(sizeof(AllToAllHdr) & 7 != 0)
500       t += 8 - sizeof(AllToAllHdr) & 7;
501
502   refno = ahdr.refno;
503   id = ahdr.id;
504   nmsgs = ahdr.nmsgs;
505   ufield = ahdr.ufield;
506
507   ComlibPrintf("[%d] unpack and insert all %d, %d\n", CkMyPe(), ufield, nmsgs);
508   
509   //Inserting a memory foot print may, change later
510   CmiChunkHeader *chdr= (CmiChunkHeader*)((char*)in - sizeof(CmiChunkHeader));
511
512   int *ref;
513   int size;
514   char *msg;
515   for(int count = 0; count < nmsgs; count++){
516
517     t += sizeof(CmiChunkHeader);
518     msg = t;
519
520     // Get the size of the message, and set the ref field correctly for CmiFree
521     size = SIZEFIELD(msg);
522     REFFIELD(msg) = (int)((char *)in - (char *)msg);
523
524     t += ALIGN8(size);
525
526     // Do CmiReference(msg), this is done bypassing converse!
527     chdr->ref++;
528
529     /*
530     UNPACK(int, size);
531     ref = (int *)t;
532     t += sizeof(int);
533     
534     *ref = (int)((char *)(&chdr->ref) - (char *)ref);
535     chdr->ref ++;
536
537     ComlibPrintf("ref = %d, global_ref = %d\n", *ref, chdr->ref);
538     
539     msg = t;
540     t += ALIGN8(size);
541     */
542     InsertMsgs(npes, pelist, size, msg);
543   }  
544
545   CmiFree(in);
546   return ufield;
547 }
548
549 PTvectorlist PeTable :: ExtractAndVectorize(comID id, int ufield, int npe, int *pelist) {
550   return NULL;
551 }
552
553 PTvectorlist PeTable :: ExtractAndVectorizeAll(comID id, int ufield) {
554   return NULL;
555 }
556
557 void PeTable :: GarbageCollect()
558 {
559 }
560