minor cleanup and removing some prints
[charm.git] / src / arch / util / machine-pxshm.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * pxshm --> posix shared memory based network layer for communication
4  * between processes on the same node
5  * This is not going to be the primary mode of communication 
6  * but only for messages below a certain size between
7  * processes on the same node
8  * for non-smp version only
9  * * @ingroup NET
10  * contains only pxshm code for 
11  * - CmiInitPxshm()
12  * - DeliverViaPxShm()
13  * - CommunicationServerPxshm()
14  * - CmiMachineExitPxshm()
15
16
17 There are three options here for synchronization:
18       PXSHM_FENCE is the default. It uses memory fences
19       PXSHM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
20       PXSHM_LOCK will cause POSIX semaphores to be used
21
22   created by 
23         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
24 */
25
26 /**
27  * @addtogroup NET
28  * @{
29  */
30
31 #include <sys/types.h>
32 #include <sys/mman.h>
33 #include <unistd.h>
34 #include <sys/stat.h>
35 #include <fcntl.h>
36 #include <errno.h>
37
38
39 /************** 
40    Determine which type of synchronization to use 
41 */
42 #if PXSHM_OSSPINLOCK
43 #include <libkern/OSAtomic.h>
44 #elif PXSHM_LOCK
45 #include <semaphore.h>
46 #else
47 /* Default to using fences */
48 #define PXSHM_FENCE 1
49 #endif
50
51
52 #define MEMDEBUG(x) //x
53
54 #define PXSHM_STATS 0
55
56
57 /*** The following code was copied verbatim from pcqueue.h file ***/
58 #undef CmiMemoryWriteFence
59 #if PXSHM_FENCE
60 #ifdef POWER_PC
61 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
62 #else
63 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
64 //#define CmiMemoryWriteFence(startPtr,nBytes) 
65 #endif
66 #else
67 #undef CmiMemoryWriteFence
68 #define CmiMemoryWriteFence(startPtr,nBytes)  
69 #endif
70
71 #undef CmiMemoryReadFence
72 #if PXSHM_FENCE
73 #ifdef POWER_PC
74 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
75 #else
76 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
77 //#define CmiMemoryReadFence(startPtr,nBytes) 
78 #endif
79 #else
80 #define CmiMemoryReadFence(startPtr,nBytes) 
81 #endif
82
83 #if CMK_SMP
84 #error  "PXSHM can only be used in non-smp build of Charm++"
85 #endif
86
87 /***************************************************************************************/
88
89 enum entities {SENDER,RECEIVER};
90
91 /************************
92  *      Implementation currently assumes that
93  *      1) all nodes have the same number of processors
94  *  2) in the nodelist all processors in a node are listed in sequence
95  *   0 1 2 3      4 5 6 7 
96  *   -------      -------
97  *    node 1       node 2 
98  ************************/
99
100 #define NAMESTRLEN 60
101 #define PREFIXSTRLEN 50 
102
103 #define SHMBUFLEN (1024*1024*1)
104
105 #define SENDQSTARTSIZE    128
106
107
108 /// This struct is used as the first portion of a shared memory region, followed by data
109 typedef struct {
110         int count; //number of messages
111         int bytes; //number of bytes
112
113 #if PXSHM_OSSPINLOCK
114         OSSpinLock lock;
115 #endif
116
117 #if PXSHM_FENCE
118         volatile int flagSender;
119         CmiMemorySMPSeparation_t pad1;
120         volatile int flagReceiver;
121         CmiMemorySMPSeparation_t pad2;
122         volatile int turn;
123 #endif  
124
125 } sharedBufHeader;
126
127
128 typedef struct {
129 #if PXSHM_LOCK
130         sem_t *mutex;
131 #endif
132         sharedBufHeader *header;        
133         char *data;
134 } sharedBufData;
135
136 typedef struct OutgoingMsgRec
137 {
138   char *data;
139   int  *refcount;
140   int   size;
141 }
142 OutgoingMsgRec;
143
144 typedef struct {
145         int size; //total size of data array
146         int begin; //position of first element
147         int end;        //position of next element
148         int numEntries; //number of entries
149
150         OutgoingMsgRec *data;
151
152 } PxshmSendQ;
153
154 typedef struct {
155         int nodesize;
156         int noderank;
157         int nodestart,nodeend;//proc numbers for the start and end of this node
158         char prefixStr[PREFIXSTRLEN];
159         char **recvBufNames;
160         char **sendBufNames;
161
162         sharedBufData *recvBufs;
163         sharedBufData *sendBufs;
164
165         PxshmSendQ **sendQs;
166
167
168 #if PXSHM_STATS
169         int sendCount;
170         int validCheckCount;
171         int lockRecvCount;
172         double validCheckTime;
173         double sendTime;
174         double commServerTime;
175 #endif
176
177 } PxshmContext;
178
179
180
181 PxshmContext *pxshmContext=NULL; //global context
182
183
184 void calculateNodeSizeAndRank(char **);
185 void setupSharedBuffers();
186 void initAllSendQs();
187
188 /******************
189  *      Initialization routine
190  *      currently just testing start up
191  * ****************/
192 void CmiInitPxshm(char **argv){
193         MACHSTATE(3,"CminitPxshm start");
194         pxshmContext = (PxshmContext *)calloc(1,sizeof(PxshmContext));
195
196 #if CMK_NET_VERSION
197         if(Cmi_charmrun_pid <= 0){
198                 CmiAbort("pxshm must be run with charmrun");
199         }
200 #endif
201         calculateNodeSizeAndRank(argv);
202         if(pxshmContext->nodesize == 1) return;
203         
204         MACHSTATE1(3,"CminitPxshm  %d calculateNodeSizeAndRank",pxshmContext->nodesize);
205
206 #if CMK_CRAYXE
207         srand(getpid());
208         int Cmi_charmrun_pid = rand();
209         PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
210 #elif !CMK_NET_VERSION
211         #error "need a unique number"
212 #endif
213         snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
214
215         MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
216
217         setupSharedBuffers();
218
219         MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
220
221         initAllSendQs();
222         
223         MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
224
225         MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
226
227 #if PXSHM_STATS
228         pxshmContext->sendCount=0;
229         pxshmContext->sendTime=0.0;
230         pxshmContext->validCheckCount=0;
231         pxshmContext->validCheckTime=0.0;
232         pxshmContext->commServerTime = 0;
233         pxshmContext->lockRecvCount = 0;
234 #endif
235
236 #if 0
237         char name[64];
238         gethostname(name,64);
239         printf("[%d] name: %s\n", myrank, name);
240 #endif
241 };
242
243 /**************
244  * shutdown shmem objects and semaphores
245  *
246  * *******************/
247 void tearDownSharedBuffers();
248
249 void CmiExitPxshm(){
250         int i=0;
251         
252         if(pxshmContext->nodesize != 1){
253                 tearDownSharedBuffers();
254         
255                 for(i=0;i<pxshmContext->nodesize;i++){
256                         if(i != pxshmContext->noderank){
257                                 break;
258                         }
259                 }
260                 free(pxshmContext->recvBufNames[i]);
261                 free(pxshmContext->sendBufNames[i]);
262
263                 free(pxshmContext->recvBufNames);
264                 free(pxshmContext->sendBufNames);
265
266                 free(pxshmContext->recvBufs);
267                 free(pxshmContext->sendBufs);
268
269         }
270 #if PXSHM_STATS
271 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime,pxshmContext->lockRecvCount);
272 #endif
273         free(pxshmContext);
274 }
275
276 /******************
277  *Should this message be sent using PxShm or not ?
278  * ***********************/
279
280 inline int CmiValidPxshm(int dst, int size){
281 #if PXSHM_STATS
282         pxshmContext->validCheckCount++;
283 #endif
284
285 /*      if(pxshmContext->nodesize == 1){
286                 return 0;
287         }*/
288         //replace by bitmap later
289         //if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
290         if(dst >= pxshmContext->nodestart && dst <= pxshmContext->nodeend && size < SHMBUFLEN ){
291                 return 1;
292         }else{
293                 return 0;
294         }
295 };
296
297
298 inline int PxshmRank(int dst){
299         return dst - pxshmContext->nodestart;
300 }
301 inline void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount);
302 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
303 inline int flushSendQ(int dstRank);
304
305 inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
306   return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
307 }
308
309 /***************
310  *
311  *Send this message through shared memory
312  *if you cannot get lock, put it in the sendQ
313  *Before sending messages pick them from sendQ
314  *
315  * ****************************/
316
317 void CmiSendMessagePxshm(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot)
318 {
319
320 #if PXSHM_STATS
321         double _startSendTime = CmiWallTimer();
322 #endif
323
324         LrtsPrepareEnvelope(msg, size);
325         
326         int dstRank = PxshmRank(dstpe);
327         MEMDEBUG(CmiMemoryCheck());
328   
329         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
330         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
331
332         CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
333         
334         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
335
336
337 #if PXSHM_OSSPINLOCK
338         if(! OSSpinLockTry(&dstBuf->header->lock)){
339 #elif PXSHM_LOCK
340         if(sem_trywait(dstBuf->mutex) < 0){
341 #elif PXSHM_FENCE
342         dstBuf->header->flagSender = 1;
343         dstBuf->header->turn = RECEIVER;
344         CmiMemoryReadFence(0,0);
345         CmiMemoryWriteFence(0,0);
346         //if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
347         if(dstBuf->header->flagReceiver){
348                 dstBuf->header->flagSender = 0;
349 #endif
350                 /**failed to get the lock 
351                 insert into q and retain the message*/
352
353                 pushSendQ(pxshmContext->sendQs[dstRank], msg, size, refcount);
354                 (*refcount)++;
355                 MEMDEBUG(CmiMemoryCheck());
356                 return;
357         }else{
358
359                 /***
360                  * We got the lock for this buffer
361                  * first write all the messages in the sendQ and then write this guy
362                  * */
363                  if(pxshmContext->sendQs[dstRank]->numEntries == 0){
364                                 // send message user event
365                                 int ret = sendMessage(msg,size,refcount,dstBuf,pxshmContext->sendQs[dstRank]);
366                                 MACHSTATE(3,"Pxshm Send succeeded immediately");
367                  }else{
368                                 (*refcount)+=2;/*this message should not get deleted when the queue is flushed*/
369                                 pushSendQ(pxshmContext->sendQs[dstRank],msg,size,refcount);
370                                 MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
371                                 int sent = flushSendQ(dstRank);
372                                 (*refcount)--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
373                                 MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
374                  }
375                  /* unlock the recvbuffer*/
376
377 #if PXSHM_OSSPINLOCK
378                  OSSpinLockUnlock(&dstBuf->header->lock);
379 #elif PXSHM_LOCK
380                  sem_post(dstBuf->mutex);
381 #elif PXSHM_FENCE
382                  CmiMemoryReadFence(0,0);                       
383                  CmiMemoryWriteFence(0,0);
384                  dstBuf->header->flagSender = 0;
385 #endif
386         }
387 #if PXSHM_STATS
388                 pxshmContext->sendCount ++;
389                 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
390 #endif
391         MEMDEBUG(CmiMemoryCheck());
392
393 };
394
395 inline void emptyAllRecvBufs();
396 inline void flushAllSendQs();
397
398 /**********
399  * Extract all the messages from the recvBuffers you can
400  * Flush all sendQs
401  * ***/
402 inline void CommunicationServerPxshm(){
403         
404 #if PXSHM_STATS
405         double _startCommServerTime =CmiWallTimer();
406 #endif  
407         
408         MEMDEBUG(CmiMemoryCheck());
409         emptyAllRecvBufs();
410         flushAllSendQs();
411
412 #if PXSHM_STATS
413         pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
414 #endif
415
416         MEMDEBUG(CmiMemoryCheck());
417 };
418
419 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
420         CommunicationServerPxshm();
421 }
422
423
424 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
425 {
426         CmiNotifyStillIdle(s);
427 }
428
429
430 void calculateNodeSizeAndRank(char **argv){
431         pxshmContext->nodesize=1;
432         MACHSTATE(3,"calculateNodeSizeAndRank start");
433         //CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
434         CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
435         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
436
437         pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
438         
439         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
440         
441         pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
442         
443         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
444
445         pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
446
447         if(pxshmContext->nodeend >= _Cmi_numnodes){
448                 pxshmContext->nodeend = _Cmi_numnodes-1;
449                 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
450         }
451         
452         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
453 }
454
455 void allocBufNameStrings(char ***bufName);
456 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
457 /***************
458  *      calculate the name of the shared objects and semaphores
459  *      
460  *      name scheme
461  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
462  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
463  *                the semaphore name used by us is the same as the shared memory object name
464  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
465  *
466  *      open these shared objects and semaphores
467  * *********/
468 void setupSharedBuffers(){
469         int i=0;
470
471         allocBufNameStrings(&(pxshmContext->recvBufNames));
472         
473         MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
474         MEMDEBUG(CmiMemoryCheck());
475
476         allocBufNameStrings((&pxshmContext->sendBufNames));
477         
478         MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
479
480         for(i=0;i<pxshmContext->nodesize;i++){
481                 if(i != pxshmContext->noderank){
482                         snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
483                         MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
484                         snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
485                         MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
486                 }
487         }
488         
489         createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
490         createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
491         
492         for(i=0;i<pxshmContext->nodesize;i++){
493                 if(i != pxshmContext->noderank){
494                         //CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
495                         pxshmContext->sendBufs[i].header->count = 0;
496                         pxshmContext->sendBufs[i].header->bytes = 0;
497                 }
498         }
499 }
500
501 void allocBufNameStrings(char ***bufName){
502         int i,count;
503         
504         int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
505         char *tmp = malloc(totalAlloc);
506         
507         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
508
509         *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
510         
511         for(i=0,count=0;i<pxshmContext->nodesize;i++){
512                 if(i != pxshmContext->noderank){
513                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
514                         count++;
515                 }else{
516                         (*bufName)[i] = NULL;
517                 }
518         }
519 }
520
521 void createShmObject(char *name,int size,char **pPtr);
522
523 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
524         int i=0;
525         
526         *bufs = (sharedBufData *)calloc(pxshmContext->nodesize, sizeof(sharedBufData));
527         
528         for(i=0;i<pxshmContext->nodesize;i++){
529                 if(i != pxshmContext->noderank){
530                         createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
531                         memset(((*bufs)[i].header), 0, SHMBUFLEN+sizeof(sharedBufHeader));
532                         (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
533 #if PXSHM_OSSPINLOCK
534                         (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
535 #elif PXSHM_LOCK
536                         (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
537 #endif
538                 }else{
539                         (*bufs)[i].header = NULL;
540                         (*bufs)[i].data = NULL;
541 #if PXSHM_LOCK
542                         (*bufs)[i].mutex = NULL;
543 #endif
544                 }
545         }       
546 }
547
548
549 void createShmObject(char *name,int size,char **pPtr){
550         int fd=-1;
551         int flags;      // opening flags for shared object
552         int open_repeat_count = 0;
553
554         flags= O_RDWR | O_CREAT; // open file in read-write mode and create it if its not there
555         
556         while(fd<0 && open_repeat_count < 100){
557           open_repeat_count++;
558           fd = shm_open(name,flags, S_IRUSR | S_IWUSR); // create the shared object with permissions for only the user to read and write
559           
560           if(fd < 0 && open_repeat_count > 10){
561             fprintf(stderr,"Error(attempt=%d) from shm_open %s while opening %s \n",open_repeat_count, strerror(errno),name);
562             fflush(stderr);
563           }
564         }
565
566         CmiAssert(fd >= 0);
567
568         ftruncate(fd,size); //set the size of the shared memory object
569
570         *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
571         CmiAssert(*pPtr != NULL);
572
573         close(fd);
574 }
575
576 void tearDownSharedBuffers(){
577         int i;
578         for(i= 0;i<pxshmContext->nodesize;i++){
579                 if(i != pxshmContext->noderank){
580                         if(     shm_unlink(pxshmContext->recvBufNames[i]) < 0){
581                                 fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
582                         }
583
584 #if PXSHM_LOCK
585                         sem_close(pxshmContext->recvBufs[i].mutex);
586                         //sem_unlink(pxshmContext->recvBufNames[i]);
587                         sem_close(pxshmContext->sendBufs[i].mutex);
588 #endif
589                 }
590         }
591 };
592
593
594 void initSendQ(PxshmSendQ *q,int size);
595
596 void initAllSendQs(){
597         int i=0;
598         pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
599         for(i=0;i<pxshmContext->nodesize;i++){
600                 if(i != pxshmContext->noderank){
601                         (pxshmContext->sendQs)[i] = (PxshmSendQ *)calloc(1, sizeof(PxshmSendQ));
602                         initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE);
603                 }else{
604                         (pxshmContext->sendQs)[i] = NULL;
605                 }
606         }
607 };
608
609
610 /****************
611  *copy this message into the sharedBuf
612  If it does not succeed
613  *put it into the sendQ 
614  *NOTE: This method is called only after obtaining the corresponding mutex
615  * ********/
616 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
617
618         if(dstBuf->header->bytes+size <= SHMBUFLEN){
619                 /**copy  this message to sharedBuf **/
620                 dstBuf->header->count++;
621                 memcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
622                 dstBuf->header->bytes += size;
623                 MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
624                 CmiFree(msg);
625                 return 1;
626         }
627         /***
628          * Shared Buffer is too full for this message
629          * **/
630         //printf("[%d] send buffer is too full\n", CmiMyPe());
631         pushSendQ(dstSendQ,msg,size,refcount);
632         (*refcount)++;
633         MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
634         return 0;
635 }
636
637 inline OutgoingMsgRec* popSendQ(PxshmSendQ *q);
638
639 /****
640  *Try to send all the messages in the sendq to this destination rank
641  *NOTE: This method is called only after obtaining the corresponding mutex
642  * ************/
643
644 inline int flushSendQ(int dstRank){
645         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
646         PxshmSendQ *dstSendQ = pxshmContext->sendQs[dstRank];
647         int count=dstSendQ->numEntries;
648         int sent=0;
649         while(count > 0){
650                 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
651                 (*ogm->refcount)--;
652                 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
653                 int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
654                 if(ret==1){
655                         sent++;
656 #if CMK_NET_VERSION
657                         GarbageCollectMsg(ogm);
658 #endif
659                 }
660                 count--;
661         }
662         return sent;
663 }
664
665 inline void emptyRecvBuf(sharedBufData *recvBuf);
666
667 inline void emptyAllRecvBufs(){
668         int i;
669         for(i=0;i<pxshmContext->nodesize;i++){
670                 if(i != pxshmContext->noderank){
671                         sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
672                         if(recvBuf->header->count > 0){
673
674 #if PXSHM_STATS
675                                 pxshmContext->lockRecvCount++;
676 #endif
677
678
679 #if PXSHM_OSSPINLOCK
680                                 if(! OSSpinLockTry(&recvBuf->header->lock)){
681 #elif PXSHM_LOCK
682                                 if(sem_trywait(recvBuf->mutex) < 0){
683 #elif PXSHM_FENCE
684                                 recvBuf->header->flagReceiver = 1;
685                                 recvBuf->header->turn = SENDER;
686                                 CmiMemoryReadFence(0,0);
687                                 CmiMemoryWriteFence(0,0);
688                                 //if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
689                                 if((recvBuf->header->flagSender)){
690                                         recvBuf->header->flagReceiver = 0;
691 #endif
692                                 }else{
693
694
695                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
696                                         emptyRecvBuf(recvBuf);
697
698 #if PXSHM_OSSPINLOCK
699                                         OSSpinLockUnlock(&recvBuf->header->lock);
700 #elif PXSHM_LOCK
701                                         sem_post(recvBuf->mutex);
702 #elif PXSHM_FENCE
703                                         CmiMemoryReadFence(0,0);
704                                         CmiMemoryWriteFence(0,0);
705                                         recvBuf->header->flagReceiver = 0;
706 #endif
707
708                                 }
709                         
710                         }
711                 }
712         }
713 };
714
715 inline void flushAllSendQs(){
716         int i=0;
717         
718         for(i=0;i<pxshmContext->nodesize;i++){
719                 if(i != pxshmContext->noderank && pxshmContext->sendQs[i]->numEntries > 0){
720         
721 #if PXSHM_OSSPINLOCK
722                         if(OSSpinLockTry(&pxshmContext->sendBufs[i].header->lock)){
723 #elif PXSHM_LOCK
724                         if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
725 #elif PXSHM_FENCE
726                         pxshmContext->sendBufs[i].header->flagSender = 1;
727                         pxshmContext->sendBufs[i].header->turn = RECEIVER;
728                         CmiMemoryReadFence(0,0);                        
729                         CmiMemoryWriteFence(0,0);
730                         if(!(pxshmContext->sendBufs[i].header->flagReceiver && pxshmContext->sendBufs[i].header->turn == RECEIVER)){
731 #endif
732
733                                 MACHSTATE1(3,"flushSendQ %d",i);
734                                 flushSendQ(i);
735
736
737                                 
738 #if PXSHM_OSSPINLOCK    
739                                 OSSpinLockUnlock(&pxshmContext->sendBufs[i].header->lock);
740 #elif PXSHM_LOCK
741                                 sem_post(pxshmContext->sendBufs[i].mutex);
742 #elif PXSHM_FENCE
743                                 CmiMemoryReadFence(0,0);                        
744                                 CmiMemoryWriteFence(0,0);
745                                 pxshmContext->sendBufs[i].header->flagSender = 0;
746 #endif
747                         }else{
748
749 #if PXSHM_FENCE
750                           pxshmContext->sendBufs[i].header->flagSender = 0;
751 #endif                          
752
753                         }
754
755                 }        
756         }       
757 };
758
759 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
760
761 void emptyRecvBuf(sharedBufData *recvBuf){
762         int numMessages = recvBuf->header->count;
763         int i=0;
764
765         char *ptr=recvBuf->data;
766
767         for(i=0;i<numMessages;i++){
768                 int size;
769                 int rank, srcpe, seqno, magic, i;
770                 unsigned int broot;
771                 char *msg = ptr;
772                 char *newMsg;
773
774 #if CMK_NET_VERSION
775                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
776                 size = CmiMsgHeaderGetLength(msg);
777 #else
778                 size = CmiGetMsgSize(msg);
779 #endif
780         
781                 newMsg = (char *)CmiAlloc(size);
782                 memcpy(newMsg,msg,size);
783
784 #if CMK_NET_VERSION
785                 handoverPxshmMessage(newMsg,size,rank,broot);
786 #else
787                 handleOneRecvedMsg(size, newMsg);
788 #endif
789                 
790                 ptr += size;
791
792                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
793         }
794 #if 1
795   if(ptr - recvBuf->data != recvBuf->header->bytes){
796                 CmiPrintf("[%d] ptr - recvBuf->data  %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
797         }
798 #endif
799         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
800         recvBuf->header->count=0;
801         recvBuf->header->bytes=0;
802 }
803
804
805 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
806 #if CMK_NET_VERSION
807         CmiAssert(rank == 0);
808 #if CMK_BROADCAST_SPANNING_TREE
809         if (rank == DGRAM_BROADCAST
810 #if CMK_NODE_QUEUE_AVAILABLE
811           || rank == DGRAM_NODEBROADCAST
812 #endif
813          ){
814                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
815                                         }
816 #elif CMK_BROADCAST_HYPERCUBE
817         if (rank == DGRAM_BROADCAST
818 #if CMK_NODE_QUEUE_AVAILABLE
819           || rank == DGRAM_NODEBROADCAST
820 #endif
821          ){
822                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
823                                         }
824 #endif
825
826                 switch (rank) {
827         case DGRAM_BROADCAST: {
828           CmiPushPE(0, newmsg);
829           break;
830       }
831         default:
832                                 {
833                                         
834           CmiPushPE(rank, newmsg);
835                                 }
836         }    /* end of switch */
837 #endif
838 }
839
840
841 /**************************
842  *sendQ helper functions
843  * ****************/
844
845 void initSendQ(PxshmSendQ *q,int size){
846         q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
847
848         q->size = size;
849         q->numEntries = 0;
850
851         q->begin = 0;
852         q->end = 0;
853 }
854
855 void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount){
856         if(q->numEntries == q->size){
857                 //need to resize 
858                 OutgoingMsgRec *oldData = q->data;
859                 int newSize = q->size<<1;
860                 q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
861                 //copy head to the beginning of the new array
862                 CmiAssert(q->begin == q->end);
863
864                 CmiAssert(q->begin < q->size);
865                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
866
867                 if(q->end!=0){
868                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
869                 }
870                 free(oldData);
871                 q->begin = 0;
872                 q->end = q->size;
873                 q->size = newSize;
874         }
875         OutgoingMsgRec *omg = &q->data[q->end];
876         omg->size = size;
877         omg->data = msg;
878         omg->refcount = refcount;
879         (q->end)++;
880         if(q->end >= q->size){
881                 q->end -= q->size;
882         }
883         q->numEntries++;
884 }
885
886 OutgoingMsgRec * popSendQ(PxshmSendQ *q){
887         OutgoingMsgRec * ret;
888         if(0 == q->numEntries){
889                 return NULL;
890         }
891
892         ret = &q->data[q->begin];
893         (q->begin)++;
894         if(q->begin >= q->size){
895                 q->begin -= q->size;
896         }
897         
898         q->numEntries--;
899         return ret;
900 }