a368bf661df29fe9a911db3a5281893f8469b0cd
[charm.git] / src / arch / util / machine-pxshm.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * pxshm --> posix shared memory based network layer for communication
4  * between processes on the same node
5  * This is not going to be the primary mode of communication 
6  * but only for messages below a certain size between
7  * processes on the same node
8  * for non-smp version only
9  * * @ingroup NET
10  * contains only pxshm code for 
11  * - CmiInitPxshm()
12  * - DeliverViaPxShm()
13  * - CommunicationServerPxshm()
14  * - CmiMachineExitPxshm()
15
16
17 There are three options here for synchronization:
18       PXSHM_FENCE is the default. It uses memory fences
19       PXSHM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
20       PXSHM_LOCK will cause POSIX semaphores to be used
21
22   created by 
23         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
24 */
25
26 /**
27  * @addtogroup NET
28  * @{
29  */
30
31 #include <sys/types.h>
32 #include <sys/mman.h>
33 #include <unistd.h>
34 #include <sys/stat.h>
35 #include <fcntl.h>
36 #include <errno.h>
37 #include <signal.h>
38
39
40 /************** 
41    Determine which type of synchronization to use 
42 */
43 #if PXSHM_OSSPINLOCK
44 #include <libkern/OSAtomic.h>
45 #elif PXSHM_LOCK
46 #include <semaphore.h>
47 #else
48 /* Default to using fences */
49 #define PXSHM_FENCE 1
50 #endif
51
52
53 #define MEMDEBUG(x) //x
54
55 #define PXSHM_STATS 0
56
57 #define SENDQ_LIST     0
58
59 /*** The following code was copied verbatim from pcqueue.h file ***/
60 #if ! CMK_SMP
61 #undef CmiMemoryWriteFence
62 #if PXSHM_FENCE
63 #ifdef POWER_PC
64 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
65 #else
66 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
67 //#define CmiMemoryWriteFence(startPtr,nBytes) 
68 #endif
69 #else
70 #undef CmiMemoryWriteFence
71 #define CmiMemoryWriteFence(startPtr,nBytes)  
72 #endif
73
74 #undef CmiMemoryReadFence
75 #if PXSHM_FENCE
76 #ifdef POWER_PC
77 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
78 #else
79 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
80 //#define CmiMemoryReadFence(startPtr,nBytes) 
81 #endif
82 #else
83 #define CmiMemoryReadFence(startPtr,nBytes) 
84 #endif
85
86 #endif
87
88 /*
89 #if CMK_SMP
90 #error  "PXSHM can only be used in non-smp build of Charm++"
91 #endif
92 */
93
94 /***************************************************************************************/
95
96 enum entities {SENDER,RECEIVER};
97
98 /************************
99  *      Implementation currently assumes that
100  *      1) all nodes have the same number of processors
101  *  2) in the nodelist all processors in a node are listed in sequence
102  *   0 1 2 3      4 5 6 7 
103  *   -------      -------
104  *    node 1       node 2 
105  ************************/
106
107 #define NAMESTRLEN 60
108 #define PREFIXSTRLEN 50 
109
110 static int SHMBUFLEN   = (1024*1024*4);
111 static int SHMMAXSIZE  = (1024*1024);
112
113 static int SENDQSTARTSIZE  =  256;
114
115
116 /// This struct is used as the first portion of a shared memory region, followed by data
117 typedef struct {
118         int count; //number of messages
119         int bytes; //number of bytes
120
121 #if PXSHM_OSSPINLOCK
122         OSSpinLock lock;
123 #endif
124
125 #if PXSHM_FENCE
126         volatile int flagSender;
127         CmiMemorySMPSeparation_t pad1;
128         volatile int flagReceiver;
129         CmiMemorySMPSeparation_t pad2;
130         volatile int turn;
131 #endif  
132
133 } sharedBufHeader;
134
135
136 typedef struct {
137 #if PXSHM_LOCK
138         sem_t *mutex;
139 #endif
140         sharedBufHeader *header;        
141         char *data;
142 } sharedBufData;
143
144 typedef struct OutgoingMsgRec
145 {
146   char *data;
147   int  *refcount;
148   int   size;
149 }
150 OutgoingMsgRec;
151
152 typedef struct {
153         int size; //total size of data array
154         int begin; //position of first element
155         int end;        //position of next element
156         int numEntries; //number of entries
157         int rank;       // for dest rank
158 #if SENDQ_LIST
159         int next;         // next dstrank of non-empty queue
160 #endif
161         OutgoingMsgRec *data;
162
163 } PxshmSendQ;
164
165 typedef struct {
166         int nodesize;
167         int noderank;
168         int nodestart,nodeend;//proc numbers for the start and end of this node
169         char prefixStr[PREFIXSTRLEN];
170         char **recvBufNames;
171         char **sendBufNames;
172
173         sharedBufData *recvBufs;
174         sharedBufData *sendBufs;
175
176         PxshmSendQ **sendQs;
177
178
179 #if PXSHM_STATS
180         int sendCount;
181         int validCheckCount;
182         int lockRecvCount;
183         double validCheckTime;
184         double sendTime;
185         double commServerTime;
186 #endif
187
188 } PxshmContext;
189
190 #if SENDQ_LIST
191 static int sendQ_head_index = -1;
192 #endif
193
194 PxshmContext *pxshmContext=NULL; //global context
195
196
197 void calculateNodeSizeAndRank(char **);
198 void setupSharedBuffers();
199 void initAllSendQs();
200
201 void CmiExitPxshm();
202
203 static void cleanupOnAllSigs(int signo)
204 {
205     CmiExitPxshm();
206 }
207
208 /******************
209  *      Initialization routine
210  *      currently just testing start up
211  * ****************/
212 void CmiInitPxshm(char **argv){
213         char *env;
214         MACHSTATE(3,"CminitPxshm start");
215
216         pxshmContext = (PxshmContext *)calloc(1,sizeof(PxshmContext));
217
218 #if CMK_NET_VERSION
219         if(Cmi_charmrun_pid <= 0){
220                 CmiAbort("pxshm must be run with charmrun");
221         }
222 #endif
223         calculateNodeSizeAndRank(argv);
224         if(pxshmContext->nodesize == 1) return;
225         
226         MACHSTATE1(3,"CminitPxshm  %d calculateNodeSizeAndRank",pxshmContext->nodesize);
227
228         env = getenv("CHARM_PXSHM_POOL_SIZE");
229         if (env) {
230             SHMBUFLEN = CmiReadSize(env);
231         }
232         env = getenv("CHARM_PXSHM_MESSAGE_MAX_SIZE");
233         if (env) {
234             SHMMAXSIZE = CmiReadSize(env);
235         }
236         if (SHMMAXSIZE > SHMBUFLEN)
237             CmiAbort("Error> Pxshm pool size is set too small in env variable CHARM_PXSHM_POOL_SIZE");
238
239         SENDQSTARTSIZE = 32 * pxshmContext->nodesize;
240
241         if (_Cmi_mynode == 0)
242             CmiPrintf("Charm++> pxshm enabled: %d cores per node, buffer size: %.1fMB\n", pxshmContext->nodesize, SHMBUFLEN/1024.0/1024.0);
243
244 #if CMK_CRAYXE
245         srand(getpid());
246         int Cmi_charmrun_pid = rand();
247         PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
248 #elif !CMK_NET_VERSION
249         #error "need a unique number"
250 #endif
251         snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
252
253         MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
254
255         setupSharedBuffers();
256
257         MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
258
259         initAllSendQs();
260         
261         MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
262
263         MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
264
265 #if PXSHM_STATS
266         pxshmContext->sendCount=0;
267         pxshmContext->sendTime=0.0;
268         pxshmContext->validCheckCount=0;
269         pxshmContext->validCheckTime=0.0;
270         pxshmContext->commServerTime = 0;
271         pxshmContext->lockRecvCount = 0;
272 #endif
273
274         signal(SIGSEGV, cleanupOnAllSigs);
275         signal(SIGFPE, cleanupOnAllSigs);
276         signal(SIGILL, cleanupOnAllSigs);
277         signal(SIGTERM, cleanupOnAllSigs);
278         signal(SIGABRT, cleanupOnAllSigs);
279         signal(SIGQUIT, cleanupOnAllSigs);
280         signal(SIGBUS, cleanupOnAllSigs);
281         signal(SIGINT, cleanupOnAllSigs);
282         signal(SIGTRAP, cleanupOnAllSigs);
283
284 #if 0
285         char name[64];
286         gethostname(name,64);
287         printf("[%d] name: %s\n", myrank, name);
288 #endif
289 };
290
291 /**************
292  * shutdown shmem objects and semaphores
293  *
294  * *******************/
295 void tearDownSharedBuffers();
296
297 void CmiExitPxshm(){
298         if (pxshmContext == NULL) return;
299         if(pxshmContext->nodesize != 1){
300                 int i;
301                 tearDownSharedBuffers();
302         
303                 for(i=0;i<pxshmContext->nodesize;i++){
304                         if(i != pxshmContext->noderank){
305                                 break;
306                         }
307                 }
308                 free(pxshmContext->recvBufNames[i]);
309                 free(pxshmContext->sendBufNames[i]);
310
311                 free(pxshmContext->recvBufNames);
312                 free(pxshmContext->sendBufNames);
313
314                 free(pxshmContext->recvBufs);
315                 free(pxshmContext->sendBufs);
316
317         }
318 #if PXSHM_STATS
319 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime,pxshmContext->lockRecvCount);
320 #endif
321         free(pxshmContext);
322         pxshmContext = NULL;
323 }
324
325 /******************
326  *Should this message be sent using PxShm or not ?
327  * ***********************/
328
329 /* dstNode is node number */
330 inline 
331 static int CmiValidPxshm(int node, int size){
332 #if PXSHM_STATS
333         pxshmContext->validCheckCount++;
334 #endif
335
336 /*      if(pxshmContext->nodesize == 1){
337                 return 0;
338         }*/
339         //replace by bitmap later
340         //if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
341         return (node >= pxshmContext->nodestart && node <= pxshmContext->nodeend && size <= SHMMAXSIZE )? 1: 0;
342 };
343
344
345 inline int PxshmRank(int dstnode){
346         return dstnode - pxshmContext->nodestart;
347 }
348
349 inline void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount);
350 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
351 inline int flushSendQ(PxshmSendQ *q);
352
353 inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
354   return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
355 }
356
357 /***************
358  *
359  *Send this message through shared memory
360  *if you cannot get lock, put it in the sendQ
361  *Before sending messages pick them from sendQ
362  *
363  * ****************************/
364
365 void CmiSendMessagePxshm(char *msg, int size, int dstnode, int *refcount)
366 {
367 #if PXSHM_STATS
368         double _startSendTime = CmiWallTimer();
369 #endif
370
371         LrtsPrepareEnvelope(msg, size);
372         
373         int dstRank = PxshmRank(dstnode);
374         MEMDEBUG(CmiMemoryCheck());
375   
376 /*
377         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
378         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
379 */
380
381         CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
382         
383         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
384         PxshmSendQ *sendQ = pxshmContext->sendQs[dstRank];
385
386 #if PXSHM_OSSPINLOCK
387         if(! OSSpinLockTry(&dstBuf->header->lock)){
388 #elif PXSHM_LOCK
389         if(sem_trywait(dstBuf->mutex) < 0){
390 #elif PXSHM_FENCE
391         dstBuf->header->flagSender = 1;
392         dstBuf->header->turn = RECEIVER;
393         CmiMemoryReadFence(0,0);
394         CmiMemoryWriteFence(0,0);
395         //if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
396         if(dstBuf->header->flagReceiver){
397                 dstBuf->header->flagSender = 0;
398 #endif
399                 /**failed to get the lock 
400                 insert into q and retain the message*/
401 #if SENDQ_LIST
402                 if (sendQ->numEntries == 0 && sendQ->next == -2) {
403                     sendQ->next = sendQ_head_index;
404                     sendQ_head_index = dstRank;
405                 }
406 #endif
407                 pushSendQ(pxshmContext->sendQs[dstRank], msg, size, refcount);
408                 (*refcount)++;
409                 MEMDEBUG(CmiMemoryCheck());
410                 return;
411         }else{
412
413                 /***
414                  * We got the lock for this buffer
415                  * first write all the messages in the sendQ and then write this guy
416                  * */
417                  if(pxshmContext->sendQs[dstRank]->numEntries == 0){
418                         // send message user event
419                         int ret = sendMessage(msg,size,refcount,dstBuf,pxshmContext->sendQs[dstRank]);
420 #if SENDQ_LIST
421                         if (sendQ->numEntries > 0 && sendQ->next == -2)
422                         {
423                                 sendQ->next = sendQ_head_index;
424                                 sendQ_head_index = dstRank;
425                         }
426 #endif
427                         MACHSTATE(3,"Pxshm Send succeeded immediately");
428                  }else{
429                         (*refcount)+=2;/*this message should not get deleted when the queue is flushed*/
430                         pushSendQ(pxshmContext->sendQs[dstRank],msg,size,refcount);
431 //                      MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
432                         int sent = flushSendQ(sendQ);
433                         (*refcount)--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
434                         MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
435                  }
436                  /* unlock the recvbuffer*/
437
438 #if PXSHM_OSSPINLOCK
439                  OSSpinLockUnlock(&dstBuf->header->lock);
440 #elif PXSHM_LOCK
441                  sem_post(dstBuf->mutex);
442 #elif PXSHM_FENCE
443                  CmiMemoryReadFence(0,0);                       
444                  CmiMemoryWriteFence(0,0);
445                  dstBuf->header->flagSender = 0;
446 #endif
447         }
448 #if PXSHM_STATS
449                 pxshmContext->sendCount ++;
450                 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
451 #endif
452         MEMDEBUG(CmiMemoryCheck());
453
454 };
455
456 inline void emptyAllRecvBufs();
457 inline void flushAllSendQs();
458
459 /**********
460  * Extract all the messages from the recvBuffers you can
461  * Flush all sendQs
462  * ***/
463 inline void CommunicationServerPxshm(){
464         
465 #if PXSHM_STATS
466         double _startCommServerTime =CmiWallTimer();
467 #endif  
468         
469         MEMDEBUG(CmiMemoryCheck());
470         emptyAllRecvBufs();
471         flushAllSendQs();
472
473 #if PXSHM_STATS
474         pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
475 #endif
476
477         MEMDEBUG(CmiMemoryCheck());
478 };
479
480 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
481         CommunicationServerPxshm();
482 }
483
484
485 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
486 {
487         CmiNotifyStillIdle(s);
488 }
489
490
491 void calculateNodeSizeAndRank(char **argv){
492         pxshmContext->nodesize=1;
493         MACHSTATE(3,"calculateNodeSizeAndRank start");
494         //CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
495         CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
496         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
497
498         pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
499         
500         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
501         
502         pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
503         
504         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
505
506         pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
507
508         if(pxshmContext->nodeend >= _Cmi_numnodes){
509                 pxshmContext->nodeend = _Cmi_numnodes-1;
510                 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
511         }
512         
513         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
514 }
515
516 void allocBufNameStrings(char ***bufName);
517 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
518 /***************
519  *      calculate the name of the shared objects and semaphores
520  *      
521  *      name scheme
522  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
523  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
524  *                the semaphore name used by us is the same as the shared memory object name
525  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
526  *
527  *      open these shared objects and semaphores
528  * *********/
529 void setupSharedBuffers(){
530         int i=0;
531
532         allocBufNameStrings(&(pxshmContext->recvBufNames));
533         
534         MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
535         MEMDEBUG(CmiMemoryCheck());
536
537         allocBufNameStrings((&pxshmContext->sendBufNames));
538         
539         MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
540
541         for(i=0;i<pxshmContext->nodesize;i++){
542                 if(i != pxshmContext->noderank){
543                         snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
544                         MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
545                         snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
546                         MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
547                 }
548         }
549         
550         createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
551         createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
552         
553         for(i=0;i<pxshmContext->nodesize;i++){
554                 if(i != pxshmContext->noderank){
555                         //CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
556                         pxshmContext->sendBufs[i].header->count = 0;
557                         pxshmContext->sendBufs[i].header->bytes = 0;
558                 }
559         }
560 }
561
562 void allocBufNameStrings(char ***bufName){
563         int i,count;
564         
565         int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
566         char *tmp = malloc(totalAlloc);
567         
568         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
569
570         *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
571         
572         for(i=0,count=0;i<pxshmContext->nodesize;i++){
573                 if(i != pxshmContext->noderank){
574                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
575                         count++;
576                 }else{
577                         (*bufName)[i] = NULL;
578                 }
579         }
580 }
581
582 void createShmObject(char *name,int size,char **pPtr);
583
584 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
585         int i=0;
586         
587         *bufs = (sharedBufData *)calloc(pxshmContext->nodesize, sizeof(sharedBufData));
588         
589         for(i=0;i<pxshmContext->nodesize;i++){
590                 if(i != pxshmContext->noderank){
591                         createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
592                         memset(((*bufs)[i].header), 0, SHMBUFLEN+sizeof(sharedBufHeader));
593                         (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
594 #if PXSHM_OSSPINLOCK
595                         (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
596 #elif PXSHM_LOCK
597                         (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
598 //                        sem_unlink(bufNames[i]);
599 #endif
600                 }else{
601                         (*bufs)[i].header = NULL;
602                         (*bufs)[i].data = NULL;
603 #if PXSHM_LOCK
604                         (*bufs)[i].mutex = NULL;
605 #endif
606                 }
607         }       
608 }
609
610
611 void createShmObject(char *name,int size,char **pPtr){
612         int fd=-1;
613         int flags;      // opening flags for shared object
614         int open_repeat_count = 0;
615
616         flags= O_RDWR | O_CREAT; // open file in read-write mode and create it if its not there
617         
618         while(fd<0 && open_repeat_count < 100){
619           open_repeat_count++;
620           fd = shm_open(name,flags, S_IRUSR | S_IWUSR); // create the shared object with permissions for only the user to read and write
621           
622           if(fd < 0 && open_repeat_count > 10){
623             fprintf(stderr,"Error(attempt=%d) from shm_open %s while opening %s \n",open_repeat_count, strerror(errno),name);
624             fflush(stderr);
625           }
626         }
627
628         CmiAssert(fd >= 0);
629
630         ftruncate(fd,size); //set the size of the shared memory object
631
632         *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
633         CmiAssert(*pPtr != NULL);
634
635         close(fd);
636         unlink(name);
637 }
638
639 void tearDownSharedBuffers(){
640         int i;
641         for(i= 0;i<pxshmContext->nodesize;i++){
642             if(i != pxshmContext->noderank){
643                 if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
644                     fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
645                 }
646                 sem_unlink(pxshmContext->sendBufNames[i]);
647 #if PXSHM_LOCK
648                 sem_close(pxshmContext->recvBufs[i].mutex);
649                 sem_close(pxshmContext->sendBufs[i].mutex);
650                 sem_unlink(pxshmContext->sendBufNames[i]);
651                 sem_unlink(pxshmContext->recvBufNames[i]);
652                 pxshmContext->recvBufs[i].mutex = NULL;
653                 pxshmContext->sendBufs[i].mutex = NULL;
654 #endif
655             }
656         }
657 };
658
659
660 void initSendQ(PxshmSendQ *q,int size,int rank);
661
662 void initAllSendQs(){
663         int i=0;
664         pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
665         for(i=0;i<pxshmContext->nodesize;i++){
666                 if(i != pxshmContext->noderank){
667                         (pxshmContext->sendQs)[i] = (PxshmSendQ *)calloc(1, sizeof(PxshmSendQ));
668                         initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE,i);
669                 }else{
670                         (pxshmContext->sendQs)[i] = NULL;
671                 }
672         }
673 };
674
675
676 /****************
677  *copy this message into the sharedBuf
678  If it does not succeed
679  *put it into the sendQ 
680  *NOTE: This method is called only after obtaining the corresponding mutex
681  * ********/
682 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
683
684         if(dstBuf->header->bytes+size <= SHMBUFLEN){
685                 /**copy  this message to sharedBuf **/
686                 dstBuf->header->count++;
687                 memcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
688                 dstBuf->header->bytes += size;
689 //              MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
690                 CmiFree(msg);
691                 return 1;
692         }
693         /***
694          * Shared Buffer is too full for this message
695          * **/
696         //printf("[%d] send buffer is too full\n", CmiMyPe());
697         pushSendQ(dstSendQ,msg,size,refcount);
698         (*refcount)++;
699 //      MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
700         return 0;
701 }
702
703 inline OutgoingMsgRec* popSendQ(PxshmSendQ *q);
704
705 /****
706  *Try to send all the messages in the sendq to this destination rank
707  *NOTE: This method is called only after obtaining the corresponding mutex
708  * ************/
709
710 inline int flushSendQ(PxshmSendQ *dstSendQ){
711         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstSendQ->rank]);
712         int count=dstSendQ->numEntries;
713         int sent=0;
714         while(count > 0){
715                 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
716                 (*ogm->refcount)--;
717                 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstSendQ->rank,ogm->refcount);
718                 int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
719                 if(ret==1){
720                         sent++;
721 #if CMK_NET_VERSION
722                         GarbageCollectMsg(ogm);
723 #endif
724                 }
725                 count--;
726         }
727         return sent;
728 }
729
730 inline void emptyRecvBuf(sharedBufData *recvBuf);
731
732 inline void emptyAllRecvBufs(){
733         int i;
734         for(i=0;i<pxshmContext->nodesize;i++){
735                 if(i != pxshmContext->noderank){
736                         sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
737                         if(recvBuf->header->count > 0){
738
739 #if PXSHM_STATS
740                                 pxshmContext->lockRecvCount++;
741 #endif
742
743 #if PXSHM_OSSPINLOCK
744                                 if(! OSSpinLockTry(&recvBuf->header->lock)){
745 #elif PXSHM_LOCK
746                                 if(sem_trywait(recvBuf->mutex) < 0){
747 #elif PXSHM_FENCE
748                                 recvBuf->header->flagReceiver = 1;
749                                 recvBuf->header->turn = SENDER;
750                                 CmiMemoryReadFence(0,0);
751                                 CmiMemoryWriteFence(0,0);
752                                 //if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
753                                 if((recvBuf->header->flagSender)){
754                                         recvBuf->header->flagReceiver = 0;
755 #endif
756                                 }else{
757
758
759                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
760                                         emptyRecvBuf(recvBuf);
761
762 #if PXSHM_OSSPINLOCK
763                                         OSSpinLockUnlock(&recvBuf->header->lock);
764 #elif PXSHM_LOCK
765                                         sem_post(recvBuf->mutex);
766 #elif PXSHM_FENCE
767                                         CmiMemoryReadFence(0,0);
768                                         CmiMemoryWriteFence(0,0);
769                                         recvBuf->header->flagReceiver = 0;
770 #endif
771
772                                 }
773                         
774                         }
775                 }
776         }
777 };
778
779 inline void flushAllSendQs(){
780         int i;
781 #if SENDQ_LIST
782         int index_prev = -1;
783
784         i =  sendQ_head_index;
785         while (i!= -1) {
786                 PxshmSendQ *sendQ = pxshmContext->sendQs[i];
787                 CmiAssert(i !=  pxshmContext->noderank);
788                 if(sendQ->numEntries > 0){
789 #else
790         for(i=0;i<pxshmContext->nodesize;i++) {
791                 if (i == pxshmContext->noderank) continue;
792                 PxshmSendQ *sendQ = pxshmContext->sendQs[i];
793                 if(sendQ->numEntries > 0) {
794 #endif
795         
796 #if PXSHM_OSSPINLOCK
797                         if(OSSpinLockTry(&pxshmContext->sendBufs[i].header->lock)){
798 #elif PXSHM_LOCK
799                         if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
800 #elif PXSHM_FENCE
801                         pxshmContext->sendBufs[i].header->flagSender = 1;
802                         pxshmContext->sendBufs[i].header->turn = RECEIVER;
803                         CmiMemoryReadFence(0,0);                        
804                         CmiMemoryWriteFence(0,0);
805                         if(!(pxshmContext->sendBufs[i].header->flagReceiver && pxshmContext->sendBufs[i].header->turn == RECEIVER)){
806 #endif
807
808                                 MACHSTATE1(3,"flushSendQ %d",i);
809                                 flushSendQ(sendQ);
810                                 
811 #if PXSHM_OSSPINLOCK    
812                                 OSSpinLockUnlock(&pxshmContext->sendBufs[i].header->lock);
813 #elif PXSHM_LOCK
814                                 sem_post(pxshmContext->sendBufs[i].mutex);
815 #elif PXSHM_FENCE
816                                 CmiMemoryReadFence(0,0);                        
817                                 CmiMemoryWriteFence(0,0);
818                                 pxshmContext->sendBufs[i].header->flagSender = 0;
819 #endif
820                         }else{
821
822 #if PXSHM_FENCE
823                           pxshmContext->sendBufs[i].header->flagSender = 0;
824 #endif                          
825
826                         }
827
828                 }        
829 #if SENDQ_LIST
830                 if (sendQ->numEntries == 0) {
831                     if (index_prev != -1)
832                         pxshmContext->sendQs[index_prev]->next = sendQ->next;
833                     else
834                         sendQ_head_index = sendQ->next;
835                     i = sendQ->next;
836                     sendQ->next = -2;
837                 }
838                 else {
839                     index_prev = i;
840                     i = sendQ->next;
841                 }
842 #endif
843         }       
844 };
845
846 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
847
848 void emptyRecvBuf(sharedBufData *recvBuf){
849         int numMessages = recvBuf->header->count;
850         int i=0;
851
852         char *ptr=recvBuf->data;
853
854         for(i=0;i<numMessages;i++){
855                 int size;
856                 int rank, srcpe, seqno, magic, i;
857                 unsigned int broot;
858                 char *msg = ptr;
859                 char *newMsg;
860
861 #if CMK_NET_VERSION
862                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
863                 size = CmiMsgHeaderGetLength(msg);
864 #else
865                 size = CmiGetMsgSize(msg);
866 #endif
867         
868                 newMsg = (char *)CmiAlloc(size);
869                 memcpy(newMsg,msg,size);
870
871 #if CMK_NET_VERSION
872                 handoverPxshmMessage(newMsg,size,rank,broot);
873 #else
874                 handleOneRecvedMsg(size, newMsg);
875 #endif
876                 
877                 ptr += size;
878
879                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
880         }
881 #if 1
882   if(ptr - recvBuf->data != recvBuf->header->bytes){
883                 CmiPrintf("[%d] ptr - recvBuf->data  %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
884         }
885 #endif
886         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
887         recvBuf->header->count=0;
888         recvBuf->header->bytes=0;
889 }
890
891
892 #if CMK_NET_VERSION
893 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
894         CmiAssert(rank == 0);
895 #if CMK_BROADCAST_SPANNING_TREE
896         if (rank == DGRAM_BROADCAST
897 #if CMK_NODE_QUEUE_AVAILABLE
898           || rank == DGRAM_NODEBROADCAST
899 #endif
900          ){
901                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
902                                         }
903 #elif CMK_BROADCAST_HYPERCUBE
904         if (rank == DGRAM_BROADCAST
905 #if CMK_NODE_QUEUE_AVAILABLE
906           || rank == DGRAM_NODEBROADCAST
907 #endif
908          ){
909                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
910                                         }
911 #endif
912
913                 switch (rank) {
914         case DGRAM_BROADCAST: {
915           CmiPushPE(0, newmsg);
916           break;
917       }
918         default:
919                                 {
920                                         
921           CmiPushPE(rank, newmsg);
922                                 }
923         }    /* end of switch */
924 }
925 #endif
926
927
928 /**************************
929  *sendQ helper functions
930  * ****************/
931
932 void initSendQ(PxshmSendQ *q,int size, int rank){
933         q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
934
935         q->size = size;
936         q->numEntries = 0;
937
938         q->begin = 0;
939         q->end = 0;
940
941         q->rank = rank;
942 #if SENDQ_LIST
943         q->next = -2;
944 #endif
945 }
946
947 void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount){
948         if(q->numEntries == q->size){
949                 //need to resize 
950                 OutgoingMsgRec *oldData = q->data;
951                 int newSize = q->size<<1;
952                 q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
953                 //copy head to the beginning of the new array
954                 CmiAssert(q->begin == q->end);
955
956                 CmiAssert(q->begin < q->size);
957                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
958
959                 if(q->end!=0){
960                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
961                 }
962                 free(oldData);
963                 q->begin = 0;
964                 q->end = q->size;
965                 q->size = newSize;
966         }
967         OutgoingMsgRec *omg = &q->data[q->end];
968         omg->size = size;
969         omg->data = msg;
970         omg->refcount = refcount;
971         (q->end)++;
972         if(q->end >= q->size){
973                 q->end -= q->size;
974         }
975         q->numEntries++;
976 }
977
978 OutgoingMsgRec * popSendQ(PxshmSendQ *q){
979         OutgoingMsgRec * ret;
980         if(0 == q->numEntries){
981                 return NULL;
982         }
983
984         ret = &q->data[q->begin];
985         (q->begin)++;
986         if(q->begin >= q->size){
987                 q->begin -= q->size;
988         }
989         
990         q->numEntries--;
991         return ret;
992 }