fix for smp
[charm.git] / src / arch / util / machine-pxshm.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * pxshm --> posix shared memory based network layer for communication
4  * between processes on the same node
5  * This is not going to be the primary mode of communication 
6  * but only for messages below a certain size between
7  * processes on the same node
8  * for non-smp version only
9  * * @ingroup NET
10  * contains only pxshm code for 
11  * - CmiInitPxshm()
12  * - DeliverViaPxShm()
13  * - CommunicationServerPxshm()
14  * - CmiMachineExitPxshm()
15
16
17 There are three options here for synchronization:
18       PXSHM_FENCE is the default. It uses memory fences
19       PXSHM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
20       PXSHM_LOCK will cause POSIX semaphores to be used
21
22   created by 
23         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
24 */
25
26 /**
27  * @addtogroup NET
28  * @{
29  */
30
31 #include <sys/types.h>
32 #include <sys/mman.h>
33 #include <unistd.h>
34 #include <sys/stat.h>
35 #include <fcntl.h>
36 #include <errno.h>
37 #include <signal.h>
38
39
40 /************** 
41    Determine which type of synchronization to use 
42 */
43 #if PXSHM_OSSPINLOCK
44 #include <libkern/OSAtomic.h>
45 #elif PXSHM_LOCK
46 #include <semaphore.h>
47 #else
48 /* Default to using fences */
49 #define PXSHM_FENCE 1
50 #endif
51
52
53 #define MEMDEBUG(x) //x
54
55 #define PXSHM_STATS 0
56
57 #define SENDQ_LIST     0
58
59 /*** The following code was copied verbatim from pcqueue.h file ***/
60 #if ! CMK_SMP
61 #undef CmiMemoryWriteFence
62 #if PXSHM_FENCE
63 #ifdef POWER_PC
64 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
65 #else
66 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
67 //#define CmiMemoryWriteFence(startPtr,nBytes) 
68 #endif
69 #else
70 #undef CmiMemoryWriteFence
71 #define CmiMemoryWriteFence(startPtr,nBytes)  
72 #endif
73
74 #undef CmiMemoryReadFence
75 #if PXSHM_FENCE
76 #ifdef POWER_PC
77 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
78 #else
79 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
80 //#define CmiMemoryReadFence(startPtr,nBytes) 
81 #endif
82 #else
83 #define CmiMemoryReadFence(startPtr,nBytes) 
84 #endif
85
86 #endif
87
88 /*
89 #if CMK_SMP
90 #error  "PXSHM can only be used in non-smp build of Charm++"
91 #endif
92 */
93
94 /***************************************************************************************/
95
96 enum entities {SENDER,RECEIVER};
97
98 /************************
99  *      Implementation currently assumes that
100  *      1) all nodes have the same number of processors
101  *  2) in the nodelist all processors in a node are listed in sequence
102  *   0 1 2 3      4 5 6 7 
103  *   -------      -------
104  *    node 1       node 2 
105  ************************/
106
107 #define NAMESTRLEN 60
108 #define PREFIXSTRLEN 50 
109
110 static int SHMBUFLEN   = (1024*1024*4);
111 static int SHMMAXSIZE  = (1024*1024);
112
113 static int SENDQSTARTSIZE  =  256;
114
115
116 /// This struct is used as the first portion of a shared memory region, followed by data
117 typedef struct {
118         int count; //number of messages
119         int bytes; //number of bytes
120
121 #if PXSHM_OSSPINLOCK
122         OSSpinLock lock;
123 #endif
124
125 #if PXSHM_FENCE
126         volatile int flagSender;
127         CmiMemorySMPSeparation_t pad1;
128         volatile int flagReceiver;
129         CmiMemorySMPSeparation_t pad2;
130         volatile int turn;
131 #endif  
132
133 } sharedBufHeader;
134
135
136 typedef struct {
137 #if PXSHM_LOCK
138         sem_t *mutex;
139 #endif
140         sharedBufHeader *header;        
141         char *data;
142 } sharedBufData;
143
144 typedef struct OutgoingMsgRec
145 {
146   char *data;
147   int  *refcount;
148   int   size;
149 }
150 OutgoingMsgRec;
151
152 typedef struct {
153         int size; //total size of data array
154         int begin; //position of first element
155         int end;        //position of next element
156         int numEntries; //number of entries
157         int rank;       // for dest rank
158 #if SENDQ_LIST
159         int next;         // next dstrank of non-empty queue
160 #endif
161         OutgoingMsgRec *data;
162
163 } PxshmSendQ;
164
165 typedef struct {
166         int nodesize;
167         int noderank;
168         int nodestart,nodeend;//proc numbers for the start and end of this node
169         char prefixStr[PREFIXSTRLEN];
170         char **recvBufNames;
171         char **sendBufNames;
172
173         sharedBufData *recvBufs;
174         sharedBufData *sendBufs;
175
176         PxshmSendQ **sendQs;
177
178
179 #if PXSHM_STATS
180         int sendCount;
181         int validCheckCount;
182         int lockRecvCount;
183         double validCheckTime;
184         double sendTime;
185         double commServerTime;
186 #endif
187
188 } PxshmContext;
189
190 #if SENDQ_LIST
191 static int sendQ_head_index = -1;
192 #endif
193
194 PxshmContext *pxshmContext=NULL; //global context
195
196
197 void calculateNodeSizeAndRank(char **);
198 void setupSharedBuffers();
199 void initAllSendQs();
200
201 void CmiExitPxshm();
202
203 static void cleanupOnAllSigs(int signo)
204 {
205     CmiExitPxshm();
206 }
207
208 /******************
209  *      Initialization routine
210  *      currently just testing start up
211  * ****************/
212 void CmiInitPxshm(char **argv){
213         char *env;
214         MACHSTATE(3,"CminitPxshm start");
215
216         pxshmContext = (PxshmContext *)calloc(1,sizeof(PxshmContext));
217
218 #if CMK_NET_VERSION
219         if(Cmi_charmrun_pid <= 0){
220                 CmiAbort("pxshm must be run with charmrun");
221         }
222 #endif
223         calculateNodeSizeAndRank(argv);
224         if(pxshmContext->nodesize == 1) return;
225         
226         MACHSTATE1(3,"CminitPxshm  %d calculateNodeSizeAndRank",pxshmContext->nodesize);
227
228         env = getenv("CHARM_PXSHM_POOL_SIZE");
229         if (env) {
230             SHMBUFLEN = CmiReadSize(env);
231         }
232         env = getenv("CHARM_PXSHM_MESSAGE_MAX_SIZE");
233         if (env) {
234             SHMMAXSIZE = CmiReadSize(env);
235         }
236         if (SHMMAXSIZE > SHMBUFLEN)
237             CmiAbort("Error> Pxshm pool size is set too small in env variable CHARM_PXSHM_POOL_SIZE");
238
239         SENDQSTARTSIZE = 32 * pxshmContext->nodesize;
240
241         if (_Cmi_mynode == 0)
242             printf("Charm++> pxshm enabled: %d cores per node, buffer size: %.1fMB\n", pxshmContext->nodesize, SHMBUFLEN/1024.0/1024.0);
243
244 #if CMK_CRAYXE
245         srand(getpid());
246         int Cmi_charmrun_pid = rand();
247         PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
248 #elif !CMK_NET_VERSION
249         #error "need a unique number"
250 #endif
251         snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
252
253         MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
254
255         setupSharedBuffers();
256
257         MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
258
259         initAllSendQs();
260         
261         MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
262
263         MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
264
265 #if PXSHM_STATS
266         pxshmContext->sendCount=0;
267         pxshmContext->sendTime=0.0;
268         pxshmContext->validCheckCount=0;
269         pxshmContext->validCheckTime=0.0;
270         pxshmContext->commServerTime = 0;
271         pxshmContext->lockRecvCount = 0;
272 #endif
273
274         signal(SIGSEGV, cleanupOnAllSigs);
275         signal(SIGFPE, cleanupOnAllSigs);
276         signal(SIGILL, cleanupOnAllSigs);
277         signal(SIGTERM, cleanupOnAllSigs);
278         signal(SIGABRT, cleanupOnAllSigs);
279         signal(SIGQUIT, cleanupOnAllSigs);
280         signal(SIGBUS, cleanupOnAllSigs);
281         signal(SIGINT, cleanupOnAllSigs);
282         signal(SIGTRAP, cleanupOnAllSigs);
283
284 #if 0
285         char name[64];
286         gethostname(name,64);
287         printf("[%d] name: %s\n", myrank, name);
288 #endif
289 };
290
291 /**************
292  * shutdown shmem objects and semaphores
293  *
294  * *******************/
295 static int pxshm_freed = 0;
296 void tearDownSharedBuffers();
297 void freeSharedBuffers();
298
299 void CmiExitPxshm(){
300         if (pxshmContext == NULL) return;
301         if(pxshmContext->nodesize != 1){
302                 int i;
303                 if (!pxshm_freed)
304                     tearDownSharedBuffers();
305         
306                 for(i=0;i<pxshmContext->nodesize;i++){
307                         if(i != pxshmContext->noderank){
308                                 break;
309                         }
310                 }
311                 free(pxshmContext->recvBufNames[i]);
312                 free(pxshmContext->sendBufNames[i]);
313
314                 free(pxshmContext->recvBufNames);
315                 free(pxshmContext->sendBufNames);
316
317                 free(pxshmContext->recvBufs);
318                 free(pxshmContext->sendBufs);
319
320         }
321 #if PXSHM_STATS
322 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime,pxshmContext->lockRecvCount);
323 #endif
324         free(pxshmContext);
325         pxshmContext = NULL;
326 }
327
328 /******************
329  *Should this message be sent using PxShm or not ?
330  * ***********************/
331
332 /* dstNode is node number */
333 inline 
334 static int CmiValidPxshm(int node, int size){
335 #if PXSHM_STATS
336         pxshmContext->validCheckCount++;
337 #endif
338
339 /*      if(pxshmContext->nodesize == 1){
340                 return 0;
341         }*/
342         //replace by bitmap later
343         //if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
344         return (node >= pxshmContext->nodestart && node <= pxshmContext->nodeend && size <= SHMMAXSIZE )? 1: 0;
345 };
346
347
348 inline int PxshmRank(int dstnode){
349         return dstnode - pxshmContext->nodestart;
350 }
351
352 inline void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount);
353 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
354 inline int flushSendQ(PxshmSendQ *q);
355
356 inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
357   return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
358 }
359
360 /***************
361  *
362  *Send this message through shared memory
363  *if you cannot get lock, put it in the sendQ
364  *Before sending messages pick them from sendQ
365  *
366  * ****************************/
367
368 void CmiSendMessagePxshm(char *msg, int size, int dstnode, int *refcount)
369 {
370 #if PXSHM_STATS
371         double _startSendTime = CmiWallTimer();
372 #endif
373
374         LrtsPrepareEnvelope(msg, size);
375         
376         int dstRank = PxshmRank(dstnode);
377         MEMDEBUG(CmiMemoryCheck());
378   
379 /*
380         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
381         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
382 */
383
384         CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
385         
386         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
387         PxshmSendQ *sendQ = pxshmContext->sendQs[dstRank];
388
389 #if PXSHM_OSSPINLOCK
390         if(! OSSpinLockTry(&dstBuf->header->lock)){
391 #elif PXSHM_LOCK
392         if(sem_trywait(dstBuf->mutex) < 0){
393 #elif PXSHM_FENCE
394         dstBuf->header->flagSender = 1;
395         dstBuf->header->turn = RECEIVER;
396         CmiMemoryReadFence(0,0);
397         CmiMemoryWriteFence(0,0);
398         //if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
399         if(dstBuf->header->flagReceiver){
400                 dstBuf->header->flagSender = 0;
401 #endif
402                 /**failed to get the lock 
403                 insert into q and retain the message*/
404 #if SENDQ_LIST
405                 if (sendQ->numEntries == 0 && sendQ->next == -2) {
406                     sendQ->next = sendQ_head_index;
407                     sendQ_head_index = dstRank;
408                 }
409 #endif
410                 pushSendQ(pxshmContext->sendQs[dstRank], msg, size, refcount);
411                 (*refcount)++;
412                 MEMDEBUG(CmiMemoryCheck());
413                 return;
414         }else{
415
416                 /***
417                  * We got the lock for this buffer
418                  * first write all the messages in the sendQ and then write this guy
419                  * */
420                  if(pxshmContext->sendQs[dstRank]->numEntries == 0){
421                         // send message user event
422                         int ret = sendMessage(msg,size,refcount,dstBuf,pxshmContext->sendQs[dstRank]);
423 #if SENDQ_LIST
424                         if (sendQ->numEntries > 0 && sendQ->next == -2)
425                         {
426                                 sendQ->next = sendQ_head_index;
427                                 sendQ_head_index = dstRank;
428                         }
429 #endif
430                         MACHSTATE(3,"Pxshm Send succeeded immediately");
431                  }else{
432                         (*refcount)+=2;/*this message should not get deleted when the queue is flushed*/
433                         pushSendQ(pxshmContext->sendQs[dstRank],msg,size,refcount);
434 //                      MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
435                         int sent = flushSendQ(sendQ);
436                         (*refcount)--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
437                         MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
438                  }
439                  /* unlock the recvbuffer*/
440
441 #if PXSHM_OSSPINLOCK
442                  OSSpinLockUnlock(&dstBuf->header->lock);
443 #elif PXSHM_LOCK
444                  sem_post(dstBuf->mutex);
445 #elif PXSHM_FENCE
446                  CmiMemoryReadFence(0,0);                       
447                  CmiMemoryWriteFence(0,0);
448                  dstBuf->header->flagSender = 0;
449 #endif
450         }
451 #if PXSHM_STATS
452                 pxshmContext->sendCount ++;
453                 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
454 #endif
455         MEMDEBUG(CmiMemoryCheck());
456
457 };
458
459 inline void emptyAllRecvBufs();
460 inline void flushAllSendQs();
461
462 /**********
463  * Extract all the messages from the recvBuffers you can
464  * Flush all sendQs
465  * ***/
466 inline void CommunicationServerPxshm(){
467         
468 #if PXSHM_STATS
469         double _startCommServerTime =CmiWallTimer();
470 #endif  
471         
472         MEMDEBUG(CmiMemoryCheck());
473         emptyAllRecvBufs();
474         flushAllSendQs();
475
476 #if PXSHM_STATS
477         pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
478 #endif
479
480         MEMDEBUG(CmiMemoryCheck());
481 };
482
483 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
484         CommunicationServerPxshm();
485 }
486
487
488 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
489 {
490         CmiNotifyStillIdle(s);
491 }
492
493
494 void calculateNodeSizeAndRank(char **argv){
495         pxshmContext->nodesize=1;
496         MACHSTATE(3,"calculateNodeSizeAndRank start");
497         //CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
498         CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
499         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
500
501         pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
502         
503         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
504         
505         pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
506         
507         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
508
509         pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
510
511         if(pxshmContext->nodeend >= _Cmi_numnodes){
512                 pxshmContext->nodeend = _Cmi_numnodes-1;
513                 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
514         }
515         
516         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
517 }
518
519 void allocBufNameStrings(char ***bufName);
520 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
521 /***************
522  *      calculate the name of the shared objects and semaphores
523  *      
524  *      name scheme
525  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
526  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
527  *                the semaphore name used by us is the same as the shared memory object name
528  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
529  *
530  *      open these shared objects and semaphores
531  * *********/
532 void setupSharedBuffers(){
533         int i=0;
534
535         allocBufNameStrings(&(pxshmContext->recvBufNames));
536         
537         MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
538         MEMDEBUG(CmiMemoryCheck());
539
540         allocBufNameStrings((&pxshmContext->sendBufNames));
541         
542         MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
543
544         for(i=0;i<pxshmContext->nodesize;i++){
545                 if(i != pxshmContext->noderank){
546                         snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
547                         MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
548                         snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
549                         MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
550                 }
551         }
552         
553         createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
554         createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
555         
556         for(i=0;i<pxshmContext->nodesize;i++){
557                 if(i != pxshmContext->noderank){
558                         //CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
559                         pxshmContext->sendBufs[i].header->count = 0;
560                         pxshmContext->sendBufs[i].header->bytes = 0;
561                 }
562         }
563
564 #if CMK_SMP && CMK_CRAYXE
565         if (PMI_Barrier() != GNI_RC_SUCCESS) return;
566 #else
567         if (CmiBarrier() != 0) return;
568 #endif
569         freeSharedBuffers();
570         pxshm_freed = 1;
571 }
572
573 void allocBufNameStrings(char ***bufName){
574         int i,count;
575         
576         int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
577         char *tmp = malloc(totalAlloc);
578         
579         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
580
581         *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
582         
583         for(i=0,count=0;i<pxshmContext->nodesize;i++){
584                 if(i != pxshmContext->noderank){
585                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
586                         count++;
587                 }else{
588                         (*bufName)[i] = NULL;
589                 }
590         }
591 }
592
593 void createShmObject(char *name,int size,char **pPtr);
594
595 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
596         int i=0;
597         
598         *bufs = (sharedBufData *)calloc(pxshmContext->nodesize, sizeof(sharedBufData));
599         
600         for(i=0;i<pxshmContext->nodesize;i++){
601                 if(i != pxshmContext->noderank){
602                         createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
603                         memset(((*bufs)[i].header), 0, SHMBUFLEN+sizeof(sharedBufHeader));
604                         (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
605 #if PXSHM_OSSPINLOCK
606                         (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
607 #elif PXSHM_LOCK
608                         (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
609 #endif
610                 }else{
611                         (*bufs)[i].header = NULL;
612                         (*bufs)[i].data = NULL;
613 #if PXSHM_LOCK
614                         (*bufs)[i].mutex = NULL;
615 #endif
616                 }
617         }       
618 }
619
620
621 void createShmObject(char *name,int size,char **pPtr){
622         int fd=-1;
623         int flags;      // opening flags for shared object
624         int open_repeat_count = 0;
625
626         flags= O_RDWR | O_CREAT; // open file in read-write mode and create it if its not there
627         
628         while(fd<0 && open_repeat_count < 100){
629           open_repeat_count++;
630           fd = shm_open(name,flags, S_IRUSR | S_IWUSR); // create the shared object with permissions for only the user to read and write
631           
632           if(fd < 0 && open_repeat_count > 10){
633             fprintf(stderr,"Error(attempt=%d) from shm_open %s while opening %s \n",open_repeat_count, strerror(errno),name);
634             fflush(stderr);
635           }
636         }
637
638         CmiAssert(fd >= 0);
639
640         ftruncate(fd,size); //set the size of the shared memory object
641
642         *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
643         CmiAssert(*pPtr != NULL);
644
645         close(fd);
646 }
647
648 void freeSharedBuffers(){
649         int i;
650         for(i= 0;i<pxshmContext->nodesize;i++){
651             if(i != pxshmContext->noderank){
652                 if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
653                     fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
654                 }
655 #if PXSHM_LOCK
656                 sem_unlink(pxshmContext->recvBufNames[i]);
657 #endif
658             }
659         }
660 };
661
662 void tearDownSharedBuffers(){
663         int i;
664         for(i= 0;i<pxshmContext->nodesize;i++){
665             if(i != pxshmContext->noderank){
666                 if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
667                     fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
668                 }
669 #if PXSHM_LOCK
670                 sem_close(pxshmContext->recvBufs[i].mutex);
671                 sem_close(pxshmContext->sendBufs[i].mutex);
672                 sem_unlink(pxshmContext->recvBufNames[i]);
673                 pxshmContext->recvBufs[i].mutex = NULL;
674                 pxshmContext->sendBufs[i].mutex = NULL;
675 #endif
676             }
677         }
678 };
679
680
681 void initSendQ(PxshmSendQ *q,int size,int rank);
682
683 void initAllSendQs(){
684         int i=0;
685         pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
686         for(i=0;i<pxshmContext->nodesize;i++){
687                 if(i != pxshmContext->noderank){
688                         (pxshmContext->sendQs)[i] = (PxshmSendQ *)calloc(1, sizeof(PxshmSendQ));
689                         initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE,i);
690                 }else{
691                         (pxshmContext->sendQs)[i] = NULL;
692                 }
693         }
694 };
695
696
697 /****************
698  *copy this message into the sharedBuf
699  If it does not succeed
700  *put it into the sendQ 
701  *NOTE: This method is called only after obtaining the corresponding mutex
702  * ********/
703 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
704
705         if(dstBuf->header->bytes+size <= SHMBUFLEN){
706                 /**copy  this message to sharedBuf **/
707                 dstBuf->header->count++;
708                 memcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
709                 dstBuf->header->bytes += size;
710 //              MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
711                 CmiFree(msg);
712                 return 1;
713         }
714         /***
715          * Shared Buffer is too full for this message
716          * **/
717         //printf("[%d] send buffer is too full\n", CmiMyPe());
718         pushSendQ(dstSendQ,msg,size,refcount);
719         (*refcount)++;
720 //      MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
721         return 0;
722 }
723
724 inline OutgoingMsgRec* popSendQ(PxshmSendQ *q);
725
726 /****
727  *Try to send all the messages in the sendq to this destination rank
728  *NOTE: This method is called only after obtaining the corresponding mutex
729  * ************/
730
731 inline int flushSendQ(PxshmSendQ *dstSendQ){
732         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstSendQ->rank]);
733         int count=dstSendQ->numEntries;
734         int sent=0;
735         while(count > 0){
736                 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
737                 (*ogm->refcount)--;
738                 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstSendQ->rank,ogm->refcount);
739                 int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
740                 if(ret==1){
741                         sent++;
742 #if CMK_NET_VERSION
743                         GarbageCollectMsg(ogm);
744 #endif
745                 }
746                 count--;
747         }
748         return sent;
749 }
750
751 inline void emptyRecvBuf(sharedBufData *recvBuf);
752
753 inline void emptyAllRecvBufs(){
754         int i;
755         for(i=0;i<pxshmContext->nodesize;i++){
756                 if(i != pxshmContext->noderank){
757                         sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
758                         if(recvBuf->header->count > 0){
759
760 #if PXSHM_STATS
761                                 pxshmContext->lockRecvCount++;
762 #endif
763
764 #if PXSHM_OSSPINLOCK
765                                 if(! OSSpinLockTry(&recvBuf->header->lock)){
766 #elif PXSHM_LOCK
767                                 if(sem_trywait(recvBuf->mutex) < 0){
768 #elif PXSHM_FENCE
769                                 recvBuf->header->flagReceiver = 1;
770                                 recvBuf->header->turn = SENDER;
771                                 CmiMemoryReadFence(0,0);
772                                 CmiMemoryWriteFence(0,0);
773                                 //if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
774                                 if((recvBuf->header->flagSender)){
775                                         recvBuf->header->flagReceiver = 0;
776 #endif
777                                 }else{
778
779
780                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
781                                         emptyRecvBuf(recvBuf);
782
783 #if PXSHM_OSSPINLOCK
784                                         OSSpinLockUnlock(&recvBuf->header->lock);
785 #elif PXSHM_LOCK
786                                         sem_post(recvBuf->mutex);
787 #elif PXSHM_FENCE
788                                         CmiMemoryReadFence(0,0);
789                                         CmiMemoryWriteFence(0,0);
790                                         recvBuf->header->flagReceiver = 0;
791 #endif
792
793                                 }
794                         
795                         }
796                 }
797         }
798 };
799
800 inline void flushAllSendQs(){
801         int i;
802 #if SENDQ_LIST
803         int index_prev = -1;
804
805         i =  sendQ_head_index;
806         while (i!= -1) {
807                 PxshmSendQ *sendQ = pxshmContext->sendQs[i];
808                 CmiAssert(i !=  pxshmContext->noderank);
809                 if(sendQ->numEntries > 0){
810 #else
811         for(i=0;i<pxshmContext->nodesize;i++) {
812                 if (i == pxshmContext->noderank) continue;
813                 PxshmSendQ *sendQ = pxshmContext->sendQs[i];
814                 if(sendQ->numEntries > 0) {
815 #endif
816         
817 #if PXSHM_OSSPINLOCK
818                         if(OSSpinLockTry(&pxshmContext->sendBufs[i].header->lock)){
819 #elif PXSHM_LOCK
820                         if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
821 #elif PXSHM_FENCE
822                         pxshmContext->sendBufs[i].header->flagSender = 1;
823                         pxshmContext->sendBufs[i].header->turn = RECEIVER;
824                         CmiMemoryReadFence(0,0);                        
825                         CmiMemoryWriteFence(0,0);
826                         if(!(pxshmContext->sendBufs[i].header->flagReceiver && pxshmContext->sendBufs[i].header->turn == RECEIVER)){
827 #endif
828
829                                 MACHSTATE1(3,"flushSendQ %d",i);
830                                 flushSendQ(sendQ);
831                                 
832 #if PXSHM_OSSPINLOCK    
833                                 OSSpinLockUnlock(&pxshmContext->sendBufs[i].header->lock);
834 #elif PXSHM_LOCK
835                                 sem_post(pxshmContext->sendBufs[i].mutex);
836 #elif PXSHM_FENCE
837                                 CmiMemoryReadFence(0,0);                        
838                                 CmiMemoryWriteFence(0,0);
839                                 pxshmContext->sendBufs[i].header->flagSender = 0;
840 #endif
841                         }else{
842
843 #if PXSHM_FENCE
844                           pxshmContext->sendBufs[i].header->flagSender = 0;
845 #endif                          
846
847                         }
848
849                 }        
850 #if SENDQ_LIST
851                 if (sendQ->numEntries == 0) {
852                     if (index_prev != -1)
853                         pxshmContext->sendQs[index_prev]->next = sendQ->next;
854                     else
855                         sendQ_head_index = sendQ->next;
856                     i = sendQ->next;
857                     sendQ->next = -2;
858                 }
859                 else {
860                     index_prev = i;
861                     i = sendQ->next;
862                 }
863 #endif
864         }       
865 };
866
867 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
868
869 void emptyRecvBuf(sharedBufData *recvBuf){
870         int numMessages = recvBuf->header->count;
871         int i=0;
872
873         char *ptr=recvBuf->data;
874
875         for(i=0;i<numMessages;i++){
876                 int size;
877                 int rank, srcpe, seqno, magic, i;
878                 unsigned int broot;
879                 char *msg = ptr;
880                 char *newMsg;
881
882 #if CMK_NET_VERSION
883                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
884                 size = CmiMsgHeaderGetLength(msg);
885 #else
886                 size = CmiGetMsgSize(msg);
887 #endif
888         
889                 newMsg = (char *)CmiAlloc(size);
890                 memcpy(newMsg,msg,size);
891
892 #if CMK_NET_VERSION
893                 handoverPxshmMessage(newMsg,size,rank,broot);
894 #else
895                 handleOneRecvedMsg(size, newMsg);
896 #endif
897                 
898                 ptr += size;
899
900                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
901         }
902 #if 1
903   if(ptr - recvBuf->data != recvBuf->header->bytes){
904                 CmiPrintf("[%d] ptr - recvBuf->data  %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
905         }
906 #endif
907         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
908         recvBuf->header->count=0;
909         recvBuf->header->bytes=0;
910 }
911
912
913 #if CMK_NET_VERSION
914 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
915         CmiAssert(rank == 0);
916 #if CMK_BROADCAST_SPANNING_TREE
917         if (rank == DGRAM_BROADCAST
918 #if CMK_NODE_QUEUE_AVAILABLE
919           || rank == DGRAM_NODEBROADCAST
920 #endif
921          ){
922                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
923                                         }
924 #elif CMK_BROADCAST_HYPERCUBE
925         if (rank == DGRAM_BROADCAST
926 #if CMK_NODE_QUEUE_AVAILABLE
927           || rank == DGRAM_NODEBROADCAST
928 #endif
929          ){
930                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
931                                         }
932 #endif
933
934                 switch (rank) {
935         case DGRAM_BROADCAST: {
936           CmiPushPE(0, newmsg);
937           break;
938       }
939         default:
940                                 {
941                                         
942           CmiPushPE(rank, newmsg);
943                                 }
944         }    /* end of switch */
945 }
946 #endif
947
948
949 /**************************
950  *sendQ helper functions
951  * ****************/
952
953 void initSendQ(PxshmSendQ *q,int size, int rank){
954         q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
955
956         q->size = size;
957         q->numEntries = 0;
958
959         q->begin = 0;
960         q->end = 0;
961
962         q->rank = rank;
963 #if SENDQ_LIST
964         q->next = -2;
965 #endif
966 }
967
968 void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount){
969         if(q->numEntries == q->size){
970                 //need to resize 
971                 OutgoingMsgRec *oldData = q->data;
972                 int newSize = q->size<<1;
973                 q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
974                 //copy head to the beginning of the new array
975                 CmiAssert(q->begin == q->end);
976
977                 CmiAssert(q->begin < q->size);
978                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
979
980                 if(q->end!=0){
981                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
982                 }
983                 free(oldData);
984                 q->begin = 0;
985                 q->end = q->size;
986                 q->size = newSize;
987         }
988         OutgoingMsgRec *omg = &q->data[q->end];
989         omg->size = size;
990         omg->data = msg;
991         omg->refcount = refcount;
992         (q->end)++;
993         if(q->end >= q->size){
994                 q->end -= q->size;
995         }
996         q->numEntries++;
997 }
998
999 OutgoingMsgRec * popSendQ(PxshmSendQ *q){
1000         OutgoingMsgRec * ret;
1001         if(0 == q->numEntries){
1002                 return NULL;
1003         }
1004
1005         ret = &q->data[q->begin];
1006         (q->begin)++;
1007         if(q->begin >= q->size){
1008                 q->begin -= q->size;
1009         }
1010         
1011         q->numEntries--;
1012         return ret;
1013 }