Fence-based synchronization code added.
[charm.git] / src / arch / net / machine-pxshm.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * pxshm --> posix shared memory based network layer for communication
4  * between processes on the same node
5  * This is not going to be the primary mode of communication 
6  * but only for messages below a certain size between
7  * processes on the same node
8  * for non-smp version only
9  * * @ingroup NET
10  * contains only pxshm code for 
11  * - CmiInitPxshm()
12  * - DeliverViaPxShm()
13  * - CommunicationServerPxshm()
14  * - CmiMachineExitPxshm()
15
16   created by 
17         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
18 */
19
20 /**
21  * @addtogroup NET
22  * @{
23  */
24
25 #include <sys/types.h>
26 #include <sys/mman.h>
27 #include <unistd.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <errno.h>
31 #include <semaphore.h>
32
33 #define MEMDEBUG(x) //x
34
35 #define PXSHM_STATS 0
36
37 #define PXSHM_LOCK 1
38 #define PXSHM_FENCE !PXSHM_LOCK
39
40
41 /*** The following code was copied verbatim from pcqueue.h file ***/
42 #define PCQUEUE_FENCE
43 #ifdef PCQUEUE_FENCE
44 #ifdef POWER_PC
45 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
46 #else
47 #define CmiMemoryWriteFence(startPtr,nBytes) {int i=0;asm("lock incl %0" :: "m" (i));}
48 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
49 //#define CmiMemoryWriteFence(startPtr,nBytes)  asm volatile("mfence":::"memory")
50 #endif
51 #else
52 #define CmiMemoryWriteFence(startPtr,nBytes)  
53 #endif
54
55 #ifdef PCQUEUE_FENCE
56 #ifdef POWER_PC
57 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
58 #else
59 //#define CmiMemoryReadFence(startPtr,nBytes) CmiMemoryWriteFence(startPtr,nBytes)
60 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
61 #endif
62 #else
63 #define CmiMemoryReadFence(startPtr,nBytes) 
64 #endif
65
66 /***************************************************************************************/
67
68 enum entities {SENDER,RECEIVER};
69
70 /************************
71  *      Implementation currently assumes that
72  *      1) all nodes have the same number of processors
73  *  2) in the nodelist all processors in a node are listed in sequence
74  *   0 1 2 3      4 5 6 7 
75  *   -------      -------
76  *    node 1       node 2 
77  ************************/
78
79 #define NAMESTRLEN 50
80 #define PREFIXSTRLEN 30 
81
82 #define SHMBUFLEN 1000000
83
84 #define SENDQSTARTSIZE 128
85
86 typedef struct {
87         int count; //number of messages
88         int bytes; //number of bytes
89 #if PXSHM_FENCE
90         int flagSender;
91         int flagReceiver;
92         int turn;
93 #endif  
94 } sharedBufHeader;
95
96 typedef struct {
97         sem_t *mutex;
98         sharedBufHeader *header;        
99         char *data;
100 } sharedBufData;
101
102 typedef struct {
103         int size; //total size of data array
104         int begin; //position of first element
105         int end;        //position of next element
106         int numEntries; //number of entries
107
108         OutgoingMsg *data;
109
110 } PxshmSendQ;
111
112 typedef struct {
113         int nodesize;
114         int noderank;
115         int nodestart,nodeend;//proc numbers for the start and end of this node
116         char prefixStr[PREFIXSTRLEN];
117         char **recvBufNames;
118         char **sendBufNames;
119
120         sharedBufData *recvBufs;
121         sharedBufData *sendBufs;
122
123         PxshmSendQ **sendQs;
124
125
126 #if PXSHM_STATS
127         int sendCount;
128         int validCheckCount;
129         double validCheckTime;
130         double sendTime;
131         double commServerTime;
132 #endif
133
134 } PxshmContext;
135
136
137
138 PxshmContext *pxshmContext=NULL; //global context
139
140
141 void calculateNodeSizeAndRank(char **);
142 void setupSharedBuffers();
143 void initAllSendQs();
144
145 /******************
146  *      Initialization routine
147  *      currently just testing start up
148  * ****************/
149 void CmiInitPxshm(char **argv){
150         MACHSTATE(3,"CminitPxshm start");
151         pxshmContext = (PxshmContext *)malloc(sizeof(PxshmContext));
152
153         if(Cmi_charmrun_pid <= 0){
154                 CmiAbort("pxshm must be run with charmrun");
155         }
156         calculateNodeSizeAndRank(argv);
157         if(pxshmContext->nodesize == 1){
158                 return;
159         }
160         
161         MACHSTATE1(3,"CminitPxshm  %d calculateNodeSizeAndRank",pxshmContext->nodesize);
162
163         snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
164
165
166         MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
167
168         setupSharedBuffers();
169
170         MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
171
172
173         initAllSendQs();
174         
175         MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
176
177         MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
178
179
180 #if PXSHM_STATS
181         pxshmContext->sendCount=0;
182         pxshmContext->sendTime=0.0;
183         pxshmContext->validCheckCount=0;
184         pxshmContext->validCheckTime=0.0;
185         pxshmContext->commServerTime = 0;
186 #endif
187
188 };
189
190 /**************
191  * shutdown shmem objects and semaphores
192  *
193  * *******************/
194 void tearDownSharedBuffers();
195
196 void CmiExitPxshm(){
197         int i=0;
198         
199         if(pxshmContext->nodesize != 1){
200                 tearDownSharedBuffers();
201         
202                 for(i=0;i<pxshmContext->nodesize;i++){
203                         if(i != pxshmContext->noderank){
204                                 break;
205                         }
206                         free(pxshmContext->recvBufNames[i]);
207                         free(pxshmContext->sendBufNames[i]);
208                 }
209                 free(pxshmContext->recvBufNames);
210                 free(pxshmContext->sendBufNames);
211
212                 free(pxshmContext->recvBufs);
213                 free(pxshmContext->sendBufs);
214
215         }
216 #if PXSHM_STATS
217 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime);
218 #endif
219         free(pxshmContext);
220 }
221
222 /******************
223  *Should this message be sent using PxShm or not ?
224  * ***********************/
225
226 inline int CmiValidPxshm(OutgoingMsg ogm, OtherNode node){
227 #if PXSHM_STATS
228         pxshmContext->validCheckCount++;
229 #endif
230
231 /*      if(pxshmContext->nodesize == 1){
232                 return 0;
233         }*/
234         //replace by bitmap later
235         if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
236                 return 1;
237         }else{
238                 return 0;
239         }
240 };
241
242
243 inline int PxshmRank(int dst){
244         return dst - pxshmContext->nodestart;
245 }
246 inline void pushSendQ(PxshmSendQ *q,OutgoingMsg msg);
247 inline int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
248 inline int flushSendQ(int dstRank);
249
250 /***************
251  *
252  *Send this message through shared memory
253  *if you cannot get lock, put it in the sendQ
254  *Before sending messages pick them from sendQ
255  *
256  * ****************************/
257
258 void CmiSendMessagePxshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot){
259 #if PXSHM_STATS
260         double _startSendTime = CmiWallTimer();
261 #endif
262
263         int dstRank = PxshmRank(ogm->dst);
264         MEMDEBUG(CmiMemoryCheck());
265   
266         DgramHeaderMake(ogm->data,rank,ogm->src,Cmi_charmrun_pid,1, broot);
267         
268   
269         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
270
271         CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
272         
273         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
274
275 #if PXSHM_LOCK
276         if(sem_trywait(dstBuf->mutex) < 0){
277 #else
278         dstBuf->header->flagSender = 1;
279         dstBuf->header->turn = RECEIVER;
280         CmiMemoryReadFence(0,0);
281         CmiMemoryWriteFence(0,0);
282         if(!(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER)){
283 #endif
284                 /**failed to get the lock 
285                 insert into q and retain the message*/
286
287                 pushSendQ(pxshmContext->sendQs[dstRank],ogm);
288                 ogm->refcount++;
289                 MEMDEBUG(CmiMemoryCheck());
290                 return;
291         }else{
292
293                 /***
294                  * We got the lock for this buffer
295                  * first write all the messages in the sendQ and then write this guy
296                  * */
297                  if(pxshmContext->sendQs[dstRank]->numEntries == 0){
298                                 int ret = sendMessage(ogm,dstBuf,pxshmContext->sendQs[dstRank]);
299                                 MACHSTATE(3,"Pxshm Send succeeded immediately");
300                  }else{
301                                 ogm->refcount+=2;/*this message should not get deleted when the queue is flushed*/
302                                 pushSendQ(pxshmContext->sendQs[dstRank],ogm);
303                                 MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
304                                 int sent = flushSendQ(dstRank);
305                                 ogm->refcount--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
306                                 MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
307                  }
308                  /* unlock the recvbuffer*/
309 #if PXSHM_LOCK
310                  sem_post(dstBuf->mutex);
311 #else
312                 dstBuf->header->flagSender = 0;
313 #endif
314         }
315 #if PXSHM_STATS
316                 pxshmContext->sendCount ++;
317                 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
318 #endif
319         MEMDEBUG(CmiMemoryCheck());
320
321 };
322
323 inline void emptyAllRecvBufs();
324 inline void flushAllSendQs();
325
326 /**********
327  * Extract all the messages from the recvBuffers you can
328  * Flush all sendQs
329  * ***/
330 inline void CommunicationServerPxshm(){
331 #if PXSHM_STATS
332         double _startCommServerTime =CmiWallTimer();
333 #endif  
334         
335         MEMDEBUG(CmiMemoryCheck());
336         emptyAllRecvBufs();
337         flushAllSendQs();
338         
339 #if PXSHM_STATS
340         pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
341 #endif
342         MEMDEBUG(CmiMemoryCheck());
343 };
344
345 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
346         CommunicationServerPxshm();
347 }
348
349
350 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
351 {
352   CmiNotifyStillIdle(s);
353 }
354
355
356 void calculateNodeSizeAndRank(char **argv){
357         pxshmContext->nodesize=1;
358         MACHSTATE(3,"calculateNodeSizeAndRank start");
359         //CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
360         CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
361         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
362
363         pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
364         
365         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
366         
367         pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
368         
369         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
370
371         pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
372
373         if(pxshmContext->nodeend >= _Cmi_numnodes){
374                 pxshmContext->nodeend = _Cmi_numnodes-1;
375                 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
376         }
377         
378         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
379 }
380
381 void allocBufNameStrings(char ***bufName);
382 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
383 /***************
384  *      calculate the name of the shared objects and semaphores
385  *      
386  *      name scheme
387  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
388  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
389  *                the semaphore name used by us is the same as the shared memory object name
390  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
391  *
392  *      open these shared objects and semaphores
393  * *********/
394 void setupSharedBuffers(){
395         int i=0;
396
397         allocBufNameStrings(&(pxshmContext->recvBufNames));
398         
399         MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
400         MEMDEBUG(CmiMemoryCheck());
401
402         allocBufNameStrings((&pxshmContext->sendBufNames));
403         
404         MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
405
406         for(i=0;i<pxshmContext->nodesize;i++){
407                 if(i != pxshmContext->noderank){
408                         snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
409                         MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
410                         snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
411                         MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
412                 }
413         }
414         
415         createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
416         createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
417         
418         for(i=0;i<pxshmContext->nodesize;i++){
419                 if(i != pxshmContext->noderank){
420                         CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
421                         pxshmContext->sendBufs[i].header->count = 0;
422                         pxshmContext->sendBufs[i].header->bytes = 0;
423                 }
424         }
425 }
426
427 void allocBufNameStrings(char ***bufName){
428         int i,count;
429         
430         int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
431         char *tmp = malloc(totalAlloc);
432         
433         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
434
435         *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
436         
437         for(i=0,count=0;i<pxshmContext->nodesize;i++){
438                 if(i != pxshmContext->noderank){
439                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
440                         count++;
441                 }else{
442                         (*bufName)[i] = NULL;
443                 }
444         }
445 }
446
447 void createShmObject(char *name,int size,char **pPtr);
448
449 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
450         int i=0;
451         
452         *bufs = (sharedBufData *)malloc(sizeof(sharedBufData)*pxshmContext->nodesize);
453         
454         for(i=0;i<pxshmContext->nodesize;i++){
455                 if(i != pxshmContext->noderank){
456                         createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
457                         (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
458                         (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
459                 }else{
460                         (*bufs)[i].header = NULL;
461                         (*bufs)[i].data = NULL;
462                         (*bufs)[i].mutex = NULL;
463                 }
464         }       
465 }
466
467
468
469 void createShmObject(char *name,int size,char **pPtr){
470         int fd;
471         int flags;      // opening flags for shared object
472         
473         flags= O_RDWR | O_CREAT; // open file in read-write mode and create it if its not there
474         
475         fd = shm_open(name,flags, S_IRUSR | S_IWUSR); // create the shared object with permissions for only the user to read and write
476
477         if(fd < 0){
478                 fprintf(stderr,"Error from shm_open %s\n",strerror(errno));
479         }
480         CmiAssert(fd >= 0);
481
482         ftruncate(fd,size); //set the size of the shared memory object
483
484         *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
485         CmiAssert(*pPtr != NULL);
486
487         close(fd);
488 }
489
490 void tearDownSharedBuffers(){
491         int i;
492         for(i= 0;i<pxshmContext->nodesize;i++){
493                 if(i != pxshmContext->noderank){
494                         if(     shm_unlink(pxshmContext->recvBufNames[i]) < 0){
495                                 fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
496                         }
497
498                         sem_close(pxshmContext->recvBufs[i].mutex);
499                         sem_unlink(pxshmContext->recvBufNames[i]);
500
501                         sem_close(pxshmContext->sendBufs[i].mutex);
502                 }
503         }
504 };
505
506
507 void initSendQ(PxshmSendQ *q,int size);
508
509 void initAllSendQs(){
510         int i=0;
511         pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
512         for(i=0;i<pxshmContext->nodesize;i++){
513                 if(i != pxshmContext->noderank){
514                         (pxshmContext->sendQs)[i] = (PxshmSendQ *)malloc(sizeof(PxshmSendQ));
515                         initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE);
516                 }else{
517                         (pxshmContext->sendQs)[i] = NULL;
518                 }
519         }
520 };
521
522
523 /****************
524  *copy this message into the sharedBuf
525  If it does not succeed
526  *put it into the sendQ 
527  *NOTE: This method is called only after obtaining the corresponding mutex
528  * ********/
529 int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
530
531         if(dstBuf->header->bytes+ogm->size <= SHMBUFLEN){
532                 /**copy  this message to sharedBuf **/
533                 dstBuf->header->count++;
534                 memcpy(dstBuf->data+dstBuf->header->bytes,ogm->data,ogm->size);
535                 dstBuf->header->bytes += ogm->size;
536                 MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
537                 return 1;
538         }
539         /***
540          * Shared Buffer is too full for this message
541          * **/
542         pushSendQ(dstSendQ,ogm);
543         ogm->refcount++;
544         MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
545         return 0;
546 }
547
548 inline OutgoingMsg popSendQ(PxshmSendQ *q);
549
550 /****
551  *Try to send all the messages in the sendq to this destination rank
552  *NOTE: This method is called only after obtaining the corresponding mutex
553  * ************/
554
555 inline int flushSendQ(int dstRank){
556         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
557         PxshmSendQ *dstSendQ = pxshmContext->sendQs[dstRank];
558         
559         int count=dstSendQ->numEntries;
560         int sent=0;
561         while(count > 0){
562                 OutgoingMsg ogm = popSendQ(dstSendQ);
563                 ogm->refcount--;
564                 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
565                 int ret = sendMessage(ogm,dstBuf,dstSendQ);
566                 if(ret==1){
567                         sent++;
568       GarbageCollectMsg(ogm);
569                 }
570                 count--;
571         }
572         return sent;
573 }
574
575 inline void emptyRecvBuf(sharedBufData *recvBuf);
576
577 inline void emptyAllRecvBufs(){
578         int i;
579
580         for(i=0;i<pxshmContext->nodesize;i++){
581                 if(i != pxshmContext->noderank){
582                         sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
583                         if(recvBuf->header->count > 0){
584 #if PXSHM_LOCK
585                                 if(sem_trywait(recvBuf->mutex) < 0){
586 #else
587                                 recvBuf->header->flagReceiver = 1;
588                                 recvBuf->header->turn = SENDER;
589                                 CmiMemoryReadFence(0,0);
590                                 CmiMemoryWriteFence(0,0);
591                                 if(!(recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
592 #endif
593                                 }else{
594                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
595                                         emptyRecvBuf(recvBuf);
596 #if PXSHM_LOCK
597                                         sem_post(recvBuf->mutex);
598 #else
599                                         recvBuf->header->flagReceiver = 0;
600 #endif
601                                 }
602                         }
603                 }
604         }
605
606
607 };
608
609 inline void flushAllSendQs(){
610         int i=0;
611
612         for(i=0;i<pxshmContext->nodesize;i++){
613                 if(i != pxshmContext->noderank && pxshmContext->sendQs[i]->numEntries > 0){
614 #if PXSHM_LOCK
615                         if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
616 #else
617                         pxshmContext->sendBufs[i].header->flagSender = 1;
618                         pxshmContext->sendBufs[i].header->turn = RECEIVER;
619                         CmiMemoryReadFence(0,0);                        
620                         CmiMemoryWriteFence(0,0);
621                         if(!(pxshmContext->sendBufs[i].header->flagReceiver && pxshmContext->sendBufs[i].header->turn == RECEIVER)){
622 #endif
623                                 MACHSTATE1(3,"flushSendQ %d",i);
624                                 flushSendQ(i);
625 #if PXSHM_LOCK
626                                 sem_post(pxshmContext->sendBufs[i].mutex);
627 #else
628                                 pxshmContext->sendBufs[i].header->flagSender = 0;
629 #endif
630                         }
631                 }        
632         }       
633 };
634
635 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
636
637 void emptyRecvBuf(sharedBufData *recvBuf){
638         int numMessages = recvBuf->header->count;
639         int i=0;
640         
641         char *ptr=recvBuf->data;
642
643         for(i=0;i<numMessages;i++){
644                 int size;
645                 int rank, srcpe, seqno, magic, i;
646                 unsigned int broot;
647                 char *msg = ptr;
648                 char *newMsg;
649
650                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
651                 size = CmiMsgHeaderGetLength(msg);
652         
653                 
654                 newMsg = (char *)CmiAlloc(size);
655                 memcpy(newMsg,msg,size);
656                 
657
658                 handoverPxshmMessage(newMsg,size,rank,broot);
659                 
660                 ptr += size;
661
662                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
663         }
664         
665         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
666
667         recvBuf->header->count=0;
668         recvBuf->header->bytes=0;
669 }
670
671
672 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
673         CmiAssert(rank == 0);
674 #if CMK_BROADCAST_SPANNING_TREE
675         if (rank == DGRAM_BROADCAST
676 #if CMK_NODE_QUEUE_AVAILABLE
677           || rank == DGRAM_NODEBROADCAST
678 #endif
679          ){
680                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
681                                         }
682 #elif CMK_BROADCAST_HYPERCUBE
683         if (rank == DGRAM_BROADCAST
684 #if CMK_NODE_QUEUE_AVAILABLE
685           || rank == DGRAM_NODEBROADCAST
686 #endif
687          ){
688                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
689                                         }
690 #endif
691
692                 switch (rank) {
693         case DGRAM_BROADCAST: {
694           CmiPushPE(0, newmsg);
695           break;
696       }
697         default:
698                                 {
699                                         
700           CmiPushPE(rank, newmsg);
701                                 }
702         }    /* end of switch */
703 }
704
705
706 /**************************
707  *sendQ helper functions
708  * ****************/
709
710 void initSendQ(PxshmSendQ *q,int size){
711         q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*size);
712
713         q->size = size;
714         q->numEntries = 0;
715
716         q->begin = 0;
717         q->end = 0;
718 }
719
720 void pushSendQ(PxshmSendQ *q,OutgoingMsg msg){
721         if(q->numEntries == q->size){
722                 //need to resize 
723                 OutgoingMsg *oldData = q->data;
724                 int newSize = q->size<<1;
725                 q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*newSize);
726                 //copy head to the beginning of the new array
727                 
728                 CmiAssert(q->begin == q->end);
729
730                 CmiAssert(q->begin < q->size);
731                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsg)*(q->size - q->begin));
732
733                 if(q->end != 0){
734                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsg)*(q->end));
735                 }
736                 free(oldData);
737                 q->begin = 0;
738                 q->end = q->size;
739                 q->size = newSize;
740         }
741         q->data[q->end] = msg;
742         (q->end)++;
743         if(q->end >= q->size){
744                 q->end -= q->size;
745         }
746         q->numEntries++;
747 }
748
749 OutgoingMsg popSendQ(PxshmSendQ *q){
750         OutgoingMsg ret;
751         if(0 == q->numEntries){
752                 return NULL;
753         }
754
755         ret = q->data[q->begin];
756         (q->begin)++;
757         if(q->begin >= q->size){
758                 q->begin -= q->size;
759         }
760         
761         q->numEntries--;
762         return ret;
763 }