Changed the interOperate to a global variable so that it is initialized from
[charm.git] / src / arch / shmem / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * Shared memory machine layer
10  * @ingroup Machine
11  * This is a complete port, but could be made considerably more efficient
12  * by handling asynchronous messages correctly, ie. without doing
13  * an extra copy and synchronous send
14  * @{
15  */
16
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <malloc.h>
20 #include "converse.h"
21
22 #include "mem-arena.h"
23
24 #include CMK_SHMEM_H
25
26 #define USE_SWAP                                       0
27
28 /*
29  * Some constants
30  */
31 enum boolean {false = 0, true = 1};
32 enum {list_empty = -1 };
33
34
35 /*
36  * Local declarations for Cmi, used by common code
37  */
38 CpvDeclare(void*, CmiLocalQueue);
39 int _Cmi_mype;
40 int _Cmi_numpes;
41 int _Cmi_myrank;
42
43 /*
44  * Local queue functions, used by common code to store messages 
45  * to my own node efficiently.  These are used when 
46  * CMK_CMIDELIVERS_USE_COMMON_CODE is true.
47  */
48 /*
49  * Distributed list declarations.  This linked list goes across machines,
50  * storing all the messages for this node until this processor copies them
51  * into local memory.
52  */
53 typedef struct McDistListS
54 {
55   int nxt_node;
56   int msg_sz;
57   struct McMsgHdrS *nxt_addr;
58 } McDistList;
59
60 typedef struct McMsgHdrS
61 {
62   McDistList list_node;
63   enum {Unknown, Message, BcastMessage } msg_type;
64   enum boolean received_f;
65   union
66   {
67     struct McMsgHdrS *ptr;
68     int count;
69   } bcast;
70   int bcast_msg_size;
71   int handler;
72 } McMsgHdr;
73
74
75 /*
76  * Mc functions, used in machine.c only.
77  */
78 static void McInit();
79 static void McInitList();
80 static void McEnqueueRemote(void *msg, int msg_sz, int dst_pe);
81 static void McRetrieveRemote(void);
82 static void McCleanUpInTransit(void);
83
84 /*
85  * These declarations are for a local linked list to hold messages which
86  * have been copied into local memory.  It is a modified version of the
87  * Origin2000 code with the locks removed.
88  */
89 /* Allocation block size, to reduce num of mallocs */
90 #define BLK_LEN  512  
91
92 typedef struct McQueueS
93 {
94   void     **blk;
95   unsigned int blk_len;
96   unsigned int first;
97   unsigned int len;
98 } McQueue;
99
100 static McQueue *McQueueCreate(void);
101 static void McQueueAddToBack(McQueue *queue, void *element);
102 static void *McQueueRemoveFromFront(McQueue *queue);
103 static void *McQueueRemoveFromBack(McQueue *queue);
104
105 /*************************************************************
106  * static variable declarations
107  */
108 /*
109  *  Local queues used for mem management.
110  *
111  * These queues hold outgoing messages which will be picked up by
112  * receiver PEs.  Garbage collection works by scanning the 
113  * in_transit_queue for messages, freeing delivered ones, and moving
114  * others to in_transit_tmp_queue.  Then the pointers are swapped,
115  * so in_transit_queue contains all the undelivered messages, and
116  * in_transit_tmp_queue is empty.
117  */
118 static McQueue *in_transit_queue;
119 static McQueue *in_transit_tmp_queue;
120
121 /* tmp_queue is used to invert the order of incoming messages */
122 static McQueue *tmp_queue;  
123
124 /* received_queue holds all the messages which have been moved
125  * into local memory.  Messages are dequede from here.
126  */
127 static McQueue *received_queue;
128
129 /* received_token_queue saves incoming broadcast-message tokens,
130  * until McRetrieveRemote is done with them.
131  */
132 static McQueue *received_token_queue;
133
134 /* outgoing broadcast message queue, holds messages until all receivers have
135  * picked it up
136  */
137 static McQueue *broadcast_queue;
138 static McQueue *broadcast_tmp_queue;
139
140 /*
141  * head is the pointer to my next incoming message.
142  */
143 static McDistList head;
144
145 #if CMK_ARENA_MALLOC
146 /* lock allocated from symmetric heap */
147 static long *my_lock;
148 static long *head_lock;
149 static long *bcast_lock;
150 #else
151 /*
152  *  We require statically allocated variables for locks.  This defines
153  *  the max number of processors available.
154  */
155 #define MAX_PES 2048
156
157 /* Static variables are necessary for locks. */
158 static long *my_lock;
159 static long head_lock[MAX_PES];
160 static long bcast_lock[MAX_PES];
161 #endif
162
163 int McChecksum(char *msg, int size)
164 {
165   int chksm;
166   int i;
167
168   chksm=0xff;
169   for(i=0; i < size; i++)
170     chksm ^= *(msg+i);
171   return chksm;
172 }
173
174 void CmiPushPE(int pe,void *msg)
175 {
176   CmiAbort("Not implemented!");
177   /* McEnqueueRemote(msg,ALIGN8(size),pe); */
178 }
179
180 /**********************************************************************
181  *  CMI Functions START HERE
182  */
183
184
185 /**********************************************************************
186  * Cmi Message calls.  This implementation uses sync-type sends for
187  * everything.  An async interface would be efficient, and not difficult
188  * to add
189  */
190 void CmiSyncSendFn(int dest_pe, int size, char *msg)
191 {
192   McMsgHdr *dup_msg;
193
194   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
195   memcpy(dup_msg,msg,size);
196   dup_msg->msg_type = Message;
197
198   McRetrieveRemote();
199
200   if (dest_pe == _Cmi_mype)
201     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dup_msg);
202   else
203   {
204     McEnqueueRemote(dup_msg,ALIGN8(size),dest_pe); 
205   }
206   CQdCreate(CpvAccess(cQdState), 1);
207 }
208
209 CmiCommHandle CmiAsyncSendFn(int dest_pe, int size, char *msg)
210 {
211   CmiSyncSendFn(dest_pe, size, msg);
212   return 0;
213 }
214
215 void CmiFreeSendFn(int dest_pe, int size, char *msg)
216 {
217   /* No need to copy message, since we will immediately free it */
218   McRetrieveRemote();
219   ((McMsgHdr *)msg)->msg_type = Message;
220
221   if (dest_pe == _Cmi_mype)
222     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
223   else
224   {
225     McEnqueueRemote(msg,size,dest_pe); 
226   }
227   CQdCreate(CpvAccess(cQdState), 1);
228 }
229
230 void CmiSyncBroadcastFn(int size, char *msg)
231 {
232   int i;
233   McMsgHdr *dup_msg;
234   McMsgHdr bcast_msg_tok;
235   McMsgHdr *dup_tok;
236   int hdr_size;
237
238   /*
239    * Copy user's message, and set count to the correct number of recients
240    */
241   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
242   memcpy(dup_msg,msg,size);
243   dup_msg->bcast.count = _Cmi_numpes - 1;
244   /*
245   CmiPrintf("PE %d broadcast handler=%d\n",_Cmi_mype,dup_msg->handler);
246   */
247   /*
248    * Make the broadcast token point to the copied message
249    */
250   bcast_msg_tok.msg_type = BcastMessage;
251   bcast_msg_tok.bcast.ptr = dup_msg;
252   bcast_msg_tok.bcast_msg_size = size;
253
254   hdr_size = ALIGN8(sizeof(McMsgHdr));
255
256   /*
257    * Enqueue copies of the token message on other nodes.  This code should
258    * be similar to CmiSyncSend
259    */
260   for(i=0; i<_Cmi_numpes; i++)
261     if (i != _Cmi_mype)
262     {
263       dup_tok = (McMsgHdr *)CmiAlloc(ALIGN8(hdr_size));
264       memcpy(dup_tok,&bcast_msg_tok,hdr_size);
265       McEnqueueRemote(dup_tok,hdr_size,i); 
266     }
267   /*
268    * The token message will be deleted as a normal message,
269    * but the message being broadcast needs to be saved for future
270    * garbage collection.
271    */
272   McQueueAddToBack(broadcast_queue,dup_msg);
273   CQdCreate(CpvAccess(cQdState), _Cmi_numpes-1);
274 }
275
276 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
277 {
278   CmiSyncBroadcastFn(size,msg);
279   return 0;
280 }
281
282 void CmiFreeBroadcastFn(int size, char *msg)
283 {
284   CmiSyncBroadcastFn(size,msg);
285   CmiFree(msg);
286 }
287
288 void CmiSyncBroadcastAllFn(int size, char *msg)
289 {
290   int i;
291   CmiSyncBroadcastFn(size,msg);
292   CmiSyncSendFn(_Cmi_mype, size, msg);
293 }
294
295 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
296 {
297   CmiSyncBroadcastAllFn(size,msg);
298   return 0;
299 }
300
301 void CmiFreeBroadcastAllFn(int size, char *msg)
302 {
303   CmiSyncBroadcastAllFn(size,msg);
304   CmiFree(msg);
305 }
306
307
308 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg)
309 {
310   int i;
311   McMsgHdr *dup_msg;
312   McMsgHdr bcast_msg_tok;
313   McMsgHdr *dup_tok;
314   int hdr_size;
315   int n_remote_pes;
316
317   /*
318    * Count how many remote PEs, and send to the local PE if it is in the list
319    */
320   /*
321   CmiPrintf("CmiSyncListSendFn: size %d handler %d\n",
322             size,((McMsgHdr *)msg)->handler);
323   CmiPrintf("CmiSyncListSendFn: size %d npes %d\n",size,npes);
324   */
325   n_remote_pes = 0;
326   for (i=0; i < npes; i++)
327   {
328     if (pes[i] == _Cmi_mype)
329       CmiSyncSendFn(_Cmi_mype, size, msg);
330     else
331       n_remote_pes++;
332   }
333   if (n_remote_pes == 0)  // Nothing to do
334     return;
335   
336   /*
337    * Copy user's message, and set count to the correct number of recients
338    */
339   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
340   memcpy(dup_msg,msg,size);
341   dup_msg->bcast.count = n_remote_pes;
342   /*
343    * Make the broadcast token point to the copied message
344    */
345   bcast_msg_tok.msg_type = BcastMessage;
346   bcast_msg_tok.bcast.ptr = dup_msg;
347   bcast_msg_tok.bcast_msg_size = size;
348
349   hdr_size = ALIGN8(sizeof(McMsgHdr));
350
351   /*
352    * Enqueue copies of the token message on other nodes.  This code should
353    * be similar to CmiSyncSend
354    */
355   for(i=0; i<npes; i++)
356     if (pes[i] != _Cmi_mype)
357     {
358       dup_tok = (McMsgHdr *)CmiAlloc(ALIGN8(hdr_size));
359       memcpy(dup_tok,&bcast_msg_tok,hdr_size);
360       McEnqueueRemote(dup_tok,hdr_size,pes[i]); 
361       CQdCreate(CpvAccess(cQdState), 1);
362     }
363   /*
364    * The token message will be deleted as a normal message,
365    * but the message being broadcast needs to be saved for future
366    * garbage collection.
367    */
368   McQueueAddToBack(broadcast_queue,dup_msg);
369 }
370
371 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg)
372 {
373   CmiSyncListSendFn(npes, pes, size, msg);
374   return 0;
375 }
376
377 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg)
378 {
379   CmiSyncListSendFn(npes,pes,size,msg);
380   CmiFree(msg);
381 }
382
383 typedef struct {
384   char header[CmiMsgHeaderSizeBytes];
385   CmiGroup grp;
386   int size;
387   char *user_msg;
388 } McMultiMsg;
389
390 CpvDeclare(int,McMulticastWaitHandler);
391
392 void CmiSyncMulticastFn(CmiGroup grp, int size, char *msg)
393 {
394   int npes;
395   int *pes;
396   McMultiMsg multi_msg;
397   
398   /*
399   CmiPrintf("CmiSyncMulticastFn: size %d handler %d\n",
400             size,((McMsgHdr *)msg)->handler);
401    */
402   /*
403    *  Check for group, and busy-wait, if necessary, for group info
404    */
405   CmiLookupGroup(grp, &npes, &pes);
406   if (pes != 0)
407     CmiSyncListSendFn( npes, pes, size, msg);
408   else
409   {
410     multi_msg.grp = grp;
411     multi_msg.size = size;
412     multi_msg.user_msg = (char *) CmiAlloc(ALIGN8(size));
413     memcpy(multi_msg.user_msg,msg,size);
414
415     CmiSetHandler(&multi_msg,CpvAccess(McMulticastWaitHandler));
416     CmiSyncSendFn(CmiMyPe(),sizeof(McMultiMsg),(char *)&multi_msg);
417   }
418 }
419
420 void McMulticastWaitFn(McMultiMsg *msg)
421 {
422   CmiFreeMulticastFn(msg->grp,msg->size,msg->user_msg);
423 }
424
425 void CmiMulticastInit(void)
426 {
427   CpvInitialize(int,McMulticastWaitHandler);
428   CpvAccess(McMulticastWaitHandler) = CmiRegisterHandler(McMulticastWaitFn);
429 }
430
431 CmiCommHandle CmiAsyncMulticastFn(CmiGroup grp, int size, char *msg)
432 {
433   CmiSyncMulticastFn(grp, size, msg);
434   return 0;
435 }
436
437 void CmiFreeMulticastFn(CmiGroup grp, int size, char *msg)
438 {
439   CmiSyncMulticastFn(grp, size, msg);
440   CmiFree(msg);
441 }
442
443
444 /***********************************************************************
445  *
446  * Abort function:
447  *
448  ************************************************************************/
449
450 void CmiAbort(const char *message)
451 {
452   CmiError(message);
453   /* *(char*)NULL = 0; */
454   exit(1);
455 }
456
457 /**********************************************************************
458  * CMI utility functions for startup, shutdown, and other miscellaneous
459  * activities.
460  */
461
462 /*
463  * This port uses the common CmiDeliver code, so we only provide
464  * CmiGetNonLocal()
465  */
466 void *CmiGetNonLocal()
467 {
468   McRetrieveRemote();
469
470   return (void *)McQueueRemoveFromFront(received_queue);
471 }
472
473 static char     **Cmi_argv;
474 static char     **Cmi_argvcopy;
475 static CmiStartFn Cmi_startfn;   /* The start function */
476 static int        Cmi_usrsched;  /* Continue after start function finishes? */
477
478 static void CommunicationServerThread(int sleepTime)
479 {
480 #if CMK_SMP
481   CommunicationServer(sleepTime);
482 #endif
483 #if CMK_IMMEDIATE_MSG
484   CmiHandleImmediate();
485 #endif
486 }
487
488 void CmiNotifyIdle(void)
489 {
490   /* Use this opportunity to clean up the in_transit_queue */
491   McCleanUpInTransit();
492 }
493
494 static void CmiNotifyBeginIdle(void *s)
495 {
496   /* Use this opportunity to clean up the in_transit_queue */
497   McCleanUpInTransit();
498 }
499
500 static void CmiNotifyStillIdle(void *s)
501 {
502   McRetrieveRemote();
503 }
504
505 static void ConverseRunPE(int everReturn)
506 {
507   char** CmiMyArgv;
508
509   if (CmiMyRank())
510     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
511   else
512     CmiMyArgv=Cmi_argv;
513
514   CthInit(CmiMyArgv);
515
516   ConverseCommonInit(CmiMyArgv);
517
518   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
519       (CcdVoidFn) CmiNotifyBeginIdle, (void *) NULL);
520   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
521       (CcdVoidFn) CmiNotifyStillIdle, (void *) NULL);
522
523   /* communication thread */
524   if (CmiMyRank() == CmiMyNodeSize()) {
525     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
526     while (1) CommunicationServerThread(5);
527   }
528   else {  /* worker thread */
529   if (!everReturn) {
530     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
531     if (Cmi_usrsched==0) CsdScheduler(-1);
532     ConverseExit();
533   }
534   }
535 }
536
537 void arena_init();
538
539 void 
540 ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
541 {
542   Cmi_argvcopy = CmiCopyArgs(argv);
543   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
544
545   McInit();
546 #if CMK_ARENA_MALLOC
547   arena_init();
548 #endif
549
550   {
551   int debug = CmiGetArgFlag(argv,"++debug");
552   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
553   if (debug || debug_no_pause)
554   {   /*Pause so user has a chance to start and attach debugger*/
555 #if CMK_HAS_GETPID
556     printf("CHARMDEBUG> Processor %d has PID %d\n",CmiMyNode(),getpid());
557     fflush(stdout);
558     if (!debug_no_pause)
559       sleep(10);
560 #else
561     printf("++debug ignored.\n");
562 #endif
563   }
564   }
565
566   /* CmiStartThreads(argv); */
567   ConverseRunPE(initret);
568 }
569
570 void ConverseExit()
571 {
572 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
573   if (CmiMyPe() == 0){
574     CmiPrintf("End of program\n");
575   }
576 #endif
577   ConverseCommonExit();
578 #if CMK_CRAYXT || CMK_CRAYXE
579   shmem_finalize();
580 #endif
581   exit(0);
582 }
583
584 /* lock
585 */
586
587 #if CMK_SHMEM_LOCK
588
589 void set_lock(long *lock, int pe)
590 {
591   while (shmem_long_swap(lock, 1L, pe)) {
592     //shmem_long_wait(&tmp, 1);
593   }
594 //printf("set lock end: %ld. \n", *lock);
595 }
596
597 void clear_lock(long *lock, int pe)
598 {
599 //printf("[%d] clear lock on %d %ld. \n", _Cmi_mype, pe, *lock);
600   shmem_long_swap(lock, 0L, pe);
601 //  shmem_long_p(lock, 0L, pe);
602 //printf("clear lock end lock:%ld\n", *lock);
603 }
604
605 #else
606
607 #if 1
608 #define set_lock(lock, pe)  shmem_set_lock(lock)
609 #define clear_lock(lock, pe)  shmem_clear_lock(lock)
610 #else              /* for debugging */
611 void set_lock(long *lock, int pe)
612 {
613   //printf("[%d] set_lock %d %d\n", CmiMyPe(), pe, *lock);
614   shmem_set_lock(lock);
615   // while (shmem_test_lock(lock)) usleep(1);
616   //printf("[%d] after set_lock %d %d\n", CmiMyPe(), pe, *lock);
617 }
618 void clear_lock(long *lock, int pe)
619 {
620   //printf("[%d] free_lock %d %d\n", CmiMyPe(), pe, *lock);
621   shmem_clear_lock(lock);
622   //printf("[%d] after free_lock %d %d\n", CmiMyPe(), pe, *lock);
623 }
624 #endif
625 #endif
626
627 /**********************************************************************
628  * Mc Functions:
629  * Mc functions are used internally in machine.c only
630  */
631
632 static void McInit()
633 {
634   CMK_SHMEM_INIT;
635
636 #if CMK_CRAYXT || CMK_CRAYXE
637   _Cmi_mype = shmem_my_pe();
638   _Cmi_numpes = shmem_n_pes();
639 #else
640   _Cmi_mype = _my_pe();
641   _Cmi_numpes = _num_pes();
642 #endif
643   _Cmi_myrank = 0;
644
645   CpvInitialize(void *, CmiLocalQueue);
646   CpvAccess(CmiLocalQueue) = CdsFifo_Create();
647
648   McInitList();
649 }
650
651 static void McInitList()
652 {
653   int i;
654
655   received_queue = McQueueCreate();
656   tmp_queue = McQueueCreate();
657   received_token_queue = McQueueCreate();
658   broadcast_queue = McQueueCreate();
659   broadcast_tmp_queue = McQueueCreate();
660   in_transit_tmp_queue = McQueueCreate();
661   in_transit_queue = McQueueCreate();
662
663   head.nxt_node = list_empty;
664   head.nxt_addr = NULL;
665   head.msg_sz = 0;
666
667 #if CMK_ARENA_MALLOC
668   head_lock = shmalloc(sizeof(long)*_Cmi_numpes);
669   _MEMCHECK(head_lock);
670   bcast_lock = shmalloc(sizeof(long)*_Cmi_numpes);
671   _MEMCHECK(bcast_lock);
672 #else
673   if (_Cmi_numpes > MAX_PES)
674   {
675     CmiPrintf("Not enough processors allocated in machine.c.\n");
676     CmiPrintf("Change MAX_PES in machine.c to at least %d and recompile Converse\n",
677     _Cmi_numpes);
678   }
679 #endif
680   for(i=0; i < _Cmi_numpes; i++)
681   {
682     head_lock[i] = 0;
683     bcast_lock[i] = 0;
684   }
685   my_lock = &(head_lock[_Cmi_mype]);
686 /*
687   clear_lock(my_lock, _Cmi_mype);
688   clear_lock(&bcast_lock[_Cmi_mype], _Cmi_mype);
689 */
690   shmem_barrier_all();
691 }
692
693 static void McEnqueueRemote(void *msg, int msg_sz, int dst_pe)
694 {
695  /*
696   * To enqueue on a remote node, we should:
697   * 0. Free any delivered messages from the message_in_transit list.
698   * 1. Add message in the "message_in_transit" list
699   * 2. Fill in the fields in the message header
700   * 3. Lock the head pointer on the remote node.
701   * 4. Swap the list pointer with that on the other node.
702   * 5. Release lock
703   */
704
705   McDistList tmp_link;
706   McDistList *msg_link;
707
708   /*  CmiPrintf("PE %d outgoing msg = %d msg_type = %d size = %d dst_pe=%d\n",
709             _Cmi_mype,msg,((McMsgHdr *)msg)->msg_type,msg_sz, dst_pe); */
710   /* 0. Free any delivered messages from the in_transit_queue list. */
711   McCleanUpInTransit();
712
713   /* 1. Add message in the "in_transit_queue" list */
714   McQueueAddToBack(in_transit_queue,msg);
715
716   /* 2. Fill in the fields in the message header */
717   msg_link = &(((McMsgHdr *)msg)->list_node);
718   ((McMsgHdr *)msg)->received_f = false;
719
720   /* Set list fields to point back to this processor, this message.  */
721   tmp_link.nxt_node = _Cmi_mype;
722   tmp_link.nxt_addr = msg;
723   tmp_link.msg_sz = msg_sz;
724
725   /* 3. Lock the head pointer on the remote node.
726      Acquire lock on the destination queue.  If locks turn oout to
727      be inefficient, use fetch and increment to imp. lock
728    */
729
730   set_lock(&head_lock[dst_pe], dst_pe);
731
732
733   /* 4. Swap the list pointer with that on the other node.
734    */
735   /* First, get current head pointer, and stick it in this 
736    * message data area.
737    */
738 #if !USE_SWAP
739   shmem_int_get((int*)msg_link, (int*)&head, sizeof(McDistList)/sizeof(int), dst_pe);
740   /* Next, write the new message into the top of the list */
741   shmem_int_put((int*)&head, (int*)&tmp_link, sizeof(McDistList)/sizeof(int),dst_pe);
742   shmem_quiet();
743 #else
744   {
745   int i, n = sizeof(McDistList)/sizeof(int);
746   int *dst = (int*)&head;
747   int *src = (int*)&tmp_link;
748   int *olddst = (int*)msg_link;
749   for (i=0; i<n; i++)  olddst[i] = shmem_int_swap(dst+i, src[i], dst_pe);
750   }
751 #endif
752
753 #ifdef DEBUG
754   printf("[%d] Adding Message to pe %d\n",_Cmi_mype,dst_pe);
755   printf("[%d]   nxt_node = %d\n",_Cmi_mype,tmp_link.nxt_node);
756   printf("[%d]   nxt_addr = %x\n",_Cmi_mype,tmp_link.nxt_addr);
757   printf("[%d]   msg_sz = %x\n",_Cmi_mype,tmp_link.msg_sz);
758   printf("[%d] Old Message is now at %x\n",_Cmi_mype,msg_link);
759   printf("[%d]   nxt_node = %d\n",_Cmi_mype,msg_link->nxt_node);
760   printf("[%d]   nxt_addr = %x\n",_Cmi_mype,msg_link->nxt_addr);
761   printf("[%d]   msg_sz = %x\n",_Cmi_mype,msg_link->msg_sz);
762 #endif
763
764   /* 5. Release lock */
765   clear_lock(&head_lock[dst_pe], dst_pe);
766 }
767
768 static void McRetrieveRemote(void)
769 {
770   /*
771    * The local host should retrieve messages from the distributed list
772    * and put them in local memory, in a messages queue.
773    * Steps:
774    * 0) Lock list pointer.
775    * 1) Replace list pointer with NULL and unlock list
776    * 2) Get each message into local memory
777    * 3) Enqueue list into local message queue, in reverse order
778    */
779
780   McDistList list_head;
781   McDistList *cur_node;
782   McMsgHdr *cur_msg; 
783   int received_f;
784   enum boolean bcast_msg;
785   McMsgHdr *bcast_ptr;
786
787   /* Get the head of the list */
788
789   if (head.nxt_node == list_empty)  /* apparently there are no messages */
790     return;
791
792   /* 0) Lock list pointer. */
793   set_lock(my_lock, _Cmi_mype);
794
795   /* 1) Replace list pointer with NULL and unlock list */
796   list_head = head;
797   head.nxt_node = list_empty;
798   head.nxt_addr = NULL;
799   head.msg_sz = 0;
800   clear_lock(my_lock, _Cmi_mype);
801
802   /* 2) Get each message into local memory
803    * Start copying the messages into local memory, putting messages into
804    * a local list for future reversing.
805    */
806   cur_node = &list_head;
807   received_f = true;
808
809   while (cur_node->nxt_node != list_empty)
810   {
811     cur_msg = (McMsgHdr *)CmiAlloc(ALIGN8(cur_node->msg_sz));
812     if (cur_msg ==NULL)
813     {
814       CmiError("%s:%d Cannot Allocate Memory\n",__FILE__,__LINE__);
815       exit(1);
816     }
817
818     shmem_get64((long*)cur_msg, (long*)cur_node->nxt_addr,
819               ALIGN8(cur_node->msg_sz)/8, cur_node->nxt_node);
820
821     /*    CmiPrintf("PE %d incoming msg = %d msg_type = %d, size = %d\n",
822               _Cmi_mype,cur_msg,cur_msg->msg_type,cur_node->msg_sz);*/
823
824     /* If it is a broadcast message, retrieve the actual message */
825     if (cur_msg->msg_type == BcastMessage)
826     {
827
828       bcast_msg = true;
829       bcast_ptr = (McMsgHdr *)CmiAlloc(ALIGN8(cur_msg->bcast_msg_size));
830       set_lock(&bcast_lock[cur_node->nxt_node], cur_node->nxt_node);
831
832       /*
833       CmiPrintf(
834         "PE %d getting message from node %d at addr %d to %d, size=%d\n",
835         _Cmi_mype,cur_node->nxt_node,cur_msg->bcast.ptr,bcast_ptr,
836         cur_msg->bcast_msg_size
837         );
838         */
839       /* Get the message */
840       shmem_get64((long*)bcast_ptr,(long*)cur_msg->bcast.ptr,
841                 ALIGN8(cur_msg->bcast_msg_size)/8,cur_node->nxt_node);
842       /* Decrement the count, and write it back to the original node. */
843       /*
844       CmiPrintf(
845       "PE %d received broadcast message count=%d size=%d handler=%d\n",
846       _Cmi_mype,bcast_ptr->bcast.count,
847       cur_msg->bcast_msg_size,bcast_ptr->handler
848       );
849       */
850
851       bcast_ptr->bcast.count--;
852
853       shmem_int_put(&(cur_msg->bcast.ptr->bcast.count),
854                 &bcast_ptr->bcast.count,1,cur_node->nxt_node);
855       shmem_quiet();
856       clear_lock(&bcast_lock[cur_node->nxt_node], cur_node->nxt_node);
857     }
858     else bcast_msg = false;
859
860     /* Mark the remote message for future deletion */
861     shmem_int_put(&(cur_node->nxt_addr->received_f),&received_f,
862               1, cur_node->nxt_node);
863     shmem_quiet();
864
865     /* Add to list for reversing */
866     if (bcast_msg)
867     {
868       McQueueAddToBack(received_token_queue,cur_msg);
869       McQueueAddToBack(tmp_queue,bcast_ptr);
870     }
871     else 
872       McQueueAddToBack(tmp_queue,cur_msg);
873
874     /* Move pointer to next message */
875     cur_node = &(cur_msg->list_node);
876   }
877
878   /* 3) Enqueue list into local message queue, in reverse order */
879   while ((cur_msg = McQueueRemoveFromBack(tmp_queue)) != NULL)  {
880     McQueueAddToBack(received_queue,cur_msg);
881   }
882
883   /* 4) Delete broadcast-message tokens */
884   while ((cur_msg = McQueueRemoveFromBack(received_token_queue)) != NULL)  {
885     CmiFree(cur_msg);
886   }
887   return;
888 }
889
890 static void McCleanUpInTransit(void)
891 {
892   McMsgHdr *msg;
893   McQueue *swap_ptr;
894
895   /* Check broadcast message queue, to see if messages have been retrieved
896    */
897   while ((msg = (McMsgHdr *)McQueueRemoveFromFront(broadcast_queue)) 
898          != NULL)
899   {
900     if (msg->bcast.count == 0)
901     {
902       /* 
903          CmiPrintf("PE %d freeing broadcast message at %d\n",_Cmi_mype,msg);
904        */
905       CmiFree(msg);
906     }
907     else
908     {
909       McQueueAddToBack(broadcast_tmp_queue,msg);
910     }
911   }
912   /*
913    * swap queues, so tmp_queue is now empty, and in_transit_queue has
914    * only non-received messages.
915    */
916   swap_ptr = broadcast_tmp_queue;
917   broadcast_tmp_queue = broadcast_queue;
918   broadcast_queue = swap_ptr;
919
920   /* 
921    * Free received messages, and move others to tmp_queue.  Similar to
922    * above
923    */
924   while ((msg = (McMsgHdr *)McQueueRemoveFromFront(in_transit_queue)) 
925          != NULL)
926   {
927     if (msg->received_f)
928     {
929       CmiFree(msg);
930     }
931     else
932     {
933       McQueueAddToBack(in_transit_tmp_queue,msg);
934     }
935   }
936   /*
937    * swap queues, so tmp_queue is now empty, and in_transit_queue has
938    * only non-received messages.
939    */
940   swap_ptr = in_transit_tmp_queue;
941   in_transit_tmp_queue = in_transit_queue;
942   in_transit_queue = swap_ptr;
943 #ifdef DEBUG
944   CmiPrintf("[%d] done in_transit_queue = %d, tmp_queue = %d\n",
945         _Cmi_mype,in_transit_queue->len,in_transit_tmp_queue->len);
946 #endif
947 }
948
949 /*******************************************************************
950  * The following internal functions implements FIFO queues for
951  * messages in the local address space.  This is used for the
952  * received_queue, the in_transit_queue, and tmp_queue.  Code
953  * originally comes from the Origin2000 port, with modifications.
954  */
955 static void **McQueueAllocBlock(unsigned int len)
956 {
957   void ** blk;
958
959   blk=(void **)malloc(len*sizeof(void *));
960   if(blk==(void **)0) {
961     CmiError("Cannot Allocate Memory!\n");
962     abort();
963   }
964   return blk;
965 }
966
967 static void 
968 McQueueSpillBlock(void **srcblk, void **destblk, 
969              unsigned int first, unsigned int len)
970 {
971   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
972   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
973 }
974
975 static McQueue * McQueueCreate(void)
976 {
977   McQueue *queue;
978
979   queue = (McQueue *) malloc(sizeof(McQueue));
980   if(queue==(McQueue *)0) {
981     CmiError("Cannot Allocate Memory!\n");
982     abort();
983   }
984   queue->blk = McQueueAllocBlock(BLK_LEN);
985   queue->blk_len = BLK_LEN;
986   queue->first = 0;
987   queue->len = 0;
988   return queue;
989 }
990
991 int inside_comm = 0;
992
993 static void McQueueAddToBack(McQueue *queue, void *element)
994 {
995   inside_comm = 1;
996   if(queue->len==queue->blk_len) {
997     void **blk;
998
999     queue->blk_len *= 3;
1000     blk = McQueueAllocBlock(queue->blk_len);
1001     McQueueSpillBlock(queue->blk, blk, queue->first, queue->len);
1002     free(queue->blk);
1003     queue->blk = blk;
1004     queue->first = 0;
1005   }
1006 #ifdef DEBUG
1007   CmiPrintf("[%d] Adding %x\n",_Cmi_mype,element);
1008 #endif
1009   queue->blk[(queue->first+queue->len++)%queue->blk_len] = element;
1010   inside_comm = 0;
1011 }
1012
1013 static void * McQueueRemoveFromBack(McQueue *queue)
1014 {
1015   void *element;
1016   element = (void *) 0;
1017   if(queue->len) {
1018     element = queue->blk[(queue->first+queue->len-1)%queue->blk_len];
1019     queue->len--;
1020   }
1021   return element;
1022 }
1023
1024 static void * McQueueRemoveFromFront(McQueue *queue)
1025 {
1026   void *element;
1027   element = (void *) 0;
1028   if(queue->len) {
1029     element = queue->blk[queue->first++];
1030     queue->first = (queue->first+queue->blk_len)%queue->blk_len;
1031     queue->len--;
1032   }
1033   return element;
1034 }
1035
1036 #if CMK_ARENA_MALLOC
1037
1038 static char *arena = NULL;
1039 static slotset *myss = NULL;
1040 static int slotsize = 1*1024;
1041
1042 typedef struct ArenaBlock {
1043       CmiInt8 slot;   /* First slot */
1044       CmiInt8 length; /* Length, in bytes*/
1045 } ArenaBlock;
1046
1047 /* Convert a heap block pointer to/from a CmiIsomallocBlock header */
1048 static void *block2pointer(ArenaBlock *blockHeader) {
1049         return (void *)(blockHeader+1);
1050 }
1051 static ArenaBlock *pointer2block(void *heapBlock) {
1052         return ((ArenaBlock *)heapBlock)-1;
1053 }
1054
1055 /* Return the number of slots in a block with n user data bytes */
1056 static int length2slots(int nBytes) {
1057         return (sizeof(ArenaBlock)+nBytes+slotsize-1)/slotsize;
1058 }
1059
1060 #define MAX_MEM    (64*1024*1024)        
1061
1062 void arena_init()
1063 {
1064   size_t maxmem = 0;
1065   int nslots;
1066   char *s;
1067 #if CMK_CRAYXT || CMK_CRAYXE
1068   if (s = getenv("XT_SYMMETRIC_HEAP_SIZE")) {
1069     size_t n=0;
1070     switch (s[strlen(s)-1]) {
1071     case 'G':  {
1072       sscanf(s, "%dG", &n);  n *= 1024*1024*1024; 
1073       break;
1074       }
1075     case 'M': {
1076       sscanf(s, "%dM", &n);  n *= 1024*1024; 
1077       break;
1078       }
1079     case 'K': {
1080       sscanf(s, "%dK", &n);  n *= 1024; 
1081       break;
1082       }
1083     default: {
1084       n =atoi(s);
1085       break;
1086       }
1087     }
1088     if (n>0) {
1089       n -= sizeof(long)*2*CmiNumPes();    /* for locks */
1090       n -= 2*1024*1024;                   /* less 2MB */
1091       if (n>0) maxmem = n;    /* a little less */
1092     }
1093   }
1094 #endif
1095   if (maxmem == 0) maxmem = MAX_MEM;
1096   if (CmiMyPe()==0) CmiPrintf("Charm++> Total of %dMB symmetric heap memory allocated.\n", maxmem/1024/1024);
1097   arena = shmalloc(maxmem);           /* global barrier */
1098   _MEMCHECK(arena);
1099   nslots = maxmem/slotsize;
1100   myss = new_slotset(0, nslots);
1101 }
1102
1103 void *arena_malloc(int size) 
1104 {   
1105         CmiInt8 s,n;
1106         ArenaBlock *blk;
1107         if (size==0) return NULL;
1108         n=length2slots(size);
1109         /*Always satisfy mallocs with local slots:*/
1110         s=get_slots(myss,n);
1111         if (s==-1) {
1112             CmiError("Not enough address space left in shmem on processor %d for %d bytes!\n", CmiMyPe(),size);
1113             CmiAbort("Out of symmetric heap space for arena_malloc");
1114         }
1115         grab_slots(myss,s,n);
1116         blk = (ArenaBlock*)(arena + s*slotsize);
1117         blk->slot=s;
1118         blk->length=size;
1119         return block2pointer(blk);
1120 }
1121
1122 void arena_free(void *blockPtr)
1123 {   
1124         ArenaBlock *blk;
1125         CmiInt8 s,n;
1126         if (blockPtr==NULL) return;
1127         blk = pointer2block(blockPtr);
1128         s=blk->slot;  
1129         n=length2slots(blk->length); 
1130         free_slots(myss, s, n);
1131 }
1132
1133 #endif
1134
1135 /*   Memory lock and unlock functions */
1136 /*      --- on T3E, no shared memory and quickthreads are used, so memory */
1137 /*          calls reentrant problem. these are only dummy functions */
1138
1139 static volatile int memflag;
1140 void CmiMemLock() { memflag=1; }
1141 void CmiMemUnlock() { memflag=0; }
1142
1143 int CmiBarrier()
1144 {
1145   return -1;
1146 }
1147
1148 int CmiBarrierZero()
1149 {
1150   return -1;
1151 }
1152
1153 /*@}*/