bbe5645976f7402d2b09c8d22babafc071603a58
[charm.git] / src / arch / util / machine-pxshm.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * pxshm --> posix shared memory based network layer for communication
4  * between processes on the same node
5  * This is not going to be the primary mode of communication 
6  * but only for messages below a certain size between
7  * processes on the same node
8  * for non-smp version only
9  * * @ingroup NET
10  * contains only pxshm code for 
11  * - CmiInitPxshm()
12  * - DeliverViaPxShm()
13  * - CommunicationServerPxshm()
14  * - CmiMachineExitPxshm()
15
16
17 There are three options here for synchronization:
18       PXSHM_FENCE is the default. It uses memory fences
19       PXSHM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
20       PXSHM_LOCK will cause POSIX semaphores to be used
21
22   created by 
23         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
24 */
25
26 /**
27  * @addtogroup NET
28  * @{
29  */
30
31 #include <sys/types.h>
32 #include <sys/mman.h>
33 #include <unistd.h>
34 #include <sys/stat.h>
35 #include <fcntl.h>
36 #include <errno.h>
37 #include <signal.h>
38
39
40 /************** 
41    Determine which type of synchronization to use 
42 */
43 #if PXSHM_OSSPINLOCK
44 #include <libkern/OSAtomic.h>
45 #elif PXSHM_LOCK
46 #include <semaphore.h>
47 #else
48 /* Default to using fences */
49 #define PXSHM_FENCE 1
50 #endif
51
52
53 #define MEMDEBUG(x) //x
54
55 #define PXSHM_STATS 0
56
57 #define SENDQ_LIST     0
58
59 /*** The following code was copied verbatim from pcqueue.h file ***/
60 #if ! CMK_SMP
61 #undef CmiMemoryWriteFence
62 #if PXSHM_FENCE
63 #ifdef POWER_PC
64 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
65 #else
66 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
67 //#define CmiMemoryWriteFence(startPtr,nBytes) 
68 #endif
69 #else
70 #undef CmiMemoryWriteFence
71 #define CmiMemoryWriteFence(startPtr,nBytes)  
72 #endif
73
74 #undef CmiMemoryReadFence
75 #if PXSHM_FENCE
76 #ifdef POWER_PC
77 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
78 #else
79 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
80 //#define CmiMemoryReadFence(startPtr,nBytes) 
81 #endif
82 #else
83 #define CmiMemoryReadFence(startPtr,nBytes) 
84 #endif
85
86 #endif
87
88 /*
89 #if CMK_SMP
90 #error  "PXSHM can only be used in non-smp build of Charm++"
91 #endif
92 */
93
94 /***************************************************************************************/
95
96 enum entities {SENDER,RECEIVER};
97
98 /************************
99  *      Implementation currently assumes that
100  *      1) all nodes have the same number of processors
101  *  2) in the nodelist all processors in a node are listed in sequence
102  *   0 1 2 3      4 5 6 7 
103  *   -------      -------
104  *    node 1       node 2 
105  ************************/
106
107 #define NAMESTRLEN 60
108 #define PREFIXSTRLEN 50 
109
110 static int SHMBUFLEN   = (1024*1024*4);
111 static int SHMMAXSIZE  = (1024*1024);
112
113 static int SENDQSTARTSIZE  =  256;
114
115
116 /// This struct is used as the first portion of a shared memory region, followed by data
117 typedef struct {
118         int count; //number of messages
119         int bytes; //number of bytes
120
121 #if PXSHM_OSSPINLOCK
122         OSSpinLock lock;
123 #endif
124
125 #if PXSHM_FENCE
126         volatile int flagSender;
127         CmiMemorySMPSeparation_t pad1;
128         volatile int flagReceiver;
129         CmiMemorySMPSeparation_t pad2;
130         volatile int turn;
131 #endif  
132
133 } sharedBufHeader;
134
135
136 typedef struct {
137 #if PXSHM_LOCK
138         sem_t *mutex;
139 #endif
140         sharedBufHeader *header;        
141         char *data;
142 } sharedBufData;
143
144 typedef struct OutgoingMsgRec
145 {
146   char *data;
147   int  *refcount;
148   int   size;
149 }
150 OutgoingMsgRec;
151
152 typedef struct {
153         int size; //total size of data array
154         int begin; //position of first element
155         int end;        //position of next element
156         int numEntries; //number of entries
157         int rank;       // for dest rank
158 #if SENDQ_LIST
159         int next;         // next dstrank of non-empty queue
160 #endif
161         OutgoingMsgRec *data;
162
163 } PxshmSendQ;
164
165 typedef struct {
166         int nodesize;
167         int noderank;
168         int nodestart,nodeend;//proc numbers for the start and end of this node
169         char prefixStr[PREFIXSTRLEN];
170         char **recvBufNames;
171         char **sendBufNames;
172
173         sharedBufData *recvBufs;
174         sharedBufData *sendBufs;
175
176         PxshmSendQ **sendQs;
177
178
179 #if PXSHM_STATS
180         int sendCount;
181         int validCheckCount;
182         int lockRecvCount;
183         double validCheckTime;
184         double sendTime;
185         double commServerTime;
186 #endif
187
188 } PxshmContext;
189
190 #if SENDQ_LIST
191 static int sendQ_head_index = -1;
192 #endif
193
194 PxshmContext *pxshmContext=NULL; //global context
195
196
197 void calculateNodeSizeAndRank(char **);
198 void setupSharedBuffers();
199 void initAllSendQs();
200
201 void CmiExitPxshm();
202
203 static void cleanupOnAllSigs(int signo)
204 {
205     CmiExitPxshm();
206 }
207
208 /******************
209  *      Initialization routine
210  *      currently just testing start up
211  * ****************/
212 void CmiInitPxshm(char **argv){
213         char *env;
214         MACHSTATE(3,"CminitPxshm start");
215
216         pxshmContext = (PxshmContext *)calloc(1,sizeof(PxshmContext));
217
218 #if CMK_NET_VERSION
219         if(Cmi_charmrun_pid <= 0){
220                 CmiAbort("pxshm must be run with charmrun");
221         }
222 #endif
223         calculateNodeSizeAndRank(argv);
224         if(pxshmContext->nodesize == 1) return;
225         
226         MACHSTATE1(3,"CminitPxshm  %d calculateNodeSizeAndRank",pxshmContext->nodesize);
227
228         env = getenv("CHARM_PXSHM_POOL_SIZE");
229         if (env) {
230             SHMBUFLEN = CmiReadSize(env);
231         }
232         env = getenv("CHARM_PXSHM_MESSAGE_MAX_SIZE");
233         if (env) {
234             SHMMAXSIZE = CmiReadSize(env);
235         }
236         if (SHMMAXSIZE > SHMBUFLEN)
237             CmiAbort("Error> Pxshm pool size is set too small in env variable CHARM_PXSHM_POOL_SIZE");
238
239         SENDQSTARTSIZE = 32 * pxshmContext->nodesize;
240
241         if (_Cmi_mynode == 0)
242             CmiPrintf("Charm++> pxshm enabled: %d cores per node, buffer size: %.1fMB\n", pxshmContext->nodesize, SHMBUFLEN/1024.0/1024.0);
243
244 #if CMK_CRAYXE
245         srand(getpid());
246         int Cmi_charmrun_pid = rand();
247         PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
248 #elif !CMK_NET_VERSION
249         #error "need a unique number"
250 #endif
251         snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
252
253         MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
254
255         setupSharedBuffers();
256
257         MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
258
259         initAllSendQs();
260         
261         MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
262
263         MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
264
265 #if PXSHM_STATS
266         pxshmContext->sendCount=0;
267         pxshmContext->sendTime=0.0;
268         pxshmContext->validCheckCount=0;
269         pxshmContext->validCheckTime=0.0;
270         pxshmContext->commServerTime = 0;
271         pxshmContext->lockRecvCount = 0;
272 #endif
273
274         signal(SIGSEGV, cleanupOnAllSigs);
275         signal(SIGFPE, cleanupOnAllSigs);
276         signal(SIGILL, cleanupOnAllSigs);
277         signal(SIGTERM, cleanupOnAllSigs);
278         signal(SIGABRT, cleanupOnAllSigs);
279         signal(SIGQUIT, cleanupOnAllSigs);
280         signal(SIGBUS, cleanupOnAllSigs);
281         signal(SIGINT, cleanupOnAllSigs);
282         signal(SIGTRAP, cleanupOnAllSigs);
283
284 #if 0
285         char name[64];
286         gethostname(name,64);
287         printf("[%d] name: %s\n", myrank, name);
288 #endif
289 };
290
291 /**************
292  * shutdown shmem objects and semaphores
293  *
294  * *******************/
295 static int pxshm_freed = 0;
296 void tearDownSharedBuffers();
297 void freeSharedBuffers();
298
299 void CmiExitPxshm(){
300         if (pxshmContext == NULL) return;
301         if(pxshmContext->nodesize != 1){
302                 int i;
303                 if (!pxshm_freed)
304                     tearDownSharedBuffers();
305         
306                 for(i=0;i<pxshmContext->nodesize;i++){
307                         if(i != pxshmContext->noderank){
308                                 break;
309                         }
310                 }
311                 free(pxshmContext->recvBufNames[i]);
312                 free(pxshmContext->sendBufNames[i]);
313
314                 free(pxshmContext->recvBufNames);
315                 free(pxshmContext->sendBufNames);
316
317                 free(pxshmContext->recvBufs);
318                 free(pxshmContext->sendBufs);
319
320         }
321 #if PXSHM_STATS
322 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime,pxshmContext->lockRecvCount);
323 #endif
324         free(pxshmContext);
325         pxshmContext = NULL;
326 }
327
328 /******************
329  *Should this message be sent using PxShm or not ?
330  * ***********************/
331
332 /* dstNode is node number */
333 inline 
334 static int CmiValidPxshm(int node, int size){
335 #if PXSHM_STATS
336         pxshmContext->validCheckCount++;
337 #endif
338
339 /*      if(pxshmContext->nodesize == 1){
340                 return 0;
341         }*/
342         //replace by bitmap later
343         //if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
344         return (node >= pxshmContext->nodestart && node <= pxshmContext->nodeend && size <= SHMMAXSIZE )? 1: 0;
345 };
346
347
348 inline int PxshmRank(int dstnode){
349         return dstnode - pxshmContext->nodestart;
350 }
351
352 inline void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount);
353 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
354 inline int flushSendQ(PxshmSendQ *q);
355
356 inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
357   return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
358 }
359
360 /***************
361  *
362  *Send this message through shared memory
363  *if you cannot get lock, put it in the sendQ
364  *Before sending messages pick them from sendQ
365  *
366  * ****************************/
367
368 void CmiSendMessagePxshm(char *msg, int size, int dstnode, int *refcount)
369 {
370 #if PXSHM_STATS
371         double _startSendTime = CmiWallTimer();
372 #endif
373
374         LrtsPrepareEnvelope(msg, size);
375         
376         int dstRank = PxshmRank(dstnode);
377         MEMDEBUG(CmiMemoryCheck());
378   
379 /*
380         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
381         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
382 */
383
384         CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
385         
386         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
387         PxshmSendQ *sendQ = pxshmContext->sendQs[dstRank];
388
389 #if PXSHM_OSSPINLOCK
390         if(! OSSpinLockTry(&dstBuf->header->lock)){
391 #elif PXSHM_LOCK
392         if(sem_trywait(dstBuf->mutex) < 0){
393 #elif PXSHM_FENCE
394         dstBuf->header->flagSender = 1;
395         dstBuf->header->turn = RECEIVER;
396         CmiMemoryReadFence(0,0);
397         CmiMemoryWriteFence(0,0);
398         //if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
399         if(dstBuf->header->flagReceiver){
400                 dstBuf->header->flagSender = 0;
401 #endif
402                 /**failed to get the lock 
403                 insert into q and retain the message*/
404 #if SENDQ_LIST
405                 if (sendQ->numEntries == 0 && sendQ->next == -2) {
406                     sendQ->next = sendQ_head_index;
407                     sendQ_head_index = dstRank;
408                 }
409 #endif
410                 pushSendQ(pxshmContext->sendQs[dstRank], msg, size, refcount);
411                 (*refcount)++;
412                 MEMDEBUG(CmiMemoryCheck());
413                 return;
414         }else{
415
416                 /***
417                  * We got the lock for this buffer
418                  * first write all the messages in the sendQ and then write this guy
419                  * */
420                  if(pxshmContext->sendQs[dstRank]->numEntries == 0){
421                         // send message user event
422                         int ret = sendMessage(msg,size,refcount,dstBuf,pxshmContext->sendQs[dstRank]);
423 #if SENDQ_LIST
424                         if (sendQ->numEntries > 0 && sendQ->next == -2)
425                         {
426                                 sendQ->next = sendQ_head_index;
427                                 sendQ_head_index = dstRank;
428                         }
429 #endif
430                         MACHSTATE(3,"Pxshm Send succeeded immediately");
431                  }else{
432                         (*refcount)+=2;/*this message should not get deleted when the queue is flushed*/
433                         pushSendQ(pxshmContext->sendQs[dstRank],msg,size,refcount);
434 //                      MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
435                         int sent = flushSendQ(sendQ);
436                         (*refcount)--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
437                         MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
438                  }
439                  /* unlock the recvbuffer*/
440
441 #if PXSHM_OSSPINLOCK
442                  OSSpinLockUnlock(&dstBuf->header->lock);
443 #elif PXSHM_LOCK
444                  sem_post(dstBuf->mutex);
445 #elif PXSHM_FENCE
446                  CmiMemoryReadFence(0,0);                       
447                  CmiMemoryWriteFence(0,0);
448                  dstBuf->header->flagSender = 0;
449 #endif
450         }
451 #if PXSHM_STATS
452                 pxshmContext->sendCount ++;
453                 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
454 #endif
455         MEMDEBUG(CmiMemoryCheck());
456
457 };
458
459 inline void emptyAllRecvBufs();
460 inline void flushAllSendQs();
461
462 /**********
463  * Extract all the messages from the recvBuffers you can
464  * Flush all sendQs
465  * ***/
466 inline void CommunicationServerPxshm(){
467         
468 #if PXSHM_STATS
469         double _startCommServerTime =CmiWallTimer();
470 #endif  
471         
472         MEMDEBUG(CmiMemoryCheck());
473         emptyAllRecvBufs();
474         flushAllSendQs();
475
476 #if PXSHM_STATS
477         pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
478 #endif
479
480         MEMDEBUG(CmiMemoryCheck());
481 };
482
483 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
484         CommunicationServerPxshm();
485 }
486
487
488 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
489 {
490         CmiNotifyStillIdle(s);
491 }
492
493
494 void calculateNodeSizeAndRank(char **argv){
495         pxshmContext->nodesize=1;
496         MACHSTATE(3,"calculateNodeSizeAndRank start");
497         //CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
498         CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
499         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
500
501         pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
502         
503         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
504         
505         pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
506         
507         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
508
509         pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
510
511         if(pxshmContext->nodeend >= _Cmi_numnodes){
512                 pxshmContext->nodeend = _Cmi_numnodes-1;
513                 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
514         }
515         
516         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
517 }
518
519 void allocBufNameStrings(char ***bufName);
520 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
521 /***************
522  *      calculate the name of the shared objects and semaphores
523  *      
524  *      name scheme
525  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
526  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
527  *                the semaphore name used by us is the same as the shared memory object name
528  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
529  *
530  *      open these shared objects and semaphores
531  * *********/
532 void setupSharedBuffers(){
533         int i=0;
534
535         allocBufNameStrings(&(pxshmContext->recvBufNames));
536         
537         MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
538         MEMDEBUG(CmiMemoryCheck());
539
540         allocBufNameStrings((&pxshmContext->sendBufNames));
541         
542         MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
543
544         for(i=0;i<pxshmContext->nodesize;i++){
545                 if(i != pxshmContext->noderank){
546                         snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
547                         MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
548                         snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
549                         MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
550                 }
551         }
552         
553         createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
554         createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
555         
556         for(i=0;i<pxshmContext->nodesize;i++){
557                 if(i != pxshmContext->noderank){
558                         //CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
559                         pxshmContext->sendBufs[i].header->count = 0;
560                         pxshmContext->sendBufs[i].header->bytes = 0;
561                 }
562         }
563
564         if (CmiBarrier() == 0) {
565             freeSharedBuffers();
566             pxshm_freed = 1;
567         }
568 }
569
570 void allocBufNameStrings(char ***bufName){
571         int i,count;
572         
573         int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
574         char *tmp = malloc(totalAlloc);
575         
576         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
577
578         *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
579         
580         for(i=0,count=0;i<pxshmContext->nodesize;i++){
581                 if(i != pxshmContext->noderank){
582                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
583                         count++;
584                 }else{
585                         (*bufName)[i] = NULL;
586                 }
587         }
588 }
589
590 void createShmObject(char *name,int size,char **pPtr);
591
592 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
593         int i=0;
594         
595         *bufs = (sharedBufData *)calloc(pxshmContext->nodesize, sizeof(sharedBufData));
596         
597         for(i=0;i<pxshmContext->nodesize;i++){
598                 if(i != pxshmContext->noderank){
599                         createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
600                         memset(((*bufs)[i].header), 0, SHMBUFLEN+sizeof(sharedBufHeader));
601                         (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
602 #if PXSHM_OSSPINLOCK
603                         (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
604 #elif PXSHM_LOCK
605                         (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
606 #endif
607                 }else{
608                         (*bufs)[i].header = NULL;
609                         (*bufs)[i].data = NULL;
610 #if PXSHM_LOCK
611                         (*bufs)[i].mutex = NULL;
612 #endif
613                 }
614         }       
615 }
616
617
618 void createShmObject(char *name,int size,char **pPtr){
619         int fd=-1;
620         int flags;      // opening flags for shared object
621         int open_repeat_count = 0;
622
623         flags= O_RDWR | O_CREAT; // open file in read-write mode and create it if its not there
624         
625         while(fd<0 && open_repeat_count < 100){
626           open_repeat_count++;
627           fd = shm_open(name,flags, S_IRUSR | S_IWUSR); // create the shared object with permissions for only the user to read and write
628           
629           if(fd < 0 && open_repeat_count > 10){
630             fprintf(stderr,"Error(attempt=%d) from shm_open %s while opening %s \n",open_repeat_count, strerror(errno),name);
631             fflush(stderr);
632           }
633         }
634
635         CmiAssert(fd >= 0);
636
637         ftruncate(fd,size); //set the size of the shared memory object
638
639         *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
640         CmiAssert(*pPtr != NULL);
641
642         close(fd);
643 }
644
645 void freeSharedBuffers(){
646         int i;
647         for(i= 0;i<pxshmContext->nodesize;i++){
648             if(i != pxshmContext->noderank){
649                 if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
650                     fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
651                 }
652                 shm_unlink(pxshmContext->sendBufNames[i]);
653 #if PXSHM_LOCK
654                 sem_unlink(pxshmContext->sendBufNames[i]);
655                 sem_unlink(pxshmContext->recvBufNames[i]);
656 #endif
657             }
658         }
659 };
660
661 void tearDownSharedBuffers(){
662         int i;
663         for(i= 0;i<pxshmContext->nodesize;i++){
664             if(i != pxshmContext->noderank){
665                 if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
666                     fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
667                 }
668                 shm_unlink(pxshmContext->sendBufNames[i]);
669 #if PXSHM_LOCK
670                 sem_close(pxshmContext->recvBufs[i].mutex);
671                 sem_close(pxshmContext->sendBufs[i].mutex);
672                 sem_unlink(pxshmContext->sendBufNames[i]);
673                 sem_unlink(pxshmContext->recvBufNames[i]);
674                 pxshmContext->recvBufs[i].mutex = NULL;
675                 pxshmContext->sendBufs[i].mutex = NULL;
676 #endif
677             }
678         }
679 };
680
681
682 void initSendQ(PxshmSendQ *q,int size,int rank);
683
684 void initAllSendQs(){
685         int i=0;
686         pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
687         for(i=0;i<pxshmContext->nodesize;i++){
688                 if(i != pxshmContext->noderank){
689                         (pxshmContext->sendQs)[i] = (PxshmSendQ *)calloc(1, sizeof(PxshmSendQ));
690                         initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE,i);
691                 }else{
692                         (pxshmContext->sendQs)[i] = NULL;
693                 }
694         }
695 };
696
697
698 /****************
699  *copy this message into the sharedBuf
700  If it does not succeed
701  *put it into the sendQ 
702  *NOTE: This method is called only after obtaining the corresponding mutex
703  * ********/
704 int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
705
706         if(dstBuf->header->bytes+size <= SHMBUFLEN){
707                 /**copy  this message to sharedBuf **/
708                 dstBuf->header->count++;
709                 memcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
710                 dstBuf->header->bytes += size;
711 //              MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
712                 CmiFree(msg);
713                 return 1;
714         }
715         /***
716          * Shared Buffer is too full for this message
717          * **/
718         //printf("[%d] send buffer is too full\n", CmiMyPe());
719         pushSendQ(dstSendQ,msg,size,refcount);
720         (*refcount)++;
721 //      MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
722         return 0;
723 }
724
725 inline OutgoingMsgRec* popSendQ(PxshmSendQ *q);
726
727 /****
728  *Try to send all the messages in the sendq to this destination rank
729  *NOTE: This method is called only after obtaining the corresponding mutex
730  * ************/
731
732 inline int flushSendQ(PxshmSendQ *dstSendQ){
733         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstSendQ->rank]);
734         int count=dstSendQ->numEntries;
735         int sent=0;
736         while(count > 0){
737                 OutgoingMsgRec *ogm = popSendQ(dstSendQ);
738                 (*ogm->refcount)--;
739                 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstSendQ->rank,ogm->refcount);
740                 int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
741                 if(ret==1){
742                         sent++;
743 #if CMK_NET_VERSION
744                         GarbageCollectMsg(ogm);
745 #endif
746                 }
747                 count--;
748         }
749         return sent;
750 }
751
752 inline void emptyRecvBuf(sharedBufData *recvBuf);
753
754 inline void emptyAllRecvBufs(){
755         int i;
756         for(i=0;i<pxshmContext->nodesize;i++){
757                 if(i != pxshmContext->noderank){
758                         sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
759                         if(recvBuf->header->count > 0){
760
761 #if PXSHM_STATS
762                                 pxshmContext->lockRecvCount++;
763 #endif
764
765 #if PXSHM_OSSPINLOCK
766                                 if(! OSSpinLockTry(&recvBuf->header->lock)){
767 #elif PXSHM_LOCK
768                                 if(sem_trywait(recvBuf->mutex) < 0){
769 #elif PXSHM_FENCE
770                                 recvBuf->header->flagReceiver = 1;
771                                 recvBuf->header->turn = SENDER;
772                                 CmiMemoryReadFence(0,0);
773                                 CmiMemoryWriteFence(0,0);
774                                 //if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
775                                 if((recvBuf->header->flagSender)){
776                                         recvBuf->header->flagReceiver = 0;
777 #endif
778                                 }else{
779
780
781                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
782                                         emptyRecvBuf(recvBuf);
783
784 #if PXSHM_OSSPINLOCK
785                                         OSSpinLockUnlock(&recvBuf->header->lock);
786 #elif PXSHM_LOCK
787                                         sem_post(recvBuf->mutex);
788 #elif PXSHM_FENCE
789                                         CmiMemoryReadFence(0,0);
790                                         CmiMemoryWriteFence(0,0);
791                                         recvBuf->header->flagReceiver = 0;
792 #endif
793
794                                 }
795                         
796                         }
797                 }
798         }
799 };
800
801 inline void flushAllSendQs(){
802         int i;
803 #if SENDQ_LIST
804         int index_prev = -1;
805
806         i =  sendQ_head_index;
807         while (i!= -1) {
808                 PxshmSendQ *sendQ = pxshmContext->sendQs[i];
809                 CmiAssert(i !=  pxshmContext->noderank);
810                 if(sendQ->numEntries > 0){
811 #else
812         for(i=0;i<pxshmContext->nodesize;i++) {
813                 if (i == pxshmContext->noderank) continue;
814                 PxshmSendQ *sendQ = pxshmContext->sendQs[i];
815                 if(sendQ->numEntries > 0) {
816 #endif
817         
818 #if PXSHM_OSSPINLOCK
819                         if(OSSpinLockTry(&pxshmContext->sendBufs[i].header->lock)){
820 #elif PXSHM_LOCK
821                         if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
822 #elif PXSHM_FENCE
823                         pxshmContext->sendBufs[i].header->flagSender = 1;
824                         pxshmContext->sendBufs[i].header->turn = RECEIVER;
825                         CmiMemoryReadFence(0,0);                        
826                         CmiMemoryWriteFence(0,0);
827                         if(!(pxshmContext->sendBufs[i].header->flagReceiver && pxshmContext->sendBufs[i].header->turn == RECEIVER)){
828 #endif
829
830                                 MACHSTATE1(3,"flushSendQ %d",i);
831                                 flushSendQ(sendQ);
832                                 
833 #if PXSHM_OSSPINLOCK    
834                                 OSSpinLockUnlock(&pxshmContext->sendBufs[i].header->lock);
835 #elif PXSHM_LOCK
836                                 sem_post(pxshmContext->sendBufs[i].mutex);
837 #elif PXSHM_FENCE
838                                 CmiMemoryReadFence(0,0);                        
839                                 CmiMemoryWriteFence(0,0);
840                                 pxshmContext->sendBufs[i].header->flagSender = 0;
841 #endif
842                         }else{
843
844 #if PXSHM_FENCE
845                           pxshmContext->sendBufs[i].header->flagSender = 0;
846 #endif                          
847
848                         }
849
850                 }        
851 #if SENDQ_LIST
852                 if (sendQ->numEntries == 0) {
853                     if (index_prev != -1)
854                         pxshmContext->sendQs[index_prev]->next = sendQ->next;
855                     else
856                         sendQ_head_index = sendQ->next;
857                     i = sendQ->next;
858                     sendQ->next = -2;
859                 }
860                 else {
861                     index_prev = i;
862                     i = sendQ->next;
863                 }
864 #endif
865         }       
866 };
867
868 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
869
870 void emptyRecvBuf(sharedBufData *recvBuf){
871         int numMessages = recvBuf->header->count;
872         int i=0;
873
874         char *ptr=recvBuf->data;
875
876         for(i=0;i<numMessages;i++){
877                 int size;
878                 int rank, srcpe, seqno, magic, i;
879                 unsigned int broot;
880                 char *msg = ptr;
881                 char *newMsg;
882
883 #if CMK_NET_VERSION
884                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
885                 size = CmiMsgHeaderGetLength(msg);
886 #else
887                 size = CmiGetMsgSize(msg);
888 #endif
889         
890                 newMsg = (char *)CmiAlloc(size);
891                 memcpy(newMsg,msg,size);
892
893 #if CMK_NET_VERSION
894                 handoverPxshmMessage(newMsg,size,rank,broot);
895 #else
896                 handleOneRecvedMsg(size, newMsg);
897 #endif
898                 
899                 ptr += size;
900
901                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
902         }
903 #if 1
904   if(ptr - recvBuf->data != recvBuf->header->bytes){
905                 CmiPrintf("[%d] ptr - recvBuf->data  %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
906         }
907 #endif
908         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
909         recvBuf->header->count=0;
910         recvBuf->header->bytes=0;
911 }
912
913
914 #if CMK_NET_VERSION
915 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
916         CmiAssert(rank == 0);
917 #if CMK_BROADCAST_SPANNING_TREE
918         if (rank == DGRAM_BROADCAST
919 #if CMK_NODE_QUEUE_AVAILABLE
920           || rank == DGRAM_NODEBROADCAST
921 #endif
922          ){
923                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
924                                         }
925 #elif CMK_BROADCAST_HYPERCUBE
926         if (rank == DGRAM_BROADCAST
927 #if CMK_NODE_QUEUE_AVAILABLE
928           || rank == DGRAM_NODEBROADCAST
929 #endif
930          ){
931                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
932                                         }
933 #endif
934
935                 switch (rank) {
936         case DGRAM_BROADCAST: {
937           CmiPushPE(0, newmsg);
938           break;
939       }
940         default:
941                                 {
942                                         
943           CmiPushPE(rank, newmsg);
944                                 }
945         }    /* end of switch */
946 }
947 #endif
948
949
950 /**************************
951  *sendQ helper functions
952  * ****************/
953
954 void initSendQ(PxshmSendQ *q,int size, int rank){
955         q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
956
957         q->size = size;
958         q->numEntries = 0;
959
960         q->begin = 0;
961         q->end = 0;
962
963         q->rank = rank;
964 #if SENDQ_LIST
965         q->next = -2;
966 #endif
967 }
968
969 void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount){
970         if(q->numEntries == q->size){
971                 //need to resize 
972                 OutgoingMsgRec *oldData = q->data;
973                 int newSize = q->size<<1;
974                 q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
975                 //copy head to the beginning of the new array
976                 CmiAssert(q->begin == q->end);
977
978                 CmiAssert(q->begin < q->size);
979                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
980
981                 if(q->end!=0){
982                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
983                 }
984                 free(oldData);
985                 q->begin = 0;
986                 q->end = q->size;
987                 q->size = newSize;
988         }
989         OutgoingMsgRec *omg = &q->data[q->end];
990         omg->size = size;
991         omg->data = msg;
992         omg->refcount = refcount;
993         (q->end)++;
994         if(q->end >= q->size){
995                 q->end -= q->size;
996         }
997         q->numEntries++;
998 }
999
1000 OutgoingMsgRec * popSendQ(PxshmSendQ *q){
1001         OutgoingMsgRec * ret;
1002         if(0 == q->numEntries){
1003                 return NULL;
1004         }
1005
1006         ret = &q->data[q->begin];
1007         (q->begin)++;
1008         if(q->begin >= q->size){
1009                 q->begin -= q->size;
1010         }
1011         
1012         q->numEntries--;
1013         return ret;
1014 }