Adding communication library in src/ck-com and src/conv-com
[charm.git] / src / conv-com / petable.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /*********************************************
9  * File : petable.C
10  *
11  * Author: Krishnan V
12  *
13  * The message buffer
14  *********************************************/
15 #include <string.h>
16 #include <stdlib.h>
17 #include <converse.h>
18 #include "comlib.h"
19 #include "petable.h"
20 #include "converse.h"
21
22 #define BIGBUFFERSIZE 65536
23 #define PTPREALLOC    100
24
25 struct AllToAllHdr{
26     char dummy[CmiReservedHeaderSize];
27     int refno;
28     comID id;
29     int ufield;
30     int nmsgs;
31 };
32
33 #define ALIGN8(x)       (int)((~7)&((x)+7))
34
35 /* Reduce the no. of mallocs by allocating from
36  * a free list */
37 #define PTALLOC(ktmp) {\
38   if (PTFreeList) {\
39         ktmp=PTFreeList;\
40         PTFreeList=ktmp->next;\
41   }\
42   else {\
43         ktmp=(PTinfo *)CmiAlloc(sizeof(PTinfo));\
44         }\
45 }
46
47 #define PTFREE(ktmp) {\
48   ktmp->next=PTFreeList;\
49   PTFreeList=ktmp;\
50 }
51
52 #define REALLOC(ktmp, ksize) {\
53    PTinfo **junkptr=(PTinfo **)CmiAlloc(2*ksize*sizeof(void *));\
54    for (int ki=0; ki<ksize;ki++) junkptr[ki]=ktmp[ki];\
55    CmiFree(ktmp);\
56    ktmp=junkptr;\
57 }
58
59 /**************************************************************
60  * Preallocated memory=P*MSGQLEN ptr + 2P ints + 1000 ptrs
61  **************************************************************/
62 PeTable :: PeTable(int n)
63 {
64   NumPes=n;
65   magic=0;
66   PeList = (PTinfo ***)CmiAlloc(sizeof(PTinfo *)*NumPes);
67   //  ComlibPrintf("Pelist[%d][%d]\n", NumPes, MSGQLEN);
68   msgnum=new int[NumPes];
69   MaxSize=new int[NumPes];
70   for (int i=0;i<NumPes;i++) {
71         msgnum[i]=0;
72         MaxSize[i]=MSGQLEN;
73         PeList[i]=(PTinfo **)CmiAlloc(sizeof(PTinfo *)*MSGQLEN);
74         for (int j=0;j<MSGQLEN;j++) PeList[i][j]=0;
75   }
76
77   //ptrlist=(PTinfo **)CmiAlloc(1000*sizeof(PTinfo *));
78   //  FreeList= new GList;
79   //CombBuffer=(char *)CmiAlloc(BIGBUFFERSIZE);
80
81   PTFreeList=NULL;
82 }
83
84 PeTable :: ~PeTable()
85 {
86   int i;
87   for (i=0;i<NumPes;i++) CmiFree(PeList[i]);
88   CmiFree(PeList);
89   delete msgnum;
90   delete MaxSize;
91   GarbageCollect();
92   //CmiFree(ptrlist);
93   PTinfo *tmp;
94   while (PTFreeList) {
95         tmp=PTFreeList;
96         PTFreeList=tmp->next;
97         CmiFree(tmp);
98   }
99  // delete FreeList;
100
101 }
102
103 void PeTable:: Purge()
104 {
105   for (int i=0; i<NumPes;i++) {
106         if (msgnum[i]) {
107             // ComlibPrintf("%d Warning: %d Undelivered Messages for %d\n", CkMyPe(), msgnum[i], i);
108           //msgnum[i]=0;
109         }
110   }
111   GarbageCollect();
112   //  ComlibPrintf("combcount = %d\n", combcount);
113   //combcount = 0;
114 }
115
116 void PeTable :: InsertMsgs(int npe, int *pelist, int size, void *msg)
117 {
118   PTinfo *tmp;
119   PTALLOC(tmp);
120   tmp->refCount=0;
121   tmp->magic=0;
122   tmp->offset=0;
123   tmp->freelistindex=-1;
124   tmp->msgsize=size;
125   tmp->msg=msg;
126
127   for (int j=0;j<npe;j++) {
128     tmp->refCount++;
129     int index=pelist[j];
130     
131     ComlibPrintf("[%d] Inserting %d %d %d\n", CkMyPe(), msgnum[index], index, size);
132     
133     if (msgnum[index] >= MaxSize[index]) {
134         REALLOC(PeList[index], MaxSize[index]);
135         MaxSize[index] *= 2;
136     }
137     PeList[index][msgnum[index]]=tmp;
138     msgnum[index]++;
139   }
140 }
141
142 void PeTable :: InsertMsgs(int npe, int *pelist, int nmsgs, void **msglist)
143 {
144   msgstruct **m=(msgstruct **)msglist;
145   for (int i=0;i<nmsgs;i++)
146       InsertMsgs(npe, pelist, m[i]->msgsize, m[i]->msg);
147 }
148
149 void PeTable :: ExtractAndDeliverLocalMsgs(int index)
150 {
151   int j;
152   msgstruct m;
153
154   ComlibPrintf("%d:Delivering %d local messages\n", CkMyPe(), msgnum[index]);
155   for (j=msgnum[index]-1;(j>=0);j--) {
156
157         m.msgsize=PeList[index][j]->msgsize;
158         m.msg=PeList[index][j]->msg;
159
160         if (--(PeList[index][j]->refCount) <=0) {
161             CmiSyncSendAndFree(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
162             PTFREE(PeList[index][j]);
163         }
164         else {
165             CmiSyncSend(CkMyPe()/*index*/, m.msgsize, (char*)m.msg);
166         }
167         PeList[index][j]=NULL;
168   }
169   msgnum[index]=j+1;
170
171   return;
172 }
173
174 int PeTable :: TotalMsgSize(int npe, int *pelist, int *nm, int *nd)
175 {
176     register int totsize=0;
177     magic++;
178     *nm=0;
179     *nd=0;        
180
181     for (int i=0;i<npe;i++) {
182         
183         int index = pelist[i];
184         
185         *nm += msgnum[index];
186
187         ComlibPrintf("%d: NUM MSGS %d, %d\n", CmiMyPe(), index, 
188                      msgnum[index]);
189
190         for (int j=0;j<msgnum[index];j++) {
191             if (PeList[index][j]->magic != magic) {
192
193                 int tmp_size = PeList[index][j]->msgsize;
194                 if(tmp_size % 8 != 0)
195                     tmp_size += 8 - tmp_size % 8;
196                 
197                 totsize += tmp_size;                
198                 totsize += sizeof(int)+sizeof(int);
199                 
200                 PeList[index][j]->magic=magic;
201                 (*nd)++;
202             }
203         }
204     }
205     return(totsize);
206 }
207
208
209 #undef PACK
210 #undef PACKMSG
211 #define PACKINT(data) {((int*)t)[0] = data; t+=sizeof(int);}
212 #define PACK(type,data) {junk=(char *)&(data); memcpy(t, junk, sizeof(type)); t+=sizeof(type);}
213 #define PACKMSG(data, size) { memcpy(p+msg_offset, (data), size); msg_offset += size; }
214
215 /*Used for all to all multicast operations.  Assumes that each message
216   is destined to all the processors, to speed up all to all
217   substantially --Sameer 09/03/03 
218   
219   Protocol:
220   |ref|comid|ufield|nmsgs|size|ref|msg1|size2|ref2|msg2|....
221 */
222
223 char * PeTable ::ExtractAndPackAll(comID id, int ufield, int *length)
224 {
225     int nmsgs = 0, i, j;
226     int index = 0;
227
228     ComlibPrintf("[%d] In Extract And Pack All\n", CkMyPe());
229
230     //Increment magic to detect duplicate messages
231     magic++;
232
233     register int total_msg_size = 0;
234
235     //first compute size
236     for (i=0;i<NumPes;i++) {
237         index = i;
238         for (j=msgnum[index]-1; (j>=0); j--) {
239             if (PeList[index][j]->magic != magic) {                
240                 total_msg_size += ALIGN8(PeList[index][j]->msgsize);
241                 total_msg_size += 2 * sizeof(int);
242                 PeList[index][j]->magic=magic;
243
244                 nmsgs ++;
245             }            
246         }
247     }
248     
249     total_msg_size += ALIGN8(sizeof(AllToAllHdr));
250
251     ComlibPrintf("[%d] Message Size %d, nmsgs %d **%d**\n", CkMyPe(), total_msg_size, nmsgs, sizeof(AllToAllHdr));
252     
253     //poiter to the combined message, UGLY NAME
254     char *p = (char *) CmiAlloc(total_msg_size * sizeof(char));    
255
256     ComlibPrintf("After cmialloc\n");
257
258     //buffer to copy stuff into
259     char *t = p; 
260     char *junk = NULL;
261     
262     int dummy = 0;
263     
264     int refno = 0;
265
266     AllToAllHdr ahdr;
267     ahdr.refno = refno;
268     ahdr.id = id;
269     ahdr.ufield = ufield;
270     ahdr.nmsgs = nmsgs;
271
272     /*
273       PACKINT(refno);    
274       PACK(comID, id);
275       
276       PACKINT(ufield);
277       PACKINT(nmsgs);
278       //    PACKINT(dummy); //Aligning to 8 bytes
279     */
280
281     PACK(AllToAllHdr, ahdr);   
282
283     int msg_offset = ALIGN8(sizeof(AllToAllHdr));
284     
285     //Increment magic again for creating the message
286     magic++;
287     for (i=0;i<NumPes;i++) {
288         index=i;
289         int ref = 0;
290         int size;
291
292         for (j=msgnum[index]-1; (j>=0); j--) {
293             //Check if it is a duplicate
294             if (PeList[index][j]->magic != magic) {                
295                 size = PeList[index][j]->msgsize;
296                 PACKMSG(&size, sizeof(int));
297                 PACKMSG(&ref, sizeof(int));
298                 PeList[index][j]->magic=magic;
299                 PACKMSG(PeList[index][j]->msg, size);
300                 if(msg_offset % 8 != 0)
301                     msg_offset += 8 - msg_offset%8;
302             }
303
304             //Free it when all the processors have gotten rid of it
305             if (--(PeList[index][j]->refCount) <=0) {
306                 ComlibPrintf("before cmifree \n");
307                 CmiFree(PeList[index][j]->msg);   
308                 ComlibPrintf("after cmifree \n");
309
310                 PTFREE(PeList[index][j]);
311             }
312             //Assign the current processors message pointer to NULL
313             PeList[index][j] = NULL;
314         }
315         msgnum[index] = 0;
316     }
317     
318     *length = total_msg_size;
319     return p;
320 }
321
322 char * PeTable ::ExtractAndPack(comID id, int ufield, int npe, 
323                                 int *pelist, int *length)
324 {
325     char *junk;
326     int mask=~7;
327     int nummsgs, offset, actual_msgsize=0, num_distinctmsgs;
328     
329     ComlibPrintf("In ExtractAndPack %d\n", npe); 
330     
331     int tot_msgsize=TotalMsgSize(npe, pelist, &nummsgs, &num_distinctmsgs);
332     if (tot_msgsize ==0) {
333         *length=0;
334         
335         ComlibPrintf("Returning NULL\n");
336
337         return(NULL);
338     }
339     
340     //int ave_msgsize=(tot_msgsize>MSGSIZETHRESHOLD) ? 
341     //  tot_msgsize/(num_distinctmsgs):tot_msgsize;
342     
343     int msg_offset = CmiReservedHeaderSize + sizeof(comID) 
344         + (npe + 4 + nummsgs) * sizeof(int);  
345
346     if(msg_offset % 8 != 0)
347         msg_offset += 8 - msg_offset % 8;
348     
349     int headersize=msg_offset;
350     
351     *length = tot_msgsize;
352     *length += msg_offset;
353     char *p;
354     p=(char *)CmiAlloc(*length);
355     
356     int l1 = *length;
357     
358     char *t = p + CmiReservedHeaderSize;
359     int i, j;
360     if (!p) CmiAbort("Big time problem\n");
361     magic++;
362
363     int refno = id.refno;    
364
365     PACKINT(refno);
366     PACK(comID, id);
367     PACKINT(ufield);
368     PACKINT(npe);
369     
370     int lesspe=0;
371     int npacked = 0;
372     for (i=0;i<npe;i++) {
373         int index=pelist[i];
374         if (msgnum[index]<=0) {
375             lesspe++;
376             continue;
377         }
378         
379         //ComlibPrintf("%d Packing pelist[%d]\n", CkMyPe(), i);
380         register int newval=-1*pelist[i];
381         PACKINT(newval); 
382         for (j=0;j<msgnum[index];j++) {
383             if (PeList[index][j]->magic != magic) {
384                 int tmpms=actual_msgsize+PeList[index][j]->msgsize;
385                 if (tmpms >= MSGSIZETHRESHOLD 
386                     /*|| (PeList[index][j]->msgsize>=ave_msgsize)*/ ) {
387                     
388                     CmiPrintf("%d sending directly\n", CkMyPe());
389                     if (--(PeList[index][j]->refCount) <=0) {
390                         CmiSyncSendAndFree(index, PeList[index][j]->msgsize, 
391                                            (char*)PeList[index][j]->msg);
392                         //ComlibPrintf("%d Freeing msg\n", CkMyPe());
393                         PTFREE(PeList[index][j]);
394                         }
395                     else
396                         CmiSyncSend(index, PeList[index][j]->msgsize, 
397                                     (char*)PeList[index][j]->msg);
398                     PeList[index][j]=NULL;
399                     continue;
400                 }
401                 
402                 npacked ++;
403                 
404                 offset=msg_offset;
405                 PeList[index][j]->magic=magic;
406                 PeList[index][j]->offset=msg_offset;
407                 PTinfo *tempmsg=PeList[index][j];
408                 PACKMSG((&(tempmsg->msgsize)), sizeof(int));
409                 actual_msgsize += tempmsg->msgsize;
410                 int nullptr=-1;
411                 PACKMSG(&nullptr, sizeof(int));
412
413                 PACKMSG(tempmsg->msg, tempmsg->msgsize);
414                 if(msg_offset % 8 != 0)
415                     msg_offset += 8 - msg_offset%8;
416
417                 if(actual_msgsize % 8 != 0)                    
418                     actual_msgsize += 8 - actual_msgsize % 8;
419                 
420                 actual_msgsize += 2*sizeof(int);
421             }
422             else {
423                 offset=(PeList[index][j]->offset);
424             }
425             
426             //ComlibPrintf("%d Packing msg_offset=%d\n", CkMyPe(), offset);
427             PACKINT(offset); 
428             if (--(PeList[index][j]->refCount) <=0) {
429                 CmiFree(PeList[index][j]->msg);
430                 
431                 PTFREE(PeList[index][j]);
432             }
433             PeList[index][j]=NULL;
434         }
435         msgnum[index]=0;
436     }
437     offset=-1;
438     PACKINT(offset);
439     
440     if (lesspe) {
441         t=p+CmiReservedHeaderSize+2*sizeof(int) + sizeof(comID);
442         npe=npe-lesspe;
443         PACK(int, npe);
444     }
445     if (!actual_msgsize) {
446         if (l1 >= MAXBUFSIZE) 
447             CmiFree(p);
448         *length=0;
449         return(NULL);
450     }
451     
452     *length=actual_msgsize+headersize;
453     return(p);
454
455
456 #undef UNPACK
457 #define UNPACK(type,data) {junk=(char *)&(data); memcpy(junk, t, sizeof(type));t+=sizeof(type);}
458 #undef UNPACKMSG
459 #define UNPACKMSG(dest,src, size) { memcpy(dest, src, size); offset += size;}
460
461 int PeTable :: UnpackAndInsert(void *in)
462 {
463   char *junk;
464   char *t =(char *)in + CmiReservedHeaderSize;
465   int i, ufield, npe, pe, tot_msgsize, ptrlistindex=0;
466   comID id;
467   int refno = 0;
468
469   UNPACK(int, refno);
470   
471   //ComlibPrintf("%d UnPacking id\n", CkMyPe());
472   UNPACK(comID, id);
473   UNPACK(int, ufield);
474   UNPACK(int, npe);
475   
476   register int offset;
477   for (i=0;i<npe;i++) {
478         UNPACK(int, pe);
479         pe *= -1;
480
481         UNPACK(int, offset);
482         while (offset > 0) {
483             int tempmsgsize;
484             UNPACKMSG(&(tempmsgsize), (char *)in+offset, sizeof(int));
485             int ptr;
486             UNPACKMSG(&ptr, (char *)in+offset, sizeof(int));
487
488             if (ptr >=0 )  {
489                 if (msgnum[pe] >= MaxSize[pe]) {
490                     REALLOC(PeList[pe], MaxSize[pe]);
491                     MaxSize[pe] *= 2;
492                 }
493                 PeList[pe][msgnum[pe]]=ptrvec[ptr];
494                 (ptrvec[ptr])->refCount++;
495                 msgnum[pe]++;
496
497                 UNPACK(int, offset);
498                 continue;
499             }
500             PTinfo *temp;
501             PTALLOC(temp);
502             temp->msgsize=tempmsgsize;
503             temp->refCount=1;
504             temp->magic=0;
505             temp->offset=0;
506
507             ptrvec.insert(ptrlistindex, temp);
508             memcpy((char *)in+offset-sizeof(int), &ptrlistindex, sizeof(int));
509
510             ptrlistindex++;
511             temp->msg=(void *)((char *)in+offset);
512             if (msgnum[pe] >= MaxSize[pe]) {
513
514                 REALLOC(PeList[pe], MaxSize[pe]);
515                 MaxSize[pe] *= 2;
516             }
517             PeList[pe][msgnum[pe]]=temp;
518             msgnum[pe]++;
519             UNPACK(int, offset);
520         }
521         t -=sizeof(int);
522   }
523   *(int *)((char *)in -sizeof(int))=ptrlistindex; 
524   
525   if (ptrlistindex==0)
526       CmiFree(in);
527   
528   for (i=0;i<ptrlistindex;i++) {
529       char * actualmsg=(char *)(ptrvec[i]->msg);
530       int *rc=(int *)(actualmsg-sizeof(int));
531       *rc=(int)((char *)in-actualmsg);
532       //ComlibPrintf("I am inserting %d\n", *rc);
533   }
534   
535   return(ufield);
536 }
537
538 /* Unpack and insert an all to all message, the router provides the
539    list of processors to insert into.
540    Same protocol as mentioned earlier.
541 */
542
543 int PeTable :: UnpackAndInsertAll(void *in, int npes, int *pelist){
544   char *junk;
545   char *t =(char *)in /*+CmiReservedHeaderSize*/;
546   int i,  
547       ufield,   // user field or ths stage of the iteration 
548       nmsgs,    // number of messages in combo message
549       refno,    // reference number
550       dummy;    // alignment dummy
551   
552   comID id;
553
554   /*
555     UNPACK(int, refno);      
556     UNPACK(comID, id);
557     
558     UNPACK(int, ufield);
559     UNPACK(int, nmsgs);
560     //UNPACK(int, dummy);
561     int header_size = sizeof(comID) + CmiReservedHeaderSize + 3 *sizeof(int);
562     if(header_size % 8 != 0)
563     t+= 8 - header_size % 8;
564   */
565
566   AllToAllHdr ahdr;
567   UNPACK(AllToAllHdr, ahdr);
568
569   if(sizeof(AllToAllHdr) % 8 != 0)
570       t += 8 - sizeof(AllToAllHdr) % 8;
571
572   refno = ahdr.refno;
573   id = ahdr.id;
574   nmsgs = ahdr.nmsgs;
575   ufield = ahdr.ufield;
576
577   ComlibPrintf("[%d] unpack and insert all %d, %d\n", CkMyPe(), ufield, nmsgs);
578   
579   //Inserting a memory foot print may, change later
580   CmiChunkHeader *chdr= (CmiChunkHeader*)((char*)in - sizeof(CmiChunkHeader));
581
582   for(int count = 0; count < nmsgs; count++){
583       int *ref = 0;
584       int size = 0;
585       char *msg = 0;
586
587       UNPACK(int, size);
588       ref = (int *)t;
589       t += sizeof(int);
590
591       *ref = (int)((char *)(&chdr->ref) - (char *)ref);
592       chdr->ref ++;
593
594       ComlibPrintf("ref = %d, global_ref = %d\n", *ref, chdr->ref);
595
596       msg = t;
597       t += ALIGN8(size);
598       
599       InsertMsgs(npes, pelist, size, msg);
600   }  
601
602   CmiFree(in);
603   return ufield;
604 }
605
606 void PeTable :: GarbageCollect()
607 {
608 }
609
610 GList :: GList()
611 {
612   InList=(InNode *)CmiAlloc(10*sizeof(InNode));
613   InListIndex=0;
614 }
615
616 GList :: ~GList()
617 {
618   CmiFree(InList);
619 }
620
621 int GList :: AddWholeMsg(void *ptr)
622 {
623   InList[InListIndex].flag=0;
624   InList[InListIndex++].ptr=ptr;
625   return(InListIndex-1);
626 }
627
628 void GList :: setRefcount(int indx, int ref)
629 {
630   //ComlibPrintf("setting InList[%d]=%d\n", indx, ref);
631   InList[indx].refCount=ref;
632 }
633
634 void GList :: DeleteWholeMsg(int indx)
635 {
636   //ComlibPrintf("DeleteWholeMsg indx=%d\n", indx);
637   InList[indx].refCount--;
638   if ((InList[indx].refCount <=0) && (InList[indx].flag==0)) {
639         //ComlibPrintf("Deleting msgwhole\n");
640         CmiFree(InList[indx].ptr);
641   }
642 }
643 void GList :: DeleteWholeMsg(int indx, int flag)
644 {
645   InList[indx].refCount--;
646   InList[indx].flag=flag;
647
648 /*
649 void GList :: DeleteWholeMsg(int indx, void *p)
650 {
651   int *rc=(int *)((char *)p-sizeof(int));
652   *rc=(int)((char *)p-(char *)(InList[indx].ptr)+2*sizeof(int));
653 }
654 */
655
656 void GList :: GarbageCollect()
657 {
658   for (int i=0;i<InListIndex;i++) {
659         if ((InList[i].refCount <= 0) && (InList[i].flag >0))
660                 CmiFree(InList[i].ptr);
661   }
662 }
663
664 void GList :: Add(void *ptr )
665 {
666   InList[InListIndex].ptr=ptr;
667   InList[InListIndex++].refCount=0;
668 }
669
670 void GList :: Delete()
671 {
672   InListIndex=0;
673   /******
674   int counter=0;
675   for (int i=0;i< InListIndex;i++) {
676         if (InList[i].refCount <=0)  {
677                 counter++;
678                 CmiFree(InList[i].ptr);
679         }
680   }
681   if (counter == InListIndex) InListIndex=0;
682 ****/
683 }
684