6baaae11ed52178c554dd1b29a0f6a3a23b685bb
[charm.git] / src / arch / net / machine-pxshm.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * pxshm --> posix shared memory based network layer for communication
4  * between processes on the same node
5  * This is not going to be the primary mode of communication 
6  * but only for messages below a certain size between
7  * processes on the same node
8  * for non-smp version only
9  * * @ingroup NET
10  * contains only pxshm code for 
11  * - CmiInitPxshm()
12  * - DeliverViaPxShm()
13  * - CommunicationServerPxshm()
14  * - CmiMachineExitPxshm()
15
16   created by 
17         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
18 */
19
20 /**
21  * @addtogroup NET
22  * @{
23  */
24
25 #include <sys/types.h>
26 #include <sys/mman.h>
27 #include <unistd.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <errno.h>
31 #include <semaphore.h>
32
33 #define MEMDEBUG(x) //x
34
35 #define PXSHM_STATS 0
36
37 /************************
38  *      Implementation currently assumes that
39  *      1) all nodes have the same number of processors
40  *  2) in the nodelist all processors in a node are listed in sequence
41  *   0 1 2 3      4 5 6 7 
42  *   -------      -------
43  *    node 1       node 2 
44  ************************/
45
46 #define NAMESTRLEN 50
47 #define PREFIXSTRLEN 30 
48
49 #define SHMBUFLEN 1000000
50
51 #define SENDQSTARTSIZE 128
52
53 typedef struct {
54         int count; //number of messages
55         int bytes; //number of bytes
56 } sharedBufHeader;
57
58 typedef struct {
59         sem_t *mutex;
60         sharedBufHeader *header;        
61         char *data;
62 } sharedBufData;
63
64 typedef struct {
65         int size; //total size of data array
66         int begin; //position of first element
67         int end;        //position of next element
68         int numEntries; //number of entries
69
70         OutgoingMsg *data;
71
72 } PxshmSendQ;
73
74 typedef struct {
75         int nodesize;
76         int noderank;
77         int nodestart,nodeend;//proc numbers for the start and end of this node
78         char prefixStr[PREFIXSTRLEN];
79         char **recvBufNames;
80         char **sendBufNames;
81
82         sharedBufData *recvBufs;
83         sharedBufData *sendBufs;
84
85         PxshmSendQ **sendQs;
86
87
88 #if PXSHM_STATS
89         int sendCount;
90         int validCheckCount;
91         double validCheckTime;
92         double sendTime;
93         double commServerTime;
94 #endif
95
96 } PxshmContext;
97
98
99
100 PxshmContext *pxshmContext=NULL; //global context
101
102
103 void calculateNodeSizeAndRank(char **);
104 void setupSharedBuffers();
105 void initAllSendQs();
106
107 /******************
108  *      Initialization routine
109  *      currently just testing start up
110  * ****************/
111 void CmiInitPxshm(char **argv){
112         MACHSTATE(3,"CminitPxshm start");
113         pxshmContext = (PxshmContext *)malloc(sizeof(PxshmContext));
114
115         if(Cmi_charmrun_pid <= 0){
116                 CmiAbort("pxshm must be run with charmrun");
117         }
118         calculateNodeSizeAndRank(argv);
119         if(pxshmContext->nodesize == 1){
120                 return;
121         }
122         
123         MACHSTATE1(3,"CminitPxshm  %d calculateNodeSizeAndRank",pxshmContext->nodesize);
124
125         snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
126
127
128         MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
129
130         setupSharedBuffers();
131
132         MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
133
134
135         initAllSendQs();
136         
137         MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
138
139         MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
140
141
142 #if PXSHM_STATS
143         pxshmContext->sendCount=0;
144         pxshmContext->sendTime=0.0;
145         pxshmContext->validCheckCount=0;
146         pxshmContext->validCheckTime=0.0;
147         pxshmContext->commServerTime = 0;
148 #endif
149
150 };
151
152 /**************
153  * shutdown shmem objects and semaphores
154  *
155  * *******************/
156 void tearDownSharedBuffers();
157
158 void CmiExitPxshm(){
159         int i=0;
160         
161         if(pxshmContext->nodesize != 1){
162                 tearDownSharedBuffers();
163         
164                 for(i=0;i<pxshmContext->nodesize;i++){
165                         if(i != pxshmContext->noderank){
166                                 break;
167                         }
168                 }
169                 free(pxshmContext->recvBufNames[i]);
170                 free(pxshmContext->sendBufNames[i]);
171         
172                 free(pxshmContext->recvBufNames);
173                 free(pxshmContext->sendBufNames);
174
175                 free(pxshmContext->recvBufs);
176                 free(pxshmContext->sendBufs);
177
178         }
179 #if PXSHM_STATS
180         CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime);
181 #endif
182         free(pxshmContext);
183 }
184
185 /******************
186  *Should this message be sent using PxShm or not ?
187  * ***********************/
188
189 inline int CmiValidPxshm(OutgoingMsg ogm, OtherNode node){
190 #if PXSHM_STATS
191         pxshmContext->validCheckCount++;
192 #endif
193
194 /*      if(pxshmContext->nodesize == 1){
195                 return 0;
196         }*/
197         //replace by bitmap later
198         if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
199                 return 1;
200         }else{
201                 return 0;
202         }
203 };
204
205
206 inline int PxshmRank(int dst){
207         return dst - pxshmContext->nodestart;
208 }
209 inline void pushSendQ(PxshmSendQ *q,OutgoingMsg msg);
210 inline int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
211 inline int flushSendQ(int dstRank);
212
213 /***************
214  *
215  *Send this message through shared memory
216  *if you cannot get lock, put it in the sendQ
217  *Before sending messages pick them from sendQ
218  *
219  * ****************************/
220
221 void CmiSendMessagePxshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot){
222 #if PXSHM_STATS
223         double _startSendTime = CmiWallTimer();
224 #endif
225
226         int dstRank = PxshmRank(ogm->dst);
227         MEMDEBUG(CmiMemoryCheck());
228   
229         DgramHeaderMake(ogm->data,rank,ogm->src,Cmi_charmrun_pid,1, broot);
230         
231   
232         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
233
234         CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
235         
236         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
237         
238         if(sem_trywait(dstBuf->mutex) < 0){
239                 /**failed to get the lock 
240                 insert into q and retain the message*/
241
242                 pushSendQ(pxshmContext->sendQs[dstRank],ogm);
243                 ogm->refcount++;
244                 MEMDEBUG(CmiMemoryCheck());
245 #if PXSHM_STATS
246                 pxshmContext->sendCount ++;
247                 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
248 #endif
249                 return;
250         }else{
251                 /***
252                  * We got the lock for this buffer
253                  * first write all the messages in the sendQ and then write this guy
254                  * */
255                  if(pxshmContext->sendQs[dstRank]->numEntries == 0){
256                                 int ret = sendMessage(ogm,dstBuf,pxshmContext->sendQs[dstRank]);
257                                 MACHSTATE(3,"Pxshm Send succeeded immediately");
258                  }else{
259                                 ogm->refcount+=2;/*this message should not get deleted when the queue is flushed*/
260                                 pushSendQ(pxshmContext->sendQs[dstRank],ogm);
261                                 MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
262                                 int sent = flushSendQ(dstRank);
263                                 ogm->refcount--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
264                                 MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
265                  }
266                  /* unlock the recvbuffer*/
267                  sem_post(dstBuf->mutex);
268         }
269 #if PXSHM_STATS
270                 pxshmContext->sendCount ++;
271                 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
272 #endif
273         MEMDEBUG(CmiMemoryCheck());
274
275 };
276
277 inline void emptyAllRecvBufs();
278 inline void flushAllSendQs();
279
280 /**********
281  * Extract all the messages from the recvBuffers you can
282  * Flush all sendQs
283  * ***/
284 inline void CommunicationServerPxshm(){
285 #if PXSHM_STATS
286         double _startCommServerTime =CmiWallTimer();
287 #endif  
288         
289         MEMDEBUG(CmiMemoryCheck());
290         emptyAllRecvBufs();
291         flushAllSendQs();
292         
293 #if PXSHM_STATS
294         pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
295 #endif
296         MEMDEBUG(CmiMemoryCheck());
297 };
298
299 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
300         CommunicationServerPxshm();
301 }
302
303
304 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
305 {
306   CmiNotifyStillIdle(s);
307 }
308
309
310 void calculateNodeSizeAndRank(char **argv){
311         pxshmContext->nodesize=1;
312         MACHSTATE(3,"calculateNodeSizeAndRank start");
313         //CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
314         CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
315         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
316
317         pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
318         
319         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
320         
321         pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
322         
323         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
324
325         pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
326
327         if(pxshmContext->nodeend >= _Cmi_numnodes){
328                 pxshmContext->nodeend = _Cmi_numnodes-1;
329                 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
330         }
331         
332         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
333 }
334
335 void allocBufNameStrings(char ***bufName);
336 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
337 /***************
338  *      calculate the name of the shared objects and semaphores
339  *      
340  *      name scheme
341  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
342  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
343  *                the semaphore name used by us is the same as the shared memory object name
344  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
345  *
346  *      open these shared objects and semaphores
347  * *********/
348 void setupSharedBuffers(){
349         int i=0;
350
351         allocBufNameStrings(&(pxshmContext->recvBufNames));
352         
353         MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
354         MEMDEBUG(CmiMemoryCheck());
355
356         allocBufNameStrings((&pxshmContext->sendBufNames));
357         
358         MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
359
360         for(i=0;i<pxshmContext->nodesize;i++){
361                 if(i != pxshmContext->noderank){
362                         snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
363                         MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
364                         snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
365                         MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
366                 }
367         }
368         
369         createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
370         createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
371         
372         for(i=0;i<pxshmContext->nodesize;i++){
373                 if(i != pxshmContext->noderank){
374                         CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
375                         pxshmContext->sendBufs[i].header->count = 0;
376                         pxshmContext->sendBufs[i].header->bytes = 0;
377                 }
378         }
379 }
380
381 void allocBufNameStrings(char ***bufName){
382         int i,count;
383         
384         int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
385         char *tmp = malloc(totalAlloc);
386         
387         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
388
389         *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
390         
391         for(i=0,count=0;i<pxshmContext->nodesize;i++){
392                 if(i != pxshmContext->noderank){
393                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
394                         count++;
395                 }else{
396                         (*bufName)[i] = NULL;
397                 }
398         }
399 }
400
401 void createShmObject(char *name,int size,char **pPtr);
402
403 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
404         int i=0;
405         
406         *bufs = (sharedBufData *)malloc(sizeof(sharedBufData)*pxshmContext->nodesize);
407         
408         for(i=0;i<pxshmContext->nodesize;i++){
409                 if(i != pxshmContext->noderank){
410                         createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
411                         (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
412                         (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
413                 }else{
414                         (*bufs)[i].header = NULL;
415                         (*bufs)[i].data = NULL;
416                         (*bufs)[i].mutex = NULL;
417                 }
418         }       
419 }
420
421
422
423 void createShmObject(char *name,int size,char **pPtr){
424         int fd;
425         int flags;      // opening flags for shared object
426         
427         flags= O_RDWR | O_CREAT; // open file in read-write mode and create it if its not there
428         
429         fd = shm_open(name,flags, S_IRUSR | S_IWUSR); // create the shared object with permissions for only the user to read and write
430
431         if(fd < 0){
432                 fprintf(stderr,"Error from shm_open %s\n",strerror(errno));
433         }
434         CmiAssert(fd >= 0);
435
436         ftruncate(fd,size); //set the size of the shared memory object
437
438         *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
439         CmiAssert(*pPtr != NULL);
440
441         close(fd);
442 }
443
444 void tearDownSharedBuffers(){
445         int i;
446         for(i= 0;i<pxshmContext->nodesize;i++){
447                 if(i != pxshmContext->noderank){
448                         shm_unlink(pxshmContext->recvBufNames[i]);
449                         sem_close(pxshmContext->recvBufs[i].mutex);
450                         sem_unlink(pxshmContext->recvBufNames[i]);
451
452                         sem_close(pxshmContext->sendBufs[i].mutex);
453                 }
454         }
455 };
456
457
458 void initSendQ(PxshmSendQ *q,int size);
459
460 void initAllSendQs(){
461         int i=0;
462         pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
463         for(i=0;i<pxshmContext->nodesize;i++){
464                 if(i != pxshmContext->noderank){
465                         (pxshmContext->sendQs)[i] = (PxshmSendQ *)malloc(sizeof(PxshmSendQ));
466                         initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE);
467                 }else{
468                         (pxshmContext->sendQs)[i] = NULL;
469                 }
470         }
471 };
472
473
474 /****************
475  *copy this message into the sharedBuf
476  If it does not succeed
477  *put it into the sendQ 
478  *NOTE: This method is called only after obtaining the corresponding mutex
479  * ********/
480 int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
481
482         if(dstBuf->header->bytes+ogm->size <= SHMBUFLEN){
483                 /**copy  this message to sharedBuf **/
484                 dstBuf->header->count++;
485                 memcpy(dstBuf->data+dstBuf->header->bytes,ogm->data,ogm->size);
486                 dstBuf->header->bytes += ogm->size;
487                 MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
488                 return 1;
489         }
490         /***
491          * Shared Buffer is too full for this message
492          * **/
493         pushSendQ(dstSendQ,ogm);
494         ogm->refcount++;
495         MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
496         return 0;
497 }
498
499 inline OutgoingMsg popSendQ(PxshmSendQ *q);
500
501 /****
502  *Try to send all the messages in the sendq to this destination rank
503  *NOTE: This method is called only after obtaining the corresponding mutex
504  * ************/
505
506 inline int flushSendQ(int dstRank){
507         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
508         PxshmSendQ *dstSendQ = pxshmContext->sendQs[dstRank];
509         
510         int count=dstSendQ->numEntries;
511         int sent=0;
512         while(count > 0){
513                 OutgoingMsg ogm = popSendQ(dstSendQ);
514                 ogm->refcount--;
515                 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
516                 int ret = sendMessage(ogm,dstBuf,dstSendQ);
517                 if(ret==1){
518                         sent++;
519       GarbageCollectMsg(ogm);
520                 }
521                 count--;
522         }
523         return sent;
524 }
525
526 inline void emptyRecvBuf(sharedBufData *recvBuf);
527
528 inline void emptyAllRecvBufs(){
529         int i;
530
531         for(i=0;i<pxshmContext->nodesize;i++){
532                 if(i != pxshmContext->noderank){
533                         sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
534
535                         if(sem_trywait(recvBuf->mutex) < 0){
536                         }else{
537                                 MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
538                                 emptyRecvBuf(recvBuf);
539                                 sem_post(recvBuf->mutex);
540                         }
541                 }
542         }
543
544
545 };
546
547 inline void flushAllSendQs(){
548         int i=0;
549
550         for(i=0;i<pxshmContext->nodesize;i++){
551                 if(i != pxshmContext->noderank && pxshmContext->sendQs[i]->numEntries > 0){
552                         if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
553                                 MACHSTATE1(3,"flushSendQ %d",i);
554                                 flushSendQ(i);
555                                 sem_post(pxshmContext->sendBufs[i].mutex);
556                         }
557                 }
558         }
559         
560 };
561
562 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
563
564 void emptyRecvBuf(sharedBufData *recvBuf){
565         int numMessages = recvBuf->header->count;
566         int i=0;
567         
568         char *ptr=recvBuf->data;
569
570         for(i=0;i<numMessages;i++){
571                 int size;
572                 int rank, srcpe, seqno, magic, i;
573                 unsigned int broot;
574                 char *msg = ptr;
575                 char *newMsg;
576
577                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
578                 size = CmiMsgHeaderGetLength(msg);
579         
580                 
581                 newMsg = (char *)CmiAlloc(size);
582                 memcpy(newMsg,msg,size);
583                 
584
585                 handoverPxshmMessage(newMsg,size,rank,broot);
586                 
587                 ptr += size;
588
589                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
590         }
591         
592         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
593
594         recvBuf->header->count=0;
595         recvBuf->header->bytes=0;
596 }
597
598
599 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
600         CmiAssert(rank == 0);
601 #if CMK_BROADCAST_SPANNING_TREE
602         if (rank == DGRAM_BROADCAST
603 #if CMK_NODE_QUEUE_AVAILABLE
604           || rank == DGRAM_NODEBROADCAST
605 #endif
606          ){
607                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
608                                         }
609 #elif CMK_BROADCAST_HYPERCUBE
610         if (rank == DGRAM_BROADCAST
611 #if CMK_NODE_QUEUE_AVAILABLE
612           || rank == DGRAM_NODEBROADCAST
613 #endif
614          ){
615                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
616                                         }
617 #endif
618
619                 switch (rank) {
620         case DGRAM_BROADCAST: {
621           CmiPushPE(0, newmsg);
622           break;
623       }
624         default:
625                                 {
626                                         
627           CmiPushPE(rank, newmsg);
628                                 }
629         }    /* end of switch */
630 }
631
632
633 /**************************
634  *sendQ helper functions
635  * ****************/
636
637 void initSendQ(PxshmSendQ *q,int size){
638         q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*size);
639
640         q->size = size;
641         q->numEntries = 0;
642
643         q->begin = 0;
644         q->end = 0;
645 }
646
647 void pushSendQ(PxshmSendQ *q,OutgoingMsg msg){
648         if(q->numEntries == q->size){
649                 //need to resize 
650                 OutgoingMsg *oldData = q->data;
651                 int newSize = q->size<<1;
652                 q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*newSize);
653                 //copy head to the beginning of the new array
654                 
655                 CmiAssert(q->begin == q->end);
656
657                 CmiAssert(q->begin < q->size);
658                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsg)*(q->size - q->begin));
659
660                 if(q->end != 0){
661                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsg)*(q->end));
662                 }
663                 free(oldData);
664                 q->begin = 0;
665                 q->end = q->size;
666                 q->size = newSize;
667         }
668         q->data[q->end] = msg;
669         (q->end)++;
670         if(q->end >= q->size){
671                 q->end -= q->size;
672         }
673         q->numEntries++;
674 }
675
676 OutgoingMsg popSendQ(PxshmSendQ *q){
677         OutgoingMsg ret;
678         if(0 == q->numEntries){
679                 return NULL;
680         }
681
682         ret = q->data[q->begin];
683         (q->begin)++;
684         if(q->begin >= q->size){
685                 q->begin -= q->size;
686         }
687         
688         q->numEntries--;
689         return ret;
690 }