Simple bug fixed.
[charm.git] / src / arch / net / machine-pxshm.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * pxshm --> posix shared memory based network layer for communication
4  * between processes on the same node
5  * This is not going to be the primary mode of communication 
6  * but only for messages below a certain size between
7  * processes on the same node
8  * for non-smp version only
9  * * @ingroup NET
10  * contains only pxshm code for 
11  * - CmiInitPxshm()
12  * - DeliverViaPxShm()
13  * - CommunicationServerPxshm()
14  * - CmiMachineExitPxshm()
15
16   created by 
17         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
18 */
19
20 /**
21  * @addtogroup NET
22  * @{
23  */
24
25 #include <sys/types.h>
26 #include <sys/mman.h>
27 #include <unistd.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <errno.h>
31 #include <semaphore.h>
32
33 #define MEMDEBUG(x) //x
34
35 #define PXSHM_STATS 1
36
37 /************************
38  *      Implementation currently assumes that
39  *      1) all nodes have the same number of processors
40  *  2) in the nodelist all processors in a node are listed in sequence
41  *   0 1 2 3      4 5 6 7 
42  *   -------      -------
43  *    node 1       node 2 
44  ************************/
45
46 #define NAMESTRLEN 50
47 #define PREFIXSTRLEN 30 
48
49 #define SHMBUFLEN 1000000
50
51 #define SENDQSTARTSIZE 128
52
53 typedef struct {
54         int count; //number of messages
55         int bytes; //number of bytes
56 } sharedBufHeader;
57
58 typedef struct {
59         sem_t *mutex;
60         sharedBufHeader *header;        
61         char *data;
62 } sharedBufData;
63
64 typedef struct {
65         int size; //total size of data array
66         int begin; //position of first element
67         int end;        //position of next element
68         int numEntries; //number of entries
69
70         OutgoingMsg *data;
71
72 } PxshmSendQ;
73
74 typedef struct {
75         int nodesize;
76         int noderank;
77         int nodestart,nodeend;//proc numbers for the start and end of this node
78         char prefixStr[PREFIXSTRLEN];
79         char **recvBufNames;
80         char **sendBufNames;
81
82         sharedBufData *recvBufs;
83         sharedBufData *sendBufs;
84
85         PxshmSendQ **sendQs;
86
87
88 #if PXSHM_STATS
89         int sendCount;
90         int validCheckCount;
91         double validCheckTime;
92         double sendTime;
93         double commServerTime;
94 #endif
95
96 } PxshmContext;
97
98
99
100 PxshmContext *pxshmContext=NULL; //global context
101
102
103 void calculateNodeSizeAndRank(char **);
104 void setupSharedBuffers();
105 void initAllSendQs();
106
107 /******************
108  *      Initialization routine
109  *      currently just testing start up
110  * ****************/
111 void CmiInitPxshm(char **argv){
112         MACHSTATE(3,"CminitPxshm start");
113         pxshmContext = (PxshmContext *)malloc(sizeof(PxshmContext));
114
115         if(Cmi_charmrun_pid <= 0){
116                 CmiAbort("pxshm must be run with charmrun");
117         }
118         calculateNodeSizeAndRank(argv);
119         if(pxshmContext->nodesize == 1){
120                 return;
121         }
122         
123         MACHSTATE1(3,"CminitPxshm  %d calculateNodeSizeAndRank",pxshmContext->nodesize);
124
125         snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
126
127
128         MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
129
130         setupSharedBuffers();
131
132         MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
133
134
135         initAllSendQs();
136         
137         MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
138
139         MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
140
141
142 #if PXSHM_STATS
143         pxshmContext->sendCount=0;
144         pxshmContext->sendTime=0.0;
145         pxshmContext->validCheckCount=0;
146         pxshmContext->validCheckTime=0.0;
147         pxshmContext->commServerTime = 0;
148 #endif
149
150 };
151
152 /**************
153  * shutdown shmem objects and semaphores
154  *
155  * *******************/
156 void tearDownSharedBuffers();
157
158 void CmiExitPxshm(){
159         int i=0;
160         
161         if(pxshmContext->nodesize != 1){
162                 tearDownSharedBuffers();
163         
164                 for(i=0;i<pxshmContext->nodesize;i++){
165                         if(i != pxshmContext->noderank){
166                                 break;
167                         }
168                         free(pxshmContext->recvBufNames[i]);
169                         free(pxshmContext->sendBufNames[i]);
170                 }
171                 free(pxshmContext->recvBufNames);
172                 free(pxshmContext->sendBufNames);
173
174                 free(pxshmContext->recvBufs);
175                 free(pxshmContext->sendBufs);
176
177         }
178 #if PXSHM_STATS
179 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime);
180 #endif
181         free(pxshmContext);
182 }
183
184 /******************
185  *Should this message be sent using PxShm or not ?
186  * ***********************/
187
188 inline int CmiValidPxshm(OutgoingMsg ogm, OtherNode node){
189 #if PXSHM_STATS
190         pxshmContext->validCheckCount++;
191 #endif
192
193 /*      if(pxshmContext->nodesize == 1){
194                 return 0;
195         }*/
196         //replace by bitmap later
197         if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
198                 return 1;
199         }else{
200                 return 0;
201         }
202 };
203
204
205 inline int PxshmRank(int dst){
206         return dst - pxshmContext->nodestart;
207 }
208 inline void pushSendQ(PxshmSendQ *q,OutgoingMsg msg);
209 inline int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
210 inline int flushSendQ(int dstRank);
211
212 /***************
213  *
214  *Send this message through shared memory
215  *if you cannot get lock, put it in the sendQ
216  *Before sending messages pick them from sendQ
217  *
218  * ****************************/
219
220 void CmiSendMessagePxshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot){
221 #if PXSHM_STATS
222         double _startSendTime = CmiWallTimer();
223 #endif
224
225         int dstRank = PxshmRank(ogm->dst);
226         MEMDEBUG(CmiMemoryCheck());
227   
228         DgramHeaderMake(ogm->data,rank,ogm->src,Cmi_charmrun_pid,1, broot);
229         
230   
231         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
232
233         CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
234         
235         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
236
237         if(sem_trywait(dstBuf->mutex) < 0){
238
239                 /**failed to get the lock 
240                 insert into q and retain the message*/
241
242                 pushSendQ(pxshmContext->sendQs[dstRank],ogm);
243                 ogm->refcount++;
244                 MEMDEBUG(CmiMemoryCheck());
245                 return;
246         }else{
247
248                 /***
249                  * We got the lock for this buffer
250                  * first write all the messages in the sendQ and then write this guy
251                  * */
252                  if(pxshmContext->sendQs[dstRank]->numEntries == 0){
253                                 int ret = sendMessage(ogm,dstBuf,pxshmContext->sendQs[dstRank]);
254                                 MACHSTATE(3,"Pxshm Send succeeded immediately");
255                  }else{
256                                 ogm->refcount+=2;/*this message should not get deleted when the queue is flushed*/
257                                 pushSendQ(pxshmContext->sendQs[dstRank],ogm);
258                                 MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
259                                 int sent = flushSendQ(dstRank);
260                                 ogm->refcount--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
261                                 MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
262                  }
263                  /* unlock the recvbuffer*/
264
265                  sem_post(dstBuf->mutex);
266         }
267 #if PXSHM_STATS
268                 pxshmContext->sendCount ++;
269                 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
270 #endif
271         MEMDEBUG(CmiMemoryCheck());
272
273 };
274
275 inline void emptyAllRecvBufs();
276 inline void flushAllSendQs();
277
278 /**********
279  * Extract all the messages from the recvBuffers you can
280  * Flush all sendQs
281  * ***/
282 inline void CommunicationServerPxshm(){
283 #if PXSHM_STATS
284         double _startCommServerTime =CmiWallTimer();
285 #endif  
286         
287         MEMDEBUG(CmiMemoryCheck());
288         emptyAllRecvBufs();
289         flushAllSendQs();
290         
291 #if PXSHM_STATS
292         pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
293 #endif
294         MEMDEBUG(CmiMemoryCheck());
295 };
296
297 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
298         CommunicationServerPxshm();
299 }
300
301
302 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
303 {
304   CmiNotifyStillIdle(s);
305 }
306
307
308 void calculateNodeSizeAndRank(char **argv){
309         pxshmContext->nodesize=1;
310         MACHSTATE(3,"calculateNodeSizeAndRank start");
311         //CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
312         CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
313         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
314
315         pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
316         
317         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
318         
319         pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
320         
321         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
322
323         pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
324
325         if(pxshmContext->nodeend >= _Cmi_numnodes){
326                 pxshmContext->nodeend = _Cmi_numnodes-1;
327                 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
328         }
329         
330         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
331 }
332
333 void allocBufNameStrings(char ***bufName);
334 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
335 /***************
336  *      calculate the name of the shared objects and semaphores
337  *      
338  *      name scheme
339  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
340  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
341  *                the semaphore name used by us is the same as the shared memory object name
342  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
343  *
344  *      open these shared objects and semaphores
345  * *********/
346 void setupSharedBuffers(){
347         int i=0;
348
349         allocBufNameStrings(&(pxshmContext->recvBufNames));
350         
351         MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
352         MEMDEBUG(CmiMemoryCheck());
353
354         allocBufNameStrings((&pxshmContext->sendBufNames));
355         
356         MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
357
358         for(i=0;i<pxshmContext->nodesize;i++){
359                 if(i != pxshmContext->noderank){
360                         snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
361                         MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
362                         snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
363                         MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
364                 }
365         }
366         
367         createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
368         createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
369         
370         for(i=0;i<pxshmContext->nodesize;i++){
371                 if(i != pxshmContext->noderank){
372                         CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
373                         pxshmContext->sendBufs[i].header->count = 0;
374                         pxshmContext->sendBufs[i].header->bytes = 0;
375                 }
376         }
377 }
378
379 void allocBufNameStrings(char ***bufName){
380         int i,count;
381         
382         int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
383         char *tmp = malloc(totalAlloc);
384         
385         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
386
387         *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
388         
389         for(i=0,count=0;i<pxshmContext->nodesize;i++){
390                 if(i != pxshmContext->noderank){
391                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
392                         count++;
393                 }else{
394                         (*bufName)[i] = NULL;
395                 }
396         }
397 }
398
399 void createShmObject(char *name,int size,char **pPtr);
400
401 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
402         int i=0;
403         
404         *bufs = (sharedBufData *)malloc(sizeof(sharedBufData)*pxshmContext->nodesize);
405         
406         for(i=0;i<pxshmContext->nodesize;i++){
407                 if(i != pxshmContext->noderank){
408                         createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
409                         (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
410                         (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
411                 }else{
412                         (*bufs)[i].header = NULL;
413                         (*bufs)[i].data = NULL;
414                         (*bufs)[i].mutex = NULL;
415                 }
416         }       
417 }
418
419
420
421 void createShmObject(char *name,int size,char **pPtr){
422         int fd;
423         int flags;      // opening flags for shared object
424         
425         flags= O_RDWR | O_CREAT; // open file in read-write mode and create it if its not there
426         
427         fd = shm_open(name,flags, S_IRUSR | S_IWUSR); // create the shared object with permissions for only the user to read and write
428
429         if(fd < 0){
430                 fprintf(stderr,"Error from shm_open %s\n",strerror(errno));
431         }
432         CmiAssert(fd >= 0);
433
434         ftruncate(fd,size); //set the size of the shared memory object
435
436         *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
437         CmiAssert(*pPtr != NULL);
438
439         close(fd);
440 }
441
442 void tearDownSharedBuffers(){
443         int i;
444         for(i= 0;i<pxshmContext->nodesize;i++){
445                 if(i != pxshmContext->noderank){
446                         shm_unlink(pxshmContext->recvBufNames[i]);
447                         sem_close(pxshmContext->recvBufs[i].mutex);
448                         sem_unlink(pxshmContext->recvBufNames[i]);
449
450                         sem_close(pxshmContext->sendBufs[i].mutex);
451                 }
452         }
453 };
454
455
456 void initSendQ(PxshmSendQ *q,int size);
457
458 void initAllSendQs(){
459         int i=0;
460         pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
461         for(i=0;i<pxshmContext->nodesize;i++){
462                 if(i != pxshmContext->noderank){
463                         (pxshmContext->sendQs)[i] = (PxshmSendQ *)malloc(sizeof(PxshmSendQ));
464                         initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE);
465                 }else{
466                         (pxshmContext->sendQs)[i] = NULL;
467                 }
468         }
469 };
470
471
472 /****************
473  *copy this message into the sharedBuf
474  If it does not succeed
475  *put it into the sendQ 
476  *NOTE: This method is called only after obtaining the corresponding mutex
477  * ********/
478 int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
479
480         if(dstBuf->header->bytes+ogm->size <= SHMBUFLEN){
481                 /**copy  this message to sharedBuf **/
482                 dstBuf->header->count++;
483                 memcpy(dstBuf->data+dstBuf->header->bytes,ogm->data,ogm->size);
484                 dstBuf->header->bytes += ogm->size;
485                 MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
486                 return 1;
487         }
488         /***
489          * Shared Buffer is too full for this message
490          * **/
491         pushSendQ(dstSendQ,ogm);
492         ogm->refcount++;
493         MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
494         return 0;
495 }
496
497 inline OutgoingMsg popSendQ(PxshmSendQ *q);
498
499 /****
500  *Try to send all the messages in the sendq to this destination rank
501  *NOTE: This method is called only after obtaining the corresponding mutex
502  * ************/
503
504 inline int flushSendQ(int dstRank){
505         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
506         PxshmSendQ *dstSendQ = pxshmContext->sendQs[dstRank];
507         
508         int count=dstSendQ->numEntries;
509         int sent=0;
510         while(count > 0){
511                 OutgoingMsg ogm = popSendQ(dstSendQ);
512                 ogm->refcount--;
513                 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
514                 int ret = sendMessage(ogm,dstBuf,dstSendQ);
515                 if(ret==1){
516                         sent++;
517       GarbageCollectMsg(ogm);
518                 }
519                 count--;
520         }
521         return sent;
522 }
523
524 inline void emptyRecvBuf(sharedBufData *recvBuf);
525
526 inline void emptyAllRecvBufs(){
527         int i;
528
529         for(i=0;i<pxshmContext->nodesize;i++){
530                 if(i != pxshmContext->noderank){
531                         sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
532                         if(recvBuf->header->count > 0){
533                                 if(sem_trywait(recvBuf->mutex) < 0){
534                                 }else{
535                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
536                                         emptyRecvBuf(recvBuf);
537                                         sem_post(recvBuf->mutex);
538                                 }
539                         }
540                 }
541         }
542
543
544 };
545
546 inline void flushAllSendQs(){
547         int i=0;
548
549         for(i=0;i<pxshmContext->nodesize;i++){
550                 if(i != pxshmContext->noderank && pxshmContext->sendQs[i]->numEntries > 0){
551                         if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
552                                 MACHSTATE1(3,"flushSendQ %d",i);
553                                 flushSendQ(i);
554                                 sem_post(pxshmContext->sendBufs[i].mutex);
555                         }
556                 }        
557         }       
558 };
559
560 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
561
562 void emptyRecvBuf(sharedBufData *recvBuf){
563         int numMessages = recvBuf->header->count;
564         int i=0;
565         
566         char *ptr=recvBuf->data;
567
568         for(i=0;i<numMessages;i++){
569                 int size;
570                 int rank, srcpe, seqno, magic, i;
571                 unsigned int broot;
572                 char *msg = ptr;
573                 char *newMsg;
574
575                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
576                 size = CmiMsgHeaderGetLength(msg);
577         
578                 
579                 newMsg = (char *)CmiAlloc(size);
580                 memcpy(newMsg,msg,size);
581                 
582
583                 handoverPxshmMessage(newMsg,size,rank,broot);
584                 
585                 ptr += size;
586
587                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
588         }
589         
590         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
591
592         recvBuf->header->count=0;
593         recvBuf->header->bytes=0;
594 }
595
596
597 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
598         CmiAssert(rank == 0);
599 #if CMK_BROADCAST_SPANNING_TREE
600         if (rank == DGRAM_BROADCAST
601 #if CMK_NODE_QUEUE_AVAILABLE
602           || rank == DGRAM_NODEBROADCAST
603 #endif
604          ){
605                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
606                                         }
607 #elif CMK_BROADCAST_HYPERCUBE
608         if (rank == DGRAM_BROADCAST
609 #if CMK_NODE_QUEUE_AVAILABLE
610           || rank == DGRAM_NODEBROADCAST
611 #endif
612          ){
613                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
614                                         }
615 #endif
616
617                 switch (rank) {
618         case DGRAM_BROADCAST: {
619           CmiPushPE(0, newmsg);
620           break;
621       }
622         default:
623                                 {
624                                         
625           CmiPushPE(rank, newmsg);
626                                 }
627         }    /* end of switch */
628 }
629
630
631 /**************************
632  *sendQ helper functions
633  * ****************/
634
635 void initSendQ(PxshmSendQ *q,int size){
636         q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*size);
637
638         q->size = size;
639         q->numEntries = 0;
640
641         q->begin = 0;
642         q->end = 0;
643 }
644
645 void pushSendQ(PxshmSendQ *q,OutgoingMsg msg){
646         if(q->numEntries == q->size){
647                 //need to resize 
648                 OutgoingMsg *oldData = q->data;
649                 int newSize = q->size<<1;
650                 q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*newSize);
651                 //copy head to the beginning of the new array
652                 
653                 CmiAssert(q->begin == q->end);
654
655                 CmiAssert(q->begin < q->size);
656                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsg)*(q->size - q->begin));
657
658                 if(q->end != 0){
659                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsg)*(q->end));
660                 }
661                 free(oldData);
662                 q->begin = 0;
663                 q->end = q->size;
664                 q->size = newSize;
665         }
666         q->data[q->end] = msg;
667         (q->end)++;
668         if(q->end >= q->size){
669                 q->end -= q->size;
670         }
671         q->numEntries++;
672 }
673
674 OutgoingMsg popSendQ(PxshmSendQ *q){
675         OutgoingMsg ret;
676         if(0 == q->numEntries){
677                 return NULL;
678         }
679
680         ret = q->data[q->begin];
681         (q->begin)++;
682         if(q->begin >= q->size){
683                 q->begin -= q->size;
684         }
685         
686         q->numEntries--;
687         return ret;
688 }