clean up shm and sem more cleanly.
[charm.git] / src / arch / util / machine-xpmem.c
1 /** @file
2
3
4 There are three options here for synchronization:
5       XPMEM_FENCE is the default. It uses memory fences
6       XPMEM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
7       XPMEM_LOCK will cause POSIX semaphores to be used
8
9   created by 
10         Gengbin Zheng, September 2011
11 */
12
13 /**
14  * @addtogroup NET
15  * @{
16  */
17
18 #include <sys/types.h>
19 #include <sys/mman.h>
20 #include <unistd.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <errno.h>
24 #include <signal.h>
25
26 #include "xpmem.h"
27
28 /************** 
29    Determine which type of synchronization to use 
30 */
31 #if XPMEM_OSSPINLOCK
32 #include <libkern/OSAtomic.h>
33 #elif XPMEM_LOCK
34 #include <semaphore.h>
35 #else
36 /* Default to using fences */
37 #define XPMEM_FENCE 1
38 #endif
39
40 #define MEMDEBUG(x) //x
41
42 #define XPMEM_STATS    0
43
44 #define SENDQ_LIST     0
45
46 /*** The following code was copied verbatim from pcqueue.h file ***/
47 #undef CmiMemoryWriteFence
48 #if XPMEM_FENCE
49 #ifdef POWER_PC
50 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
51 #else
52 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
53 //#define CmiMemoryWriteFence(startPtr,nBytes) 
54 #endif
55 #else
56 #undef CmiMemoryWriteFence
57 #define CmiMemoryWriteFence(startPtr,nBytes)  
58 #endif
59
60 #undef CmiMemoryReadFence
61 #if XPMEM_FENCE
62 #ifdef POWER_PC
63 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
64 #else
65 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
66 //#define CmiMemoryReadFence(startPtr,nBytes) 
67 #endif
68 #else
69 #define CmiMemoryReadFence(startPtr,nBytes) 
70 #endif
71
72 #if CMK_SMP
73 #error  "PXSHM can only be used in non-smp build of Charm++"
74 #endif
75
76 /***************************************************************************************/
77
78 enum entities {SENDER,RECEIVER};
79
80 /************************
81  *      Implementation currently assumes that
82  *      1) all nodes have the same number of processors
83  *  2) in the nodelist all processors in a node are listed in sequence
84  *   0 1 2 3      4 5 6 7 
85  *   -------      -------
86  *    node 1       node 2 
87  ************************/
88
89 #define NAMESTRLEN 60
90 #define PREFIXSTRLEN 50 
91
92 static int XPMEMBUFLEN  =   (1024*1024*4);
93 #define XPMEMMINSIZE     (1*1024)
94 #define XPMEMMAXSIZE     (1024*1024)
95
96 static int  SENDQSTARTSIZE  =  256;
97
98
99 /// This struct is used as the first portion of a shared memory region, followed by data
100 typedef struct {
101         int count; //number of messages
102         int bytes; //number of bytes
103
104 #if XPMEM_OSSPINLOCK
105         OSSpinLock lock;
106 #endif
107
108 #if XPMEM_FENCE
109         volatile int flagSender;
110         CmiMemorySMPSeparation_t pad1;
111         volatile int flagReceiver;
112         CmiMemorySMPSeparation_t pad2;
113         volatile int turn;
114 #endif  
115
116 } sharedBufHeader;
117
118
119 typedef struct {
120 #if XPMEM_LOCK
121         sem_t *mutex;
122 #endif
123         sharedBufHeader *header;        
124         char *data;
125         __s64  segid;
126 } sharedBufData;
127
128 typedef struct OutgoingMsgRec
129 {
130   char *data;
131   int  *refcount;
132   int   size;
133 }
134 OutgoingMsgRec;
135
136 typedef struct {
137         int size;       //total size of data array
138         int begin;      //position of first element
139         int end;        //position of next element
140         int numEntries; //number of entries
141         int rank;       // for dest rank
142 #if SENDQ_LIST
143         int next;         // next dstrank of non-empty queue
144 #endif
145         OutgoingMsgRec *data;
146
147 } XpmemSendQ;
148
149 typedef struct {
150         int nodesize;
151         int noderank;
152         int nodestart,nodeend;//proc numbers for the start and end of this node
153         char prefixStr[PREFIXSTRLEN];
154         char **recvBufNames;
155         char **sendBufNames;
156
157         sharedBufData *recvBufs;
158         sharedBufData *sendBufs;
159
160         XpmemSendQ **sendQs;
161
162 #if XPMEM_STATS
163         int sendCount;
164         int validCheckCount;
165         int lockRecvCount;
166         double validCheckTime;
167         double sendTime;
168         double commServerTime;
169 #endif
170
171 } XpmemContext;
172
173
174 #if SENDQ_LIST
175 static int sendQ_head_index = -1;
176 #endif
177
178 XpmemContext *xpmemContext=NULL; //global context
179
180
181 void calculateNodeSizeAndRank(char **);
182 void setupSharedBuffers();
183 void initAllSendQs();
184
185 void CmiExitXpmem();
186
187 static void cleanupOnAllSigs(int signo)
188 {
189     CmiExitXpmem();
190 }
191
192 static int xpmem_fd;
193
194 /******************
195  *      Initialization routine
196  *      currently just testing start up
197  * ****************/
198 void CmiInitXpmem(char **argv){
199         char input[32];
200         char *env;
201
202         MACHSTATE(3,"CminitXpmem start");
203         xpmemContext = (XpmemContext *)calloc(1,sizeof(XpmemContext));
204
205 #if CMK_NET_VERSION
206         if(Cmi_charmrun_pid <= 0){
207                 CmiAbort("pxshm must be run with charmrun");
208         }
209 #endif
210         calculateNodeSizeAndRank(argv);
211
212         MACHSTATE1(3,"CminitXpmem  %d calculateNodeSizeAndRank",xpmemContext->nodesize);
213
214         if(xpmemContext->nodesize == 1) return;
215         
216         env = getenv("CHARM_XPMEM_SIZE");
217         if (env) {
218             XPMEMBUFLEN = CmiReadSize(env);
219         }
220         SENDQSTARTSIZE = 32 * xpmemContext->nodesize;
221
222         if (_Cmi_mynode == 0)
223             CmiPrintf("Charm++> xpmem enabled: %d cores per node, buffer size: %.1fMB\n", xpmemContext->nodesize, XPMEMBUFLEN/1024.0/1024.0);
224
225         xpmem_fd = open("/dev/xpmem", O_RDWR);
226         if (xpmem_fd == -1) {
227             CmiAbort("Opening /dev/xpmem");
228         }
229
230 #if CMK_CRAYXE
231         srand(getpid());
232         int Cmi_charmrun_pid = rand();
233         PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
234 #elif !CMK_NET_VERSION
235         #error "need a unique number"
236 #endif
237         snprintf(&(xpmemContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_xpmem_%d",Cmi_charmrun_pid);
238
239         MACHSTATE2(3,"CminitXpmem %s %d pre setupSharedBuffers",xpmemContext->prefixStr,xpmemContext->nodesize);
240
241         setupSharedBuffers();
242
243         MACHSTATE2(3,"CminitXpmem %s %d setupSharedBuffers",xpmemContext->prefixStr,xpmemContext->nodesize);
244
245         initAllSendQs();
246         
247         MACHSTATE2(3,"CminitXpmem %s %d initAllSendQs",xpmemContext->prefixStr,xpmemContext->nodesize);
248
249         MACHSTATE2(3,"CminitXpmem %s %d done",xpmemContext->prefixStr,xpmemContext->nodesize);
250
251         signal(SIGSEGV, cleanupOnAllSigs);
252         signal(SIGFPE, cleanupOnAllSigs);
253         signal(SIGILL, cleanupOnAllSigs);
254         signal(SIGTERM, cleanupOnAllSigs);
255         signal(SIGABRT, cleanupOnAllSigs);
256         signal(SIGQUIT, cleanupOnAllSigs);
257         signal(SIGBUS, cleanupOnAllSigs);
258 };
259
260 /**************
261  * shutdown shmem objects and semaphores
262  *
263  * *******************/
264 static int pxshm_freed = 0;
265 void tearDownSharedBuffers();
266 void freeSharedBuffers();
267
268 void CmiExitXpmem(){
269         int i=0;
270         
271         if (xpmemContext == NULL) return;
272
273         if(xpmemContext->nodesize != 1) {
274                 //tearDownSharedBuffers();
275         
276                 for(i=0;i<xpmemContext->nodesize;i++){
277                         if(i != xpmemContext->noderank){
278                                 break;
279                         }
280                 }
281                 free(xpmemContext->recvBufNames[i]);
282                 free(xpmemContext->sendBufNames[i]);
283
284                 free(xpmemContext->recvBufNames);
285                 free(xpmemContext->sendBufNames);
286
287                 free(xpmemContext->recvBufs);
288                 free(xpmemContext->sendBufs);
289
290         }
291 #if XPMEM_STATS
292 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,xpmemContext->sendCount,xpmemContext->sendTime,xpmemContext->validCheckCount,xpmemContext->validCheckTime,xpmemContext->commServerTime,xpmemContext->lockRecvCount);
293 #endif
294         free(xpmemContext);
295         xpmemContext = NULL;
296 }
297
298 /******************
299  *Should this message be sent using PxShm or not ?
300  * ***********************/
301
302 /* dstNode is node number */
303 inline 
304 static int CmiValidXpmem(int node, int size){
305 #if XPMEM_STATS
306         xpmemContext->validCheckCount++;
307 #endif
308         //replace by bitmap later
309         //if(dst >= xpmemContext->nodestart && dst <= xpmemContext->nodeend && size < XPMEMMAXSIZE && size > XPMEMMINSIZE){
310         return (node >= xpmemContext->nodestart && node <= xpmemContext->nodeend && size <= XPMEMMAXSIZE )? 1: 0;
311 };
312
313
314 inline int XpmemRank(int dstnode){
315         return dstnode - xpmemContext->nodestart;
316 }
317
318 inline void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount);
319 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,XpmemSendQ *dstSendQ);
320 inline int flushSendQ(XpmemSendQ *sendQ);
321
322 inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,XpmemSendQ *dstSendQ){
323   return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
324 }
325
326 /***************
327  *
328  *Send this message through shared memory
329  *if you cannot get lock, put it in the sendQ
330  *Before sending messages pick them from sendQ
331  *
332  * ****************************/
333
334 void CmiSendMessageXpmem(char *msg, int size, int dstnode, int *refcount)
335 {
336 #if XPMEM_STATS
337         double _startSendTime = CmiWallTimer();
338 #endif
339
340         LrtsPrepareEnvelope(msg, size);
341         
342         int dstRank = XpmemRank(dstnode);
343         MEMDEBUG(CmiMemoryCheck());
344   
345         MACHSTATE4(3,"Send Msg Xpmem ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
346         MACHSTATE4(3,"Send Msg Xpmem ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
347
348         CmiAssert(dstRank >=0 && dstRank != xpmemContext->noderank);
349         
350         sharedBufData *dstBuf = &(xpmemContext->sendBufs[dstRank]);
351         XpmemSendQ *sendQ = xpmemContext->sendQs[dstRank];
352
353 #if XPMEM_OSSPINLOCK
354         if(! OSSpinLockTry(&dstBuf->header->lock)){
355 #elif XPMEM_LOCK
356         if(sem_trywait(dstBuf->mutex) < 0){
357 #elif XPMEM_FENCE
358         dstBuf->header->flagSender = 1;
359         dstBuf->header->turn = RECEIVER;
360         CmiMemoryReadFence(0,0);
361         CmiMemoryWriteFence(0,0);
362         //if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
363         if(dstBuf->header->flagReceiver){
364                 dstBuf->header->flagSender = 0;
365 #endif
366                 /**failed to get the lock 
367                 insert into q and retain the message*/
368 #if SENDQ_LIST
369                 if (sendQ->numEntries == 0 && sendQ->next == -2) {
370                     sendQ->next = sendQ_head_index;
371                     sendQ_head_index = dstRank;
372                 }
373 #endif
374                 pushSendQ(sendQ, msg, size, refcount);
375                 (*refcount)++;
376                 MEMDEBUG(CmiMemoryCheck());
377                 return;
378         }else{
379                 /***
380                  * We got the lock for this buffer
381                  * first write all the messages in the sendQ and then write this guy
382                  * */
383                  if(sendQ->numEntries == 0){
384                         // send message user event
385                         int ret = sendMessage(msg,size,refcount,dstBuf,sendQ);
386 #if SENDQ_LIST
387                         if (sendQ->numEntries > 0 && sendQ->next == -2) 
388                         {
389                                 sendQ->next = sendQ_head_index;
390                                 sendQ_head_index = dstRank;
391                         }
392 #endif
393                         MACHSTATE(3,"Xpmem Send succeeded immediately");
394                  }else{
395                         (*refcount)+=2;/*this message should not get deleted when the queue is flushed*/
396                         pushSendQ(sendQ,msg,size,refcount);
397                         MACHSTATE3(3,"Xpmem ogm %p pushed to sendQ length %d refcount %d",ogm,sendQ->numEntries,ogm->refcount);
398                         int sent = flushSendQ(sendQ);
399                         (*refcount)--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
400                         MACHSTATE1(3,"Xpmem flushSendQ sent %d messages",sent);
401                  }
402                  /* unlock the recvbuffer*/
403
404 #if XPMEM_OSSPINLOCK
405                  OSSpinLockUnlock(&dstBuf->header->lock);
406 #elif XPMEM_LOCK
407                  sem_post(dstBuf->mutex);
408 #elif XPMEM_FENCE
409                  CmiMemoryReadFence(0,0);                       
410                  CmiMemoryWriteFence(0,0);
411                  dstBuf->header->flagSender = 0;
412 #endif
413         }
414 #if XPMEM_STATS
415                 xpmemContext->sendCount ++;
416                 xpmemContext->sendTime += (CmiWallTimer()-_startSendTime);
417 #endif
418         MEMDEBUG(CmiMemoryCheck());
419
420 };
421
422 inline void emptyAllRecvBufs();
423 inline void flushAllSendQs();
424
425 /**********
426  * Extract all the messages from the recvBuffers you can
427  * Flush all sendQs
428  * ***/
429 inline void CommunicationServerXpmem()
430 {
431 #if XPMEM_STATS
432         double _startCommServerTime =CmiWallTimer();
433 #endif  
434         MEMDEBUG(CmiMemoryCheck());
435
436         emptyAllRecvBufs();
437         flushAllSendQs();
438
439 #if XPMEM_STATS
440         xpmemContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
441 #endif
442         MEMDEBUG(CmiMemoryCheck());
443 };
444
445 static void CmiNotifyStillIdleXpmem(CmiIdleState *s){
446         CommunicationServerXpmem();
447 }
448
449
450 static void CmiNotifyBeginIdleXpmem(CmiIdleState *s)
451 {
452         CmiNotifyStillIdle(s);
453 }
454
455 void calculateNodeSizeAndRank(char **argv)
456 {
457         xpmemContext->nodesize=1;
458         MACHSTATE(3,"calculateNodeSizeAndRank start");
459         //CmiGetArgIntDesc(argv, "+nodesize", &(xpmemContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
460         CmiGetArgIntDesc(argv, "+nodesize", &(xpmemContext->nodesize),"Number of cores in this node");
461         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",xpmemContext->nodesize);
462
463         xpmemContext->noderank = _Cmi_mynode % (xpmemContext->nodesize);
464         
465         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",xpmemContext->noderank);
466         
467         xpmemContext->nodestart = _Cmi_mynode -xpmemContext->noderank;
468         
469         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
470
471         xpmemContext->nodeend = xpmemContext->nodestart + xpmemContext->nodesize -1;
472
473         if(xpmemContext->nodeend >= _Cmi_numnodes){
474                 xpmemContext->nodeend = _Cmi_numnodes-1;
475                 xpmemContext->nodesize = (xpmemContext->nodeend - xpmemContext->nodestart) +1;
476         }
477         
478         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",xpmemContext->nodestart,xpmemContext->nodesize,xpmemContext->noderank);
479 }
480
481 void allocBufNameStrings(char ***bufName);
482 void createRecvXpmemAndSems(sharedBufData **bufs,char **bufNames);
483 void createSendXpmemAndSems(sharedBufData **bufs,char **bufNames);
484 void removeXpmemFiles();
485
486 /***************
487  *      calculate the name of the shared objects and semaphores
488  *      
489  *      name scheme
490  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
491  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
492  *                the semaphore name used by us is the same as the shared memory object name
493  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
494  *
495  *      open these shared objects and semaphores
496  * *********/
497 void setupSharedBuffers(){
498         int i=0;
499         
500         allocBufNameStrings(&(xpmemContext->recvBufNames));
501         
502         allocBufNameStrings((&xpmemContext->sendBufNames));
503         
504         for(i=0;i<xpmemContext->nodesize;i++){
505                 if(i != xpmemContext->noderank){
506                         snprintf(xpmemContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",xpmemContext->prefixStr,xpmemContext->noderank+xpmemContext->nodestart,i+xpmemContext->nodestart);
507                         MACHSTATE2(3,"recvBufName %s with rank %d",xpmemContext->recvBufNames[i],i)
508                         snprintf(xpmemContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",xpmemContext->prefixStr,i+xpmemContext->nodestart,xpmemContext->noderank+xpmemContext->nodestart);
509                         MACHSTATE2(3,"sendBufName %s with rank %d",xpmemContext->sendBufNames[i],i);
510                 }
511         }
512         
513         createRecvXpmemAndSems(&(xpmemContext->recvBufs),xpmemContext->recvBufNames);
514         CmiBarrier();
515         createSendXpmemAndSems(&(xpmemContext->sendBufs),xpmemContext->sendBufNames);
516         CmiBarrier();
517         removeXpmemFiles();
518         freeSharedBuffers();
519         
520         for(i=0;i<xpmemContext->nodesize;i++){
521                 if(i != xpmemContext->noderank){
522                         //CmiAssert(xpmemContext->sendBufs[i].header->count == 0);
523                         xpmemContext->sendBufs[i].header->count = 0;
524                         xpmemContext->sendBufs[i].header->bytes = 0;
525                 }
526         }
527 }
528
529 void allocBufNameStrings(char ***bufName)
530 {
531         int i,count;
532         int totalAlloc = sizeof(char)*NAMESTRLEN*(xpmemContext->nodesize-1);
533         char *tmp = malloc(totalAlloc);
534         
535         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
536
537         *bufName = (char **)malloc(sizeof(char *)*xpmemContext->nodesize);
538         for(i=0,count=0;i<xpmemContext->nodesize;i++){
539                 if(i != xpmemContext->noderank){
540                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
541                         count++;
542                 }else{
543                         (*bufName)[i] = NULL;
544                 }
545         }
546 }
547
548 __s64 createXpmemObject(int size,char **pPtr)
549 {
550         struct xpmem_cmd_make make_info;
551         int ret;
552
553         *pPtr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0);
554         if (*pPtr == MAP_FAILED) {
555             perror("Creating mapping.");
556             return -1;
557         }
558         make_info.vaddr = (__u64) *pPtr;
559         make_info.size = size;
560         make_info.permit_type = XPMEM_PERMIT_MODE;
561         make_info.permit_value = (__u64) 0600;
562         ret = ioctl(xpmem_fd, XPMEM_CMD_MAKE, &make_info);
563         if (ret != 0) {
564             perror("xpmem_make");
565             CmiAbort("xpmem_make");
566         }
567         return make_info.segid;
568 }
569
570 void attachXpmemObject(__s64 segid, int size, char **pPtr)
571 {
572        int ret;
573        __s64 apid;
574        struct xpmem_cmd_get get_info;
575        struct xpmem_cmd_attach attach_info;
576
577        get_info.segid = segid;
578        get_info.flags = XPMEM_RDWR;
579        get_info.permit_type = XPMEM_PERMIT_MODE;
580        get_info.permit_value = (__u64) NULL;
581        ret = ioctl(xpmem_fd, XPMEM_CMD_GET, &get_info);
582        if (ret != 0) {
583                CmiAbort("xpmem_get");
584        }
585        apid = get_info.apid;
586
587        attach_info.apid = get_info.apid;
588        attach_info.offset = 0;
589        attach_info.size = size;
590        attach_info.vaddr = (__u64) NULL;
591        attach_info.fd = xpmem_fd;
592        attach_info.flags = 0;
593
594        ret = ioctl(xpmem_fd, XPMEM_CMD_ATTACH, &attach_info);
595        if (ret != 0) {
596                CmiAbort("xpmem_attach");
597        }
598
599        *pPtr = (void *)attach_info.vaddr;
600 }
601
602 void createRecvXpmemAndSems(sharedBufData **bufs,char **bufNames){
603         int i=0;
604         __s64 *segid_arr;
605         int size, pagesize = getpagesize();
606         
607         *bufs = (sharedBufData *)calloc(xpmemContext->nodesize, sizeof(sharedBufData));
608         segid_arr = malloc(sizeof(__s64)*xpmemContext->nodesize);
609         
610         size = XPMEMBUFLEN+sizeof(sharedBufHeader);
611         size = ((~(pagesize-1))&(size+pagesize-1));
612
613         for(i=0;i<xpmemContext->nodesize;i++){
614             if(i != xpmemContext->noderank)  {
615                 (*bufs)[i].segid = segid_arr[i] = createXpmemObject(size,(char **)&((*bufs)[i].header));
616                 memset(((*bufs)[i].header), 0, size);
617                 (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
618 #if XPMEM_OSSPINLOCK
619                 (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
620 #elif XPMEM_LOCK
621                 (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
622 #endif
623             }else{
624                 (*bufs)[i].header = NULL;
625                 (*bufs)[i].data = NULL;
626 #if XPMEM_LOCK
627                 (*bufs)[i].mutex = NULL;
628 #endif
629             }
630         }       
631
632         int fd;
633         char fname[128];
634         sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+xpmemContext->noderank);
635         fd = open(fname, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR);
636         if (fd == -1) {
637           CmiAbort("createShmObjectsAndSems failed");
638         }
639         write(fd, segid_arr, sizeof(__s64*)*xpmemContext->nodesize);
640         close(fd);
641         free(segid_arr);
642 }
643
644 void createSendXpmemAndSems(sharedBufData **bufs,char **bufNames)
645 {
646         int i;
647         int size, pagesize;
648
649         pagesize = getpagesize();
650         size = XPMEMBUFLEN+sizeof(sharedBufHeader);
651         size = ((~(pagesize-1))&(size+pagesize-1));
652
653         *bufs = (sharedBufData *)calloc(xpmemContext->nodesize, sizeof(sharedBufData));
654
655         for(i=0;i<xpmemContext->nodesize;i++){
656             if(i != xpmemContext->noderank)  {
657                 __s64 segid;
658                  char fname[128];
659                  int fd;
660                  sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+i);
661                  fd = open(fname, O_RDONLY);
662                  if (fd == -1) {
663                      CmiAbort("createShmObjectsAndSems failed");
664                  }
665                 lseek(fd, xpmemContext->noderank*sizeof(__s64), SEEK_SET);
666                 read(fd, &segid, sizeof(__s64*));
667                 close(fd);
668                 (*bufs)[i].segid = segid;
669                 attachXpmemObject(segid, size,(char **)&((*bufs)[i].header));
670                 memset(((*bufs)[i].header), 0, XPMEMBUFLEN+sizeof(sharedBufHeader));
671                 (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
672 #if XPMEM_OSSPINLOCK
673                 (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
674 #elif XPMEM_LOCK
675                 (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
676 #endif
677             }else{
678                 (*bufs)[i].header = NULL;
679                 (*bufs)[i].data = NULL;
680 #if XPMEM_LOCK
681                 (*bufs)[i].mutex = NULL;
682 #endif
683             }
684         }
685 }
686
687 void removeXpmemFiles()
688 {
689         char fname[64];
690         sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+xpmemContext->noderank);
691         unlink(fname);
692 }
693
694 void freeSharedBuffers(){
695         int i;
696         for(i= 0;i<xpmemContext->nodesize;i++){
697             if(i != xpmemContext->noderank){
698 #if XPMEM_LOCK
699                 sem_unlink(xpmemContext->sendBufNames[i]);
700                 sem_unlink(xpmemContext->recvBufNames[i]);
701 #endif
702             }
703         }
704 }
705
706 void tearDownSharedBuffers(){
707         int i;
708         for(i= 0;i<xpmemContext->nodesize;i++){
709             if(i != xpmemContext->noderank){
710 #if XPMEM_LOCK
711                 sem_close(xpmemContext->recvBufs[i].mutex);
712                 sem_close(xpmemContext->sendBufs[i].mutex);
713                 sem_unlink(xpmemContext->sendBufNames[i]);
714                 sem_unlink(xpmemContext->recvBufNames[i]);
715                 xpmemContext->recvBufs[i].mutex = NULL;
716                 xpmemContext->sendBufs[i].mutex = NULL;
717 #endif
718             }
719         }
720 };
721
722 void initSendQ(XpmemSendQ *q,int size,int rank);
723
724 void initAllSendQs(){
725         int i=0;
726         xpmemContext->sendQs = (XpmemSendQ **) malloc(sizeof(XpmemSendQ *)*xpmemContext->nodesize);
727         for(i=0;i<xpmemContext->nodesize;i++){
728                 if(i != xpmemContext->noderank){
729                         xpmemContext->sendQs[i] = (XpmemSendQ *)calloc(1, sizeof(XpmemSendQ));
730                         initSendQ((xpmemContext->sendQs)[i],SENDQSTARTSIZE,i);
731                 }else{
732                         xpmemContext->sendQs[i] = NULL;
733                 }
734         }
735 };
736
737
738 /****************
739  *copy this message into the sharedBuf
740  If it does not succeed
741  *put it into the sendQ 
742  *NOTE: This method is called only after obtaining the corresponding mutex
743  * ********/
744 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,XpmemSendQ *dstSendQ){
745
746         if(dstBuf->header->bytes+size <= XPMEMBUFLEN){
747                 /**copy  this message to sharedBuf **/
748                 dstBuf->header->count++;
749                 CmiMemcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
750                 dstBuf->header->bytes += size;
751                 MACHSTATE4(3,"Xpmem send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
752                 CmiFree(msg);
753                 return 1;
754         }
755         /***
756          * Shared Buffer is too full for this message
757          * **/
758         //printf("[%d] send buffer is too full\n", CmiMyPe());
759         pushSendQ(dstSendQ,msg,size,refcount);
760         (*refcount)++;
761         MACHSTATE3(3,"Xpmem send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
762         return 0;
763 }
764
765 inline OutgoingMsgRec* popSendQ(XpmemSendQ *q);
766
767 /****
768  *Try to send all the messages in the sendq to this destination rank
769  *NOTE: This method is called only after obtaining the corresponding mutex
770  * ************/
771
772 inline int flushSendQ(XpmemSendQ  *dstSendQ){
773         sharedBufData *dstBuf = &(xpmemContext->sendBufs[dstSendQ->rank]);
774         int count=dstSendQ->numEntries;
775         int sent=0;
776         while(count > 0){
777                 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
778                 (*ogm->refcount)--;
779                 MACHSTATE4(3,"Xpmem trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstSendQ->rank,ogm->refcount);
780                 int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
781                 if(ret==1){
782                         sent++;
783 #if CMK_NET_VERSION
784                         GarbageCollectMsg(ogm);
785 #endif
786                 }
787                 count--;
788         }
789         return sent;
790 }
791
792 inline void emptyRecvBuf(sharedBufData *recvBuf);
793
794 inline void emptyAllRecvBufs(){
795         int  i;
796         for(i=0;i<xpmemContext->nodesize;i++){
797                 if(i != xpmemContext->noderank){
798                         sharedBufData *recvBuf = &(xpmemContext->recvBufs[i]);
799                         if(recvBuf->header->count > 0){
800
801 #if XPMEM_STATS
802                                 xpmemContext->lockRecvCount++;
803 #endif
804
805 #if XPMEM_OSSPINLOCK
806                                 if(! OSSpinLockTry(&recvBuf->header->lock)){
807 #elif XPMEM_LOCK
808                                 if(sem_trywait(recvBuf->mutex) < 0){
809 #elif XPMEM_FENCE
810                                 recvBuf->header->flagReceiver = 1;
811                                 recvBuf->header->turn = SENDER;
812                                 CmiMemoryReadFence(0,0);
813                                 CmiMemoryWriteFence(0,0);
814                                 //if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
815                                 if((recvBuf->header->flagSender)){
816                                         recvBuf->header->flagReceiver = 0;
817 #endif
818                                 }else{
819
820
821                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
822                                         emptyRecvBuf(recvBuf);
823
824 #if XPMEM_OSSPINLOCK
825                                         OSSpinLockUnlock(&recvBuf->header->lock);
826 #elif XPMEM_LOCK
827                                         sem_post(recvBuf->mutex);
828 #elif XPMEM_FENCE
829                                         CmiMemoryReadFence(0,0);
830                                         CmiMemoryWriteFence(0,0);
831                                         recvBuf->header->flagReceiver = 0;
832 #endif
833
834                                 }
835                         
836                         }
837                 }
838         }
839 };
840
841 inline void flushAllSendQs(){
842         int i;
843 #if SENDQ_LIST
844         int index_prev = -1;
845
846         i =  sendQ_head_index;
847         while (i!= -1) {
848                 XpmemSendQ *sendQ = xpmemContext->sendQs[i];
849                 CmiAssert(i !=  xpmemContext->noderank);
850                 if(sendQ->numEntries > 0){
851 #else
852         for(i=0;i<xpmemContext->nodesize;i++) {
853                 if (i == xpmemContext->noderank) continue;
854                 XpmemSendQ *sendQ = xpmemContext->sendQs[i];
855                 if(sendQ->numEntries > 0) {
856 #endif
857         
858 #if XPMEM_OSSPINLOCK
859                         if(OSSpinLockTry(&xpmemContext->sendBufs[i].header->lock)){
860 #elif XPMEM_LOCK
861                         if(sem_trywait(xpmemContext->sendBufs[i].mutex) >= 0){
862 #elif XPMEM_FENCE
863                         xpmemContext->sendBufs[i].header->flagSender = 1;
864                         xpmemContext->sendBufs[i].header->turn = RECEIVER;
865                         CmiMemoryReadFence(0,0);                        
866                         CmiMemoryWriteFence(0,0);
867                         if(!(xpmemContext->sendBufs[i].header->flagReceiver && xpmemContext->sendBufs[i].header->turn == RECEIVER)){
868 #endif
869
870                                 MACHSTATE1(3,"flushSendQ %d",i);
871                                 flushSendQ(sendQ);
872
873 #if XPMEM_OSSPINLOCK    
874                                 OSSpinLockUnlock(&xpmemContext->sendBufs[i].header->lock);
875 #elif XPMEM_LOCK
876                                 sem_post(xpmemContext->sendBufs[i].mutex);
877 #elif XPMEM_FENCE
878                                 CmiMemoryReadFence(0,0);                        
879                                 CmiMemoryWriteFence(0,0);
880                                 xpmemContext->sendBufs[i].header->flagSender = 0;
881 #endif
882                         }else{
883
884 #if XPMEM_FENCE
885                           xpmemContext->sendBufs[i].header->flagSender = 0;
886 #endif                          
887
888                         }
889                 }        
890 #if SENDQ_LIST
891                 if (sendQ->numEntries == 0) {
892                     if (index_prev != -1)
893                         xpmemContext->sendQs[index_prev]->next = sendQ->next;
894                     else
895                         sendQ_head_index = sendQ->next;
896                     i = sendQ->next;
897                     sendQ->next = -2;
898                 }
899                 else {
900                     index_prev = i;
901                     i = sendQ->next;
902                 }
903 #endif
904         }       
905 };
906
907 void static inline handoverXpmemMessage(char *newmsg,int total_size,int rank,int broot);
908
909 void emptyRecvBuf(sharedBufData *recvBuf){
910         int numMessages = recvBuf->header->count;
911         int i;
912
913         char *ptr=recvBuf->data;
914
915         for(i=0;i<numMessages;i++){
916                 int size;
917                 int rank, srcpe, seqno, magic, i;
918                 unsigned int broot;
919                 char *msg = ptr;
920                 char *newMsg;
921
922 #if CMK_NET_VERSION
923                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
924                 size = CmiMsgHeaderGetLength(msg);
925 #else
926                 size = CmiGetMsgSize(msg);
927 #endif
928         
929                 newMsg = (char *)CmiAlloc(size);
930                 memcpy(newMsg,msg,size);
931
932 #if CMK_NET_VERSION
933                 handoverPxshmMessage(newMsg,size,rank,broot);
934 #else
935                 handleOneRecvedMsg(size, newMsg);
936 #endif
937                 
938                 ptr += size;
939
940                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
941         }
942 #if 1
943   if(ptr - recvBuf->data != recvBuf->header->bytes){
944                 CmiPrintf("[%d] ptr - recvBuf->data  %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
945         }
946 #endif
947         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
948         recvBuf->header->count=0;
949         recvBuf->header->bytes=0;
950 }
951
952
953 #if CMK_NET_VERSION
954 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
955         CmiAssert(rank == 0);
956 #if CMK_BROADCAST_SPANNING_TREE
957         if (rank == DGRAM_BROADCAST
958 #if CMK_NODE_QUEUE_AVAILABLE
959           || rank == DGRAM_NODEBROADCAST
960 #endif
961          ){
962                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
963                                         }
964 #elif CMK_BROADCAST_HYPERCUBE
965         if (rank == DGRAM_BROADCAST
966 #if CMK_NODE_QUEUE_AVAILABLE
967           || rank == DGRAM_NODEBROADCAST
968 #endif
969          ){
970                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
971                                         }
972 #endif
973
974                 switch (rank) {
975         case DGRAM_BROADCAST: {
976           CmiPushPE(0, newmsg);
977           break;
978       }
979         default:
980                                 {
981                                         
982           CmiPushPE(rank, newmsg);
983                                 }
984         }    /* end of switch */
985 }
986 #endif
987
988
989 /**************************
990  *sendQ helper functions
991  * ****************/
992
993 void initSendQ(XpmemSendQ *q,int size,int rank){
994         q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
995
996         q->size = size;
997         q->numEntries = 0;
998
999         q->begin = 0;
1000         q->end = 0;
1001
1002         q->rank = rank;
1003 #if SENDQ_LIST
1004         q->next = -2;
1005 #endif
1006 }
1007
1008 void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount){
1009         if(q->numEntries == q->size){
1010                 //need to resize 
1011                 OutgoingMsgRec *oldData = q->data;
1012                 int newSize = q->size<<1;
1013                 q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
1014                 //copy head to the beginning of the new array
1015                 CmiAssert(q->begin == q->end);
1016
1017                 CmiAssert(q->begin < q->size);
1018                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
1019
1020                 if(q->end!=0){
1021                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
1022                 }
1023                 free(oldData);
1024                 q->begin = 0;
1025                 q->end = q->size;
1026                 q->size = newSize;
1027         }
1028         OutgoingMsgRec *omg = &q->data[q->end];
1029         omg->size = size;
1030         omg->data = msg;
1031         omg->refcount = refcount;
1032         (q->end)++;
1033         if(q->end >= q->size){
1034                 q->end -= q->size;
1035         }
1036         q->numEntries++;
1037 }
1038
1039 OutgoingMsgRec * popSendQ(XpmemSendQ *q){
1040         OutgoingMsgRec * ret;
1041         if(0 == q->numEntries){
1042                 return NULL;
1043         }
1044
1045         ret = &q->data[q->begin];
1046         (q->begin)++;
1047         if(q->begin >= q->size){
1048                 q->begin -= q->size;
1049         }
1050         
1051         q->numEntries--;
1052         return ret;
1053 }