ffce1b0fdaeea982eb98e21b50504427d8660c51
[charm.git] / src / arch / util / machine-xpmem.c
1 /** @file
2
3
4 There are three options here for synchronization:
5       XPMEM_FENCE is the default. It uses memory fences
6       XPMEM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
7       XPMEM_LOCK will cause POSIX semaphores to be used
8
9   created by 
10         Gengbin Zheng, September 2011
11 */
12
13 /**
14  * @addtogroup NET
15  * @{
16  */
17
18 #include <sys/types.h>
19 #include <sys/mman.h>
20 #include <unistd.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <errno.h>
24 #include <signal.h>
25
26 #include "xpmem.h"
27
28 /************** 
29    Determine which type of synchronization to use 
30 */
31 #if XPMEM_OSSPINLOCK
32 #include <libkern/OSAtomic.h>
33 #elif XPMEM_LOCK
34 #include <semaphore.h>
35 #else
36 /* Default to using fences */
37 #define XPMEM_FENCE 1
38 #endif
39
40 #define MEMDEBUG(x) //x
41
42 #define XPMEM_STATS    0
43
44 #define SENDQ_LIST     0
45
46 /*** The following code was copied verbatim from pcqueue.h file ***/
47 #undef CmiMemoryWriteFence
48 #if XPMEM_FENCE
49 #ifdef POWER_PC
50 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
51 #else
52 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
53 //#define CmiMemoryWriteFence(startPtr,nBytes) 
54 #endif
55 #else
56 #undef CmiMemoryWriteFence
57 #define CmiMemoryWriteFence(startPtr,nBytes)  
58 #endif
59
60 #undef CmiMemoryReadFence
61 #if XPMEM_FENCE
62 #ifdef POWER_PC
63 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
64 #else
65 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
66 //#define CmiMemoryReadFence(startPtr,nBytes) 
67 #endif
68 #else
69 #define CmiMemoryReadFence(startPtr,nBytes) 
70 #endif
71
72 #if CMK_SMP
73 #error  "PXSHM can only be used in non-smp build of Charm++"
74 #endif
75
76 /***************************************************************************************/
77
78 enum entities {SENDER,RECEIVER};
79
80 /************************
81  *      Implementation currently assumes that
82  *      1) all nodes have the same number of processors
83  *  2) in the nodelist all processors in a node are listed in sequence
84  *   0 1 2 3      4 5 6 7 
85  *   -------      -------
86  *    node 1       node 2 
87  ************************/
88
89 #define NAMESTRLEN 60
90 #define PREFIXSTRLEN 50 
91
92 static int XPMEMBUFLEN  =   (1024*1024*4);
93 #define XPMEMMINSIZE     (1*1024)
94 #define XPMEMMAXSIZE     (1024*1024)
95
96 static int  SENDQSTARTSIZE  =  256;
97
98
99 /// This struct is used as the first portion of a shared memory region, followed by data
100 typedef struct {
101         int count; //number of messages
102         int bytes; //number of bytes
103
104 #if XPMEM_OSSPINLOCK
105         OSSpinLock lock;
106 #endif
107
108 #if XPMEM_FENCE
109         volatile int flagSender;
110         CmiMemorySMPSeparation_t pad1;
111         volatile int flagReceiver;
112         CmiMemorySMPSeparation_t pad2;
113         volatile int turn;
114 #endif  
115
116 } sharedBufHeader;
117
118
119 typedef struct {
120 #if XPMEM_LOCK
121         sem_t *mutex;
122 #endif
123         sharedBufHeader *header;        
124         char *data;
125         __s64  segid;
126 } sharedBufData;
127
128 typedef struct OutgoingMsgRec
129 {
130   char *data;
131   int  *refcount;
132   int   size;
133 }
134 OutgoingMsgRec;
135
136 typedef struct {
137         int size;       //total size of data array
138         int begin;      //position of first element
139         int end;        //position of next element
140         int numEntries; //number of entries
141         int rank;       // for dest rank
142 #if SENDQ_LIST
143         int next;         // next dstrank of non-empty queue
144 #endif
145         OutgoingMsgRec *data;
146
147 } XpmemSendQ;
148
149 typedef struct {
150         int nodesize;
151         int noderank;
152         int nodestart,nodeend;//proc numbers for the start and end of this node
153         char prefixStr[PREFIXSTRLEN];
154         char **recvBufNames;
155         char **sendBufNames;
156
157         sharedBufData *recvBufs;
158         sharedBufData *sendBufs;
159
160         XpmemSendQ **sendQs;
161
162 #if XPMEM_STATS
163         int sendCount;
164         int validCheckCount;
165         int lockRecvCount;
166         double validCheckTime;
167         double sendTime;
168         double commServerTime;
169 #endif
170
171 } XpmemContext;
172
173
174 #if SENDQ_LIST
175 static int sendQ_head_index = -1;
176 #endif
177
178 XpmemContext *xpmemContext=NULL; //global context
179
180
181 void calculateNodeSizeAndRank(char **);
182 void setupSharedBuffers();
183 void initAllSendQs();
184
185 void CmiExitXpmem();
186
187 static void cleanupOnAllSigs(int signo)
188 {
189     CmiExitXpmem();
190 }
191
192 static int xpmem_fd;
193
194 /******************
195  *      Initialization routine
196  *      currently just testing start up
197  * ****************/
198 void CmiInitXpmem(char **argv){
199         char input[32];
200         char *env;
201
202         MACHSTATE(3,"CminitXpmem start");
203         xpmemContext = (XpmemContext *)calloc(1,sizeof(XpmemContext));
204
205 #if CMK_NET_VERSION
206         if(Cmi_charmrun_pid <= 0){
207                 CmiAbort("pxshm must be run with charmrun");
208         }
209 #endif
210         calculateNodeSizeAndRank(argv);
211
212         MACHSTATE1(3,"CminitXpmem  %d calculateNodeSizeAndRank",xpmemContext->nodesize);
213
214         if(xpmemContext->nodesize == 1) return;
215         
216         env = getenv("CHARM_XPMEM_SIZE");
217         if (env) {
218             XPMEMBUFLEN = CmiReadSize(env);
219         }
220         SENDQSTARTSIZE = 32 * xpmemContext->nodesize;
221
222         if (_Cmi_mynode == 0)
223             CmiPrintf("Charm++> xpmem enabled: %d cores per node, buffer size: %.1fMB\n", xpmemContext->nodesize, XPMEMBUFLEN/1024.0/1024.0);
224
225         xpmem_fd = open("/dev/xpmem", O_RDWR);
226         if (xpmem_fd == -1) {
227             CmiAbort("Opening /dev/xpmem");
228         }
229
230 #if CMK_CRAYXE
231         srand(getpid());
232         int Cmi_charmrun_pid = rand();
233         PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
234 #elif !CMK_NET_VERSION
235         #error "need a unique number"
236 #endif
237         snprintf(&(xpmemContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_xpmem_%d",Cmi_charmrun_pid);
238
239         MACHSTATE2(3,"CminitXpmem %s %d pre setupSharedBuffers",xpmemContext->prefixStr,xpmemContext->nodesize);
240
241         setupSharedBuffers();
242
243         MACHSTATE2(3,"CminitXpmem %s %d setupSharedBuffers",xpmemContext->prefixStr,xpmemContext->nodesize);
244
245         initAllSendQs();
246         
247         MACHSTATE2(3,"CminitXpmem %s %d initAllSendQs",xpmemContext->prefixStr,xpmemContext->nodesize);
248
249         MACHSTATE2(3,"CminitXpmem %s %d done",xpmemContext->prefixStr,xpmemContext->nodesize);
250
251         signal(SIGSEGV, cleanupOnAllSigs);
252         signal(SIGFPE, cleanupOnAllSigs);
253         signal(SIGILL, cleanupOnAllSigs);
254         signal(SIGTERM, cleanupOnAllSigs);
255         signal(SIGABRT, cleanupOnAllSigs);
256         signal(SIGQUIT, cleanupOnAllSigs);
257         signal(SIGBUS, cleanupOnAllSigs);
258 };
259
260 /**************
261  * shutdown shmem objects and semaphores
262  *
263  * *******************/
264 void tearDownSharedBuffers();
265
266 void CmiExitXpmem(){
267         int i=0;
268         
269         if (xpmemContext == NULL) return;
270
271         if(xpmemContext->nodesize != 1) {
272                 tearDownSharedBuffers();
273         
274                 for(i=0;i<xpmemContext->nodesize;i++){
275                         if(i != xpmemContext->noderank){
276                                 break;
277                         }
278                 }
279                 free(xpmemContext->recvBufNames[i]);
280                 free(xpmemContext->sendBufNames[i]);
281
282                 free(xpmemContext->recvBufNames);
283                 free(xpmemContext->sendBufNames);
284
285                 free(xpmemContext->recvBufs);
286                 free(xpmemContext->sendBufs);
287
288         }
289 #if XPMEM_STATS
290 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,xpmemContext->sendCount,xpmemContext->sendTime,xpmemContext->validCheckCount,xpmemContext->validCheckTime,xpmemContext->commServerTime,xpmemContext->lockRecvCount);
291 #endif
292         free(xpmemContext);
293         xpmemContext = NULL;
294 }
295
296 /******************
297  *Should this message be sent using PxShm or not ?
298  * ***********************/
299
300 /* dstNode is node number */
301 inline 
302 static int CmiValidXpmem(int node, int size){
303 #if XPMEM_STATS
304         xpmemContext->validCheckCount++;
305 #endif
306         //replace by bitmap later
307         //if(dst >= xpmemContext->nodestart && dst <= xpmemContext->nodeend && size < XPMEMMAXSIZE && size > XPMEMMINSIZE){
308         return (node >= xpmemContext->nodestart && node <= xpmemContext->nodeend && size <= XPMEMMAXSIZE )? 1: 0;
309 };
310
311
312 inline int XpmemRank(int dstnode){
313         return dstnode - xpmemContext->nodestart;
314 }
315
316 inline void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount);
317 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,XpmemSendQ *dstSendQ);
318 inline int flushSendQ(XpmemSendQ *sendQ);
319
320 inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,XpmemSendQ *dstSendQ){
321   return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
322 }
323
324 /***************
325  *
326  *Send this message through shared memory
327  *if you cannot get lock, put it in the sendQ
328  *Before sending messages pick them from sendQ
329  *
330  * ****************************/
331
332 void CmiSendMessageXpmem(char *msg, int size, int dstnode, int *refcount)
333 {
334 #if XPMEM_STATS
335         double _startSendTime = CmiWallTimer();
336 #endif
337
338         LrtsPrepareEnvelope(msg, size);
339         
340         int dstRank = XpmemRank(dstnode);
341         MEMDEBUG(CmiMemoryCheck());
342   
343         MACHSTATE4(3,"Send Msg Xpmem ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
344         MACHSTATE4(3,"Send Msg Xpmem ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
345
346         CmiAssert(dstRank >=0 && dstRank != xpmemContext->noderank);
347         
348         sharedBufData *dstBuf = &(xpmemContext->sendBufs[dstRank]);
349         XpmemSendQ *sendQ = xpmemContext->sendQs[dstRank];
350
351 #if XPMEM_OSSPINLOCK
352         if(! OSSpinLockTry(&dstBuf->header->lock)){
353 #elif XPMEM_LOCK
354         if(sem_trywait(dstBuf->mutex) < 0){
355 #elif XPMEM_FENCE
356         dstBuf->header->flagSender = 1;
357         dstBuf->header->turn = RECEIVER;
358         CmiMemoryReadFence(0,0);
359         CmiMemoryWriteFence(0,0);
360         //if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
361         if(dstBuf->header->flagReceiver){
362                 dstBuf->header->flagSender = 0;
363 #endif
364                 /**failed to get the lock 
365                 insert into q and retain the message*/
366 #if SENDQ_LIST
367                 if (sendQ->numEntries == 0 && sendQ->next == -2) {
368                     sendQ->next = sendQ_head_index;
369                     sendQ_head_index = dstRank;
370                 }
371 #endif
372                 pushSendQ(sendQ, msg, size, refcount);
373                 (*refcount)++;
374                 MEMDEBUG(CmiMemoryCheck());
375                 return;
376         }else{
377                 /***
378                  * We got the lock for this buffer
379                  * first write all the messages in the sendQ and then write this guy
380                  * */
381                  if(sendQ->numEntries == 0){
382                         // send message user event
383                         int ret = sendMessage(msg,size,refcount,dstBuf,sendQ);
384 #if SENDQ_LIST
385                         if (sendQ->numEntries > 0 && sendQ->next == -2) 
386                         {
387                                 sendQ->next = sendQ_head_index;
388                                 sendQ_head_index = dstRank;
389                         }
390 #endif
391                         MACHSTATE(3,"Xpmem Send succeeded immediately");
392                  }else{
393                         (*refcount)+=2;/*this message should not get deleted when the queue is flushed*/
394                         pushSendQ(sendQ,msg,size,refcount);
395                         MACHSTATE3(3,"Xpmem ogm %p pushed to sendQ length %d refcount %d",ogm,sendQ->numEntries,ogm->refcount);
396                         int sent = flushSendQ(sendQ);
397                         (*refcount)--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
398                         MACHSTATE1(3,"Xpmem flushSendQ sent %d messages",sent);
399                  }
400                  /* unlock the recvbuffer*/
401
402 #if XPMEM_OSSPINLOCK
403                  OSSpinLockUnlock(&dstBuf->header->lock);
404 #elif XPMEM_LOCK
405                  sem_post(dstBuf->mutex);
406 #elif XPMEM_FENCE
407                  CmiMemoryReadFence(0,0);                       
408                  CmiMemoryWriteFence(0,0);
409                  dstBuf->header->flagSender = 0;
410 #endif
411         }
412 #if XPMEM_STATS
413                 xpmemContext->sendCount ++;
414                 xpmemContext->sendTime += (CmiWallTimer()-_startSendTime);
415 #endif
416         MEMDEBUG(CmiMemoryCheck());
417
418 };
419
420 inline void emptyAllRecvBufs();
421 inline void flushAllSendQs();
422
423 /**********
424  * Extract all the messages from the recvBuffers you can
425  * Flush all sendQs
426  * ***/
427 inline void CommunicationServerXpmem()
428 {
429 #if XPMEM_STATS
430         double _startCommServerTime =CmiWallTimer();
431 #endif  
432         MEMDEBUG(CmiMemoryCheck());
433
434         emptyAllRecvBufs();
435         flushAllSendQs();
436
437 #if XPMEM_STATS
438         xpmemContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
439 #endif
440         MEMDEBUG(CmiMemoryCheck());
441 };
442
443 static void CmiNotifyStillIdleXpmem(CmiIdleState *s){
444         CommunicationServerXpmem();
445 }
446
447
448 static void CmiNotifyBeginIdleXpmem(CmiIdleState *s)
449 {
450         CmiNotifyStillIdle(s);
451 }
452
453 void calculateNodeSizeAndRank(char **argv)
454 {
455         xpmemContext->nodesize=1;
456         MACHSTATE(3,"calculateNodeSizeAndRank start");
457         //CmiGetArgIntDesc(argv, "+nodesize", &(xpmemContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
458         CmiGetArgIntDesc(argv, "+nodesize", &(xpmemContext->nodesize),"Number of cores in this node");
459         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",xpmemContext->nodesize);
460
461         xpmemContext->noderank = _Cmi_mynode % (xpmemContext->nodesize);
462         
463         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",xpmemContext->noderank);
464         
465         xpmemContext->nodestart = _Cmi_mynode -xpmemContext->noderank;
466         
467         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
468
469         xpmemContext->nodeend = xpmemContext->nodestart + xpmemContext->nodesize -1;
470
471         if(xpmemContext->nodeend >= _Cmi_numnodes){
472                 xpmemContext->nodeend = _Cmi_numnodes-1;
473                 xpmemContext->nodesize = (xpmemContext->nodeend - xpmemContext->nodestart) +1;
474         }
475         
476         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",xpmemContext->nodestart,xpmemContext->nodesize,xpmemContext->noderank);
477 }
478
479 void allocBufNameStrings(char ***bufName);
480 void createRecvXpmemAndSems(sharedBufData **bufs,char **bufNames);
481 void createSendXpmemAndSems(sharedBufData **bufs,char **bufNames);
482 void removeXpmemFiles();
483
484 /***************
485  *      calculate the name of the shared objects and semaphores
486  *      
487  *      name scheme
488  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
489  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
490  *                the semaphore name used by us is the same as the shared memory object name
491  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
492  *
493  *      open these shared objects and semaphores
494  * *********/
495 void setupSharedBuffers(){
496         int i=0;
497         
498         allocBufNameStrings(&(xpmemContext->recvBufNames));
499         
500         allocBufNameStrings((&xpmemContext->sendBufNames));
501         
502         for(i=0;i<xpmemContext->nodesize;i++){
503                 if(i != xpmemContext->noderank){
504                         snprintf(xpmemContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",xpmemContext->prefixStr,xpmemContext->noderank+xpmemContext->nodestart,i+xpmemContext->nodestart);
505                         MACHSTATE2(3,"recvBufName %s with rank %d",xpmemContext->recvBufNames[i],i)
506                         snprintf(xpmemContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",xpmemContext->prefixStr,i+xpmemContext->nodestart,xpmemContext->noderank+xpmemContext->nodestart);
507                         MACHSTATE2(3,"sendBufName %s with rank %d",xpmemContext->sendBufNames[i],i);
508                 }
509         }
510         
511         createRecvXpmemAndSems(&(xpmemContext->recvBufs),xpmemContext->recvBufNames);
512         CmiBarrier();
513         createSendXpmemAndSems(&(xpmemContext->sendBufs),xpmemContext->sendBufNames);
514         CmiBarrier();
515         removeXpmemFiles();
516         
517         for(i=0;i<xpmemContext->nodesize;i++){
518                 if(i != xpmemContext->noderank){
519                         //CmiAssert(xpmemContext->sendBufs[i].header->count == 0);
520                         xpmemContext->sendBufs[i].header->count = 0;
521                         xpmemContext->sendBufs[i].header->bytes = 0;
522                 }
523         }
524 }
525
526 void allocBufNameStrings(char ***bufName)
527 {
528         int i,count;
529         int totalAlloc = sizeof(char)*NAMESTRLEN*(xpmemContext->nodesize-1);
530         char *tmp = malloc(totalAlloc);
531         
532         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
533
534         *bufName = (char **)malloc(sizeof(char *)*xpmemContext->nodesize);
535         for(i=0,count=0;i<xpmemContext->nodesize;i++){
536                 if(i != xpmemContext->noderank){
537                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
538                         count++;
539                 }else{
540                         (*bufName)[i] = NULL;
541                 }
542         }
543 }
544
545 __s64 createXpmemObject(int size,char **pPtr)
546 {
547         struct xpmem_cmd_make make_info;
548         int ret;
549
550         *pPtr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0);
551         if (*pPtr == MAP_FAILED) {
552             perror("Creating mapping.");
553             return -1;
554         }
555         make_info.vaddr = (__u64) *pPtr;
556         make_info.size = size;
557         make_info.permit_type = XPMEM_PERMIT_MODE;
558         make_info.permit_value = (__u64) 0600;
559         ret = ioctl(xpmem_fd, XPMEM_CMD_MAKE, &make_info);
560         if (ret != 0) {
561             perror("xpmem_make");
562             CmiAbort("xpmem_make");
563         }
564         return make_info.segid;
565 }
566
567 void attachXpmemObject(__s64 segid, int size, char **pPtr)
568 {
569        int ret;
570        __s64 apid;
571        struct xpmem_cmd_get get_info;
572        struct xpmem_cmd_attach attach_info;
573
574        get_info.segid = segid;
575        get_info.flags = XPMEM_RDWR;
576        get_info.permit_type = XPMEM_PERMIT_MODE;
577        get_info.permit_value = (__u64) NULL;
578        ret = ioctl(xpmem_fd, XPMEM_CMD_GET, &get_info);
579        if (ret != 0) {
580                CmiAbort("xpmem_get");
581        }
582        apid = get_info.apid;
583
584        attach_info.apid = get_info.apid;
585        attach_info.offset = 0;
586        attach_info.size = size;
587        attach_info.vaddr = (__u64) NULL;
588        attach_info.fd = xpmem_fd;
589        attach_info.flags = 0;
590
591        ret = ioctl(xpmem_fd, XPMEM_CMD_ATTACH, &attach_info);
592        if (ret != 0) {
593                CmiAbort("xpmem_attach");
594        }
595
596        *pPtr = (void *)attach_info.vaddr;
597 }
598
599 void createRecvXpmemAndSems(sharedBufData **bufs,char **bufNames){
600         int i=0;
601         __s64 *segid_arr;
602         int size, pagesize = getpagesize();
603         
604         *bufs = (sharedBufData *)calloc(xpmemContext->nodesize, sizeof(sharedBufData));
605         segid_arr = malloc(sizeof(__s64)*xpmemContext->nodesize);
606         
607         size = XPMEMBUFLEN+sizeof(sharedBufHeader);
608         size = ((~(pagesize-1))&(size+pagesize-1));
609
610         for(i=0;i<xpmemContext->nodesize;i++){
611             if(i != xpmemContext->noderank)  {
612                 (*bufs)[i].segid = segid_arr[i] = createXpmemObject(size,(char **)&((*bufs)[i].header));
613                 memset(((*bufs)[i].header), 0, size);
614                 (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
615 #if XPMEM_OSSPINLOCK
616                 (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
617 #elif XPMEM_LOCK
618                 (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
619 #endif
620             }else{
621                 (*bufs)[i].header = NULL;
622                 (*bufs)[i].data = NULL;
623 #if XPMEM_LOCK
624                 (*bufs)[i].mutex = NULL;
625 #endif
626             }
627         }       
628
629         int fd;
630         char fname[128];
631         sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+xpmemContext->noderank);
632         fd = open(fname, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR);
633         if (fd == -1) {
634           CmiAbort("createShmObjectsAndSems failed");
635         }
636         write(fd, segid_arr, sizeof(__s64*)*xpmemContext->nodesize);
637         close(fd);
638         free(segid_arr);
639 }
640
641 void createSendXpmemAndSems(sharedBufData **bufs,char **bufNames)
642 {
643         int i;
644         int size, pagesize;
645
646         pagesize = getpagesize();
647         size = XPMEMBUFLEN+sizeof(sharedBufHeader);
648         size = ((~(pagesize-1))&(size+pagesize-1));
649
650         *bufs = (sharedBufData *)calloc(xpmemContext->nodesize, sizeof(sharedBufData));
651
652         for(i=0;i<xpmemContext->nodesize;i++){
653             if(i != xpmemContext->noderank)  {
654                 __s64 segid;
655                  char fname[128];
656                  int fd;
657                  sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+i);
658                  fd = open(fname, O_RDONLY);
659                  if (fd == -1) {
660                      CmiAbort("createShmObjectsAndSems failed");
661                  }
662                 lseek(fd, xpmemContext->noderank*sizeof(__s64), SEEK_SET);
663                 read(fd, &segid, sizeof(__s64*));
664                 close(fd);
665                 (*bufs)[i].segid = segid;
666                 attachXpmemObject(segid, size,(char **)&((*bufs)[i].header));
667                 memset(((*bufs)[i].header), 0, XPMEMBUFLEN+sizeof(sharedBufHeader));
668                 (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
669 #if XPMEM_OSSPINLOCK
670                 (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
671 #elif XPMEM_LOCK
672                 (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
673 #endif
674             }else{
675                 (*bufs)[i].header = NULL;
676                 (*bufs)[i].data = NULL;
677 #if XPMEM_LOCK
678                 (*bufs)[i].mutex = NULL;
679 #endif
680             }
681         }
682 }
683
684 void removeXpmemFiles()
685 {
686         char fname[64];
687         sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+xpmemContext->noderank);
688         unlink(fname);
689 }
690
691 void tearDownSharedBuffers(){
692         int i;
693         for(i= 0;i<xpmemContext->nodesize;i++){
694             if(i != xpmemContext->noderank){
695 #if XPMEM_LOCK
696                 sem_close(xpmemContext->recvBufs[i].mutex);
697                 sem_close(xpmemContext->sendBufs[i].mutex);
698                 sem_unlink(xpmemContext->sendBufNames[i]);
699                 sem_unlink(xpmemContext->recvBufNames[i]);
700                 xpmemContext->recvBufs[i].mutex = NULL;
701                 xpmemContext->sendBufs[i].mutex = NULL;
702 #endif
703             }
704         }
705 };
706
707 void initSendQ(XpmemSendQ *q,int size,int rank);
708
709 void initAllSendQs(){
710         int i=0;
711         xpmemContext->sendQs = (XpmemSendQ **) malloc(sizeof(XpmemSendQ *)*xpmemContext->nodesize);
712         for(i=0;i<xpmemContext->nodesize;i++){
713                 if(i != xpmemContext->noderank){
714                         xpmemContext->sendQs[i] = (XpmemSendQ *)calloc(1, sizeof(XpmemSendQ));
715                         initSendQ((xpmemContext->sendQs)[i],SENDQSTARTSIZE,i);
716                 }else{
717                         xpmemContext->sendQs[i] = NULL;
718                 }
719         }
720 };
721
722
723 /****************
724  *copy this message into the sharedBuf
725  If it does not succeed
726  *put it into the sendQ 
727  *NOTE: This method is called only after obtaining the corresponding mutex
728  * ********/
729 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,XpmemSendQ *dstSendQ){
730
731         if(dstBuf->header->bytes+size <= XPMEMBUFLEN){
732                 /**copy  this message to sharedBuf **/
733                 dstBuf->header->count++;
734                 CmiMemcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
735                 dstBuf->header->bytes += size;
736                 MACHSTATE4(3,"Xpmem send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
737                 CmiFree(msg);
738                 return 1;
739         }
740         /***
741          * Shared Buffer is too full for this message
742          * **/
743         //printf("[%d] send buffer is too full\n", CmiMyPe());
744         pushSendQ(dstSendQ,msg,size,refcount);
745         (*refcount)++;
746         MACHSTATE3(3,"Xpmem send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
747         return 0;
748 }
749
750 inline OutgoingMsgRec* popSendQ(XpmemSendQ *q);
751
752 /****
753  *Try to send all the messages in the sendq to this destination rank
754  *NOTE: This method is called only after obtaining the corresponding mutex
755  * ************/
756
757 inline int flushSendQ(XpmemSendQ  *dstSendQ){
758         sharedBufData *dstBuf = &(xpmemContext->sendBufs[dstSendQ->rank]);
759         int count=dstSendQ->numEntries;
760         int sent=0;
761         while(count > 0){
762                 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
763                 (*ogm->refcount)--;
764                 MACHSTATE4(3,"Xpmem trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstSendQ->rank,ogm->refcount);
765                 int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
766                 if(ret==1){
767                         sent++;
768 #if CMK_NET_VERSION
769                         GarbageCollectMsg(ogm);
770 #endif
771                 }
772                 count--;
773         }
774         return sent;
775 }
776
777 inline void emptyRecvBuf(sharedBufData *recvBuf);
778
779 inline void emptyAllRecvBufs(){
780         int  i;
781         for(i=0;i<xpmemContext->nodesize;i++){
782                 if(i != xpmemContext->noderank){
783                         sharedBufData *recvBuf = &(xpmemContext->recvBufs[i]);
784                         if(recvBuf->header->count > 0){
785
786 #if XPMEM_STATS
787                                 xpmemContext->lockRecvCount++;
788 #endif
789
790 #if XPMEM_OSSPINLOCK
791                                 if(! OSSpinLockTry(&recvBuf->header->lock)){
792 #elif XPMEM_LOCK
793                                 if(sem_trywait(recvBuf->mutex) < 0){
794 #elif XPMEM_FENCE
795                                 recvBuf->header->flagReceiver = 1;
796                                 recvBuf->header->turn = SENDER;
797                                 CmiMemoryReadFence(0,0);
798                                 CmiMemoryWriteFence(0,0);
799                                 //if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
800                                 if((recvBuf->header->flagSender)){
801                                         recvBuf->header->flagReceiver = 0;
802 #endif
803                                 }else{
804
805
806                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
807                                         emptyRecvBuf(recvBuf);
808
809 #if XPMEM_OSSPINLOCK
810                                         OSSpinLockUnlock(&recvBuf->header->lock);
811 #elif XPMEM_LOCK
812                                         sem_post(recvBuf->mutex);
813 #elif XPMEM_FENCE
814                                         CmiMemoryReadFence(0,0);
815                                         CmiMemoryWriteFence(0,0);
816                                         recvBuf->header->flagReceiver = 0;
817 #endif
818
819                                 }
820                         
821                         }
822                 }
823         }
824 };
825
826 inline void flushAllSendQs(){
827         int i;
828 #if SENDQ_LIST
829         int index_prev = -1;
830
831         i =  sendQ_head_index;
832         while (i!= -1) {
833                 XpmemSendQ *sendQ = xpmemContext->sendQs[i];
834                 CmiAssert(i !=  xpmemContext->noderank);
835                 if(sendQ->numEntries > 0){
836 #else
837         for(i=0;i<xpmemContext->nodesize;i++) {
838                 if (i == xpmemContext->noderank) continue;
839                 XpmemSendQ *sendQ = xpmemContext->sendQs[i];
840                 if(SendQ->numEntries > 0) {
841 #endif
842         
843 #if XPMEM_OSSPINLOCK
844                         if(OSSpinLockTry(&xpmemContext->sendBufs[i].header->lock)){
845 #elif XPMEM_LOCK
846                         if(sem_trywait(xpmemContext->sendBufs[i].mutex) >= 0){
847 #elif XPMEM_FENCE
848                         xpmemContext->sendBufs[i].header->flagSender = 1;
849                         xpmemContext->sendBufs[i].header->turn = RECEIVER;
850                         CmiMemoryReadFence(0,0);                        
851                         CmiMemoryWriteFence(0,0);
852                         if(!(xpmemContext->sendBufs[i].header->flagReceiver && xpmemContext->sendBufs[i].header->turn == RECEIVER)){
853 #endif
854
855                                 MACHSTATE1(3,"flushSendQ %d",i);
856                                 flushSendQ(sendQ);
857
858 #if XPMEM_OSSPINLOCK    
859                                 OSSpinLockUnlock(&xpmemContext->sendBufs[i].header->lock);
860 #elif XPMEM_LOCK
861                                 sem_post(xpmemContext->sendBufs[i].mutex);
862 #elif XPMEM_FENCE
863                                 CmiMemoryReadFence(0,0);                        
864                                 CmiMemoryWriteFence(0,0);
865                                 xpmemContext->sendBufs[i].header->flagSender = 0;
866 #endif
867                         }else{
868
869 #if XPMEM_FENCE
870                           xpmemContext->sendBufs[i].header->flagSender = 0;
871 #endif                          
872
873                         }
874                 }        
875 #if SENDQ_LIST
876                 if (sendQ->numEntries == 0) {
877                     if (index_prev != -1)
878                         xpmemContext->sendQs[index_prev]->next = sendQ->next;
879                     else
880                         sendQ_head_index = sendQ->next;
881                     i = sendQ->next;
882                     sendQ->next = -2;
883                 }
884                 else {
885                     index_prev = i;
886                     i = sendQ->next;
887                 }
888 #endif
889         }       
890 };
891
892 void static inline handoverXpmemMessage(char *newmsg,int total_size,int rank,int broot);
893
894 void emptyRecvBuf(sharedBufData *recvBuf){
895         int numMessages = recvBuf->header->count;
896         int i;
897
898         char *ptr=recvBuf->data;
899
900         for(i=0;i<numMessages;i++){
901                 int size;
902                 int rank, srcpe, seqno, magic, i;
903                 unsigned int broot;
904                 char *msg = ptr;
905                 char *newMsg;
906
907 #if CMK_NET_VERSION
908                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
909                 size = CmiMsgHeaderGetLength(msg);
910 #else
911                 size = CmiGetMsgSize(msg);
912 #endif
913         
914                 newMsg = (char *)CmiAlloc(size);
915                 memcpy(newMsg,msg,size);
916
917 #if CMK_NET_VERSION
918                 handoverPxshmMessage(newMsg,size,rank,broot);
919 #else
920                 handleOneRecvedMsg(size, newMsg);
921 #endif
922                 
923                 ptr += size;
924
925                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
926         }
927 #if 1
928   if(ptr - recvBuf->data != recvBuf->header->bytes){
929                 CmiPrintf("[%d] ptr - recvBuf->data  %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
930         }
931 #endif
932         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
933         recvBuf->header->count=0;
934         recvBuf->header->bytes=0;
935 }
936
937
938 #if CMK_NET_VERSION
939 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
940         CmiAssert(rank == 0);
941 #if CMK_BROADCAST_SPANNING_TREE
942         if (rank == DGRAM_BROADCAST
943 #if CMK_NODE_QUEUE_AVAILABLE
944           || rank == DGRAM_NODEBROADCAST
945 #endif
946          ){
947                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
948                                         }
949 #elif CMK_BROADCAST_HYPERCUBE
950         if (rank == DGRAM_BROADCAST
951 #if CMK_NODE_QUEUE_AVAILABLE
952           || rank == DGRAM_NODEBROADCAST
953 #endif
954          ){
955                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
956                                         }
957 #endif
958
959                 switch (rank) {
960         case DGRAM_BROADCAST: {
961           CmiPushPE(0, newmsg);
962           break;
963       }
964         default:
965                                 {
966                                         
967           CmiPushPE(rank, newmsg);
968                                 }
969         }    /* end of switch */
970 }
971 #endif
972
973
974 /**************************
975  *sendQ helper functions
976  * ****************/
977
978 void initSendQ(XpmemSendQ *q,int size,int rank){
979         q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
980
981         q->size = size;
982         q->numEntries = 0;
983
984         q->begin = 0;
985         q->end = 0;
986
987         q->rank = rank;
988 #if SENDQ_LIST
989         q->next = -2;
990 #endif
991 }
992
993 void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount){
994         if(q->numEntries == q->size){
995                 //need to resize 
996                 OutgoingMsgRec *oldData = q->data;
997                 int newSize = q->size<<1;
998                 q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
999                 //copy head to the beginning of the new array
1000                 CmiAssert(q->begin == q->end);
1001
1002                 CmiAssert(q->begin < q->size);
1003                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
1004
1005                 if(q->end!=0){
1006                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
1007                 }
1008                 free(oldData);
1009                 q->begin = 0;
1010                 q->end = q->size;
1011                 q->size = newSize;
1012         }
1013         OutgoingMsgRec *omg = &q->data[q->end];
1014         omg->size = size;
1015         omg->data = msg;
1016         omg->refcount = refcount;
1017         (q->end)++;
1018         if(q->end >= q->size){
1019                 q->end -= q->size;
1020         }
1021         q->numEntries++;
1022 }
1023
1024 OutgoingMsgRec * popSendQ(XpmemSendQ *q){
1025         OutgoingMsgRec * ret;
1026         if(0 == q->numEntries){
1027                 return NULL;
1028         }
1029
1030         ret = &q->data[q->begin];
1031         (q->begin)++;
1032         if(q->begin >= q->size){
1033                 q->begin -= q->size;
1034         }
1035         
1036         q->numEntries--;
1037         return ret;
1038 }