Blue Gene/L optimizations to reduce the overhead of the charm++ stack. Two changes
[charm.git] / src / arch / bluegenel / bglmachine.C
1
2 #include <stdio.h>
3 #include <errno.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <math.h>
7 #include <string.h>
8 #include "machine.h"
9 #include "converse.h"
10 #include "pcqueue.h"
11 #include <stack>
12
13 #include "rts.h"
14 #include "bgml.h"
15
16 #if CMK_PERSISTENT_COMM
17 #include "persist_impl.h"
18 #endif
19
20 #if 0
21 #define BGL_DEBUG CmiPrintf
22 #else
23 #define BGL_DEBUG // CmiPrintf
24 #endif
25
26 extern "C"
27 void        BG2S_UnOrdered_Send     (BG2S_t               * request,
28                                      const BGML_Callback_t* cb_info,
29                                      const BGQuad         * msginfo,
30                                      const char           * sndbuf,
31                                      unsigned               sndlen,
32                                      unsigned               destrank);
33
34 int phscount;
35 int vnpeer = -1;
36
37 inline char *ALIGN_16(char *p){
38   return((char *)((((unsigned long)p)+0xf)&0xfffffff0));
39 }
40
41 PCQueue message_q;                   //queue to receive incoming messages
42 PCQueue broadcast_q;                 //queue to send broadcast messages
43
44 #define PROGRESS_PERIOD 8
45 #define PROGRESS_CYCLES 4000           //10k cycles
46
47 /*
48     To reduce the buffer used in broadcast and distribute the load from
49   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
50   spanning tree broadcast algorithm.
51     This will use the fourth short in message as an indicator of spanning tree
52   root.
53 */
54 #if CMK_SMP
55 #define CMK_BROADCAST_SPANNING_TREE    0
56 #else
57 #define CMK_BROADCAST_SPANNING_TREE    1
58 #define CMK_BROADCAST_HYPERCUBE        0
59 #endif /* CMK_SMP */
60
61 #define BROADCAST_SPANNING_FACTOR      4
62
63 #define MAX_OUTSTANDING  1024
64 #define MAX_POSTED 64
65 #define MAX_QLEN 1024
66 #define MAX_BYTES 1000000
67
68 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
69 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
70
71 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
72 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
73
74 /* FIXME: need a random number that everyone agrees ! */
75 #define CHARM_MAGIC_NUMBER               126
76
77 #if !CMK_OPTIMIZE
78 static int checksum_flag = 0;
79 extern "C" unsigned char computeCheckSum(unsigned char *data, int len);
80
81 #define CMI_SET_CHECKSUM(msg, len)      \
82         if (checksum_flag)  {   \
83           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
84           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
85         }
86
87 #define CMI_CHECK_CHECKSUM(msg, len)    \
88         if (checksum_flag)      \
89           if (computeCheckSum((unsigned char*)msg, len) != 0)  { \
90             printf("\n\n------------------------------\n\nReceiver %d size %d:", CmiMyPe(), len); \
91             for(int count = 0; count < len; count++) { \
92                 printf("%2x", msg[count]);                 \
93             }                                             \
94             printf("------------------------------\n\n"); \
95             CmiAbort("Fatal error: checksum doesn't agree!\n"); \
96           }
97 #else
98 #define CMI_SET_CHECKSUM(msg, len)
99 #define CMI_CHECK_CHECKSUM(msg, len)
100 #endif
101
102 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
103
104 #if CMK_BROADCAST_HYPERCUBE
105 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
106 #else
107 #  define CMI_SET_CYCLE(msg, cycle)
108 #endif
109
110 int               _Cmi_numpes;
111 int               _Cmi_mynode;    /* Which address space am I */
112 int               _Cmi_mynodesize;/* Number of processors in my address space */
113 int               _Cmi_numnodes;  /* Total number of address spaces */
114 static int        Cmi_nodestart; /* First processor in this address space */
115 CpvDeclare(void*, CmiLocalQueue);
116
117 int               idleblock = 0;
118
119 #include "machine-smp.c"
120 CsvDeclare(CmiNodeState, NodeState);
121 #include "immediate.c"
122
123 static inline void AdvanceCommunications(int max_out = MAX_OUTSTANDING);
124 static int outstanding_recvs = 0;
125
126
127 #if !CMK_SMP
128 /************ non SMP **************/
129 static struct CmiStateStruct Cmi_state;
130 int _Cmi_mype;
131 int _Cmi_myrank;
132
133 void CmiMemLock(void) {}
134 void CmiMemUnlock(void) {}
135
136 #define CmiGetState() (&Cmi_state)
137 #define CmiGetStateN(n) (&Cmi_state)
138
139 void CmiYield(void) { sleep(0); }
140
141 static void CmiStartThreads(char **argv)
142 {
143   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
144   _Cmi_mype = Cmi_nodestart;
145   _Cmi_myrank = 0;
146 }
147 #endif  /* !CMK_SMP */
148
149 int received_immediate = 0;
150
151 /*Add a message to this processor's receive queue, pe is a rank */
152 static void CmiPushPE(int pe,void *msg)
153 {
154   CmiState cs = CmiGetStateN(pe);
155   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
156 #if CMK_IMMEDIATE_MSG
157   if (CmiIsImmediate(msg)) {
158     /**(CmiUInt2 *)msg = pe;*/
159     received_immediate = 1;
160     CMI_DEST_RANK(msg) = pe;
161     CmiPushImmediateMsg(msg);
162     return;
163   }
164 #endif
165 #if CMK_SMP
166   CmiLock(procState[pe].recvLock);
167 #endif
168
169   PCQueuePush(cs->recv,(char *)msg);
170   //printf("%d: {%d} PCQueue length = %d, msg = %x\n", CmiMyPe(), bgx_in_interrupt, PCQueueLength(cs->recv), msg);
171   
172 #if CMK_SMP
173   CmiUnlock(procState[pe].recvLock);
174 #endif
175   CmiIdleLock_addMessage(&cs->idle);
176   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
177 }
178
179 #if CMK_NODE_QUEUE_AVAILABLE
180 /*Add a message to this processor's receive queue */
181 static void CmiPushNode(void *msg)
182 {
183   MACHSTATE(3,"Pushing message into NodeRecv queue");
184 #if CMK_IMMEDIATE_MSG
185   if (CmiIsImmediate(msg)) {
186     CMI_DEST_RANK(msg) = 0;
187     CmiPushImmediateMsg(msg);
188     return;
189   }
190 #endif
191   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
192   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
193   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
194   {
195   CmiState cs=CmiGetStateN(0);
196   CmiIdleLock_addMessage(&cs->idle);
197   }
198 }
199 #endif /* CMK_NODE_QUEUE_AVAILABLE */
200
201 #ifndef CmiMyPe
202 int CmiMyPe(void)
203 {
204   return CmiGetState()->pe;
205 }
206 #endif
207
208 #ifndef CmiMyRank
209 int CmiMyRank(void)
210 {
211   return CmiGetState()->rank;
212 }
213 #endif
214
215
216 static int msgQueueLen = 0;
217 static int msgQBytes = 0;
218 static int numPosted = 0;
219
220 static int request_max;
221 static int maxMessages;
222 static int maxBytes;
223 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
224 static int progress_cycles;
225
226 static int Cmi_dim;     /* hypercube dim of network */
227
228 static char     **Cmi_argv;
229 static char     **Cmi_argvcopy;
230 static CmiStartFn Cmi_startfn;   /* The start function */
231 static int        Cmi_usrsched;  /* Continue after start function finishes? */
232
233 extern "C" void ConverseCommonInit(char **argv);
234 extern "C" void ConverseCommonExit(void);
235 extern "C" void CthInit(char **argv);
236
237 static inline void SendMsgsUntil(int, int);
238 static void ConverseRunPE(int everReturn);
239 static void CommunicationServer(int sleepTime);
240 static void CommunicationServerThread(int sleepTime);
241
242 void SendSpanningChildren(int size, char *msg);
243 void SendHypercube(int size, char *msg);
244
245 typedef struct msg_list {
246   BG2S_t             send;
247   BGQuad             info;
248   BGML_Callback_t    cb;
249   char              *msg;
250   int                size;
251   int                destpe;
252   int               *pelist;
253
254 #if CMK_PERSISTENT_COMM
255   PersistentHandle phs;
256   int phscount;
257   int phsSize;
258 #endif
259 } SMSG_LIST;
260
261
262 typedef struct {
263   int sleepMs; /*Milliseconds to sleep while idle*/
264   int nIdles; /*Number of times we've been idle in a row*/
265   CmiState cs; /*Machine state*/
266 } CmiIdleState;
267
268 static CmiIdleState *CmiNotifyGetState(void)
269 {
270   CmiIdleState *s=(CmiIdleState *)CmiAlloc(sizeof(CmiIdleState));
271   s->sleepMs=0;
272   s->nIdles=0;
273   s->cs=CmiGetState();
274   return s;
275 }
276
277 typedef struct ProcState {
278 /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
279 CmiNodeLock  recvLock;              /* for cs->recv */
280 } ProcState;
281
282 static ProcState  *procState;
283
284 /* send done callback: sets the smsg entry to done */
285 static void send_done(void *data){
286   SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
287   CmiFree(msg_tmp->msg);
288   msgQBytes -= msg_tmp->size;
289   free(data);
290
291   msgQueueLen--;
292   numPosted --;
293 }
294
295 /* send done callback: sets the smsg entry to done */
296 static void send_multi_done(void *data){
297   SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
298   CmiFree(msg_tmp->msg);
299   free(msg_tmp->pelist);
300   free(data);
301
302   msgQBytes -= msg_tmp->size;
303   msgQueueLen --;
304   numPosted --;
305 }
306
307
308 //Called on receiving a persistent message
309 void persist_recv_done(void *clientdata) {
310
311   char *msg = (char *) clientdata;
312   int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
313
314   ///  printf("[%d] persistent receive of size %d\n", CmiMyPe(), sndlen);
315   
316   //Cannot broadcast Persistent message
317
318   CMI_CHECK_CHECKSUM(msg, sndlen);
319   if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
320     CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
321     return;
322   }
323   
324   CmiReference(msg);
325
326 #if CMK_NODE_QUEUE_AVAILABLE
327   if (CMI_DEST_RANK(msg) == DGRAM_NODEMESSAGE)
328     CmiPushNode(msg);
329   else
330 #endif
331     CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
332 }
333
334 /* recv done callback: push the recved msg to recv queue */
335 static void recv_done(void *clientdata){
336
337   char *msg = (char *) clientdata;
338   int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
339   
340   /* then we do what PumpMsgs used to do:
341    * push msg to recv queue */
342   
343   CMI_CHECK_CHECKSUM(msg, sndlen);
344   if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
345     CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
346     return;
347   }
348
349 #if CMK_NODE_QUEUE_AVAILABLE
350   if (CMI_DEST_RANK(msg) == DGRAM_NODEMESSAGE)
351     CmiPushNode(msg);
352   else
353 #endif
354     CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
355
356 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
357   if(CMI_BROADCAST_ROOT(msg) != 0) {
358     //printf ("%d: Receiving bcast message %d bytes\n", CmiMyPe(), sndlen);
359     PCQueuePush(broadcast_q, msg);
360   }
361 #endif
362
363
364   //rts_dcache_evict_normal();
365
366   outstanding_recvs --;
367 }
368
369
370 void     short_pkt_recv (const BGQuad     * info,
371                          unsigned           senderrank,
372                          const char       * buffer,
373                          const unsigned     sndlen)
374 {
375   outstanding_recvs ++;
376   int alloc_size = sndlen;
377   
378   //printf ("%d: Receiving short message %d bytes\n", CmiMyPe(), sndlen);
379   
380   char * new_buffer = (char *)CmiAlloc(alloc_size);
381   memcpy (new_buffer, buffer, sndlen);
382   recv_done (new_buffer);
383 }
384
385
386 BG2S_t * first_pkt_recv_done (const BGQuad      * info,
387                               unsigned            senderrank,
388                               const unsigned      sndlen,
389                               unsigned          * rcvlen,
390                               char             ** buffer,
391                               BGML_Callback_t   * cb 
392                               )
393 {
394   outstanding_recvs ++;
395   int alloc_size = sndlen + sizeof(BG2S_t) + 16;
396   
397   //printf ("%d: {%d} Receiving message %d bytes\n", CmiMyPe(), bgx_in_interrupt, sndlen);
398
399   /* printf ("Receiving %d bytes\n", sndlen); */
400   *rcvlen = sndlen>0?sndlen:1;  /* to avoid malloc(0) which might
401                                    return NULL */
402
403   *buffer = (char *)CmiAlloc(alloc_size);
404   cb->function = recv_done;
405   cb->clientdata = *buffer;
406
407   return (BG2S_t *) ALIGN_16(*buffer + sndlen);
408 }
409
410 void sendBroadcastMessages() __attribute__((noinline));
411
412 void sendBroadcastMessages() {
413   while(!PCQueueEmpty(broadcast_q)) {
414     char *msg = (char *) PCQueuePop(broadcast_q);
415     
416 #if CMK_BROADCAST_SPANNING_TREE
417     SendSpanningChildren(((CmiMsgHeaderBasic *) msg)->size, msg);
418 #elif CMK_BROADCAST_HYPERCUBE
419     SendHypercube(((CmiMsgHeaderBasic *) msg)->size, msg);
420 #endif
421     
422     //CMI_CHECK_CHECKSUM(msg, (((CmiMsgHeaderBasic *) msg)->size));
423   }
424 }
425
426 unsigned int *ranklist;
427
428 #include "bgltorus.h"
429 CpvDeclare(BGLTorusManager*, tmanager);
430
431 BGTsC_t        barrier;
432
433 CpvDeclare(unsigned, networkProgressCount);
434 int  networkProgressPeriod;
435
436 // -----------------------------------------
437 // Rectangular broadcast implementation 
438 // -----------------------------------------
439
440 typedef struct rectbcast_msg {
441   BGTsRC_t           request;
442   BGML_Callback_t    cb;
443   char              *msg;
444 } RectBcastInfo;
445
446
447 static void bcast_done (void *data) {  
448   RectBcastInfo *rinfo = (RectBcastInfo *) data;  
449   CmiFree (rinfo->msg);
450   free (rinfo);
451 }
452
453
454 typedef void * (*requestFnType) (unsigned);
455
456 requestFnType requestFn;
457
458 static  void *   getRectBcastRequest (unsigned comm) {
459   return requestFn (comm);
460 }
461
462
463 static  void *  bcast_recv     (unsigned               root,
464                                 unsigned               comm,
465                                 const unsigned         sndlen,
466                                 unsigned             * rcvlen,
467                                 char                ** rcvbuf,
468                                 BGML_Callback_t      * const cb) {
469   
470   //  printf ("%d: in bcast recv \n", CmiMyPe());
471
472   int alloc_size = sndlen + sizeof(BGTsRC_t) + 16;
473   
474   *rcvlen = sndlen>0?sndlen:1;  /* to avoid malloc(0) which might
475                                    return NULL */
476   
477   *rcvbuf       =  (char *)CmiAlloc(alloc_size);
478   cb->function  =   recv_done;
479   cb->clientdata = *rcvbuf;
480
481   BGTsRC_t *request = (BGTsRC_t *) ALIGN_16 (*rcvbuf + sndlen);
482   memset (request, 0, sizeof(BGTsRC_t));
483   return request;
484 }
485
486
487 extern "C"  
488 void bgl_machine_RectBcast (unsigned                 commid,
489                             char                   * sndbuf,
490                             unsigned                 sndlen) 
491 {
492   CMI_SET_BROADCAST_ROOT(sndbuf,0);  
493   CMI_MAGIC(sndbuf) = CHARM_MAGIC_NUMBER;
494   ((CmiMsgHeaderBasic *)sndbuf)->size = sndlen;  
495   CMI_SET_CHECKSUM(sndbuf, sndlen);
496   
497   RectBcastInfo *rinfo  =   (RectBcastInfo *) malloc (sizeof(RectBcastInfo)); 
498
499   rinfo->cb.function    =   bcast_done;
500   rinfo->cb.clientdata  =   rinfo;
501   rinfo->msg            =   sndbuf;
502   
503   memset (&rinfo->request, 0, sizeof(BGTsRC_t));
504
505   BGTsRC_AsyncBcast_start (commid, &rinfo->request, &rinfo->cb, sndbuf, sndlen);
506 }
507
508 extern "C"   
509 void    *         bgl_machine_RectBcastInit  (unsigned               commID,
510                                               const BGTsRC_Geometry_t* geometry) {
511   
512   BGTsRC_t *request =  (BGTsRC_t *) malloc (sizeof (BGTsRC_t));
513   memset(request, 0,  sizeof( BGTsRC_t));
514   BGTsRC_AsyncBcast_init  (request, commID,  geometry);
515   
516   return request;
517 }
518
519
520 extern "C" 
521 void  bgl_machine_RectBcastConfigure (requestFnType fn) {
522   requestFn = fn;
523 }
524
525
526
527 //--------------------------------------------------------------
528 //----- End Rectangular Broadcast Implementation ---------------
529 //--------------------------------------------------------------
530
531
532 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret){
533   int n, i;
534   
535   BGML_Messager_Init();
536   BG2S_Configure (short_pkt_recv, first_pkt_recv_done, NULL);
537   
538   //BGTr_Configure ( 1 /* use_coprocessor  */, 
539   //               1 /* protocolTreshold */, 
540   //               1 /* highPrecision    */); 
541   
542   BGTsC_Configure (NULL, getRectBcastRequest, bcast_recv);
543
544   _Cmi_numnodes = BGML_Messager_size();
545   _Cmi_mynode = BGML_Messager_rank();
546   
547   message_q = PCQueueCreate();
548   broadcast_q = PCQueueCreate();
549
550   unsigned rank = BGML_Messager_rank();
551   unsigned size = BGML_Messager_size();
552   
553   //vnpeer = BGML_Messager_vnpeerrank();
554   
555 #if 1
556   ranklist = (unsigned int *)malloc (size * sizeof(int));
557   for(int count = 0; count < size; count ++)
558     ranklist[count] = count;
559   
560   /* global barrier initialize */
561   BGTsC_Barrier_init (&barrier, 0, size, ranklist);
562 #endif
563   
564   CmiBarrier();    
565   CmiBarrier();    
566   CmiBarrier();    
567
568   /* processor per node */
569   _Cmi_mynodesize = 1;
570   CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize);
571 #if ! CMK_SMP
572   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
573     CmiAbort("+ppn cannot be used in non SMP version!\n");
574 #endif
575
576   idleblock = CmiGetArgFlag(argv, "+idleblocking");
577   if (idleblock && _Cmi_mynode == 0) {
578     CmiPrintf("Charm++: Running in idle blocking mode.\n");
579   }
580
581 #if CMK_NO_OUTSTANDING_SENDS
582   no_outstanding_sends=1;
583 #endif
584   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
585     no_outstanding_sends = 1;
586     if (_Cmi_mynode == 0)
587       CmiPrintf("Charm++: Will%s consume outstanding sends in scheduler loop\n",        no_outstanding_sends?"":" not");
588   }
589   
590   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
591   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
592   Cmi_argvcopy = CmiCopyArgs(argv);
593   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
594
595   CpvInitialize(BGLTorusManager*, tmanager);
596   CpvAccess(tmanager) = new BGLTorusManager();
597
598
599   /* find dim = log2(numpes), to pretend we are a hypercube */
600   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
601     Cmi_dim++ ;
602
603   request_max=MAX_POSTED;
604   CmiGetArgInt(argv,"+requestmax",&request_max);
605  
606   maxMessages = MAX_QLEN;
607   maxBytes = MAX_BYTES;
608
609   /* checksum flag */
610   if (CmiGetArgFlag(argv,"+checksum")) {
611 #if !CMK_OPTIMIZE
612     checksum_flag = 1;
613     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
614 #else
615     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
616 #endif
617   }
618
619   CsvInitialize(CmiNodeState, NodeState);
620   CmiNodeStateInit(&CsvAccess(NodeState));
621
622   procState = (ProcState *)CmiAlloc((_Cmi_mynodesize+1) * sizeof(ProcState));
623   for (i=0; i<_Cmi_mynodesize+1; i++) {
624 /*    procState[i].sendMsgBuf = PCQueueCreate();   */
625     procState[i].recvLock = CmiCreateLock();
626   }
627
628   /* Network progress function is used to poll the network when for
629      messages. This flushes receive buffers on some  implementations*/
630   networkProgressPeriod = PROGRESS_PERIOD;
631   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
632   
633   progress_cycles = PROGRESS_CYCLES;
634   CmiGetArgInt(argv, "+progressCycles", &progress_cycles);
635
636   CmiStartThreads(argv);
637   ConverseRunPE(initret);
638 }
639
640 static void ConverseRunPE(int everReturn)
641 {
642   CmiIdleState *s=CmiNotifyGetState();
643   CmiState cs;
644   char** CmiMyArgv;
645   CmiNodeAllBarrier();
646   cs = CmiGetState();
647   CpvInitialize(void *,CmiLocalQueue);
648   CpvAccess(CmiLocalQueue) = cs->localqueue;
649
650   if (CmiMyRank())
651     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
652   else   
653     CmiMyArgv=Cmi_argv;
654     
655   CthInit(CmiMyArgv);
656
657   ConverseCommonInit(CmiMyArgv);
658
659   /* initialize the network progress counter*/
660   /* Network progress function is used to poll the network when for
661      messages. This flushes receive buffers on some  implementations*/
662   CpvInitialize(int , networkProgressCount);
663   CpvAccess(networkProgressCount) = 0;
664
665 #if CMK_SMP
666   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
667   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
668 #else
669   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
670 #endif
671
672 #if MACHINE_DEBUG_LOG
673   if (CmiMyRank() == 0) {
674     char ln[200];
675     sprintf(ln,"debugLog.%d",CmiMyNode());
676     debugLog=fopen(ln,"w");
677   }
678 #endif
679
680   CmiBarrier();
681
682   /* Converse initialization finishes, immediate messages can be processed.
683      node barrier previously should take care of the node synchronization */
684   _immediateReady = 1;
685
686   /* communication thread */
687   if (CmiMyRank() == CmiMyNodeSize()) {
688     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
689     while (1) CommunicationServerThread(5);
690   }
691   else {  /* worker thread */
692     if (!everReturn) {
693       Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
694       if (Cmi_usrsched==0) CsdScheduler(-1);
695       ConverseExit();
696     }
697   }
698 }
699
700 #if CMK_SMP
701 static int inexit = 0;
702
703 static int MsgQueueEmpty()
704 {
705   int i;
706 #if 0
707   for (i=0; i<_Cmi_mynodesize; i++)
708     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
709 #else
710   return PCQueueEmpty(sendMsgBuf);
711 #endif
712   return 1;
713 }
714
715 static int SendMsgBuf();
716
717 /* test if all processors recv queues are empty */
718 static int RecvQueueEmpty()
719 {
720   int i;
721   for (i=0; i<_Cmi_mynodesize; i++) {
722     CmiState cs=CmiGetStateN(i);
723     if (!PCQueueEmpty(cs->recv)) return 0;
724   }
725   return 1;
726 }
727
728 static void CommunicationServer(int sleepTime){
729   int static count=0;
730
731   SendMsgBuf(); 
732   AdvanceCommunications();
733 /*
734   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
735 */
736   if (inexit == CmiMyNodeSize()) {
737     MACHSTATE(2, "CommunicationServer exiting {");
738 #if 0
739     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
740 #endif
741     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
742       SendMsgBuf(); 
743       AdvanceCommunications();
744     }
745 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
746     if (CmiMyNode() == 0){
747       CmiPrintf("End of program\n");
748     }
749 #endif
750     MACHSTATE(2, "} CommunicationServer EXIT");
751     exit(0);   
752   }
753 }
754 #endif /* CMK_SMP */
755
756 static void CommunicationServerThread(int sleepTime){
757 #if CMK_SMP
758   CommunicationServer(sleepTime);
759 #endif
760 #if CMK_IMMEDIATE_MSG
761   CmiHandleImmediate();
762 #endif
763 }
764
765 extern "C" void BGML_Messager_dumpTimers();
766
767 void ConverseExit(void){
768   /* #if ! CMK_SMP */
769
770   while(msgQueueLen > 0 || outstanding_recvs > 0) {
771     AdvanceCommunications();
772   }
773
774   CmiBarrier();
775
776   ConverseCommonExit();  
777
778   if(CmiMyPe()%101 == 0)
779     BGML_Messager_dumpTimers();
780   
781   CmiFree(procState);
782   
783 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
784   if (CmiMyPe() == 0){
785     CmiPrintf("End of program\n");
786   }
787 #endif
788   
789   exit(0);
790 }
791
792 /* exit() called on any node would abort the whole program */
793 void CmiAbort(const char * message){
794   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
795         "{snd:%d,rcv:%d} Reason: %s\n",CmiMyPe(),msgQueueLen,outstanding_recvs,message);
796   CmiPrintStackTrace(0);
797
798   while(msgQueueLen > 0 || outstanding_recvs > 0) {
799     AdvanceCommunications();
800   }
801   
802   //CmiBarrier(); 
803
804   assert (0);
805 }
806
807 void *CmiGetNonLocal(){
808   static int count=0;
809   CmiState cs = CmiGetState();
810
811   void *msg = NULL;
812   //CmiIdleLock_checkMessage(&cs->idle);
813   /* although it seems that lock is not needed, I found it crashes very often
814      on mpi-smp without lock */
815   
816   AdvanceCommunications();
817   
818   //CmiLock(procState[cs->rank].recvLock);
819   
820   if (PCQueueLength(cs->recv) > 0)
821     msg =  PCQueuePop(cs->recv); 
822
823   //CmiUnlock(procState[cs->rank].recvLock);
824   
825 #if !CMK_SMP
826 #if BLOCKING_COMM
827   if (no_outstanding_sends) {
828     SendMsgsUntil(0, 0);
829   }
830 #endif
831 #endif /* !CMK_SMP */
832   
833   return msg;
834 }
835
836 static void CmiSendSelf(char *msg){
837 #if CMK_IMMEDIATE_MSG
838     if (CmiIsImmediate(msg)) {
839       /* CmiBecomeNonImmediate(msg); */
840       CmiPushImmediateMsg(msg);
841       CmiHandleImmediate();
842       return;
843     }
844 #endif
845     CQdCreate(CpvAccess(cQdState), 1);
846     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
847 }
848
849
850 #if CMK_PERSISTENT_COMM
851 #include "persistent.C"
852 #endif
853
854 inline void machineSend(SMSG_LIST *msg_tmp) {
855   
856   CQdCreate(CpvAccess(cQdState), 1);
857   numPosted ++;
858
859   if(msg_tmp->destpe == CmiMyPe())
860     CmiAbort("Sending to self\n");
861   
862 #if CMK_PERSISTENT_COMM
863   if(msg_tmp->phs) {
864     if(machineSendPersistentMsg(msg_tmp))
865       return;
866   }
867 #endif
868   
869   CmiAssert(msg_tmp->destpe >= 0 && msg_tmp->destpe < CmiNumPes());
870   msg_tmp->cb.function     =   send_done;
871   msg_tmp->cb.clientdata   =   msg_tmp;
872
873   BG2S_UnOrdered_Send (&msg_tmp->send, &msg_tmp->cb, &msg_tmp->info, 
874                        msg_tmp->msg, msg_tmp->size, msg_tmp->destpe);    
875 }
876
877 void sendQueuedMessages() __attribute__((noinline));
878
879 void sendQueuedMessages() {
880   while(numPosted <= request_max && !PCQueueEmpty(message_q)) {
881     SMSG_LIST *msg_tmp = (SMSG_LIST *)PCQueuePop(message_q);    
882     machineSend(msg_tmp);
883   }
884 }
885
886
887 static int enableBatchSends = 1;
888
889 /* The general free send function
890  * Send is synchronous, and free msg after posted
891  */
892 inline void  CmiGeneralFreeSend(int destPE, int size, char* msg){
893   
894   CmiState cs = CmiGetState();
895   if(destPE==cs->pe){
896     CmiSendSelf(msg);
897     return;
898   }
899   
900   if(numPosted >= 16 /*|| !enableBatchSends*/)
901     SendMsgsUntil(maxMessages, maxBytes);
902   
903   SMSG_LIST *msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
904   msg_tmp->destpe = destPE;
905   msg_tmp->size = size;
906   msg_tmp->msg = msg;
907   
908 #if CMK_PERSISTENT_COMM
909   msg_tmp->phs = phs;
910   msg_tmp->phscount = phscount;
911   msg_tmp->phsSize = phsSize;
912 #endif
913
914   //sendQueuedMessages();
915   while(numPosted <= request_max && !PCQueueEmpty(message_q)) {
916     SMSG_LIST *msg_tmp = (SMSG_LIST *)PCQueuePop(message_q);    
917     machineSend(msg_tmp);
918   }  
919   
920   if(numPosted > request_max) {
921     PCQueuePush(message_q, (char *)msg_tmp);
922   }    
923   else 
924     machineSend(msg_tmp);    
925   
926   msgQBytes += size;
927   msgQueueLen++;
928   
929 }
930
931 extern "C"
932 void BGML_Multicast (BG2S_t                 * sender,
933                      const BGML_Callback_t  * cb_info,
934                      const BGQuad           * msginfo,
935                      const char             * sndbuf,
936                      unsigned                 sndlen,
937                      unsigned               * destranks,
938                      unsigned                 nranks);
939
940
941 /* Multicast message to a list of destinations
942  */
943 inline void  machineMulticast(int npes, int *pelist, int size, char* msg){  
944   CQdCreate(CpvAccess(cQdState), npes);
945   
946   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
947   ((CmiMsgHeaderBasic *)msg)->size = size;  
948   CMI_SET_BROADCAST_ROOT(msg,0);
949   CMI_SET_CHECKSUM(msg, size);
950   
951   SMSG_LIST *msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
952   memset (msg_tmp, 0, sizeof(SMSG_LIST));
953   
954   //  CmiState cs = CmiGetState();
955   
956   msg_tmp->destpe    = -1;      //multicast operation
957   msg_tmp->size      = size * npes;
958   msg_tmp->msg       = msg;
959   msg_tmp->pelist    = pelist;
960   
961   msgQBytes  += size * npes;   
962   msgQueueLen ++;
963   numPosted  ++;
964   
965   msg_tmp->cb.function    =   send_multi_done;
966   msg_tmp->cb.clientdata  =   msg_tmp;
967   
968   BGML_Multicast (&msg_tmp->send, &msg_tmp->cb, &msg_tmp->info, 
969                   msg_tmp->msg, size, (unsigned *)pelist, npes); 
970 }
971  
972
973 void CmiSyncSendFn(int destPE, int size, char *msg){
974   char *copymsg;
975   copymsg = (char *)CmiAlloc(size);
976   memcpy(copymsg,msg,size);
977   CmiFreeSendFn(destPE,size,copymsg);
978 }
979
980 void CmiFreeSendFn(int destPE, int size, char *msg){
981   CMI_SET_BROADCAST_ROOT(msg,0);
982   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
983   ((CmiMsgHeaderBasic *)msg)->size = size;  
984   CMI_SET_CHECKSUM(msg, size);
985
986   CmiGeneralFreeSend(destPE,size,msg);
987 }
988
989 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
990 void CmiSyncSendFn1(int destPE, int size, char *msg)
991 {
992   char *copymsg;
993   copymsg = (char *)CmiAlloc(size);
994   memcpy(copymsg, msg, size);
995
996   //  asm volatile("sync" ::: "memory");
997
998   CMI_MAGIC(copymsg) = CHARM_MAGIC_NUMBER;
999   ((CmiMsgHeaderBasic *)copymsg)->size = size;  
1000   CMI_SET_CHECKSUM(copymsg, size);
1001   
1002   CmiGeneralFreeSend(destPE,size,copymsg);
1003 }
1004
1005 /* send msg to its spanning children in broadcast. G. Zheng */
1006 void SendSpanningChildren(int size, char *msg)
1007 {
1008   CmiState cs = CmiGetState();
1009   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1010   int i;
1011
1012   CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1013   int dist = cs->pe-startpe;
1014   if(dist<0) dist+=_Cmi_numpes;
1015   for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1016     int p = BROADCAST_SPANNING_FACTOR*dist + i;
1017     if (p > _Cmi_numpes - 1) break;
1018     p += startpe;
1019     p = p%_Cmi_numpes;
1020     CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1021     CmiSyncSendFn1(p, size, msg);
1022   }
1023 }
1024
1025 /* send msg along the hypercube in broadcast. (Sameer) */
1026 void SendHypercube(int size, char *msg)
1027 {
1028   CmiState cs = CmiGetState();
1029   int curcycle = CMI_GET_CYCLE(msg);
1030   int i;
1031
1032   double logp = CmiNumPes();
1033   logp = log(logp)/log(2.0);
1034   logp = ceil(logp);
1035   
1036   /*  CmiPrintf("In hypercube\n"); */
1037   /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1038
1039   for (i = curcycle; i < logp; i++) {
1040     int p = cs->pe ^ (1 << i);
1041     /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1042     if(p < CmiNumPes()) {
1043       CMI_SET_CYCLE(msg, i + 1);
1044
1045       CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1046       CmiSyncSendFn1(p, size, msg);
1047     }
1048   }
1049 }
1050
1051 void CmiSyncBroadcastFn(int size, char *msg){
1052   char *copymsg;
1053   copymsg = (char *)CmiAlloc(size);
1054   memcpy(copymsg,msg,size);
1055   CmiFreeBroadcastFn(size,copymsg);
1056 }
1057
1058 void CmiFreeBroadcastFn(int size, char *msg){
1059   
1060   //printf("%d: Calling Broadcast %d\n", CmiMyPe(), size);
1061   /*
1062   printf("------------------------------\n\nSender %d :", CmiMyPe());
1063   
1064   for(int count = 0; count < size; count++) {
1065     printf("%2x", msg[count]);
1066   } 
1067   
1068   printf("------------------------------\n\n");
1069   */
1070
1071
1072   CmiState cs = CmiGetState();
1073 #if CMK_BROADCAST_SPANNING_TREE
1074   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1075   SendSpanningChildren(size, msg);
1076   CmiFree(msg);
1077 #elif CMK_BROADCAST_HYPERCUBE
1078   CMI_SET_CYCLE(msg, 0);
1079   SendHypercube(size, msg);
1080   CmiFree(msg);
1081 #else
1082   int i;
1083
1084   for ( i=cs->pe+1; i<_Cmi_numpes; i++ ) 
1085     CmiSyncSendFn(i,size,msg);
1086   
1087   for ( i=0; i<cs->pe; i++ ) 
1088     CmiSyncSendFn(i,size,msg);
1089
1090   CmiFree(msg);
1091 #endif
1092   //SendMsgsUntil(0,0);
1093 }
1094
1095 void CmiSyncBroadcastAllFn(int size, char *msg){
1096   char *copymsg;
1097   copymsg = (char *)CmiAlloc(size);
1098   memcpy(copymsg,msg,size);
1099   CmiFreeBroadcastAllFn(size,copymsg);
1100 }
1101
1102 void CmiFreeBroadcastAllFn(int size, char *msg){
1103   
1104   //printf("%d: Calling All Broadcast %d\n", CmiMyPe(), size);
1105
1106   CmiState cs = CmiGetState();
1107 #if CMK_BROADCAST_SPANNING_TREE
1108   CmiSyncSendFn(cs->pe,size,msg);
1109   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1110   SendSpanningChildren(size, msg);
1111   CmiFree(msg);
1112 #elif CMK_BROADCAST_HYPERCUBE
1113   CmiSyncSendFn(cs->pe,size,msg);
1114   CMI_SET_CYCLE(msg, 0);
1115   SendHypercube(size, msg);
1116   CmiFree(msg);
1117 #else
1118   int i ;
1119
1120   CmiSyncSendFn(CmiMyPe(), size, msg);
1121   
1122   for ( i=0; i<_Cmi_numpes; i++ ) {
1123     if(i== CmiMyPe())
1124       continue;
1125
1126     CmiSyncSendFn(i,size,msg);
1127   }
1128   CmiFree(msg);
1129 #endif
1130   
1131 }
1132
1133 static unsigned long long lastProgress = 0;
1134
1135 static inline void AdvanceCommunications(int max_out){
1136   
1137   if (PCQueueLength (broadcast_q) > 0)
1138     sendBroadcastMessages();
1139
1140 #if BLOCKING_COMM
1141   while(msgQueueLen > maxMessages && msgQBytes > maxBytes){
1142     while(BGML_Messager_advance()>0) ;
1143     sendQueuedMessages();
1144   }
1145   
1146   int target = outstanding_recvs - max_out;
1147   while(target > 0){
1148     while(BGML_Messager_advance()>0) ;
1149     target = outstanding_recvs - max_out;
1150   }
1151 #endif
1152   
1153   while(BGML_Messager_advance()>0);
1154
1155   if (PCQueueLength (broadcast_q) > 0)
1156     sendBroadcastMessages();
1157   
1158 #if CMK_IMMEDIATE_MSG
1159   if(received_immediate) {
1160     CmiHandleImmediate();
1161   }
1162   received_immediate = 0;
1163 #endif
1164   
1165   if (numPosted <= request_max) 
1166     sendQueuedMessages();
1167 }
1168
1169 static inline void SendMsgsUntil(int targetm, int targetb){
1170
1171   sendBroadcastMessages();
1172   
1173   while(msgQueueLen>targetm && msgQBytes > targetb){
1174     while(BGML_Messager_advance()>0) ;
1175     sendQueuedMessages();
1176   }
1177   
1178   while ( BGML_Messager_advance() > 0 );
1179   
1180   sendBroadcastMessages();
1181 }
1182
1183 void CmiNotifyIdle(){  
1184   AdvanceCommunications();
1185
1186   CmiState cs = CmiGetStateN(pe);
1187   //int length = PCQueueLength(cs->recv);
1188   //if(length != 0)
1189   //printf("%d: {%d} PCQueue length = %d\n", CmiMyPe(), bgx_in_interrupt, length);
1190 }
1191
1192 static void CmiNotifyBeginIdle(CmiIdleState *s)
1193 {
1194   s->sleepMs=0;
1195   s->nIdles=0;
1196 }
1197
1198 static void CmiNotifyStillIdle(CmiIdleState *s)
1199
1200 #if ! CMK_SMP
1201   AdvanceCommunications();
1202 #else
1203 /*  CmiYield();  */
1204 #endif
1205
1206 #if 1
1207   {
1208   int nSpins=20; /*Number of times to spin before sleeping*/
1209   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1210   s->nIdles++;
1211   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1212     s->sleepMs+=2;
1213     if (s->sleepMs>10) s->sleepMs=10;
1214   }
1215   /*Comm. thread will listen on sockets-- just sleep*/
1216   if (s->sleepMs>0) {
1217     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1218     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1219     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1220   }       
1221   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1222   }
1223 #endif
1224 }
1225
1226 /*==========================================================*/
1227 /*==========================================================*/
1228 /*==========================================================*/
1229
1230 /************ Recommended routines ***********************/
1231 /************ You dont have to implement these but they are supported
1232  in the converse syntax and some rare programs may crash. But most
1233  programs dont need them. *************/
1234
1235 CmiCommHandle CmiAsyncSendFn(int, int, char *){
1236   CmiAbort("CmiAsyncSendFn not implemented.");
1237   return (CmiCommHandle) 0;
1238 }
1239 CmiCommHandle CmiAsyncBroadcastFn(int, char *){
1240   CmiAbort("CmiAsyncBroadcastFn not implemented.");
1241   return (CmiCommHandle) 0;
1242 }
1243 CmiCommHandle CmiAsyncBroadcastAllFn(int, char *){
1244   CmiAbort("CmiAsyncBroadcastAllFn not implemented.");
1245   return (CmiCommHandle) 0;
1246 }
1247
1248 int           CmiAsyncMsgSent(CmiCommHandle handle){
1249   CmiAbort("CmiAsyncMsgSent not implemented.");
1250   return 0;
1251 }
1252 void          CmiReleaseCommHandle(CmiCommHandle handle){
1253   CmiAbort("CmiReleaseCommHandle not implemented.");
1254 }
1255
1256
1257 /*==========================================================*/
1258 /*==========================================================*/
1259 /*==========================================================*/
1260
1261 /* Optional routines which could use common code which is shared with
1262    other machine layer implementations. */
1263
1264 /* MULTICAST/VECTOR SENDING FUNCTIONS
1265
1266  * In relations to some flags, some other delivery functions may be needed.
1267  */
1268
1269 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1270
1271 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg){
1272   char *copymsg;
1273   copymsg = (char *)CmiAlloc(size);
1274   memcpy(copymsg,msg,size);
1275   CmiFreeListSendFn(npes, pes, size, msg);
1276 }
1277
1278 static int iteration_count = 0;
1279
1280 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
1281   CMI_SET_BROADCAST_ROOT(msg,0);  
1282   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1283   ((CmiMsgHeaderBasic *)msg)->size = size;  
1284   CMI_SET_CHECKSUM(msg, size);
1285   
1286   //printf("%d: In Free List Send Fn\n", CmiMyPe());
1287   
1288   //CmiBecomeImmediate(msg);
1289
1290   iteration_count ++;
1291 #if 0
1292   if(iteration_count > 700) {    
1293     char pbuf[10000];
1294     char tbuf[64];
1295     
1296     sprintf(pbuf, "PE %d sending %d messages of size %d\n", CmiMyPe(), npes, size);
1297     //for(int pcount = 0; pcount < npes; pcount++) {
1298     //sprintf(tbuf," %d,", pes[pcount]);
1299     //strcat(pbuf, tbuf);
1300     //}
1301     printf("\n%s\n", pbuf);
1302   }
1303 #endif
1304   
1305   int i, count = 0, my_loc = -1;
1306   for(i=0; i<npes; i++) {
1307     if(pes[i] == CmiMyPe() || pes[i] == vnpeer) {
1308       CmiSyncSend(pes[i], size, msg);
1309       my_loc = i;
1310     }
1311   }
1312   
1313 #if 0
1314   if (iteration_count > 500 && (npes > 1) && (CmiNumPes() >= 512)) {    
1315     int *newpelist = (int *) malloc (sizeof(int) * npes);
1316     int new_npes = 0;
1317     
1318     for(i=0; i<npes; i++) {
1319       if(pes[i] == CmiMyPe() || pes[i] == vnpeer) 
1320         continue;
1321       else
1322         newpelist[new_npes++] = pes[i];
1323     }
1324
1325     machineMulticast (new_npes, newpelist, size, msg);
1326     AdvanceCommunications();
1327     return;
1328   }
1329 #endif
1330   
1331   for(i=0;i<npes;i++) {
1332     if(pes[i] == CmiMyPe() || pes[i] == vnpeer);
1333     else if(i < npes - 1){
1334       CmiReference(msg);
1335       CmiGeneralFreeSend(pes[i], size, msg);
1336       
1337       //CmiSyncSend(pes[i], size, msg);
1338     }
1339     
1340 #if CMK_PERSISTENT_COMM
1341     if(phs) 
1342       phscount ++;
1343 #endif
1344   }
1345   
1346   if (npes  && (pes[npes-1] != CmiMyPe() && pes[npes-1] != vnpeer))
1347     CmiSyncSendAndFree(pes[npes-1], size, msg);
1348   else 
1349     CmiFree(msg);
1350   
1351   phscount = 0;
1352   
1353   AdvanceCommunications();
1354 }
1355
1356 #if 0
1357
1358 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
1359   CMI_SET_BROADCAST_ROOT(msg,0);  
1360   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1361   ((CmiMsgHeaderBasic *)msg)->size = size;  
1362   CMI_SET_CHECKSUM(msg, size);
1363   
1364   //printf("%d: In Free List Send Fn\n", CmiMyPe());
1365   
1366   CmiBecomeImmediate(msg);
1367   iteration_count ++;
1368   /*
1369   if(iteration_count == 9500) {
1370     
1371     char pbuf[10000];
1372     char tbuf[64];
1373
1374     sprintf(pbuf, "PE %d sending %d messages of size %d to :", CmiMyPe(), npes, size);
1375     for(int pcount = 0; pcount < npes; pcount++) {
1376       sprintf(tbuf," %d,", pes[pcount]);
1377       strcat(pbuf, tbuf);
1378     }
1379
1380     printf("\n%s\n", pbuf);
1381   }
1382   */
1383
1384   int new_npes = 0;
1385
1386   int i, count = 0, my_loc = -1;
1387   for(i=0; i<npes; i++) {
1388     if(pes[i] == CmiMyPe() || pes[i] == vnpeer) {
1389       CmiSyncSend(pes[i], size, msg);
1390       my_loc = i;
1391     }
1392   }
1393   
1394 #if 0
1395   int *pelist  = (int *) CmiAlloc (sizeof(int) * npes);
1396   if(iteration_count > 700) {
1397
1398     CmiAssert (vnpeer < 0);
1399     
1400     if(my_loc >= 0) {
1401       for(i=0; i<my_loc; i++) {
1402         int loc = (19*my_loc + 7 * (i+1))% (npes-1);
1403         pelist[loc] = pes[i];
1404       }
1405       
1406       for(i ++; i < npes; i ++) {
1407         int loc = (19*my_loc + 7 * i) % (npes-1);
1408         pelist[loc] = pes[i];
1409       }
1410
1411       new_npes = npes - 1;
1412     }
1413     else {
1414       for(i=0; i<npes; i++) {
1415         int loc = (19 + 7 * (i+1)) % npes;
1416         pelist[i] = pes[i];
1417       }
1418       new_npes = npes;
1419     }
1420     
1421     if(new_npes <= request_max) {
1422       machineMulticast (new_npes, pelist, size, msg);
1423       //SendMsgsUntil(0,0);
1424       AdvanceCommunications();
1425       return;
1426     }
1427   }
1428   CmiFree (pelist);
1429 #endif  
1430
1431
1432   for(i=0;i<npes;i++) {
1433     if(pes[i] == CmiMyPe() || pes[i] == vnpeer);
1434     else if(i < npes - 1){
1435       CmiReference(msg);
1436       CmiGeneralFreeSend(pes[i], size, msg);
1437       
1438       //CmiSyncSend(pes[i], size, msg);
1439     }
1440     
1441 #if CMK_PERSISTENT_COMM
1442     if(phs) 
1443       phscount ++;
1444 #endif
1445   }
1446   
1447   if (npes  && (pes[npes-1] != CmiMyPe() && pes[npes-1] != vnpeer))
1448     CmiSyncSendAndFree(pes[npes-1], size, msg);
1449   else 
1450     CmiFree(msg);
1451   
1452   phscount = 0;
1453   
1454   //  SendMsgsUntil(0,0);
1455   SendMsgsUntil(maxMessages, maxBytes);
1456   AdvanceCommunications();
1457 }
1458 #endif
1459
1460
1461 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg){
1462   CmiAbort("CmiAsyncListSendFn not implemented.");
1463   return (CmiCommHandle) 0;
1464 }
1465 #endif
1466
1467 /** NODE SENDING FUNCTIONS
1468
1469  * If there is a node queue, and we consider also nodes as entity (tipically in
1470  * SMP versions), these functions are needed.
1471  */
1472
1473 #if CMK_NODE_QUEUE_AVAILABLE
1474
1475 void          CmiSyncNodeSendFn(int, int, char *);
1476 CmiCommHandle CmiAsyncNodeSendFn(int, int, char *);
1477 void          CmiFreeNodeSendFn(int, int, char *);
1478
1479 void          CmiSyncNodeBroadcastFn(int, char *);
1480 CmiCommHandle CmiAsyncNodeBroadcastFn(int, char *);
1481 void          CmiFreeNodeBroadcastFn(int, char *);
1482
1483 void          CmiSyncNodeBroadcastAllFn(int, char *);
1484 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int, char *);
1485 void          CmiFreeNodeBroadcastAllFn(int, char *);
1486
1487 #endif
1488
1489
1490 #if CMK_SHARED_VARS_POSIX_THREADS_SMP /*Used by the net-*-smp versions*/
1491
1492 int _Cmi_numpes;
1493 int _Cmi_mynodesize;
1494 int _Cmi_mynode;
1495 int _Cmi_numnodes;
1496
1497 int CmiMyPe();
1498 int CmiMyRank();
1499 int CmiNodeFirst(int node);
1500 int CmiNodeSize(int node);
1501 int CmiNodeOf(int pe);
1502 int CmiRankOf(int pe);
1503
1504 /* optional, these functions are implemented in "machine-smp.c", so including
1505    this file avoid the necessity to reimplement them.
1506  */
1507 void CmiNodeBarrier(void);
1508 void CmiNodeAllBarrier(void);
1509 CmiNodeLock CmiCreateLock();
1510 void CmiDestroyLock(CmiNodeLock lock);
1511
1512 #endif
1513
1514 /** IMMEDIATE MESSAGES
1515
1516  * If immediate messages are supported, the following function is needed. There
1517  * is an exeption if the machine progress is also defined (see later for this).
1518
1519  * Moreover, the file "immediate.c" should be included, otherwise all its
1520  * functions and variables have to be redefined.
1521 */
1522
1523 #if CMK_CCS_AVAILABLE
1524
1525 #include "immediate.c"
1526
1527 #if ! CMK_MACHINE_PROGRESS_DEFINED /* Hack for some machines */
1528 void CmiProbeImmediateMsg();
1529 #endif
1530
1531 #endif
1532
1533
1534 /** MACHINE PROGRESS DEFINED
1535
1536  * Some machines (like BlueGene/L) do not have coprocessors, and messages need
1537  * to be pulled out of the network manually. For this reason the following
1538  * functions are needed. Notice that the function "CmiProbeImmediateMsg" must
1539  * not be defined anymore.
1540  */
1541
1542 #if CMK_MACHINE_PROGRESS_DEFINED
1543
1544
1545
1546 void CmiMachineProgressImpl()
1547 {
1548 #if 0
1549   unsigned long long new_time = rts_get_timebase();  
1550   if(new_time < lastProgress + progress_cycles) {
1551     return;
1552   }
1553   lastProgress = new_time;
1554 #endif
1555
1556 #if !CMK_SMP
1557   AdvanceCommunications();
1558 #else
1559   /*Not implemented yet. Communication server does not seem to be
1560     thread safe */
1561   /* CommunicationServerThread(0); */
1562 #endif  
1563 }
1564
1565 #endif
1566
1567 /* Dummy implementation */
1568 extern "C" void CmiBarrier()
1569 {
1570   BGTsC_Barrier(0);
1571   //BGTr_Barrier(1);
1572 }
1573