tweak pxshm to work for SMP build
[charm.git] / src / arch / util / machine-pxshm.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * pxshm --> posix shared memory based network layer for communication
4  * between processes on the same node
5  * This is not going to be the primary mode of communication 
6  * but only for messages below a certain size between
7  * processes on the same node
8  * for non-smp version only
9  * * @ingroup NET
10  * contains only pxshm code for 
11  * - CmiInitPxshm()
12  * - DeliverViaPxShm()
13  * - CommunicationServerPxshm()
14  * - CmiMachineExitPxshm()
15
16
17 There are three options here for synchronization:
18       PXSHM_FENCE is the default. It uses memory fences
19       PXSHM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
20       PXSHM_LOCK will cause POSIX semaphores to be used
21
22   created by 
23         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
24 */
25
26 /**
27  * @addtogroup NET
28  * @{
29  */
30
31 #include <sys/types.h>
32 #include <sys/mman.h>
33 #include <unistd.h>
34 #include <sys/stat.h>
35 #include <fcntl.h>
36 #include <errno.h>
37
38
39 /************** 
40    Determine which type of synchronization to use 
41 */
42 #if PXSHM_OSSPINLOCK
43 #include <libkern/OSAtomic.h>
44 #elif PXSHM_LOCK
45 #include <semaphore.h>
46 #else
47 /* Default to using fences */
48 #define PXSHM_FENCE 1
49 #endif
50
51
52 #define MEMDEBUG(x) //x
53
54 #define PXSHM_STATS 0
55
56
57 /*** The following code was copied verbatim from pcqueue.h file ***/
58 #if ! CMK_SMP
59 #undef CmiMemoryWriteFence
60 #if PXSHM_FENCE
61 #ifdef POWER_PC
62 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
63 #else
64 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
65 //#define CmiMemoryWriteFence(startPtr,nBytes) 
66 #endif
67 #else
68 #undef CmiMemoryWriteFence
69 #define CmiMemoryWriteFence(startPtr,nBytes)  
70 #endif
71
72 #undef CmiMemoryReadFence
73 #if PXSHM_FENCE
74 #ifdef POWER_PC
75 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
76 #else
77 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
78 //#define CmiMemoryReadFence(startPtr,nBytes) 
79 #endif
80 #else
81 #define CmiMemoryReadFence(startPtr,nBytes) 
82 #endif
83
84 #endif
85
86 /*
87 #if CMK_SMP
88 #error  "PXSHM can only be used in non-smp build of Charm++"
89 #endif
90 */
91
92 /***************************************************************************************/
93
94 enum entities {SENDER,RECEIVER};
95
96 /************************
97  *      Implementation currently assumes that
98  *      1) all nodes have the same number of processors
99  *  2) in the nodelist all processors in a node are listed in sequence
100  *   0 1 2 3      4 5 6 7 
101  *   -------      -------
102  *    node 1       node 2 
103  ************************/
104
105 #define NAMESTRLEN 60
106 #define PREFIXSTRLEN 50 
107
108 #define SHMBUFLEN (1024*1024*4)
109 #define SHMMAXSIZE     (1024*1024)
110
111 #define SENDQSTARTSIZE    256
112
113
114 /// This struct is used as the first portion of a shared memory region, followed by data
115 typedef struct {
116         int count; //number of messages
117         int bytes; //number of bytes
118
119 #if PXSHM_OSSPINLOCK
120         OSSpinLock lock;
121 #endif
122
123 #if PXSHM_FENCE
124         volatile int flagSender;
125         CmiMemorySMPSeparation_t pad1;
126         volatile int flagReceiver;
127         CmiMemorySMPSeparation_t pad2;
128         volatile int turn;
129 #endif  
130
131 } sharedBufHeader;
132
133
134 typedef struct {
135 #if PXSHM_LOCK
136         sem_t *mutex;
137 #endif
138         sharedBufHeader *header;        
139         char *data;
140 } sharedBufData;
141
142 typedef struct OutgoingMsgRec
143 {
144   char *data;
145   int  *refcount;
146   int   size;
147 }
148 OutgoingMsgRec;
149
150 typedef struct {
151         int size; //total size of data array
152         int begin; //position of first element
153         int end;        //position of next element
154         int numEntries; //number of entries
155
156         OutgoingMsgRec *data;
157
158 } PxshmSendQ;
159
160 typedef struct {
161         int nodesize;
162         int noderank;
163         int nodestart,nodeend;//proc numbers for the start and end of this node
164         char prefixStr[PREFIXSTRLEN];
165         char **recvBufNames;
166         char **sendBufNames;
167
168         sharedBufData *recvBufs;
169         sharedBufData *sendBufs;
170
171         PxshmSendQ **sendQs;
172
173
174 #if PXSHM_STATS
175         int sendCount;
176         int validCheckCount;
177         int lockRecvCount;
178         double validCheckTime;
179         double sendTime;
180         double commServerTime;
181 #endif
182
183 } PxshmContext;
184
185
186
187 PxshmContext *pxshmContext=NULL; //global context
188
189
190 void calculateNodeSizeAndRank(char **);
191 void setupSharedBuffers();
192 void initAllSendQs();
193
194 /******************
195  *      Initialization routine
196  *      currently just testing start up
197  * ****************/
198 void CmiInitPxshm(char **argv){
199         MACHSTATE(3,"CminitPxshm start");
200
201         pxshmContext = (PxshmContext *)calloc(1,sizeof(PxshmContext));
202
203 #if CMK_NET_VERSION
204         if(Cmi_charmrun_pid <= 0){
205                 CmiAbort("pxshm must be run with charmrun");
206         }
207 #endif
208         calculateNodeSizeAndRank(argv);
209         if(pxshmContext->nodesize == 1) return;
210         
211         MACHSTATE1(3,"CminitPxshm  %d calculateNodeSizeAndRank",pxshmContext->nodesize);
212
213 #if CMK_CRAYXE
214         srand(getpid());
215         int Cmi_charmrun_pid = rand();
216         PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
217 #elif !CMK_NET_VERSION
218         #error "need a unique number"
219 #endif
220         snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
221
222         MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
223
224         setupSharedBuffers();
225
226         MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
227
228         initAllSendQs();
229         
230         MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
231
232         MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
233
234 #if PXSHM_STATS
235         pxshmContext->sendCount=0;
236         pxshmContext->sendTime=0.0;
237         pxshmContext->validCheckCount=0;
238         pxshmContext->validCheckTime=0.0;
239         pxshmContext->commServerTime = 0;
240         pxshmContext->lockRecvCount = 0;
241 #endif
242
243 #if 0
244         char name[64];
245         gethostname(name,64);
246         printf("[%d] name: %s\n", myrank, name);
247 #endif
248 };
249
250 /**************
251  * shutdown shmem objects and semaphores
252  *
253  * *******************/
254 void tearDownSharedBuffers();
255
256 void CmiExitPxshm(){
257         if(pxshmContext->nodesize != 1){
258                 int i;
259                 tearDownSharedBuffers();
260         
261                 for(i=0;i<pxshmContext->nodesize;i++){
262                         if(i != pxshmContext->noderank){
263                                 break;
264                         }
265                 }
266                 free(pxshmContext->recvBufNames[i]);
267                 free(pxshmContext->sendBufNames[i]);
268
269                 free(pxshmContext->recvBufNames);
270                 free(pxshmContext->sendBufNames);
271
272                 free(pxshmContext->recvBufs);
273                 free(pxshmContext->sendBufs);
274
275         }
276 #if PXSHM_STATS
277 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime,pxshmContext->lockRecvCount);
278 #endif
279         free(pxshmContext);
280 }
281
282 /******************
283  *Should this message be sent using PxShm or not ?
284  * ***********************/
285
286 inline int CmiValidPxshm(int dst, int size){
287 #if PXSHM_STATS
288         pxshmContext->validCheckCount++;
289 #endif
290
291 /*      if(pxshmContext->nodesize == 1){
292                 return 0;
293         }*/
294         //replace by bitmap later
295         //if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
296         int node = CmiNodeOf(dst);
297         if(node >= pxshmContext->nodestart && node <= pxshmContext->nodeend && size < SHMMAXSIZE ){
298                 return 1;
299         }else{
300                 return 0;
301         }
302 };
303
304
305 inline int PxshmRank(int dst){
306         return CmiNodeOf(dst) - pxshmContext->nodestart;
307 }
308
309 inline void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount);
310 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
311 inline int flushSendQ(int dstRank);
312
313 inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
314   return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
315 }
316
317 /***************
318  *
319  *Send this message through shared memory
320  *if you cannot get lock, put it in the sendQ
321  *Before sending messages pick them from sendQ
322  *
323  * ****************************/
324
325 void CmiSendMessagePxshm(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot)
326 {
327
328 #if PXSHM_STATS
329         double _startSendTime = CmiWallTimer();
330 #endif
331
332         LrtsPrepareEnvelope(msg, size);
333         
334         int dstRank = PxshmRank(dstpe);
335         MEMDEBUG(CmiMemoryCheck());
336   
337         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
338         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
339
340         CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
341         
342         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
343
344
345 #if PXSHM_OSSPINLOCK
346         if(! OSSpinLockTry(&dstBuf->header->lock)){
347 #elif PXSHM_LOCK
348         if(sem_trywait(dstBuf->mutex) < 0){
349 #elif PXSHM_FENCE
350         dstBuf->header->flagSender = 1;
351         dstBuf->header->turn = RECEIVER;
352         CmiMemoryReadFence(0,0);
353         CmiMemoryWriteFence(0,0);
354         //if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
355         if(dstBuf->header->flagReceiver){
356                 dstBuf->header->flagSender = 0;
357 #endif
358                 /**failed to get the lock 
359                 insert into q and retain the message*/
360
361                 pushSendQ(pxshmContext->sendQs[dstRank], msg, size, refcount);
362                 (*refcount)++;
363                 MEMDEBUG(CmiMemoryCheck());
364                 return;
365         }else{
366
367                 /***
368                  * We got the lock for this buffer
369                  * first write all the messages in the sendQ and then write this guy
370                  * */
371                  if(pxshmContext->sendQs[dstRank]->numEntries == 0){
372                                 // send message user event
373                                 int ret = sendMessage(msg,size,refcount,dstBuf,pxshmContext->sendQs[dstRank]);
374                                 MACHSTATE(3,"Pxshm Send succeeded immediately");
375                  }else{
376                                 (*refcount)+=2;/*this message should not get deleted when the queue is flushed*/
377                                 pushSendQ(pxshmContext->sendQs[dstRank],msg,size,refcount);
378                                 MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
379                                 int sent = flushSendQ(dstRank);
380                                 (*refcount)--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
381                                 MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
382                  }
383                  /* unlock the recvbuffer*/
384
385 #if PXSHM_OSSPINLOCK
386                  OSSpinLockUnlock(&dstBuf->header->lock);
387 #elif PXSHM_LOCK
388                  sem_post(dstBuf->mutex);
389 #elif PXSHM_FENCE
390                  CmiMemoryReadFence(0,0);                       
391                  CmiMemoryWriteFence(0,0);
392                  dstBuf->header->flagSender = 0;
393 #endif
394         }
395 #if PXSHM_STATS
396                 pxshmContext->sendCount ++;
397                 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
398 #endif
399         MEMDEBUG(CmiMemoryCheck());
400
401 };
402
403 inline void emptyAllRecvBufs();
404 inline void flushAllSendQs();
405
406 /**********
407  * Extract all the messages from the recvBuffers you can
408  * Flush all sendQs
409  * ***/
410 inline void CommunicationServerPxshm(){
411         
412 #if PXSHM_STATS
413         double _startCommServerTime =CmiWallTimer();
414 #endif  
415         
416         MEMDEBUG(CmiMemoryCheck());
417         emptyAllRecvBufs();
418         flushAllSendQs();
419
420 #if PXSHM_STATS
421         pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
422 #endif
423
424         MEMDEBUG(CmiMemoryCheck());
425 };
426
427 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
428         CommunicationServerPxshm();
429 }
430
431
432 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
433 {
434         CmiNotifyStillIdle(s);
435 }
436
437
438 void calculateNodeSizeAndRank(char **argv){
439         pxshmContext->nodesize=1;
440         MACHSTATE(3,"calculateNodeSizeAndRank start");
441         //CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
442         CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
443         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
444
445         pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
446         
447         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
448         
449         pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
450         
451         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
452
453         pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
454
455         if(pxshmContext->nodeend >= _Cmi_numnodes){
456                 pxshmContext->nodeend = _Cmi_numnodes-1;
457                 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
458         }
459         
460         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
461 }
462
463 void allocBufNameStrings(char ***bufName);
464 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
465 /***************
466  *      calculate the name of the shared objects and semaphores
467  *      
468  *      name scheme
469  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
470  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
471  *                the semaphore name used by us is the same as the shared memory object name
472  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
473  *
474  *      open these shared objects and semaphores
475  * *********/
476 void setupSharedBuffers(){
477         int i=0;
478
479         allocBufNameStrings(&(pxshmContext->recvBufNames));
480         
481         MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
482         MEMDEBUG(CmiMemoryCheck());
483
484         allocBufNameStrings((&pxshmContext->sendBufNames));
485         
486         MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
487
488         for(i=0;i<pxshmContext->nodesize;i++){
489                 if(i != pxshmContext->noderank){
490                         snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
491                         MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
492                         snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
493                         MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
494                 }
495         }
496         
497         createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
498         createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
499         
500         for(i=0;i<pxshmContext->nodesize;i++){
501                 if(i != pxshmContext->noderank){
502                         //CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
503                         pxshmContext->sendBufs[i].header->count = 0;
504                         pxshmContext->sendBufs[i].header->bytes = 0;
505                 }
506         }
507 }
508
509 void allocBufNameStrings(char ***bufName){
510         int i,count;
511         
512         int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
513         char *tmp = malloc(totalAlloc);
514         
515         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
516
517         *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
518         
519         for(i=0,count=0;i<pxshmContext->nodesize;i++){
520                 if(i != pxshmContext->noderank){
521                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
522                         count++;
523                 }else{
524                         (*bufName)[i] = NULL;
525                 }
526         }
527 }
528
529 void createShmObject(char *name,int size,char **pPtr);
530
531 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
532         int i=0;
533         
534         *bufs = (sharedBufData *)calloc(pxshmContext->nodesize, sizeof(sharedBufData));
535         
536         for(i=0;i<pxshmContext->nodesize;i++){
537                 if(i != pxshmContext->noderank){
538                         createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
539                         memset(((*bufs)[i].header), 0, SHMBUFLEN+sizeof(sharedBufHeader));
540                         (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
541 #if PXSHM_OSSPINLOCK
542                         (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
543 #elif PXSHM_LOCK
544                         (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
545 #endif
546                 }else{
547                         (*bufs)[i].header = NULL;
548                         (*bufs)[i].data = NULL;
549 #if PXSHM_LOCK
550                         (*bufs)[i].mutex = NULL;
551 #endif
552                 }
553         }       
554 }
555
556
557 void createShmObject(char *name,int size,char **pPtr){
558         int fd=-1;
559         int flags;      // opening flags for shared object
560         int open_repeat_count = 0;
561
562         flags= O_RDWR | O_CREAT; // open file in read-write mode and create it if its not there
563         
564         while(fd<0 && open_repeat_count < 100){
565           open_repeat_count++;
566           fd = shm_open(name,flags, S_IRUSR | S_IWUSR); // create the shared object with permissions for only the user to read and write
567           
568           if(fd < 0 && open_repeat_count > 10){
569             fprintf(stderr,"Error(attempt=%d) from shm_open %s while opening %s \n",open_repeat_count, strerror(errno),name);
570             fflush(stderr);
571           }
572         }
573
574         CmiAssert(fd >= 0);
575
576         ftruncate(fd,size); //set the size of the shared memory object
577
578         *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
579         CmiAssert(*pPtr != NULL);
580
581         close(fd);
582 }
583
584 void tearDownSharedBuffers(){
585         int i;
586         for(i= 0;i<pxshmContext->nodesize;i++){
587                 if(i != pxshmContext->noderank){
588                         if(     shm_unlink(pxshmContext->recvBufNames[i]) < 0){
589                                 fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
590                         }
591
592 #if PXSHM_LOCK
593                         sem_close(pxshmContext->recvBufs[i].mutex);
594                         //sem_unlink(pxshmContext->recvBufNames[i]);
595                         sem_close(pxshmContext->sendBufs[i].mutex);
596 #endif
597                 }
598         }
599 };
600
601
602 void initSendQ(PxshmSendQ *q,int size);
603
604 void initAllSendQs(){
605         int i=0;
606         pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
607         for(i=0;i<pxshmContext->nodesize;i++){
608                 if(i != pxshmContext->noderank){
609                         (pxshmContext->sendQs)[i] = (PxshmSendQ *)calloc(1, sizeof(PxshmSendQ));
610                         initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE);
611                 }else{
612                         (pxshmContext->sendQs)[i] = NULL;
613                 }
614         }
615 };
616
617
618 /****************
619  *copy this message into the sharedBuf
620  If it does not succeed
621  *put it into the sendQ 
622  *NOTE: This method is called only after obtaining the corresponding mutex
623  * ********/
624 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
625
626         if(dstBuf->header->bytes+size <= SHMBUFLEN){
627                 /**copy  this message to sharedBuf **/
628                 dstBuf->header->count++;
629                 memcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
630                 dstBuf->header->bytes += size;
631                 MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
632                 CmiFree(msg);
633                 return 1;
634         }
635         /***
636          * Shared Buffer is too full for this message
637          * **/
638         //printf("[%d] send buffer is too full\n", CmiMyPe());
639         pushSendQ(dstSendQ,msg,size,refcount);
640         (*refcount)++;
641         MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
642         return 0;
643 }
644
645 inline OutgoingMsgRec* popSendQ(PxshmSendQ *q);
646
647 /****
648  *Try to send all the messages in the sendq to this destination rank
649  *NOTE: This method is called only after obtaining the corresponding mutex
650  * ************/
651
652 inline int flushSendQ(int dstRank){
653         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
654         PxshmSendQ *dstSendQ = pxshmContext->sendQs[dstRank];
655         int count=dstSendQ->numEntries;
656         int sent=0;
657         while(count > 0){
658                 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
659                 (*ogm->refcount)--;
660                 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
661                 int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
662                 if(ret==1){
663                         sent++;
664 #if CMK_NET_VERSION
665                         GarbageCollectMsg(ogm);
666 #endif
667                 }
668                 count--;
669         }
670         return sent;
671 }
672
673 inline void emptyRecvBuf(sharedBufData *recvBuf);
674
675 inline void emptyAllRecvBufs(){
676         int i;
677         for(i=0;i<pxshmContext->nodesize;i++){
678                 if(i != pxshmContext->noderank){
679                         sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
680                         if(recvBuf->header->count > 0){
681
682 #if PXSHM_STATS
683                                 pxshmContext->lockRecvCount++;
684 #endif
685
686
687 #if PXSHM_OSSPINLOCK
688                                 if(! OSSpinLockTry(&recvBuf->header->lock)){
689 #elif PXSHM_LOCK
690                                 if(sem_trywait(recvBuf->mutex) < 0){
691 #elif PXSHM_FENCE
692                                 recvBuf->header->flagReceiver = 1;
693                                 recvBuf->header->turn = SENDER;
694                                 CmiMemoryReadFence(0,0);
695                                 CmiMemoryWriteFence(0,0);
696                                 //if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
697                                 if((recvBuf->header->flagSender)){
698                                         recvBuf->header->flagReceiver = 0;
699 #endif
700                                 }else{
701
702
703                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
704                                         emptyRecvBuf(recvBuf);
705
706 #if PXSHM_OSSPINLOCK
707                                         OSSpinLockUnlock(&recvBuf->header->lock);
708 #elif PXSHM_LOCK
709                                         sem_post(recvBuf->mutex);
710 #elif PXSHM_FENCE
711                                         CmiMemoryReadFence(0,0);
712                                         CmiMemoryWriteFence(0,0);
713                                         recvBuf->header->flagReceiver = 0;
714 #endif
715
716                                 }
717                         
718                         }
719                 }
720         }
721 };
722
723 inline void flushAllSendQs(){
724         int i=0;
725         
726         for(i=0;i<pxshmContext->nodesize;i++){
727                 if(i != pxshmContext->noderank && pxshmContext->sendQs[i]->numEntries > 0){
728         
729 #if PXSHM_OSSPINLOCK
730                         if(OSSpinLockTry(&pxshmContext->sendBufs[i].header->lock)){
731 #elif PXSHM_LOCK
732                         if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
733 #elif PXSHM_FENCE
734                         pxshmContext->sendBufs[i].header->flagSender = 1;
735                         pxshmContext->sendBufs[i].header->turn = RECEIVER;
736                         CmiMemoryReadFence(0,0);                        
737                         CmiMemoryWriteFence(0,0);
738                         if(!(pxshmContext->sendBufs[i].header->flagReceiver && pxshmContext->sendBufs[i].header->turn == RECEIVER)){
739 #endif
740
741                                 MACHSTATE1(3,"flushSendQ %d",i);
742                                 flushSendQ(i);
743
744
745                                 
746 #if PXSHM_OSSPINLOCK    
747                                 OSSpinLockUnlock(&pxshmContext->sendBufs[i].header->lock);
748 #elif PXSHM_LOCK
749                                 sem_post(pxshmContext->sendBufs[i].mutex);
750 #elif PXSHM_FENCE
751                                 CmiMemoryReadFence(0,0);                        
752                                 CmiMemoryWriteFence(0,0);
753                                 pxshmContext->sendBufs[i].header->flagSender = 0;
754 #endif
755                         }else{
756
757 #if PXSHM_FENCE
758                           pxshmContext->sendBufs[i].header->flagSender = 0;
759 #endif                          
760
761                         }
762
763                 }        
764         }       
765 };
766
767 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
768
769 void emptyRecvBuf(sharedBufData *recvBuf){
770         int numMessages = recvBuf->header->count;
771         int i=0;
772
773         char *ptr=recvBuf->data;
774
775         for(i=0;i<numMessages;i++){
776                 int size;
777                 int rank, srcpe, seqno, magic, i;
778                 unsigned int broot;
779                 char *msg = ptr;
780                 char *newMsg;
781
782 #if CMK_NET_VERSION
783                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
784                 size = CmiMsgHeaderGetLength(msg);
785 #else
786                 size = CmiGetMsgSize(msg);
787 #endif
788         
789                 newMsg = (char *)CmiAlloc(size);
790                 memcpy(newMsg,msg,size);
791
792 #if CMK_NET_VERSION
793                 handoverPxshmMessage(newMsg,size,rank,broot);
794 #else
795                 handleOneRecvedMsg(size, newMsg);
796 #endif
797                 
798                 ptr += size;
799
800                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
801         }
802 #if 1
803   if(ptr - recvBuf->data != recvBuf->header->bytes){
804                 CmiPrintf("[%d] ptr - recvBuf->data  %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
805         }
806 #endif
807         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
808         recvBuf->header->count=0;
809         recvBuf->header->bytes=0;
810 }
811
812
813 #if CMK_NET_VERSION
814 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
815         CmiAssert(rank == 0);
816 #if CMK_BROADCAST_SPANNING_TREE
817         if (rank == DGRAM_BROADCAST
818 #if CMK_NODE_QUEUE_AVAILABLE
819           || rank == DGRAM_NODEBROADCAST
820 #endif
821          ){
822                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
823                                         }
824 #elif CMK_BROADCAST_HYPERCUBE
825         if (rank == DGRAM_BROADCAST
826 #if CMK_NODE_QUEUE_AVAILABLE
827           || rank == DGRAM_NODEBROADCAST
828 #endif
829          ){
830                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
831                                         }
832 #endif
833
834                 switch (rank) {
835         case DGRAM_BROADCAST: {
836           CmiPushPE(0, newmsg);
837           break;
838       }
839         default:
840                                 {
841                                         
842           CmiPushPE(rank, newmsg);
843                                 }
844         }    /* end of switch */
845 }
846 #endif
847
848
849 /**************************
850  *sendQ helper functions
851  * ****************/
852
853 void initSendQ(PxshmSendQ *q,int size){
854         q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
855
856         q->size = size;
857         q->numEntries = 0;
858
859         q->begin = 0;
860         q->end = 0;
861 }
862
863 void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount){
864         if(q->numEntries == q->size){
865                 //need to resize 
866                 OutgoingMsgRec *oldData = q->data;
867                 int newSize = q->size<<1;
868                 q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
869                 //copy head to the beginning of the new array
870                 CmiAssert(q->begin == q->end);
871
872                 CmiAssert(q->begin < q->size);
873                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
874
875                 if(q->end!=0){
876                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
877                 }
878                 free(oldData);
879                 q->begin = 0;
880                 q->end = q->size;
881                 q->size = newSize;
882         }
883         OutgoingMsgRec *omg = &q->data[q->end];
884         omg->size = size;
885         omg->data = msg;
886         omg->refcount = refcount;
887         (q->end)++;
888         if(q->end >= q->size){
889                 q->end -= q->size;
890         }
891         q->numEntries++;
892 }
893
894 OutgoingMsgRec * popSendQ(PxshmSendQ *q){
895         OutgoingMsgRec * ret;
896         if(0 == q->numEntries){
897                 return NULL;
898         }
899
900         ret = &q->data[q->begin];
901         (q->begin)++;
902         if(q->begin >= q->size){
903                 q->begin -= q->size;
904         }
905         
906         q->numEntries--;
907         return ret;
908 }