Make housekeeping methods "expedited", to allow housekeeping messages
[charm.git] / src / conv-com / petable.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /*********************************************
9  * File : petable.C
10  *
11  * Author: Krishnan V
12  *
13  * The message buffer
14  *********************************************/
15 #include <string.h>
16 #include <stdlib.h>
17 #include <converse.h>
18 #include "comlib.h"
19 #include "petable.h"
20 #include "converse.h"
21
22 #define BIGBUFFERSIZE 65536
23 #define PTPREALLOC    100
24
25 struct AllToAllHdr{
26     char dummy[CmiReservedHeaderSize];
27     int refno;
28     comID id;
29     int ufield;
30     int nmsgs;
31 };
32
33
34 /**************************************************************
35  * Preallocated memory=P*MSGQLEN ptr + 2P ints + 1000 ptrs
36  **************************************************************/
37 PeTable :: PeTable(int n)
38 {
39   NumPes=n;
40   magic=0;
41   PeList = (PTinfo ***)CmiAlloc(sizeof(PTinfo *)*NumPes);
42   //  ComlibPrintf("Pelist[%d][%d]\n", NumPes, MSGQLEN);
43   msgnum=new int[NumPes];
44   MaxSize=new int[NumPes];
45   for (int i=0;i<NumPes;i++) {
46         msgnum[i]=0;
47         MaxSize[i]=MSGQLEN;
48         PeList[i]=(PTinfo **)CmiAlloc(sizeof(PTinfo *)*MSGQLEN);
49         for (int j=0;j<MSGQLEN;j++) PeList[i][j]=0;
50   }
51
52   //ptrlist=(PTinfo **)CmiAlloc(1000*sizeof(PTinfo *));
53   //  FreeList= new GList;
54   //CombBuffer=(char *)CmiAlloc(BIGBUFFERSIZE);
55
56   PTFreeList=NULL;
57 }
58
59 PeTable :: ~PeTable()
60 {
61   int i;
62   for (i=0;i<NumPes;i++) CmiFree(PeList[i]);
63   CmiFree(PeList);
64   delete msgnum;
65   delete MaxSize;
66   GarbageCollect();
67   //CmiFree(ptrlist);
68   PTinfo *tmp;
69   while (PTFreeList) {
70         tmp=PTFreeList;
71         PTFreeList=tmp->next;
72         CmiFree(tmp);
73   }
74  // delete FreeList;
75
76 }
77
78 void PeTable:: Purge()
79 {
80   for (int i=0; i<NumPes;i++) {
81         if (msgnum[i]) {
82             // ComlibPrintf("%d Warning: %d Undelivered Messages for %d\n", CkMyPe(), msgnum[i], i);
83           //msgnum[i]=0;
84         }
85   }
86   GarbageCollect();
87   //  ComlibPrintf("combcount = %d\n", combcount);
88   //combcount = 0;
89 }
90
91 void PeTable :: ExtractAndDeliverLocalMsgs(int index)
92 {
93   int j;
94   msgstruct m;
95
96   ComlibPrintf("%d:Delivering %d local messages\n", CkMyPe(), msgnum[index]);
97   for (j=msgnum[index]-1;(j>=0);j--) {
98
99         m.msgsize=PeList[index][j]->msgsize;
100         m.msg=PeList[index][j]->msg;
101
102         if (--(PeList[index][j]->refCount) <=0) {
103             CmiSyncSendAndFree(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
104             PTFREE(PeList[index][j]);
105         }
106         else {
107             CmiSyncSend(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
108         }
109         PeList[index][j]=NULL;
110   }
111   msgnum[index]=j+1;
112
113   return;
114 }
115
116
117 #undef PACK
118 #undef PACKMSG
119 #define PACKINT(data) {((int*)t)[0] = data; t+=sizeof(int);}
120 #define PACK(type,data) {junk=(char *)&(data); memcpy(t, junk, sizeof(type)); t+=sizeof(type);}
121 #define PACKMSG(data, size) { memcpy(p+msg_offset, (data), size); msg_offset += size; }
122
123 /*Used for all to all multicast operations.  Assumes that each message
124   is destined to all the processors, to speed up all to all
125   substantially --Sameer 09/03/03 
126   
127   Protocol:
128   |ref|comid|ufield|nmsgs|size|ref|msg1|size2|ref2|msg2|....
129 */
130
131 char * PeTable ::ExtractAndPackAll(comID id, int ufield, int *length)
132 {
133     int nmsgs = 0, i, j;
134     int index = 0;
135
136     ComlibPrintf("[%d] In Extract And Pack All\n", CkMyPe());
137
138     //Increment magic to detect duplicate messages
139     magic++;
140
141     register int total_msg_size = 0;
142
143     //first compute size
144     for (i=0;i<NumPes;i++) {
145         index = i;
146         for (j=msgnum[index]-1; (j>=0); j--) {
147             if (PeList[index][j]->magic != magic) {                
148                 total_msg_size += ALIGN8(PeList[index][j]->msgsize);
149                 total_msg_size += 2 * sizeof(int);
150                 PeList[index][j]->magic=magic;
151
152                 nmsgs ++;
153             }            
154         }
155     }
156     
157     total_msg_size += ALIGN8(sizeof(AllToAllHdr));
158
159     ComlibPrintf("[%d] Message Size %d, nmsgs %d **%d**\n", CkMyPe(), total_msg_size, nmsgs, sizeof(AllToAllHdr));
160     
161     //poiter to the combined message, UGLY NAME
162     char *p = (char *) CmiAlloc(total_msg_size * sizeof(char));    
163
164     ComlibPrintf("After cmialloc\n");
165
166     //buffer to copy stuff into
167     char *t = p; 
168     char *junk = NULL;
169     
170     int dummy = 0;
171     
172     int refno = 0;
173
174     AllToAllHdr ahdr;
175     ahdr.refno = refno;
176     ahdr.id = id;
177     ahdr.ufield = ufield;
178     ahdr.nmsgs = nmsgs;
179
180     /*
181       PACKINT(refno);    
182       PACK(comID, id);
183       
184       PACKINT(ufield);
185       PACKINT(nmsgs);
186       //    PACKINT(dummy); //Aligning to 8 bytes
187     */
188
189     PACK(AllToAllHdr, ahdr);   
190
191     int msg_offset = ALIGN8(sizeof(AllToAllHdr));
192     
193     //Increment magic again for creating the message
194     magic++;
195     for (i=0;i<NumPes;i++) {
196         index=i;
197         int ref = 0;
198         int size;
199
200         for (j=msgnum[index]-1; (j>=0); j--) {
201             //Check if it is a duplicate
202             if (PeList[index][j]->magic != magic) {                
203                 size = PeList[index][j]->msgsize;
204                 PACKMSG(&size, sizeof(int));
205                 PACKMSG(&ref, sizeof(int));
206                 PeList[index][j]->magic=magic;
207                 PACKMSG(PeList[index][j]->msg, size);
208
209                 msg_offset = ALIGN8(msg_offset);
210             }
211
212             //Free it when all the processors have gotten rid of it
213             if (--(PeList[index][j]->refCount) <=0) {
214                 ComlibPrintf("before cmifree \n");
215                 CmiFree(PeList[index][j]->msg);   
216                 ComlibPrintf("after cmifree \n");
217
218                 PTFREE(PeList[index][j]);
219             }
220             //Assign the current processors message pointer to NULL
221             PeList[index][j] = NULL;
222         }
223         msgnum[index] = 0;
224     }
225     
226     *length = total_msg_size;
227     return p;
228 }
229
230 char * PeTable ::ExtractAndPack(comID id, int ufield, int npe, 
231                                 int *pelist, int *length)
232 {
233     char *junk;
234     int mask=~7;
235     int nummsgs, offset, actual_msgsize=0, num_distinctmsgs;
236     
237     ComlibPrintf("In ExtractAndPack %d\n", npe); 
238     
239     int tot_msgsize=TotalMsgSize(npe, pelist, &nummsgs, &num_distinctmsgs);
240     if (tot_msgsize ==0) {
241         *length=0;
242         
243         ComlibPrintf("Returning NULL\n");
244
245         return(NULL);
246     }
247     
248     //int ave_msgsize=(tot_msgsize>MSGSIZETHRESHOLD) ? 
249     //  tot_msgsize/(num_distinctmsgs):tot_msgsize;
250     
251     int msg_offset = CmiReservedHeaderSize + sizeof(comID) 
252         + (npe + 4 + nummsgs) * sizeof(int);  
253
254     msg_offset = ALIGN8(msg_offset);
255     
256     int headersize=msg_offset;
257     
258     *length = tot_msgsize;
259     *length += msg_offset;
260     char *p;
261     p=(char *)CmiAlloc(*length);
262     
263     int l1 = *length;
264     
265     char *t = p + CmiReservedHeaderSize;
266     int i, j;
267     if (!p) CmiAbort("Big time problem\n");
268     magic++;
269
270     int refno = id.refno;    
271
272     PACKINT(refno);
273     PACK(comID, id);
274     PACKINT(ufield);
275     PACKINT(npe);
276     
277     int lesspe=0;
278     int npacked = 0;
279     for (i=0;i<npe;i++) {
280         int index=pelist[i];
281
282         if (msgnum[index]<=0) {
283             lesspe++;
284             continue;
285         }
286         
287         //ComlibPrintf("%d Packing pelist[%d]\n", CkMyPe(), i);
288         register int newval=-1*pelist[i];
289         PACKINT(newval); 
290         for (j=0;j<msgnum[index];j++) {
291             if (PeList[index][j]->magic != magic) {
292                 int tmpms=actual_msgsize+PeList[index][j]->msgsize;
293                 if (tmpms >= MSGSIZETHRESHOLD 
294                     /*|| (PeList[index][j]->msgsize>=ave_msgsize)*/ ) {
295                     
296                     CmiPrintf("%d sending directly\n", CkMyPe());
297                     if (--(PeList[index][j]->refCount) <=0) {
298                         CmiSyncSendAndFree(index, PeList[index][j]->msgsize, 
299                                            (char*)PeList[index][j]->msg);
300                         //ComlibPrintf("%d Freeing msg\n", CkMyPe());
301                         PTFREE(PeList[index][j]);
302                         }
303                     else
304                         CmiSyncSend(index, PeList[index][j]->msgsize, 
305                                     (char*)PeList[index][j]->msg);
306                     PeList[index][j]=NULL;
307                     continue;
308                 }
309                 
310                 npacked ++;
311                 
312                 offset=msg_offset;
313                 PeList[index][j]->magic=magic;
314                 PeList[index][j]->offset=msg_offset;
315                 PTinfo *tempmsg=PeList[index][j];
316                 PACKMSG((&(tempmsg->msgsize)), sizeof(int));
317                 actual_msgsize += tempmsg->msgsize;
318                 int nullptr=-1;
319                 PACKMSG(&nullptr, sizeof(int));
320
321                 PACKMSG(tempmsg->msg, tempmsg->msgsize);
322
323                 msg_offset = ALIGN8(msg_offset);
324                 actual_msgsize = ALIGN8(actual_msgsize);                
325                 actual_msgsize += 2*sizeof(int);
326             }
327             else {
328                 offset=(PeList[index][j]->offset);
329             }
330             
331             //ComlibPrintf("%d Packing msg_offset=%d\n", CkMyPe(), offset);
332             PACKINT(offset); 
333             if (--(PeList[index][j]->refCount) <=0) {
334                 CmiFree(PeList[index][j]->msg);
335                 
336                 PTFREE(PeList[index][j]);
337             }
338             PeList[index][j]=NULL;
339         }
340         msgnum[index]=0;
341     }
342     offset=-1;
343     PACKINT(offset);
344     
345     if (lesspe) {
346         t=p+CmiReservedHeaderSize+2*sizeof(int) + sizeof(comID);
347         npe=npe-lesspe;
348         PACK(int, npe);
349     }
350     if (!actual_msgsize) {
351         if (l1 >= MAXBUFSIZE) 
352             CmiFree(p);
353         *length=0;
354         return(NULL);
355     }
356     
357     *length=actual_msgsize+headersize;
358     return(p);
359
360
361 #undef UNPACK
362 #define UNPACK(type,data) {junk=(char *)&(data); memcpy(junk, t, sizeof(type));t+=sizeof(type);}
363 #undef UNPACKMSG
364 #define UNPACKMSG(dest,src, size) { memcpy(dest, src, size); offset += size;}
365
366 int PeTable :: UnpackAndInsert(void *in)
367 {
368   char *junk;
369   char *t =(char *)in + CmiReservedHeaderSize;
370   int i, ufield, npe, pe, tot_msgsize, ptrlistindex=0;
371   comID id;
372   int refno = 0;
373
374   UNPACK(int, refno);
375   
376   //ComlibPrintf("%d UnPacking id\n", CkMyPe());
377   UNPACK(comID, id);
378   UNPACK(int, ufield);
379   UNPACK(int, npe);
380   
381   register int offset;
382   for (i=0;i<npe;i++) {
383         UNPACK(int, pe);
384         pe *= -1;
385
386         UNPACK(int, offset);
387         while (offset > 0) {
388             int tempmsgsize;
389             UNPACKMSG(&(tempmsgsize), (char *)in+offset, sizeof(int));
390             int ptr;
391             UNPACKMSG(&ptr, (char *)in+offset, sizeof(int));
392
393             if (ptr >=0 )  {
394                 if (msgnum[pe] >= MaxSize[pe]) {
395                     REALLOC(PeList[pe], MaxSize[pe]);
396                     MaxSize[pe] *= 2;
397                 }
398                 PeList[pe][msgnum[pe]]=ptrvec[ptr];
399                 (ptrvec[ptr])->refCount++;
400                 msgnum[pe]++;
401
402                 UNPACK(int, offset);
403                 continue;
404             }
405             PTinfo *temp;
406             PTALLOC(temp);
407             temp->msgsize=tempmsgsize;
408             temp->refCount=1;
409             temp->magic=0;
410             temp->offset=0;
411
412             ptrvec.insert(ptrlistindex, temp);
413             memcpy((char *)in+offset-sizeof(int), &ptrlistindex, sizeof(int));
414
415             ptrlistindex++;
416             temp->msg=(void *)((char *)in+offset);
417             if (msgnum[pe] >= MaxSize[pe]) {
418
419                 REALLOC(PeList[pe], MaxSize[pe]);
420                 MaxSize[pe] *= 2;
421             }
422             PeList[pe][msgnum[pe]]=temp;
423             msgnum[pe]++;
424             UNPACK(int, offset);
425         }
426         t -=sizeof(int);
427   }
428   *(int *)((char *)in -sizeof(int))=ptrlistindex; 
429   
430   if (ptrlistindex==0)
431       CmiFree(in);
432   
433   for (i=0;i<ptrlistindex;i++) {
434       char * actualmsg=(char *)(ptrvec[i]->msg);
435       int *rc=(int *)(actualmsg-sizeof(int));
436       *rc=(int)((char *)in-actualmsg);
437       //ComlibPrintf("I am inserting %d\n", *rc);
438   }
439   
440   return(ufield);
441 }
442
443 /* Unpack and insert an all to all message, the router provides the
444    list of processors to insert into.
445    Same protocol as mentioned earlier.
446 */
447
448 int PeTable :: UnpackAndInsertAll(void *in, int npes, int *pelist){
449   char *junk;
450   char *t =(char *)in /*+CmiReservedHeaderSize*/;
451   int i,  
452       ufield,   // user field or ths stage of the iteration 
453       nmsgs,    // number of messages in combo message
454       refno,    // reference number
455       dummy;    // alignment dummy
456   
457   comID id;
458
459   /*
460     UNPACK(int, refno);      
461     UNPACK(comID, id);
462     
463     UNPACK(int, ufield);
464     UNPACK(int, nmsgs);
465     //UNPACK(int, dummy);
466     int header_size = sizeof(comID) + CmiReservedHeaderSize + 3 *sizeof(int);
467     if(header_size % 8 != 0)
468     t+= 8 - header_size % 8;
469   */
470
471   AllToAllHdr ahdr;
472   UNPACK(AllToAllHdr, ahdr);
473
474   if(sizeof(AllToAllHdr) % 8 != 0)
475       t += 8 - sizeof(AllToAllHdr) % 8;
476
477   refno = ahdr.refno;
478   id = ahdr.id;
479   nmsgs = ahdr.nmsgs;
480   ufield = ahdr.ufield;
481
482   ComlibPrintf("[%d] unpack and insert all %d, %d\n", CkMyPe(), ufield, nmsgs);
483   
484   //Inserting a memory foot print may, change later
485   CmiChunkHeader *chdr= (CmiChunkHeader*)((char*)in - sizeof(CmiChunkHeader));
486
487   for(int count = 0; count < nmsgs; count++){
488       int *ref = 0;
489       int size = 0;
490       char *msg = 0;
491
492       UNPACK(int, size);
493       ref = (int *)t;
494       t += sizeof(int);
495
496       *ref = (int)((char *)(&chdr->ref) - (char *)ref);
497       chdr->ref ++;
498
499       ComlibPrintf("ref = %d, global_ref = %d\n", *ref, chdr->ref);
500
501       msg = t;
502       t += ALIGN8(size);
503       
504       InsertMsgs(npes, pelist, size, msg);
505   }  
506
507   CmiFree(in);
508   return ufield;
509 }
510
511 void PeTable :: GarbageCollect()
512 {
513 }
514
515 GList :: GList()
516 {
517   InList=(InNode *)CmiAlloc(10*sizeof(InNode));
518   InListIndex=0;
519 }
520
521 GList :: ~GList()
522 {
523   CmiFree(InList);
524 }
525
526 int GList :: AddWholeMsg(void *ptr)
527 {
528   InList[InListIndex].flag=0;
529   InList[InListIndex++].ptr=ptr;
530   return(InListIndex-1);
531 }
532
533 void GList :: setRefcount(int indx, int ref)
534 {
535   //ComlibPrintf("setting InList[%d]=%d\n", indx, ref);
536   InList[indx].refCount=ref;
537 }
538
539 void GList :: DeleteWholeMsg(int indx)
540 {
541   //ComlibPrintf("DeleteWholeMsg indx=%d\n", indx);
542   InList[indx].refCount--;
543   if ((InList[indx].refCount <=0) && (InList[indx].flag==0)) {
544         //ComlibPrintf("Deleting msgwhole\n");
545         CmiFree(InList[indx].ptr);
546   }
547 }
548 void GList :: DeleteWholeMsg(int indx, int flag)
549 {
550   InList[indx].refCount--;
551   InList[indx].flag=flag;
552
553 /*
554 void GList :: DeleteWholeMsg(int indx, void *p)
555 {
556   int *rc=(int *)((char *)p-sizeof(int));
557   *rc=(int)((char *)p-(char *)(InList[indx].ptr)+2*sizeof(int));
558 }
559 */
560
561 void GList :: GarbageCollect()
562 {
563   for (int i=0;i<InListIndex;i++) {
564         if ((InList[i].refCount <= 0) && (InList[i].flag >0))
565                 CmiFree(InList[i].ptr);
566   }
567 }
568
569 void GList :: Add(void *ptr )
570 {
571   InList[InListIndex].ptr=ptr;
572   InList[InListIndex++].refCount=0;
573 }
574
575 void GList :: Delete()
576 {
577   InListIndex=0;
578   /******
579   int counter=0;
580   for (int i=0;i< InListIndex;i++) {
581         if (InList[i].refCount <=0)  {
582                 counter++;
583                 CmiFree(InList[i].ptr);
584         }
585   }
586   if (counter == InListIndex) InListIndex=0;
587 ****/
588 }
589