Enable MPI_SMP.
[charm.git] / src / arch / shmem / machine.c
1 /** @file
2  * Shared memory machine layer
3  * @ingroup Machine
4  * This is a complete port, but could be made considerably more efficient
5  * by handling asynchronous messages correctly, ie. without doing
6  * an extra copy and synchronous send
7  * @{
8  */
9
10 #include <stdlib.h>
11 #include <unistd.h>
12 #include <malloc.h>
13 #include "converse.h"
14
15 #include "mem-arena.h"
16
17 #include CMK_SHMEM_H
18
19 #define USE_SWAP                                       0
20
21 /*
22  * Some constants
23  */
24 enum boolean {false = 0, true = 1};
25 enum {list_empty = -1 };
26
27
28 /*
29  * Local declarations for Cmi, used by common code
30  */
31 CpvDeclare(void*, CmiLocalQueue);
32 int _Cmi_mype;
33 int _Cmi_numpes;
34 int _Cmi_myrank;
35
36 /*
37  * Local queue functions, used by common code to store messages 
38  * to my own node efficiently.  These are used when 
39  * CMK_CMIDELIVERS_USE_COMMON_CODE is true.
40  */
41 /*
42  * Distributed list declarations.  This linked list goes across machines,
43  * storing all the messages for this node until this processor copies them
44  * into local memory.
45  */
46 typedef struct McDistListS
47 {
48   int nxt_node;
49   int msg_sz;
50   struct McMsgHdrS *nxt_addr;
51 } McDistList;
52
53 typedef struct McMsgHdrS
54 {
55   McDistList list_node;
56   enum {Unknown, Message, BcastMessage } msg_type;
57   enum boolean received_f;
58   union
59   {
60     struct McMsgHdrS *ptr;
61     int count;
62   } bcast;
63   int bcast_msg_size;
64   int handler;
65 } McMsgHdr;
66
67
68 /*
69  * Mc functions, used in machine.c only.
70  */
71 static void McInit();
72 static void McInitList();
73 static void McEnqueueRemote(void *msg, int msg_sz, int dst_pe);
74 static void McRetrieveRemote(void);
75 static void McCleanUpInTransit(void);
76
77 /*
78  * These declarations are for a local linked list to hold messages which
79  * have been copied into local memory.  It is a modified version of the
80  * Origin2000 code with the locks removed.
81  */
82 /* Allocation block size, to reduce num of mallocs */
83 #define BLK_LEN  512  
84
85 typedef struct McQueueS
86 {
87   void     **blk;
88   unsigned int blk_len;
89   unsigned int first;
90   unsigned int len;
91 } McQueue;
92
93 static McQueue *McQueueCreate(void);
94 static void McQueueAddToBack(McQueue *queue, void *element);
95 static void *McQueueRemoveFromFront(McQueue *queue);
96 static void *McQueueRemoveFromBack(McQueue *queue);
97
98 /*************************************************************
99  * static variable declarations
100  */
101 /*
102  *  Local queues used for mem management.
103  *
104  * These queues hold outgoing messages which will be picked up by
105  * receiver PEs.  Garbage collection works by scanning the 
106  * in_transit_queue for messages, freeing delivered ones, and moving
107  * others to in_transit_tmp_queue.  Then the pointers are swapped,
108  * so in_transit_queue contains all the undelivered messages, and
109  * in_transit_tmp_queue is empty.
110  */
111 static McQueue *in_transit_queue;
112 static McQueue *in_transit_tmp_queue;
113
114 /* tmp_queue is used to invert the order of incoming messages */
115 static McQueue *tmp_queue;  
116
117 /* received_queue holds all the messages which have been moved
118  * into local memory.  Messages are dequede from here.
119  */
120 static McQueue *received_queue;
121
122 /* received_token_queue saves incoming broadcast-message tokens,
123  * until McRetrieveRemote is done with them.
124  */
125 static McQueue *received_token_queue;
126
127 /* outgoing broadcast message queue, holds messages until all receivers have
128  * picked it up
129  */
130 static McQueue *broadcast_queue;
131 static McQueue *broadcast_tmp_queue;
132
133 /*
134  * head is the pointer to my next incoming message.
135  */
136 static McDistList head;
137
138 #if CMK_ARENA_MALLOC
139 /* lock allocated from symmetric heap */
140 static long *my_lock;
141 static long *head_lock;
142 static long *bcast_lock;
143 #else
144 /*
145  *  We require statically allocated variables for locks.  This defines
146  *  the max number of processors available.
147  */
148 #define MAX_PES 2048
149
150 /* Static variables are necessary for locks. */
151 static long *my_lock;
152 static long head_lock[MAX_PES];
153 static long bcast_lock[MAX_PES];
154 #endif
155
156 int McChecksum(char *msg, int size)
157 {
158   int chksm;
159   int i;
160
161   chksm=0xff;
162   for(i=0; i < size; i++)
163     chksm ^= *(msg+i);
164   return chksm;
165 }
166
167 void CmiPushPE(int pe,void *msg)
168 {
169   CmiAbort("Not implemented!");
170   /* McEnqueueRemote(msg,ALIGN8(size),pe); */
171 }
172
173 /**********************************************************************
174  *  CMI Functions START HERE
175  */
176
177
178 /**********************************************************************
179  * Cmi Message calls.  This implementation uses sync-type sends for
180  * everything.  An async interface would be efficient, and not difficult
181  * to add
182  */
183 void CmiSyncSendFn(int dest_pe, int size, char *msg)
184 {
185   McMsgHdr *dup_msg;
186
187   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
188   memcpy(dup_msg,msg,size);
189   dup_msg->msg_type = Message;
190
191   McRetrieveRemote();
192
193   if (dest_pe == _Cmi_mype)
194     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dup_msg);
195   else
196   {
197     McEnqueueRemote(dup_msg,ALIGN8(size),dest_pe); 
198   }
199   CQdCreate(CpvAccess(cQdState), 1);
200 }
201
202 CmiCommHandle CmiAsyncSendFn(int dest_pe, int size, char *msg)
203 {
204   CmiSyncSendFn(dest_pe, size, msg);
205   return 0;
206 }
207
208 void CmiFreeSendFn(int dest_pe, int size, char *msg)
209 {
210   /* No need to copy message, since we will immediately free it */
211   McRetrieveRemote();
212   ((McMsgHdr *)msg)->msg_type = Message;
213
214   if (dest_pe == _Cmi_mype)
215     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
216   else
217   {
218     McEnqueueRemote(msg,size,dest_pe); 
219   }
220   CQdCreate(CpvAccess(cQdState), 1);
221 }
222
223 void CmiSyncBroadcastFn(int size, char *msg)
224 {
225   int i;
226   McMsgHdr *dup_msg;
227   McMsgHdr bcast_msg_tok;
228   McMsgHdr *dup_tok;
229   int hdr_size;
230
231   /*
232    * Copy user's message, and set count to the correct number of recients
233    */
234   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
235   memcpy(dup_msg,msg,size);
236   dup_msg->bcast.count = _Cmi_numpes - 1;
237   /*
238   CmiPrintf("PE %d broadcast handler=%d\n",_Cmi_mype,dup_msg->handler);
239   */
240   /*
241    * Make the broadcast token point to the copied message
242    */
243   bcast_msg_tok.msg_type = BcastMessage;
244   bcast_msg_tok.bcast.ptr = dup_msg;
245   bcast_msg_tok.bcast_msg_size = size;
246
247   hdr_size = ALIGN8(sizeof(McMsgHdr));
248
249   /*
250    * Enqueue copies of the token message on other nodes.  This code should
251    * be similar to CmiSyncSend
252    */
253   for(i=0; i<_Cmi_numpes; i++)
254     if (i != _Cmi_mype)
255     {
256       dup_tok = (McMsgHdr *)CmiAlloc(ALIGN8(hdr_size));
257       memcpy(dup_tok,&bcast_msg_tok,hdr_size);
258       McEnqueueRemote(dup_tok,hdr_size,i); 
259     }
260   /*
261    * The token message will be deleted as a normal message,
262    * but the message being broadcast needs to be saved for future
263    * garbage collection.
264    */
265   McQueueAddToBack(broadcast_queue,dup_msg);
266   CQdCreate(CpvAccess(cQdState), _Cmi_numpes-1);
267 }
268
269 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
270 {
271   CmiSyncBroadcastFn(size,msg);
272   return 0;
273 }
274
275 void CmiFreeBroadcastFn(int size, char *msg)
276 {
277   CmiSyncBroadcastFn(size,msg);
278   CmiFree(msg);
279 }
280
281 void CmiSyncBroadcastAllFn(int size, char *msg)
282 {
283   int i;
284   CmiSyncBroadcastFn(size,msg);
285   CmiSyncSendFn(_Cmi_mype, size, msg);
286 }
287
288 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
289 {
290   CmiSyncBroadcastAllFn(size,msg);
291   return 0;
292 }
293
294 void CmiFreeBroadcastAllFn(int size, char *msg)
295 {
296   CmiSyncBroadcastAllFn(size,msg);
297   CmiFree(msg);
298 }
299
300
301 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg)
302 {
303   int i;
304   McMsgHdr *dup_msg;
305   McMsgHdr bcast_msg_tok;
306   McMsgHdr *dup_tok;
307   int hdr_size;
308   int n_remote_pes;
309
310   /*
311    * Count how many remote PEs, and send to the local PE if it is in the list
312    */
313   /*
314   CmiPrintf("CmiSyncListSendFn: size %d handler %d\n",
315             size,((McMsgHdr *)msg)->handler);
316   CmiPrintf("CmiSyncListSendFn: size %d npes %d\n",size,npes);
317   */
318   n_remote_pes = 0;
319   for (i=0; i < npes; i++)
320   {
321     if (pes[i] == _Cmi_mype)
322       CmiSyncSendFn(_Cmi_mype, size, msg);
323     else
324       n_remote_pes++;
325   }
326   if (n_remote_pes == 0)  // Nothing to do
327     return;
328   
329   /*
330    * Copy user's message, and set count to the correct number of recients
331    */
332   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
333   memcpy(dup_msg,msg,size);
334   dup_msg->bcast.count = n_remote_pes;
335   /*
336    * Make the broadcast token point to the copied message
337    */
338   bcast_msg_tok.msg_type = BcastMessage;
339   bcast_msg_tok.bcast.ptr = dup_msg;
340   bcast_msg_tok.bcast_msg_size = size;
341
342   hdr_size = ALIGN8(sizeof(McMsgHdr));
343
344   /*
345    * Enqueue copies of the token message on other nodes.  This code should
346    * be similar to CmiSyncSend
347    */
348   for(i=0; i<npes; i++)
349     if (pes[i] != _Cmi_mype)
350     {
351       dup_tok = (McMsgHdr *)CmiAlloc(ALIGN8(hdr_size));
352       memcpy(dup_tok,&bcast_msg_tok,hdr_size);
353       McEnqueueRemote(dup_tok,hdr_size,pes[i]); 
354       CQdCreate(CpvAccess(cQdState), 1);
355     }
356   /*
357    * The token message will be deleted as a normal message,
358    * but the message being broadcast needs to be saved for future
359    * garbage collection.
360    */
361   McQueueAddToBack(broadcast_queue,dup_msg);
362 }
363
364 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg)
365 {
366   CmiSyncListSendFn(npes, pes, size, msg);
367   return 0;
368 }
369
370 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg)
371 {
372   CmiSyncListSendFn(npes,pes,size,msg);
373   CmiFree(msg);
374 }
375
376 typedef struct {
377   char header[CmiMsgHeaderSizeBytes];
378   CmiGroup grp;
379   int size;
380   char *user_msg;
381 } McMultiMsg;
382
383 CpvDeclare(int,McMulticastWaitHandler);
384
385 void CmiSyncMulticastFn(CmiGroup grp, int size, char *msg)
386 {
387   int npes;
388   int *pes;
389   McMultiMsg multi_msg;
390   
391   /*
392   CmiPrintf("CmiSyncMulticastFn: size %d handler %d\n",
393             size,((McMsgHdr *)msg)->handler);
394    */
395   /*
396    *  Check for group, and busy-wait, if necessary, for group info
397    */
398   CmiLookupGroup(grp, &npes, &pes);
399   if (pes != 0)
400     CmiSyncListSendFn( npes, pes, size, msg);
401   else
402   {
403     multi_msg.grp = grp;
404     multi_msg.size = size;
405     multi_msg.user_msg = (char *) CmiAlloc(ALIGN8(size));
406     memcpy(multi_msg.user_msg,msg,size);
407
408     CmiSetHandler(&multi_msg,CpvAccess(McMulticastWaitHandler));
409     CmiSyncSendFn(CmiMyPe(),sizeof(McMultiMsg),(char *)&multi_msg);
410   }
411 }
412
413 void McMulticastWaitFn(McMultiMsg *msg)
414 {
415   CmiFreeMulticastFn(msg->grp,msg->size,msg->user_msg);
416 }
417
418 void CmiMulticastInit(void)
419 {
420   CpvInitialize(int,McMulticastWaitHandler);
421   CpvAccess(McMulticastWaitHandler) = CmiRegisterHandler(McMulticastWaitFn);
422 }
423
424 CmiCommHandle CmiAsyncMulticastFn(CmiGroup grp, int size, char *msg)
425 {
426   CmiSyncMulticastFn(grp, size, msg);
427   return 0;
428 }
429
430 void CmiFreeMulticastFn(CmiGroup grp, int size, char *msg)
431 {
432   CmiSyncMulticastFn(grp, size, msg);
433   CmiFree(msg);
434 }
435
436
437 /***********************************************************************
438  *
439  * Abort function:
440  *
441  ************************************************************************/
442
443 void CmiAbort(const char *message)
444 {
445   CmiError(message);
446   /* *(char*)NULL = 0; */
447   exit(1);
448 }
449
450 /**********************************************************************
451  * CMI utility functions for startup, shutdown, and other miscellaneous
452  * activities.
453  */
454
455 /*
456  * This port uses the common CmiDeliver code, so we only provide
457  * CmiGetNonLocal()
458  */
459 void *CmiGetNonLocal()
460 {
461   McRetrieveRemote();
462
463   return (void *)McQueueRemoveFromFront(received_queue);
464 }
465
466 static char     **Cmi_argv;
467 static char     **Cmi_argvcopy;
468 static CmiStartFn Cmi_startfn;   /* The start function */
469 static int        Cmi_usrsched;  /* Continue after start function finishes? */
470
471 static void CommunicationServerThread(int sleepTime)
472 {
473 #if CMK_SMP
474   CommunicationServer(sleepTime);
475 #endif
476 #if CMK_IMMEDIATE_MSG
477   CmiHandleImmediate();
478 #endif
479 }
480
481 void CmiNotifyIdle(void)
482 {
483   /* Use this opportunity to clean up the in_transit_queue */
484   McCleanUpInTransit();
485 }
486
487 static void CmiNotifyBeginIdle(void *s)
488 {
489   /* Use this opportunity to clean up the in_transit_queue */
490   McCleanUpInTransit();
491 }
492
493 static void CmiNotifyStillIdle(void *s)
494 {
495   McRetrieveRemote();
496 }
497
498 static void ConverseRunPE(int everReturn)
499 {
500   char** CmiMyArgv;
501
502   if (CmiMyRank())
503     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
504   else
505     CmiMyArgv=Cmi_argv;
506
507   CthInit(CmiMyArgv);
508
509   ConverseCommonInit(CmiMyArgv);
510
511   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
512       (CcdVoidFn) CmiNotifyBeginIdle, (void *) NULL);
513   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
514       (CcdVoidFn) CmiNotifyStillIdle, (void *) NULL);
515
516   /* communication thread */
517   if (CmiMyRank() == CmiMyNodeSize()) {
518     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
519     while (1) CommunicationServerThread(5);
520   }
521   else {  /* worker thread */
522   if (!everReturn) {
523     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
524     if (Cmi_usrsched==0) CsdScheduler(-1);
525     ConverseExit();
526   }
527   }
528 }
529
530 void arena_init();
531
532 void 
533 ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
534 {
535   Cmi_argvcopy = CmiCopyArgs(argv);
536   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
537
538   McInit();
539 #if CMK_ARENA_MALLOC
540   arena_init();
541 #endif
542
543   {
544   int debug = CmiGetArgFlag(argv,"++debug");
545   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
546   if (debug || debug_no_pause)
547   {   /*Pause so user has a chance to start and attach debugger*/
548 #if CMK_HAS_GETPID
549     printf("CHARMDEBUG> Processor %d has PID %d\n",CmiMyNode(),getpid());
550     fflush(stdout);
551     if (!debug_no_pause)
552       sleep(10);
553 #else
554     printf("++debug ignored.\n");
555 #endif
556   }
557   }
558
559   /* CmiStartThreads(argv); */
560   ConverseRunPE(initret);
561 }
562
563 void ConverseExit()
564 {
565 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
566   if (CmiMyPe() == 0){
567     CmiPrintf("End of program\n");
568   }
569 #endif
570   ConverseCommonExit();
571 #if CMK_CRAYXT || CMK_CRAYXE
572   shmem_finalize();
573 #endif
574   exit(0);
575 }
576
577 /* lock
578 */
579
580 #if CMK_SHMEM_LOCK
581
582 void set_lock(long *lock, int pe)
583 {
584   while (shmem_long_swap(lock, 1L, pe)) {
585     //shmem_long_wait(&tmp, 1);
586   }
587 //printf("set lock end: %ld. \n", *lock);
588 }
589
590 void clear_lock(long *lock, int pe)
591 {
592 //printf("[%d] clear lock on %d %ld. \n", _Cmi_mype, pe, *lock);
593   shmem_long_swap(lock, 0L, pe);
594 //  shmem_long_p(lock, 0L, pe);
595 //printf("clear lock end lock:%ld\n", *lock);
596 }
597
598 #else
599
600 #if 1
601 #define set_lock(lock, pe)  shmem_set_lock(lock)
602 #define clear_lock(lock, pe)  shmem_clear_lock(lock)
603 #else              /* for debugging */
604 void set_lock(long *lock, int pe)
605 {
606   //printf("[%d] set_lock %d %d\n", CmiMyPe(), pe, *lock);
607   shmem_set_lock(lock);
608   // while (shmem_test_lock(lock)) usleep(1);
609   //printf("[%d] after set_lock %d %d\n", CmiMyPe(), pe, *lock);
610 }
611 void clear_lock(long *lock, int pe)
612 {
613   //printf("[%d] free_lock %d %d\n", CmiMyPe(), pe, *lock);
614   shmem_clear_lock(lock);
615   //printf("[%d] after free_lock %d %d\n", CmiMyPe(), pe, *lock);
616 }
617 #endif
618 #endif
619
620 /**********************************************************************
621  * Mc Functions:
622  * Mc functions are used internally in machine.c only
623  */
624
625 static void McInit()
626 {
627   CMK_SHMEM_INIT;
628
629 #if CMK_CRAYXT || CMK_CRAYXE
630   _Cmi_mype = shmem_my_pe();
631   _Cmi_numpes = shmem_n_pes();
632 #else
633   _Cmi_mype = _my_pe();
634   _Cmi_numpes = _num_pes();
635 #endif
636   _Cmi_myrank = 0;
637
638   CpvInitialize(void *, CmiLocalQueue);
639   CpvAccess(CmiLocalQueue) = CdsFifo_Create();
640
641   McInitList();
642 }
643
644 static void McInitList()
645 {
646   int i;
647
648   received_queue = McQueueCreate();
649   tmp_queue = McQueueCreate();
650   received_token_queue = McQueueCreate();
651   broadcast_queue = McQueueCreate();
652   broadcast_tmp_queue = McQueueCreate();
653   in_transit_tmp_queue = McQueueCreate();
654   in_transit_queue = McQueueCreate();
655
656   head.nxt_node = list_empty;
657   head.nxt_addr = NULL;
658   head.msg_sz = 0;
659
660 #if CMK_ARENA_MALLOC
661   head_lock = shmalloc(sizeof(long)*_Cmi_numpes);
662   _MEMCHECK(head_lock);
663   bcast_lock = shmalloc(sizeof(long)*_Cmi_numpes);
664   _MEMCHECK(bcast_lock);
665 #else
666   if (_Cmi_numpes > MAX_PES)
667   {
668     CmiPrintf("Not enough processors allocated in machine.c.\n");
669     CmiPrintf("Change MAX_PES in machine.c to at least %d and recompile Converse\n",
670     _Cmi_numpes);
671   }
672 #endif
673   for(i=0; i < _Cmi_numpes; i++)
674   {
675     head_lock[i] = 0;
676     bcast_lock[i] = 0;
677   }
678   my_lock = &(head_lock[_Cmi_mype]);
679 /*
680   clear_lock(my_lock, _Cmi_mype);
681   clear_lock(&bcast_lock[_Cmi_mype], _Cmi_mype);
682 */
683   shmem_barrier_all();
684 }
685
686 static void McEnqueueRemote(void *msg, int msg_sz, int dst_pe)
687 {
688  /*
689   * To enqueue on a remote node, we should:
690   * 0. Free any delivered messages from the message_in_transit list.
691   * 1. Add message in the "message_in_transit" list
692   * 2. Fill in the fields in the message header
693   * 3. Lock the head pointer on the remote node.
694   * 4. Swap the list pointer with that on the other node.
695   * 5. Release lock
696   */
697
698   McDistList tmp_link;
699   McDistList *msg_link;
700
701   /*  CmiPrintf("PE %d outgoing msg = %d msg_type = %d size = %d dst_pe=%d\n",
702             _Cmi_mype,msg,((McMsgHdr *)msg)->msg_type,msg_sz, dst_pe); */
703   /* 0. Free any delivered messages from the in_transit_queue list. */
704   McCleanUpInTransit();
705
706   /* 1. Add message in the "in_transit_queue" list */
707   McQueueAddToBack(in_transit_queue,msg);
708
709   /* 2. Fill in the fields in the message header */
710   msg_link = &(((McMsgHdr *)msg)->list_node);
711   ((McMsgHdr *)msg)->received_f = false;
712
713   /* Set list fields to point back to this processor, this message.  */
714   tmp_link.nxt_node = _Cmi_mype;
715   tmp_link.nxt_addr = msg;
716   tmp_link.msg_sz = msg_sz;
717
718   /* 3. Lock the head pointer on the remote node.
719      Acquire lock on the destination queue.  If locks turn oout to
720      be inefficient, use fetch and increment to imp. lock
721    */
722
723   set_lock(&head_lock[dst_pe], dst_pe);
724
725
726   /* 4. Swap the list pointer with that on the other node.
727    */
728   /* First, get current head pointer, and stick it in this 
729    * message data area.
730    */
731 #if !USE_SWAP
732   shmem_int_get((int*)msg_link, (int*)&head, sizeof(McDistList)/sizeof(int), dst_pe);
733   /* Next, write the new message into the top of the list */
734   shmem_int_put((int*)&head, (int*)&tmp_link, sizeof(McDistList)/sizeof(int),dst_pe);
735   shmem_quiet();
736 #else
737   {
738   int i, n = sizeof(McDistList)/sizeof(int);
739   int *dst = (int*)&head;
740   int *src = (int*)&tmp_link;
741   int *olddst = (int*)msg_link;
742   for (i=0; i<n; i++)  olddst[i] = shmem_int_swap(dst+i, src[i], dst_pe);
743   }
744 #endif
745
746 #ifdef DEBUG
747   printf("[%d] Adding Message to pe %d\n",_Cmi_mype,dst_pe);
748   printf("[%d]   nxt_node = %d\n",_Cmi_mype,tmp_link.nxt_node);
749   printf("[%d]   nxt_addr = %x\n",_Cmi_mype,tmp_link.nxt_addr);
750   printf("[%d]   msg_sz = %x\n",_Cmi_mype,tmp_link.msg_sz);
751   printf("[%d] Old Message is now at %x\n",_Cmi_mype,msg_link);
752   printf("[%d]   nxt_node = %d\n",_Cmi_mype,msg_link->nxt_node);
753   printf("[%d]   nxt_addr = %x\n",_Cmi_mype,msg_link->nxt_addr);
754   printf("[%d]   msg_sz = %x\n",_Cmi_mype,msg_link->msg_sz);
755 #endif
756
757   /* 5. Release lock */
758   clear_lock(&head_lock[dst_pe], dst_pe);
759 }
760
761 static void McRetrieveRemote(void)
762 {
763   /*
764    * The local host should retrieve messages from the distributed list
765    * and put them in local memory, in a messages queue.
766    * Steps:
767    * 0) Lock list pointer.
768    * 1) Replace list pointer with NULL and unlock list
769    * 2) Get each message into local memory
770    * 3) Enqueue list into local message queue, in reverse order
771    */
772
773   McDistList list_head;
774   McDistList *cur_node;
775   McMsgHdr *cur_msg; 
776   int received_f;
777   enum boolean bcast_msg;
778   McMsgHdr *bcast_ptr;
779
780   /* Get the head of the list */
781
782   if (head.nxt_node == list_empty)  /* apparently there are no messages */
783     return;
784
785   /* 0) Lock list pointer. */
786   set_lock(my_lock, _Cmi_mype);
787
788   /* 1) Replace list pointer with NULL and unlock list */
789   list_head = head;
790   head.nxt_node = list_empty;
791   head.nxt_addr = NULL;
792   head.msg_sz = 0;
793   clear_lock(my_lock, _Cmi_mype);
794
795   /* 2) Get each message into local memory
796    * Start copying the messages into local memory, putting messages into
797    * a local list for future reversing.
798    */
799   cur_node = &list_head;
800   received_f = true;
801
802   while (cur_node->nxt_node != list_empty)
803   {
804     cur_msg = (McMsgHdr *)CmiAlloc(ALIGN8(cur_node->msg_sz));
805     if (cur_msg ==NULL)
806     {
807       CmiError("%s:%d Cannot Allocate Memory\n",__FILE__,__LINE__);
808       exit(1);
809     }
810
811     shmem_get64((long*)cur_msg, (long*)cur_node->nxt_addr,
812               ALIGN8(cur_node->msg_sz)/8, cur_node->nxt_node);
813
814     /*    CmiPrintf("PE %d incoming msg = %d msg_type = %d, size = %d\n",
815               _Cmi_mype,cur_msg,cur_msg->msg_type,cur_node->msg_sz);*/
816
817     /* If it is a broadcast message, retrieve the actual message */
818     if (cur_msg->msg_type == BcastMessage)
819     {
820
821       bcast_msg = true;
822       bcast_ptr = (McMsgHdr *)CmiAlloc(ALIGN8(cur_msg->bcast_msg_size));
823       set_lock(&bcast_lock[cur_node->nxt_node], cur_node->nxt_node);
824
825       /*
826       CmiPrintf(
827         "PE %d getting message from node %d at addr %d to %d, size=%d\n",
828         _Cmi_mype,cur_node->nxt_node,cur_msg->bcast.ptr,bcast_ptr,
829         cur_msg->bcast_msg_size
830         );
831         */
832       /* Get the message */
833       shmem_get64((long*)bcast_ptr,(long*)cur_msg->bcast.ptr,
834                 ALIGN8(cur_msg->bcast_msg_size)/8,cur_node->nxt_node);
835       /* Decrement the count, and write it back to the original node. */
836       /*
837       CmiPrintf(
838       "PE %d received broadcast message count=%d size=%d handler=%d\n",
839       _Cmi_mype,bcast_ptr->bcast.count,
840       cur_msg->bcast_msg_size,bcast_ptr->handler
841       );
842       */
843
844       bcast_ptr->bcast.count--;
845
846       shmem_int_put(&(cur_msg->bcast.ptr->bcast.count),
847                 &bcast_ptr->bcast.count,1,cur_node->nxt_node);
848       shmem_quiet();
849       clear_lock(&bcast_lock[cur_node->nxt_node], cur_node->nxt_node);
850     }
851     else bcast_msg = false;
852
853     /* Mark the remote message for future deletion */
854     shmem_int_put(&(cur_node->nxt_addr->received_f),&received_f,
855               1, cur_node->nxt_node);
856     shmem_quiet();
857
858     /* Add to list for reversing */
859     if (bcast_msg)
860     {
861       McQueueAddToBack(received_token_queue,cur_msg);
862       McQueueAddToBack(tmp_queue,bcast_ptr);
863     }
864     else 
865       McQueueAddToBack(tmp_queue,cur_msg);
866
867     /* Move pointer to next message */
868     cur_node = &(cur_msg->list_node);
869   }
870
871   /* 3) Enqueue list into local message queue, in reverse order */
872   while ((cur_msg = McQueueRemoveFromBack(tmp_queue)) != NULL)  {
873     McQueueAddToBack(received_queue,cur_msg);
874   }
875
876   /* 4) Delete broadcast-message tokens */
877   while ((cur_msg = McQueueRemoveFromBack(received_token_queue)) != NULL)  {
878     CmiFree(cur_msg);
879   }
880   return;
881 }
882
883 static void McCleanUpInTransit(void)
884 {
885   McMsgHdr *msg;
886   McQueue *swap_ptr;
887
888   /* Check broadcast message queue, to see if messages have been retrieved
889    */
890   while ((msg = (McMsgHdr *)McQueueRemoveFromFront(broadcast_queue)) 
891          != NULL)
892   {
893     if (msg->bcast.count == 0)
894     {
895       /* 
896          CmiPrintf("PE %d freeing broadcast message at %d\n",_Cmi_mype,msg);
897        */
898       CmiFree(msg);
899     }
900     else
901     {
902       McQueueAddToBack(broadcast_tmp_queue,msg);
903     }
904   }
905   /*
906    * swap queues, so tmp_queue is now empty, and in_transit_queue has
907    * only non-received messages.
908    */
909   swap_ptr = broadcast_tmp_queue;
910   broadcast_tmp_queue = broadcast_queue;
911   broadcast_queue = swap_ptr;
912
913   /* 
914    * Free received messages, and move others to tmp_queue.  Similar to
915    * above
916    */
917   while ((msg = (McMsgHdr *)McQueueRemoveFromFront(in_transit_queue)) 
918          != NULL)
919   {
920     if (msg->received_f)
921     {
922       CmiFree(msg);
923     }
924     else
925     {
926       McQueueAddToBack(in_transit_tmp_queue,msg);
927     }
928   }
929   /*
930    * swap queues, so tmp_queue is now empty, and in_transit_queue has
931    * only non-received messages.
932    */
933   swap_ptr = in_transit_tmp_queue;
934   in_transit_tmp_queue = in_transit_queue;
935   in_transit_queue = swap_ptr;
936 #ifdef DEBUG
937   CmiPrintf("[%d] done in_transit_queue = %d, tmp_queue = %d\n",
938         _Cmi_mype,in_transit_queue->len,in_transit_tmp_queue->len);
939 #endif
940 }
941
942 /*******************************************************************
943  * The following internal functions implements FIFO queues for
944  * messages in the local address space.  This is used for the
945  * received_queue, the in_transit_queue, and tmp_queue.  Code
946  * originally comes from the Origin2000 port, with modifications.
947  */
948 static void **McQueueAllocBlock(unsigned int len)
949 {
950   void ** blk;
951
952   blk=(void **)malloc(len*sizeof(void *));
953   if(blk==(void **)0) {
954     CmiError("Cannot Allocate Memory!\n");
955     abort();
956   }
957   return blk;
958 }
959
960 static void 
961 McQueueSpillBlock(void **srcblk, void **destblk, 
962              unsigned int first, unsigned int len)
963 {
964   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
965   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
966 }
967
968 static McQueue * McQueueCreate(void)
969 {
970   McQueue *queue;
971
972   queue = (McQueue *) malloc(sizeof(McQueue));
973   if(queue==(McQueue *)0) {
974     CmiError("Cannot Allocate Memory!\n");
975     abort();
976   }
977   queue->blk = McQueueAllocBlock(BLK_LEN);
978   queue->blk_len = BLK_LEN;
979   queue->first = 0;
980   queue->len = 0;
981   return queue;
982 }
983
984 int inside_comm = 0;
985
986 static void McQueueAddToBack(McQueue *queue, void *element)
987 {
988   inside_comm = 1;
989   if(queue->len==queue->blk_len) {
990     void **blk;
991
992     queue->blk_len *= 3;
993     blk = McQueueAllocBlock(queue->blk_len);
994     McQueueSpillBlock(queue->blk, blk, queue->first, queue->len);
995     free(queue->blk);
996     queue->blk = blk;
997     queue->first = 0;
998   }
999 #ifdef DEBUG
1000   CmiPrintf("[%d] Adding %x\n",_Cmi_mype,element);
1001 #endif
1002   queue->blk[(queue->first+queue->len++)%queue->blk_len] = element;
1003   inside_comm = 0;
1004 }
1005
1006 static void * McQueueRemoveFromBack(McQueue *queue)
1007 {
1008   void *element;
1009   element = (void *) 0;
1010   if(queue->len) {
1011     element = queue->blk[(queue->first+queue->len-1)%queue->blk_len];
1012     queue->len--;
1013   }
1014   return element;
1015 }
1016
1017 static void * McQueueRemoveFromFront(McQueue *queue)
1018 {
1019   void *element;
1020   element = (void *) 0;
1021   if(queue->len) {
1022     element = queue->blk[queue->first++];
1023     queue->first = (queue->first+queue->blk_len)%queue->blk_len;
1024     queue->len--;
1025   }
1026   return element;
1027 }
1028
1029 #if CMK_ARENA_MALLOC
1030
1031 static char *arena = NULL;
1032 static slotset *myss = NULL;
1033 static int slotsize = 1*1024;
1034
1035 typedef struct ArenaBlock {
1036       CmiInt8 slot;   /* First slot */
1037       CmiInt8 length; /* Length, in bytes*/
1038 } ArenaBlock;
1039
1040 /* Convert a heap block pointer to/from a CmiIsomallocBlock header */
1041 static void *block2pointer(ArenaBlock *blockHeader) {
1042         return (void *)(blockHeader+1);
1043 }
1044 static ArenaBlock *pointer2block(void *heapBlock) {
1045         return ((ArenaBlock *)heapBlock)-1;
1046 }
1047
1048 /* Return the number of slots in a block with n user data bytes */
1049 static int length2slots(int nBytes) {
1050         return (sizeof(ArenaBlock)+nBytes+slotsize-1)/slotsize;
1051 }
1052
1053 #define MAX_MEM    (64*1024*1024)        
1054
1055 void arena_init()
1056 {
1057   size_t maxmem = 0;
1058   int nslots;
1059   char *s;
1060 #if CMK_CRAYXT || CMK_CRAYXE
1061   if (s = getenv("XT_SYMMETRIC_HEAP_SIZE")) {
1062     size_t n=0;
1063     switch (s[strlen(s)-1]) {
1064     case 'G':  {
1065       sscanf(s, "%dG", &n);  n *= 1024*1024*1024; 
1066       break;
1067       }
1068     case 'M': {
1069       sscanf(s, "%dM", &n);  n *= 1024*1024; 
1070       break;
1071       }
1072     case 'K': {
1073       sscanf(s, "%dK", &n);  n *= 1024; 
1074       break;
1075       }
1076     default: {
1077       n =atoi(s);
1078       break;
1079       }
1080     }
1081     if (n>0) {
1082       n -= sizeof(long)*2*CmiNumPes();    /* for locks */
1083       n -= 2*1024*1024;                   /* less 2MB */
1084       if (n>0) maxmem = n;    /* a little less */
1085     }
1086   }
1087 #endif
1088   if (maxmem == 0) maxmem = MAX_MEM;
1089   if (CmiMyPe()==0) CmiPrintf("Charm++> Total of %dMB symmetric heap memory allocated.\n", maxmem/1024/1024);
1090   arena = shmalloc(maxmem);           /* global barrier */
1091   _MEMCHECK(arena);
1092   nslots = maxmem/slotsize;
1093   myss = new_slotset(0, nslots);
1094 }
1095
1096 void *arena_malloc(int size) 
1097 {   
1098         CmiInt8 s,n;
1099         ArenaBlock *blk;
1100         if (size==0) return NULL;
1101         n=length2slots(size);
1102         /*Always satisfy mallocs with local slots:*/
1103         s=get_slots(myss,n);
1104         if (s==-1) {
1105             CmiError("Not enough address space left in shmem on processor %d for %d bytes!\n", CmiMyPe(),size);
1106             CmiAbort("Out of symmetric heap space for arena_malloc");
1107         }
1108         grab_slots(myss,s,n);
1109         blk = (ArenaBlock*)(arena + s*slotsize);
1110         blk->slot=s;
1111         blk->length=size;
1112         return block2pointer(blk);
1113 }
1114
1115 void arena_free(void *blockPtr)
1116 {   
1117         ArenaBlock *blk;
1118         CmiInt8 s,n;
1119         if (blockPtr==NULL) return;
1120         blk = pointer2block(blockPtr);
1121         s=blk->slot;  
1122         n=length2slots(blk->length); 
1123         free_slots(myss, s, n);
1124 }
1125
1126 #endif
1127
1128 /*   Memory lock and unlock functions */
1129 /*      --- on T3E, no shared memory and quickthreads are used, so memory */
1130 /*          calls reentrant problem. these are only dummy functions */
1131
1132 static volatile int memflag;
1133 void CmiMemLock() { memflag=1; }
1134 void CmiMemUnlock() { memflag=0; }
1135
1136 int CmiBarrier()
1137 {
1138   return -1;
1139 }
1140
1141 int CmiBarrierZero()
1142 {
1143   return -1;
1144 }
1145
1146 /*@}*/