fixed a bug in pxshm
[charm.git] / src / arch / util / machine-pxshm.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * pxshm --> posix shared memory based network layer for communication
4  * between processes on the same node
5  * This is not going to be the primary mode of communication 
6  * but only for messages below a certain size between
7  * processes on the same node
8  * for non-smp version only
9  * * @ingroup NET
10  * contains only pxshm code for 
11  * - CmiInitPxshm()
12  * - DeliverViaPxShm()
13  * - CommunicationServerPxshm()
14  * - CmiMachineExitPxshm()
15
16
17 There are three options here for synchronization:
18       PXSHM_FENCE is the default. It uses memory fences
19       PXSHM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
20       PXSHM_LOCK will cause POSIX semaphores to be used
21
22   created by 
23         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
24 */
25
26 /**
27  * @addtogroup NET
28  * @{
29  */
30
31 #include <sys/types.h>
32 #include <sys/mman.h>
33 #include <unistd.h>
34 #include <sys/stat.h>
35 #include <fcntl.h>
36 #include <errno.h>
37
38
39 /************** 
40    Determine which type of synchronization to use 
41 */
42 #if PXSHM_OSSPINLOCK
43 #include <libkern/OSAtomic.h>
44 #elif PXSHM_LOCK
45 #include <semaphore.h>
46 #else
47 /* Default to using fences */
48 #define PXSHM_FENCE 1
49 #endif
50
51
52 #define MEMDEBUG(x) //x
53
54 #define PXSHM_STATS 0
55
56
57 /*** The following code was copied verbatim from pcqueue.h file ***/
58 #undef CmiMemoryWriteFence
59 #if PXSHM_FENCE
60 #ifdef POWER_PC
61 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
62 #else
63 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
64 //#define CmiMemoryWriteFence(startPtr,nBytes) 
65 #endif
66 #else
67 #undef CmiMemoryWriteFence
68 #define CmiMemoryWriteFence(startPtr,nBytes)  
69 #endif
70
71 #undef CmiMemoryReadFence
72 #if PXSHM_FENCE
73 #ifdef POWER_PC
74 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
75 #else
76 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
77 //#define CmiMemoryReadFence(startPtr,nBytes) 
78 #endif
79 #else
80 #define CmiMemoryReadFence(startPtr,nBytes) 
81 #endif
82
83 /***************************************************************************************/
84
85 enum entities {SENDER,RECEIVER};
86
87 /************************
88  *      Implementation currently assumes that
89  *      1) all nodes have the same number of processors
90  *  2) in the nodelist all processors in a node are listed in sequence
91  *   0 1 2 3      4 5 6 7 
92  *   -------      -------
93  *    node 1       node 2 
94  ************************/
95
96 #define NAMESTRLEN 60
97 #define PREFIXSTRLEN 50 
98
99 #define SHMBUFLEN (1024*1024*1)
100
101 #define SENDQSTARTSIZE 128
102
103
104 /// This struct is used as the first portion of a shared memory region, followed by data
105 typedef struct {
106         int count; //number of messages
107         int bytes; //number of bytes
108
109 #if PXSHM_OSSPINLOCK
110         OSSpinLock lock;
111 #endif
112
113 #if PXSHM_FENCE
114         volatile int flagSender;
115         CmiMemorySMPSeparation_t pad1;
116         volatile int flagReceiver;
117         CmiMemorySMPSeparation_t pad2;
118         volatile int turn;
119 #endif  
120
121 } sharedBufHeader;
122
123
124 typedef struct {
125 #if PXSHM_LOCK
126         sem_t *mutex;
127 #endif
128         sharedBufHeader *header;        
129         char *data;
130 } sharedBufData;
131
132 #if ! CMK_NET_VERSION
133 typedef struct OutgoingMsgRec
134 {
135   char *data;
136   int  *refcount;
137   int   size;
138 }
139 OutgoingMsgRec;
140 #endif
141
142 typedef struct {
143         int size; //total size of data array
144         int begin; //position of first element
145         int end;        //position of next element
146         int numEntries; //number of entries
147
148         OutgoingMsgRec *data;
149
150 } PxshmSendQ;
151
152 typedef struct {
153         int nodesize;
154         int noderank;
155         int nodestart,nodeend;//proc numbers for the start and end of this node
156         char prefixStr[PREFIXSTRLEN];
157         char **recvBufNames;
158         char **sendBufNames;
159
160         sharedBufData *recvBufs;
161         sharedBufData *sendBufs;
162
163         PxshmSendQ **sendQs;
164
165
166 #if PXSHM_STATS
167         int sendCount;
168         int validCheckCount;
169         int lockRecvCount;
170         double validCheckTime;
171         double sendTime;
172         double commServerTime;
173 #endif
174
175 } PxshmContext;
176
177
178
179 PxshmContext *pxshmContext=NULL; //global context
180
181
182 void calculateNodeSizeAndRank(char **);
183 void setupSharedBuffers();
184 void initAllSendQs();
185
186 /******************
187  *      Initialization routine
188  *      currently just testing start up
189  * ****************/
190 void CmiInitPxshm(char **argv){
191         MACHSTATE(3,"CminitPxshm start");
192         pxshmContext = (PxshmContext *)calloc(1,sizeof(PxshmContext));
193
194 #if CMK_NET_VERSION
195         if(Cmi_charmrun_pid <= 0){
196                 CmiAbort("pxshm must be run with charmrun");
197         }
198 #endif
199         calculateNodeSizeAndRank(argv);
200         if(pxshmContext->nodesize == 1) return;
201         
202         MACHSTATE1(3,"CminitPxshm  %d calculateNodeSizeAndRank",pxshmContext->nodesize);
203
204 #if ! CMK_NET_VERSION
205         srand(getpid());
206         int Cmi_charmrun_pid = rand();
207         PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
208 #endif
209         snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
210
211         MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
212
213         setupSharedBuffers();
214
215         MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
216
217         initAllSendQs();
218         
219         MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
220
221         MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
222
223 #if PXSHM_STATS
224         pxshmContext->sendCount=0;
225         pxshmContext->sendTime=0.0;
226         pxshmContext->validCheckCount=0;
227         pxshmContext->validCheckTime=0.0;
228         pxshmContext->commServerTime = 0;
229         pxshmContext->lockRecvCount = 0;
230 #endif
231
232 #if 0
233         char name[64];
234         gethostname(name,64);
235         printf("[%d] name: %s\n", myrank, name);
236 #endif
237 };
238
239 /**************
240  * shutdown shmem objects and semaphores
241  *
242  * *******************/
243 void tearDownSharedBuffers();
244
245 void CmiExitPxshm(){
246         int i=0;
247         
248         if(pxshmContext->nodesize != 1){
249                 tearDownSharedBuffers();
250         
251                 for(i=0;i<pxshmContext->nodesize;i++){
252                         if(i != pxshmContext->noderank){
253                                 break;
254                         }
255                 }
256                 free(pxshmContext->recvBufNames[i]);
257                 free(pxshmContext->sendBufNames[i]);
258
259                 free(pxshmContext->recvBufNames);
260                 free(pxshmContext->sendBufNames);
261
262                 free(pxshmContext->recvBufs);
263                 free(pxshmContext->sendBufs);
264
265         }
266 #if PXSHM_STATS
267 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime,pxshmContext->lockRecvCount);
268 #endif
269         free(pxshmContext);
270 }
271
272 /******************
273  *Should this message be sent using PxShm or not ?
274  * ***********************/
275
276 inline int CmiValidPxshm(int dst, int size){
277 #if PXSHM_STATS
278         pxshmContext->validCheckCount++;
279 #endif
280
281 /*      if(pxshmContext->nodesize == 1){
282                 return 0;
283         }*/
284         //replace by bitmap later
285         //if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
286         if(dst >= pxshmContext->nodestart && dst <= pxshmContext->nodeend && size < SHMBUFLEN ){
287                 return 1;
288         }else{
289                 return 0;
290         }
291 };
292
293
294 inline int PxshmRank(int dst){
295         return dst - pxshmContext->nodestart;
296 }
297 inline void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount);
298 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
299 inline int flushSendQ(int dstRank);
300
301 inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
302   return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
303 }
304
305 /***************
306  *
307  *Send this message through shared memory
308  *if you cannot get lock, put it in the sendQ
309  *Before sending messages pick them from sendQ
310  *
311  * ****************************/
312
313 void CmiSendMessagePxshm(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot)
314 {
315
316 #if PXSHM_STATS
317         double _startSendTime = CmiWallTimer();
318 #endif
319
320         LrtsPrepareEnvelope(msg, size);
321         
322         int dstRank = PxshmRank(dstpe);
323         MEMDEBUG(CmiMemoryCheck());
324   
325         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
326         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
327
328         CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
329         
330         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
331
332
333 #if PXSHM_OSSPINLOCK
334         if(! OSSpinLockTry(&dstBuf->header->lock)){
335 #elif PXSHM_LOCK
336         if(sem_trywait(dstBuf->mutex) < 0){
337 #elif PXSHM_FENCE
338         dstBuf->header->flagSender = 1;
339         dstBuf->header->turn = RECEIVER;
340         CmiMemoryReadFence(0,0);
341         CmiMemoryWriteFence(0,0);
342         //if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
343         if(dstBuf->header->flagReceiver){
344                 dstBuf->header->flagSender = 0;
345 #endif
346                 /**failed to get the lock 
347                 insert into q and retain the message*/
348
349                 pushSendQ(pxshmContext->sendQs[dstRank], msg, size, refcount);
350                 (*refcount)++;
351                 MEMDEBUG(CmiMemoryCheck());
352                 return;
353         }else{
354
355                 /***
356                  * We got the lock for this buffer
357                  * first write all the messages in the sendQ and then write this guy
358                  * */
359                  if(pxshmContext->sendQs[dstRank]->numEntries == 0){
360                                 // send message user event
361                                 int ret = sendMessage(msg,size,refcount,dstBuf,pxshmContext->sendQs[dstRank]);
362                                 MACHSTATE(3,"Pxshm Send succeeded immediately");
363                  }else{
364                                 (*refcount)+=2;/*this message should not get deleted when the queue is flushed*/
365                                 pushSendQ(pxshmContext->sendQs[dstRank],msg,size,refcount);
366                                 MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
367                                 int sent = flushSendQ(dstRank);
368                                 (*refcount)--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
369                                 MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
370                  }
371                  /* unlock the recvbuffer*/
372
373 #if PXSHM_OSSPINLOCK
374                  OSSpinLockUnlock(&dstBuf->header->lock);
375 #elif PXSHM_LOCK
376                  sem_post(dstBuf->mutex);
377 #elif PXSHM_FENCE
378                  CmiMemoryReadFence(0,0);                       
379                  CmiMemoryWriteFence(0,0);
380                  dstBuf->header->flagSender = 0;
381 #endif
382         }
383 #if PXSHM_STATS
384                 pxshmContext->sendCount ++;
385                 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
386 #endif
387         MEMDEBUG(CmiMemoryCheck());
388
389 };
390
391 inline void emptyAllRecvBufs();
392 inline void flushAllSendQs();
393
394 /**********
395  * Extract all the messages from the recvBuffers you can
396  * Flush all sendQs
397  * ***/
398 inline void CommunicationServerPxshm(){
399         
400 #if PXSHM_STATS
401         double _startCommServerTime =CmiWallTimer();
402 #endif  
403         
404         MEMDEBUG(CmiMemoryCheck());
405         emptyAllRecvBufs();
406         flushAllSendQs();
407
408 #if PXSHM_STATS
409         pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
410 #endif
411
412         MEMDEBUG(CmiMemoryCheck());
413 };
414
415 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
416         CommunicationServerPxshm();
417 }
418
419
420 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
421 {
422         CmiNotifyStillIdle(s);
423 }
424
425
426 void calculateNodeSizeAndRank(char **argv){
427         pxshmContext->nodesize=1;
428         MACHSTATE(3,"calculateNodeSizeAndRank start");
429         //CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
430         CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
431         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
432
433         pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
434         
435         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
436         
437         pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
438         
439         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
440
441         pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
442
443         if(pxshmContext->nodeend >= _Cmi_numnodes){
444                 pxshmContext->nodeend = _Cmi_numnodes-1;
445                 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
446         }
447         
448         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
449 }
450
451 void allocBufNameStrings(char ***bufName);
452 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
453 /***************
454  *      calculate the name of the shared objects and semaphores
455  *      
456  *      name scheme
457  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
458  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
459  *                the semaphore name used by us is the same as the shared memory object name
460  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
461  *
462  *      open these shared objects and semaphores
463  * *********/
464 void setupSharedBuffers(){
465         int i=0;
466
467         allocBufNameStrings(&(pxshmContext->recvBufNames));
468         
469         MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
470         MEMDEBUG(CmiMemoryCheck());
471
472         allocBufNameStrings((&pxshmContext->sendBufNames));
473         
474         MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
475
476         for(i=0;i<pxshmContext->nodesize;i++){
477                 if(i != pxshmContext->noderank){
478                         snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
479                         MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
480                         snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
481                         MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
482                 }
483         }
484         
485         createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
486         createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
487         
488         for(i=0;i<pxshmContext->nodesize;i++){
489                 if(i != pxshmContext->noderank){
490                         //CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
491                         pxshmContext->sendBufs[i].header->count = 0;
492                         pxshmContext->sendBufs[i].header->bytes = 0;
493                 }
494         }
495 }
496
497 void allocBufNameStrings(char ***bufName){
498         int i,count;
499         
500         int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
501         char *tmp = malloc(totalAlloc);
502         
503         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
504
505         *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
506         
507         for(i=0,count=0;i<pxshmContext->nodesize;i++){
508                 if(i != pxshmContext->noderank){
509                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
510                         count++;
511                 }else{
512                         (*bufName)[i] = NULL;
513                 }
514         }
515 }
516
517 void createShmObject(char *name,int size,char **pPtr);
518
519 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
520         int i=0;
521         
522         *bufs = (sharedBufData *)calloc(pxshmContext->nodesize, sizeof(sharedBufData));
523         
524         for(i=0;i<pxshmContext->nodesize;i++){
525                 if(i != pxshmContext->noderank){
526                         createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
527                         memset(((*bufs)[i].header), 0, SHMBUFLEN+sizeof(sharedBufHeader));
528                         (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
529 #if PXSHM_OSSPINLOCK
530                         (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
531 #elif PXSHM_LOCK
532                         (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
533 #endif
534                 }else{
535                         (*bufs)[i].header = NULL;
536                         (*bufs)[i].data = NULL;
537 #if PXSHM_LOCK
538                         (*bufs)[i].mutex = NULL;
539 #endif
540                 }
541         }       
542 }
543
544
545 void createShmObject(char *name,int size,char **pPtr){
546         int fd=-1;
547         int flags;      // opening flags for shared object
548         int open_repeat_count = 0;
549
550         flags= O_RDWR | O_CREAT; // open file in read-write mode and create it if its not there
551         
552         while(fd<0 && open_repeat_count < 100){
553           open_repeat_count++;
554           fd = shm_open(name,flags, S_IRUSR | S_IWUSR); // create the shared object with permissions for only the user to read and write
555           
556           if(fd < 0 && open_repeat_count > 10){
557             fprintf(stderr,"Error(attempt=%d) from shm_open %s while opening %s \n",open_repeat_count, strerror(errno),name);
558             fflush(stderr);
559           }
560         }
561
562         CmiAssert(fd >= 0);
563
564         ftruncate(fd,size); //set the size of the shared memory object
565
566         *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
567         CmiAssert(*pPtr != NULL);
568
569         close(fd);
570 }
571
572 void tearDownSharedBuffers(){
573         int i;
574         for(i= 0;i<pxshmContext->nodesize;i++){
575                 if(i != pxshmContext->noderank){
576                         if(     shm_unlink(pxshmContext->recvBufNames[i]) < 0){
577                                 fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
578                         }
579
580 #if PXSHM_LOCK
581                         sem_close(pxshmContext->recvBufs[i].mutex);
582                         //sem_unlink(pxshmContext->recvBufNames[i]);
583                         sem_close(pxshmContext->sendBufs[i].mutex);
584 #endif
585                 }
586         }
587 };
588
589
590 void initSendQ(PxshmSendQ *q,int size);
591
592 void initAllSendQs(){
593         int i=0;
594         pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
595         for(i=0;i<pxshmContext->nodesize;i++){
596                 if(i != pxshmContext->noderank){
597                         (pxshmContext->sendQs)[i] = (PxshmSendQ *)calloc(1, sizeof(PxshmSendQ));
598                         initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE);
599                 }else{
600                         (pxshmContext->sendQs)[i] = NULL;
601                 }
602         }
603 };
604
605
606 /****************
607  *copy this message into the sharedBuf
608  If it does not succeed
609  *put it into the sendQ 
610  *NOTE: This method is called only after obtaining the corresponding mutex
611  * ********/
612 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
613
614         if(dstBuf->header->bytes+size <= SHMBUFLEN){
615                 /**copy  this message to sharedBuf **/
616                 dstBuf->header->count++;
617                 memcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
618                 dstBuf->header->bytes += size;
619                 MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
620                 CmiFree(msg);
621                 return 1;
622         }
623         /***
624          * Shared Buffer is too full for this message
625          * **/
626         printf("[%d] send buffer is too full\n", CmiMyPe());
627         pushSendQ(dstSendQ,msg,size,refcount);
628         (*refcount)++;
629         MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
630         return 0;
631 }
632
633 inline OutgoingMsgRec* popSendQ(PxshmSendQ *q);
634
635 /****
636  *Try to send all the messages in the sendq to this destination rank
637  *NOTE: This method is called only after obtaining the corresponding mutex
638  * ************/
639
640 inline int flushSendQ(int dstRank){
641         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
642         PxshmSendQ *dstSendQ = pxshmContext->sendQs[dstRank];
643         int count=dstSendQ->numEntries;
644         int sent=0;
645         while(count > 0){
646                 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
647                 (*ogm->refcount)--;
648                 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
649                 int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
650                 if(ret==1){
651                         sent++;
652 #if CMK_NET_VERSION
653                         GarbageCollectMsg(ogm);
654 #endif
655                 }
656                 count--;
657         }
658         return sent;
659 }
660
661 inline void emptyRecvBuf(sharedBufData *recvBuf);
662
663 inline void emptyAllRecvBufs(){
664         int i;
665         for(i=0;i<pxshmContext->nodesize;i++){
666                 if(i != pxshmContext->noderank){
667                         sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
668                         if(recvBuf->header->count > 0){
669
670 #if PXSHM_STATS
671                                 pxshmContext->lockRecvCount++;
672 #endif
673
674
675 #if PXSHM_OSSPINLOCK
676                                 if(! OSSpinLockTry(&recvBuf->header->lock)){
677 #elif PXSHM_LOCK
678                                 if(sem_trywait(recvBuf->mutex) < 0){
679 #elif PXSHM_FENCE
680                                 recvBuf->header->flagReceiver = 1;
681                                 recvBuf->header->turn = SENDER;
682                                 CmiMemoryReadFence(0,0);
683                                 CmiMemoryWriteFence(0,0);
684                                 //if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
685                                 if((recvBuf->header->flagSender)){
686                                         recvBuf->header->flagReceiver = 0;
687 #endif
688                                 }else{
689
690
691                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
692                                         emptyRecvBuf(recvBuf);
693
694 #if PXSHM_OSSPINLOCK
695                                         OSSpinLockUnlock(&recvBuf->header->lock);
696 #elif PXSHM_LOCK
697                                         sem_post(recvBuf->mutex);
698 #elif PXSHM_FENCE
699                                         CmiMemoryReadFence(0,0);
700                                         CmiMemoryWriteFence(0,0);
701                                         recvBuf->header->flagReceiver = 0;
702 #endif
703
704                                 }
705                         
706                         }
707                 }
708         }
709 };
710
711 inline void flushAllSendQs(){
712         int i=0;
713         
714         for(i=0;i<pxshmContext->nodesize;i++){
715                 if(i != pxshmContext->noderank && pxshmContext->sendQs[i]->numEntries > 0){
716         
717 #if PXSHM_OSSPINLOCK
718                         if(OSSpinLockTry(&pxshmContext->sendBufs[i].header->lock)){
719 #elif PXSHM_LOCK
720                         if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
721 #elif PXSHM_FENCE
722                         pxshmContext->sendBufs[i].header->flagSender = 1;
723                         pxshmContext->sendBufs[i].header->turn = RECEIVER;
724                         CmiMemoryReadFence(0,0);                        
725                         CmiMemoryWriteFence(0,0);
726                         if(!(pxshmContext->sendBufs[i].header->flagReceiver && pxshmContext->sendBufs[i].header->turn == RECEIVER)){
727 #endif
728
729                                 MACHSTATE1(3,"flushSendQ %d",i);
730                                 flushSendQ(i);
731
732
733                                 
734 #if PXSHM_OSSPINLOCK    
735                                 OSSpinLockUnlock(&pxshmContext->sendBufs[i].header->lock);
736 #elif PXSHM_LOCK
737                                 sem_post(pxshmContext->sendBufs[i].mutex);
738 #elif PXSHM_FENCE
739                                 CmiMemoryReadFence(0,0);                        
740                                 CmiMemoryWriteFence(0,0);
741                                 pxshmContext->sendBufs[i].header->flagSender = 0;
742 #endif
743                         }else{
744
745 #if PXSHM_FENCE
746                           pxshmContext->sendBufs[i].header->flagSender = 0;
747 #endif                          
748
749                         }
750
751                 }        
752         }       
753 };
754
755 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
756
757 void emptyRecvBuf(sharedBufData *recvBuf){
758         int numMessages = recvBuf->header->count;
759         int i=0;
760
761         char *ptr=recvBuf->data;
762
763         for(i=0;i<numMessages;i++){
764                 int size;
765                 int rank, srcpe, seqno, magic, i;
766                 unsigned int broot;
767                 char *msg = ptr;
768                 char *newMsg;
769
770 #if CMK_NET_VERSION
771                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
772                 size = CmiMsgHeaderGetLength(msg);
773 #else
774                 size = CmiGetMsgSize(msg);
775 #endif
776         
777                 newMsg = (char *)CmiAlloc(size);
778                 memcpy(newMsg,msg,size);
779
780 #if CMK_NET_VERSION
781                 handoverPxshmMessage(newMsg,size,rank,broot);
782 #else
783                 handleOneRecvedMsg(size, newMsg);
784 #endif
785                 
786                 ptr += size;
787
788                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
789         }
790 #if 1
791   if(ptr - recvBuf->data != recvBuf->header->bytes){
792                 CmiPrintf("[%d] ptr - recvBuf->data  %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
793         }
794 #endif
795         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
796         recvBuf->header->count=0;
797         recvBuf->header->bytes=0;
798 }
799
800
801 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
802 #if CMK_NET_VERSION
803         CmiAssert(rank == 0);
804 #if CMK_BROADCAST_SPANNING_TREE
805         if (rank == DGRAM_BROADCAST
806 #if CMK_NODE_QUEUE_AVAILABLE
807           || rank == DGRAM_NODEBROADCAST
808 #endif
809          ){
810                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
811                                         }
812 #elif CMK_BROADCAST_HYPERCUBE
813         if (rank == DGRAM_BROADCAST
814 #if CMK_NODE_QUEUE_AVAILABLE
815           || rank == DGRAM_NODEBROADCAST
816 #endif
817          ){
818                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
819                                         }
820 #endif
821
822                 switch (rank) {
823         case DGRAM_BROADCAST: {
824           CmiPushPE(0, newmsg);
825           break;
826       }
827         default:
828                                 {
829                                         
830           CmiPushPE(rank, newmsg);
831                                 }
832         }    /* end of switch */
833 #endif
834 }
835
836
837 /**************************
838  *sendQ helper functions
839  * ****************/
840
841 void initSendQ(PxshmSendQ *q,int size){
842         q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
843
844         q->size = size;
845         q->numEntries = 0;
846
847         q->begin = 0;
848         q->end = 0;
849 }
850
851 void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount){
852         if(q->numEntries == q->size){
853                 //need to resize 
854 CmiPrintf("[%d] pushSendQ resize \n", CmiMyPe());
855                 OutgoingMsgRec *oldData = q->data;
856                 int newSize = q->size<<1;
857                 q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
858                 //copy head to the beginning of the new array
859                 CmiAssert(q->begin == q->end);
860
861                 CmiAssert(q->begin < q->size);
862                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
863
864                 if(q->end!=0){
865                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
866                 }
867                 free(oldData);
868                 q->begin = 0;
869                 q->end = q->size;
870                 q->size = newSize;
871         }
872         OutgoingMsgRec *omg = &q->data[q->end];
873         omg->size = size;
874         omg->data = msg;
875         omg->refcount = refcount;
876         (q->end)++;
877         if(q->end >= q->size){
878                 q->end -= q->size;
879         }
880         q->numEntries++;
881 }
882
883 OutgoingMsgRec * popSendQ(PxshmSendQ *q){
884         OutgoingMsgRec * ret;
885         if(0 == q->numEntries){
886                 return NULL;
887         }
888
889         ret = &q->data[q->begin];
890         (q->begin)++;
891         if(q->begin >= q->size){
892                 q->begin -= q->size;
893         }
894         
895         q->numEntries--;
896         return ret;
897 }