recovery from hard failures
[charm.git] / src / arch / elan / machine.c
1 /** @file
2  * Elan machine layer
3  * @ingroup Machine
4 */
5
6 /* Charm++ Machine Layer for ELAN network interface 
7 Developed by Sameer Kumar
8 */
9
10 #include <stdio.h>
11 #include <sys/time.h>
12 #include <assert.h>
13 #include <errno.h>
14 #include "converse.h"
15 #include <elan/elan.h>
16 /*#include <elan3/elan3.h>*/
17
18 /*Support for ++debug: */
19 #include <unistd.h> /*For getpid()*/
20 #include <stdlib.h> /*For sleep()*/
21
22 #include "machine.h"
23 #include "pcqueue.h"
24
25 #if CMK_PERSISTENT_COMM
26 #include "persist_impl.h"
27 #endif
28
29 /* copy from elan/version.h */
30 #ifndef QSNETLIBS_VERSION_CODE
31 #define QSNETLIBS_VERSION(a,b,c)        (((a) << 16) + ((b) << 8) + (c))
32 #define QSNETLIBS_VERSION_CODE          QSNETLIBS_VERSION(1,3,0)
33 #endif
34
35 #define MAX_QLEN 1000
36 #define MAX_BYTES 1000000
37
38 #define USE_SHM 0
39
40 /*
41   To reduce the buffer used in broadcast and distribute the load from 
42   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of 
43   spanning tree broadcast algorithm.
44   This will use the fourth short in message as an indicator of spanning tree
45   root.
46 */
47 #define CMK_BROADCAST_SPANNING_TREE    1
48 #define BROADCAST_SPANNING_FACTOR      4
49
50 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
51 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
52 #define CMI_MESSAGE_SIZE(msg)            ((CmiMsgHeaderBasic *)msg)->size
53
54 #if CMK_BROADCAST_SPANNING_TREE
55 #  define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
56 #else
57 #  define CMI_SET_BROADCAST_ROOT(msg, root)
58 #endif
59
60 ELAN_BASE     *elan_base;
61 ELAN_TPORT    *elan_port;
62 ELAN_QUEUE    *elan_q;
63
64 int enableGetBasedSend = 1;
65 int enableBufferPooling = 0;
66
67 int SMALL_MESSAGE_SIZE=4080;  /* Smallest message size queue 
68                                  used for receiving short messages */
69                                      
70 int MID_MESSAGE_SIZE=65536;     /* Queue for larger messages 
71                                    which need pre posted receives
72                                    Message sizes greater will be 
73                                    probe received adding 5us overhead*/
74 #define SYNC_MESSAGE_SIZE MID_MESSAGE_SIZE * 10
75                                /* Message sizes greater will be 
76                                   sent synchronously thus avoiding copying*/
77
78 #define NON_BLOCKING_MSG  4     /* Message sizes greater 
79                                     than this will be sent asynchronously*/
80 #define RECV_MSG_Q_SIZE  8   //Maximim queue size for short messages
81 #define MID_MSG_Q_SIZE   4   //Maximum queue size for mid-range messages
82
83 //Actual sizes, can also be set from the command line
84 int smallQSize = RECV_MSG_Q_SIZE;
85 int midQSize = MID_MSG_Q_SIZE;
86
87 ELAN_EVENT *esmall[RECV_MSG_Q_SIZE], *emid[MID_MSG_Q_SIZE], *elarge;
88
89 #define TAG_SMALL 0x1
90 #define TAG_LARGE_HEADER 0x3     /* Header that a large message is coming*/
91 #define TAG_GET_BASED_SEND 0x5     /* Header that a large message is coming
92                                     as a get request*/
93 #define TAG_MID   0x10
94 #define TAG_LARGE 0x100
95
96 /*Release sent messages status to check for*/
97 #define BASIC_SEND 0
98 #define GET_BASED_SEND 1
99 #define RECEIVE_GET   2
100 #define GET_FINISHED_RECEIVE 3
101
102 int               _Cmi_mynode;    /* Which address space am I */
103 int               _Cmi_mynodesize;/* Number of processors in my address space */
104 int               _Cmi_numnodes;  /* Total number of address spaces */
105 int               _Cmi_numpes;    /* Total number of processors */
106
107 static int        Cmi_nodestart; /* First processor in this address space */ 
108 CpvDeclare(void*, CmiLocalQueue);
109
110 #define BLK_LEN  512
111
112 static int MsgQueueLen=0;
113 static int MsgQueueBytes=0;
114 static int request_max;
115 static int request_bytes;
116
117 #include "pcqueue.h"
118 PCQueue localSmallBufferQueue;
119 PCQueue localMidBufferQueue;
120
121 int outstandingMsgs[3000];
122
123 int stretchFlag = 0;
124 int blockingReceiveFlag = 0;
125
126 static void ConverseRunPE(int everReturn);
127
128 typedef struct {
129     char header[CmiMsgHeaderSizeBytes];
130     int size;
131     char* src_addr;
132     char* flag_addr;
133 } GetHeader;
134
135 typedef struct msg_list {
136     ELAN_EVENT *e;
137     char *msg;
138     struct msg_list *next;
139     int size, destpe;
140     int sent;
141     
142     //Fields for get based send
143     int status;
144     long done;
145     long *flag_addr;
146     char *gmsg;
147     char *newmsg;       
148     int is_broadcast;
149 } SMSG_LIST;
150
151
152 static int Cmi_dim;
153
154 static SMSG_LIST *sent_msgs=0;
155 static SMSG_LIST *end_sent=0;
156 static SMSG_LIST *cur_unsent=0;
157
158 void ElanSendQueuedMessages();
159 static int CmiReleaseSentMessages();
160
161 void ElanGetBasedSend(SMSG_LIST *ptr);
162 void handleGetHeader(char *msg, int src);
163 void processGetEnv(SMSG_LIST *ptr);
164
165 #if NODE_0_IS_CONVHOST
166 int inside_comm = 0;
167 #endif
168
169 double starttimer;
170
171 void CmiAbort(const char *message);
172 static void PerrorExit(const char *msg);
173
174 void SendSpanningChildren(int size, char *msg);
175
176 typedef struct __elanChunkHeader {
177   int type;
178   int size;
179 } ElanChunkHeader;
180
181 typedef struct __chunkHeader {
182   ElanChunkHeader elan;
183   CmiChunkHeader conv;
184 } ChunkHeader;
185
186 #define TYPE_FIELD(buf)       (((ChunkHeader*)(buf))->elan.type)
187 #define SIZE_FIELD(buf)       (((ChunkHeader*)(buf))->elan.size)
188 #define CONV_SIZE_FIELD(buf)  (((ChunkHeader*)(buf))->conv.size)
189 #define REF_FIELD(buf)        (((ChunkHeader*)(buf))->conv.ref)
190
191 // CONV_BUF_START moves the res from pointing to the start of the elan chunk to the start of the converse chunk
192 #define CONV_BUF_START(res)     ((char*)(res) + sizeof(ElanChunkHeader))
193
194 // MACHINE_BUF_START moves the res from pointing to the start of the converse chunk to the start of the elan chunk
195 #define MACHINE_BUF_START(res)  ((char*)(res) - sizeof(ElanChunkHeader))
196
197 // USER_BUF_START moves the res from pointing to the start of the payload to the start of the elan chunk
198 #define USER_BUF_START(res)     ((char*)(res) - sizeof(ChunkHeader))
199
200 #define DYNAMIC_MESSAGE 0
201 #define STATIC_MESSAGE 1
202 #define ELAN_MESSAGE 3
203
204 static void PerrorExit(const char *msg)
205 {
206   perror(msg);
207   exit(1);
208 }
209
210 /**************************  TIMER FUNCTIONS **************************/
211
212 #if CMK_TIMER_USE_SPECIAL
213
214 #include <sys/timers.h>
215 void CmiTimerInit(char **argv)
216 {
217     starttimer =  elan_clock(elan_base->state); 
218 }
219
220 double CmiTimer(void)
221 {
222     return (elan_clock(elan_base->state) - starttimer)/1e9;
223 }
224
225 double CmiWallTimer(void)
226 {
227     return (elan_clock(elan_base->state) - starttimer)/1e9;
228 }
229
230 double CmiCpuTimer(void)
231 {
232     return (elan_clock(elan_base->state) - starttimer)/1e9;
233 }
234
235 #endif
236
237 static PCQueue   msgBuf;
238
239 /************************************************************
240  * 
241  * Processor state structure
242  *
243  ************************************************************/
244
245 /*****
246       SMP version Extend later, currently only NON SMP version 
247 ***************/
248
249 #include "machine-smp.c"
250
251 CsvDeclare(CmiNodeState, NodeState);
252
253 static struct CmiStateStruct Cmi_state;
254 int _Cmi_mype;
255 int _Cmi_myrank;
256
257 #include "immediate.c"
258
259 void CmiMemLock(void) {}
260 void CmiMemUnlock(void) {}
261
262 #define CmiGetState() (&Cmi_state)
263 #define CmiGetStateN(n) (&Cmi_state)
264
265 static void CmiStartThreads(char **argv)
266 {
267   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
268   _Cmi_mype = Cmi_nodestart;
269   _Cmi_myrank = 0;
270 }      
271
272 /*Add a message to this processor's receive queue */
273 void CmiPushPE(int pe,void *msg)
274 {
275   CmiState cs=CmiGetStateN(pe);
276   MACHSTATE1(2,"Pushing message into %d's queue",pe);
277 #if CMK_IMMEDIATE_MSG
278   if (CmiIsImmediate(msg)) {
279       /*CmiPrintf("[node %d] Immediate Message %d %d {{. \n", CmiMyNode(), CmiGetHandler(msg), _ImmediateMsgHandlerIdx);*/
280       /*CmiHandleMessage(msg);*/
281       CmiPushImmediateMsg(msg);
282       /*CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());*/
283       return;
284   }
285 #endif
286   CmiIdleLock_addMessage(&cs->idle); 
287   PCQueuePush(cs->recv,msg);
288 }
289
290 #ifndef CmiMyPe
291 int CmiMyPe(void)
292 {
293   return CmiGetState()->pe;
294 }
295 #endif
296
297 #ifndef CmiMyRank
298 int CmiMyRank(void)
299 {
300   return CmiGetState()->rank;
301 }
302 #endif
303
304 #ifndef CmiNodeFirst
305 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
306 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
307 #endif
308
309 #ifndef CmiNodeOf
310 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
311 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
312 #endif
313
314 int CmiAllAsyncMsgsSent(void)
315 {
316    SMSG_LIST *msg_tmp = NULL; 
317
318    int done;
319    
320    CmiReleaseSentMessages();
321
322    msg_tmp = sent_msgs;
323    while((msg_tmp != cur_unsent) && (msg_tmp->e != NULL)){
324        done = 0;
325     
326        if(elan_tportTxDone(msg_tmp->e))
327            done = 1;
328        else
329 #if USE_SHM 
330            elan_deviceCheck(elan_base->state);
331 #else 
332        ;
333 #endif
334
335        if(!done)
336            return 0;
337        msg_tmp = msg_tmp->next;
338        //    MsgQueueLen--;
339    }
340    return 1;
341 }
342
343 int CmiAsyncMsgSent(CmiCommHandle c) {
344      
345   SMSG_LIST *msg_tmp = sent_msgs;
346   int done;
347
348   while ((msg_tmp) && (msg_tmp ->e != NULL) && 
349          ((CmiCommHandle)(msg_tmp->e) != c))
350       msg_tmp = msg_tmp->next;
351   
352   if(msg_tmp) {
353     done = 0;
354     
355     if(elan_tportTxDone(msg_tmp->e))
356         done = 1;
357     else 
358 #if USE_SHM 
359            elan_deviceCheck(elan_base->state);
360 #else 
361        ;
362 #endif
363     
364     return ((done)?1:0);
365   } else {
366       return 1;
367   }
368 }
369
370 void CmiReleaseCommHandle(CmiCommHandle c)
371 {
372   return;
373 }
374
375 void release_pmsg_list();
376
377 #define MAX_RELEASE_POLL 4096
378
379 static int CmiReleaseSentMessages(void)
380 {
381     SMSG_LIST *msg_tmp=sent_msgs;
382     SMSG_LIST *prev=0;
383     SMSG_LIST *temp;
384     int done;
385     int locked = 0;
386     
387 #ifndef CMK_OPTIMIZE 
388     double rel_start_time = CmiWallTimer();
389 #endif
390
391 #if CMK_PERSISTENT_COMM
392   release_pmsg_list();
393 #endif
394
395   int ncheck = MAX_RELEASE_POLL;
396
397   while(msg_tmp != NULL && ncheck > 0){
398       if(msg_tmp->sent) {
399           done =0;
400
401           if(msg_tmp->status == BASIC_SEND) {
402               if(elan_tportTxDone(msg_tmp->e)) {
403                   elan_tportTxWait(msg_tmp->e);
404                   done = 1;
405               }          
406               else 
407 #if USE_SHM 
408                   elan_deviceCheck(elan_base->state);
409 #else 
410               ;
411 #endif
412           }
413           else {
414               processGetEnv(msg_tmp);
415               done = msg_tmp->done;
416           }
417           
418           if(done) {
419               MsgQueueLen--;
420               MsgQueueBytes -= msg_tmp->size;
421               
422               outstandingMsgs[msg_tmp->destpe] = 0;
423               
424               /* Release the message */
425               temp = msg_tmp->next;
426               if(prev==0)  /* first message */
427                   sent_msgs = temp;
428               else
429                   prev->next = temp;
430               
431               CmiFree(msg_tmp->msg);
432               CmiFree(msg_tmp);
433               msg_tmp = temp;
434           } else {
435               prev = msg_tmp;
436               msg_tmp = msg_tmp->next;
437               ncheck --;
438           }
439       }
440       else {
441           prev = msg_tmp;
442           msg_tmp = msg_tmp->next;
443       }
444   }
445   
446   //if(msg_tmp)
447   //  elan_deviceCheck(elan_base->state);
448
449   end_sent = prev;
450
451 #if CMK_PERSISTENT_COMM
452   release_pmsg_list();
453 #endif
454
455 #ifndef CMK_OPTIMIZE 
456 #if ! CMK_TRACE_IN_CHARM
457   {
458   double rel_end_time = CmiWallTimer();
459   if(rel_end_time > rel_start_time + 50/1e6)
460       traceUserBracketEvent(20, rel_start_time, rel_end_time);
461   }
462 #endif
463 #endif
464
465   //So messages not finished sending
466   if(msg_tmp != cur_unsent)
467       return 0;
468   else
469       //All messages sent
470       return 1;
471 }
472
473 /* retflag = 0, receive as many messages as can be 
474    and then post another receive
475    retflag = 1, receive the first message and return 
476    retflag =3 blocking receives
477
478    Pump Msgs posts a circular queue of receives. The main idea 
479    is that if a large number of receives are being posted only 
480    minimum receive events should be polled. 
481
482    So there are two indices, event_idx and post_idx. event_idx points
483    to the first posted receive. So if a large number of messages come
484    for a particular tag then this position in the queue will receive
485    the first message. The new receives should be posted from
486    post_idx. Notice that event_idx and post_idx are not the same. If a
487    large number of messages are received then receives should be
488    posted from the position of where first message was received.
489
490 */
491 int PumpMsgs(int retflag)
492 {
493
494     static char recv_small_done[RECV_MSG_Q_SIZE]; /*A list of flags which 
495                                                     tells if a particular 
496                                                     slot in the queue has 
497                                                     a receive posted or not. */
498     static char recv_mid_done[MID_MSG_Q_SIZE];  
499     static int recv_large_done = 0;
500     
501     static char *sbuf[RECV_MSG_Q_SIZE];    /* Buffer of pointer to the 
502                                               messages received */
503     static char *mbuf[MID_MSG_Q_SIZE];
504     static char *lbuf;
505     
506     static int event_idx = 0;             /* As defined earlier */
507     static int post_idx = 0;
508
509     static int event_m_idx = 0;
510     static int post_m_idx = 0;
511
512     static int nlarge_torecv = 0; /*this variable specifies how many
513                                     large messages need to be received
514                                     before we can block again.*/
515
516     static int step1 = 0;   /* Large message are received in two
517                                steps, first the envelope is probed by
518                                posting a receive and then memory is
519                                allocated for the message and the
520                                message is finally received */
521
522     int flg, res, rcount, mcount;
523     char *msg = 0;
524     
525     int recd=0;
526 #if QSNETLIBS_VERSION_CODE > QSNETLIBS_VERSION(1,4,0)
527     unsigned long size= 0;
528 #else
529     int size= 0;
530 #endif
531     int tag=0;
532     int src=-1;
533     
534 #ifndef CMK_OPTIMIZE 
535     double pmp_start_time = CmiWallTimer();
536 #endif
537
538     int ecount = 0, emcount = 0;
539
540 #if CMK_PERSISTENT_COMM
541     if (PumpPersistent()) return 1;
542 #endif
543         
544     while(1) {
545         msg = 0;
546         
547         ecount = 0;
548         for(rcount = 0; rcount < smallQSize; rcount ++){
549             ecount = (rcount + post_idx) % smallQSize;
550             if(!recv_small_done[ecount]) {
551                 sbuf[ecount] = (char *) CmiAlloc(SMALL_MESSAGE_SIZE);
552                 
553                 esmall[ecount] = elan_tportRxStart(elan_port, 0, 0, 0, 1, 
554                                                    TAG_SMALL, sbuf[ecount], 
555                                                    SMALL_MESSAGE_SIZE);
556                 recv_small_done[ecount] = 1;
557             }
558             else {
559                 ecount = (ecount + smallQSize - 1) % smallQSize;
560                 break;
561             }
562         }
563         post_idx = ecount + 1;
564
565         emcount = 0;
566         for(mcount = 0; mcount < midQSize; mcount ++){
567             emcount = (mcount + post_m_idx) % midQSize;
568             if(!recv_mid_done[emcount]) {
569                 mbuf[emcount] = (char *) CmiAlloc(MID_MESSAGE_SIZE);
570                                                 
571                 emid[emcount] = elan_tportRxStart(elan_port, 0, 0, 0, -1, 
572                                                   TAG_MID, mbuf[emcount], 
573                                                   MID_MESSAGE_SIZE);
574                 recv_mid_done[emcount] = 1;
575             }
576             else {
577                 emcount = (emcount + midQSize - 1) % midQSize;
578                 break;
579             }
580         }
581         post_m_idx = emcount + 1;        
582         
583         if(!recv_large_done) {
584             elarge = elan_tportRxStart(elan_port, ELAN_TPORT_RXPROBE, 0, 0, 
585                                        -1, TAG_LARGE, NULL, 0);
586             recv_large_done = 1;
587         }
588     
589         if(!step1 && elan_tportRxDone(elarge)) {
590             elan_tportRxWait(elarge, NULL, NULL, &size );
591       
592             lbuf = (char *) CmiAlloc(size);
593             elarge = elan_tportRxStart(elan_port, 0, 0, 0, -1, TAG_LARGE, 
594                                        lbuf,size);
595             step1 = 1;
596         }
597         
598         if(step1 && elan_tportRxDone(elarge)) {
599             elan_tportRxWait(elarge, NULL, NULL, &size);
600             
601             msg = lbuf;
602             recv_large_done = 0;
603             flg = 1;
604             
605             CmiPushPE(CMI_DEST_RANK(msg), msg);
606 #if CMK_BROADCAST_SPANNING_TREE
607             if (CMI_BROADCAST_ROOT(msg))
608                 SendSpanningChildren(size, msg);
609 #endif
610             step1 = 0;
611             
612             if(blockingReceiveFlag)
613                 nlarge_torecv --;
614         }
615
616         emcount = 0;
617         for(mcount = 0; mcount < midQSize; mcount ++){
618             emcount = (mcount + event_m_idx) % midQSize;
619             if(elan_tportRxDone(emid[emcount])) {
620                 elan_tportRxWait(emid[emcount], NULL, NULL, &size);
621                 
622                 msg = mbuf[emcount];
623                 mbuf[emcount] = NULL;
624                 
625                 recv_mid_done[emcount] = 0;
626                 flg = 1;
627                 
628                 CmiPushPE(CMI_DEST_RANK(msg), msg);
629                 
630 #if CMK_BROADCAST_SPANNING_TREE
631                 if (CMI_BROADCAST_ROOT(msg))
632                     SendSpanningChildren(size, msg);
633 #endif
634                 if(blockingReceiveFlag)
635                     nlarge_torecv --;
636             }
637             else {
638 #if USE_SHM 
639                 elan_deviceCheck(elan_base->state);
640 #else 
641                 ;
642 #endif
643                 emcount = (emcount + midQSize - 1) % midQSize;
644                 break;
645             }
646         }
647         event_m_idx = emcount + 1;
648         
649         ecount = 0;
650         for(rcount = 0; rcount < smallQSize; rcount ++){
651             ecount = (rcount + event_idx) % smallQSize;
652             if(elan_tportRxDone(esmall[ecount]) || 
653                (retflag == 3 && nlarge_torecv == 0 && !flg)) {
654                 elan_tportRxWait(esmall[ecount], &src, &tag, &size );
655                 
656                 msg = sbuf[ecount];
657                 sbuf[ecount] = NULL;
658                 
659                 recv_small_done[ecount] = 0;
660                     
661                 if(tag == TAG_SMALL) {
662                     flg = 1;
663                     CmiPushPE(CMI_DEST_RANK(msg), msg);                
664 #if CMK_BROADCAST_SPANNING_TREE
665                     if (CMI_BROADCAST_ROOT(msg))
666                         SendSpanningChildren(size, msg);
667 #endif
668                 }
669                 else if(tag == TAG_LARGE_HEADER) {
670                     //CmiPrintf("[%d] Received Header\n", CmiMyPe());
671                     nlarge_torecv ++;
672                     CmiFree(msg);
673                 }
674                 else if(tag == TAG_GET_BASED_SEND) {
675                     handleGetHeader(msg, src);
676                 }                    
677
678                 if(retflag == 3)
679                     retflag = 1;
680             }
681             else {
682 #if USE_SHM 
683                 elan_deviceCheck(elan_base->state);
684 #else 
685                 ;
686 #endif
687                 ecount = (ecount + smallQSize - 1) % smallQSize;
688                 break;
689             }
690         }
691         event_idx = ecount + 1;
692         
693 #if CMK_PERSISTENT_COMM
694         PumpPersistent();
695 #endif
696         
697         if(!flg) {
698 #ifndef CMK_OPTIMIZE 
699 #if ! CMK_TRACE_IN_CHARM
700
701             double pmp_end_time = CmiWallTimer();
702             if(pmp_end_time > pmp_start_time + 50/1e6)
703                 traceUserBracketEvent(10, pmp_start_time, pmp_end_time);
704 #endif
705 #endif
706 #if CMK_IMMEDIATE_MSG && !CMK_SMP
707             CmiHandleImmediate();
708 #endif
709             return recd;    
710         }
711         
712         if (retflag) {
713 #ifndef CMK_OPTIMIZE 
714 #if ! CMK_TRACE_IN_CHARM
715             double pmp_end_time = CmiWallTimer();
716             if(pmp_end_time > pmp_start_time + 50/1e6)
717                 traceUserBracketEvent(10, pmp_start_time, pmp_end_time);
718 #endif
719 #endif
720 #if CMK_IMMEDIATE_MSG && !CMK_SMP
721             CmiHandleImmediate();
722 #endif
723             return flg;
724         }
725         
726         recd = 1;
727         flg = 0;
728     }
729 #if CMK_IMMEDIATE_MSG && !CMK_SMP
730     CmiHandleImmediate();
731 #endif
732     return recd;
733 }
734
735 void *remote_get(void * srcptr, void *destptr, int size, int srcPE){
736     return (void *)elan_get(elan_base->state, srcptr, destptr, size, srcPE);
737 }
738
739 int remote_get_done(void *e){
740     ELAN_EVENT *evt = (ELAN_EVENT *)e;
741
742     int flag = elan_poll(evt, ELAN_POLL_EVENT);
743     return flag;
744 }
745
746 /*
747 void remote_get_wait_all(){
748     elan_getWaitAll(elan_base->state, ELAN_WAIT_EVENT);
749 }
750
751 void remote_put_wait_all(){
752     elan_putWaitAll(elan_base->state, ELAN_WAIT_EVENT);
753 }
754 */
755
756 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
757 void *CmiGetNonLocal(void)
758 {
759     register CmiState cs = CmiGetState();
760     register void *msg = NULL;
761     CmiIdleLock_checkMessage(&cs->idle);
762     
763     msg =  PCQueuePop(cs->recv); 
764     
765     if(msg) {
766         return msg;
767     }
768
769     //get new messages and flush receive buffers
770     PumpMsgs(1);           // PumpMsgs(0)
771     //we are idle do more work
772     CmiReleaseSentMessages();
773     ElanSendQueuedMessages();
774
775     msg =  PCQueuePop(cs->recv); 
776     return msg;
777 }
778
779 void CmiPing() {
780     CmiReleaseSentMessages();
781     PumpMsgs(0);
782     ElanSendQueuedMessages();
783 }
784
785
786 void enableBlockingReceives(){
787     blockingReceiveFlag = 1;
788 }
789
790 void disableBlockingReceives(){
791     blockingReceiveFlag = 0;
792 }
793
794 static int toggle = 0;  //Blocking receive posted only after all idle
795                         //handlers are called
796
797 void CmiNotifyIdle(void)
798 {
799     static int previousSleepTime = 0;
800     CmiReleaseSentMessages();
801     ElanSendQueuedMessages();
802
803     PumpMsgs(1);    
804     toggle = 0;
805 }
806
807 void CmiNotifyStillIdle(void)
808 {
809     static int previousSleepTime = 0;
810     CmiReleaseSentMessages();
811     ElanSendQueuedMessages();
812     
813     if(!PumpMsgs(1) && blockingReceiveFlag && toggle){
814         if (!PCQueueEmpty(CmiGetState()->recv)) return; 
815         if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
816         if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
817         if (sent_msgs) return;
818         if (cur_unsent) return;
819         PumpMsgs(3); 
820     }
821     toggle = 1;
822 }
823
824
825  
826 #if CMK_IMMEDIATE_MSG
827 void CmiProbeImmediateMsg()
828 {
829     PumpMsgs(0);
830 }
831 #endif
832 /********************* MESSAGE SEND FUNCTIONS ******************/
833
834 static void CmiSendSelf(char *msg)
835 {
836 #if CMK_IMMEDIATE_MSG
837     if (CmiIsImmediate(msg)) {
838       /* CmiBecomeNonImmediate(msg); */
839       CmiHandleImmediateMessage(msg);
840       return;
841     }
842 #endif
843     CQdCreate(CpvAccess(cQdState), 1);
844     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
845 }
846
847 void CmiSyncSendFn(int destPE, int size, char *msg)
848 {
849     CmiState cs = CmiGetState();
850     
851     char *dupmsg;
852     dupmsg = (char *) CmiAlloc(size);
853     memcpy(dupmsg, msg, size);
854     
855     //  CmiPrintf("Setting root to %d\n", 0);
856     CMI_SET_BROADCAST_ROOT(dupmsg, 0);
857     
858     if (cs->pe==destPE)
859         CmiSendSelf(dupmsg);
860     else
861         CmiAsyncSendFn(destPE, size, dupmsg);
862 }
863
864 void ElanBasicSendFn(SMSG_LIST * ptr){
865     int tag = 0, sync_mode = 0;
866     int tiny_msg = 0;
867     
868     ptr->status = BASIC_SEND;
869     
870     if (ptr->size <= SMALL_MESSAGE_SIZE)
871         tag = TAG_SMALL;
872     else if (ptr->size < MID_MESSAGE_SIZE)
873         tag = TAG_MID;
874     else {        
875         if(!ptr->is_broadcast && enableGetBasedSend) {
876             ElanGetBasedSend(ptr);
877             return;
878         }
879         
880         tag = TAG_LARGE;
881     }
882
883     //if(ptr->size > SYNC_MESSAGE_SIZE)
884     //  sync_mode = ELAN_TPORT_TXSYNC;
885     
886     tiny_msg = 0; //A sizeof(int) byte message 
887     //sent to wake up a blocked process
888     
889     //WAKE A PROCESS SLEEPING ON A BLOCKING RECEIVE UP,
890     //WITH A SMALL MESSAGE THAT MATCHES THE TAG OF A SMALL MESSAGE
891     //BUT IS ACTUALLY A MID OR LARGE MESSAGE
892     if(ptr->size > SMALL_MESSAGE_SIZE && blockingReceiveFlag) {
893         elan_tportTxWait(elan_tportTxStart(elan_port, 0, ptr->destpe, 
894                                            CmiMyPe(), TAG_LARGE_HEADER, 
895                                            &tiny_msg, sizeof(int)));
896     }
897
898     ptr->e = elan_tportTxStart(elan_port, sync_mode, ptr->destpe, CmiMyPe(),
899                                tag, ptr->msg, ptr->size);
900     ptr->sent = 1;
901     
902     MsgQueueLen++;
903     MsgQueueBytes += ptr->size;
904     
905     outstandingMsgs[ptr->destpe] = 1;
906 }
907
908 CmiCommHandle ElanSendFn(int destPE, int size, char *msg, int flag)
909 {
910     CmiState cs = CmiGetState();
911     SMSG_LIST *msg_tmp;
912     CmiUInt2  rank, node;
913     
914     if(destPE == cs->pe) {
915         char *dupmsg = (char *) CmiAlloc(size);
916         memcpy(dupmsg, msg, size);
917         CmiSendSelf(dupmsg);
918         return 0;
919     }
920     
921     CQdCreate(CpvAccess(cQdState), 1);
922 #if CMK_PERSISTENT_COMM
923     if (phs) {
924         CmiAssert(phsSize == 1);
925         CmiSendPersistentMsg(*phs, destPE, size, msg);
926         return NULL;
927     }
928 #endif
929     
930     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
931     msg_tmp->msg = msg;
932     msg_tmp->next = 0;
933     msg_tmp->size = size;
934     msg_tmp->sent = 0;
935     msg_tmp->e = NULL;
936     msg_tmp->destpe = destPE;
937     msg_tmp->is_broadcast = flag;
938
939     if ((MsgQueueLen > request_max || MsgQueueBytes > request_bytes) 
940         && (!flag)) {
941         CmiReleaseSentMessages();
942         PumpMsgs(1); //PumpMsgs(0) 
943     }
944
945     ElanSendQueuedMessages();
946
947     if(MsgQueueLen > request_max || MsgQueueBytes > request_bytes 
948        || outstandingMsgs[destPE]){
949         
950         if(sent_msgs==0)
951             sent_msgs = msg_tmp;
952         else
953             end_sent->next = msg_tmp;
954         end_sent = msg_tmp;
955         
956         if(cur_unsent == 0)
957             cur_unsent = msg_tmp;
958         
959     }
960     else{        
961         ElanBasicSendFn(msg_tmp);
962
963         if(sent_msgs==0)
964             sent_msgs = msg_tmp;
965         else
966             end_sent->next = msg_tmp;
967         end_sent = msg_tmp;
968         
969         return (CmiCommHandle) msg_tmp->e;
970     }
971     return NULL;
972 }
973
974 void ElanSendQueuedMessages() {
975     SMSG_LIST * ptr = cur_unsent, *new_unsent = NULL;
976     while (MsgQueueLen <= request_max && MsgQueueBytes <= request_bytes 
977            && ptr != NULL) {
978
979         if(!outstandingMsgs[ptr->destpe] && !ptr->sent)
980             ElanBasicSendFn(ptr);
981         else if ((!ptr->sent) && (new_unsent == NULL))
982             new_unsent = ptr;
983         
984         ptr = ptr->next;
985     }
986     
987     if(new_unsent)
988         cur_unsent = new_unsent;
989     else
990         cur_unsent = ptr;
991 }
992
993 void ElanGetBasedSend(SMSG_LIST *msg_tmp){
994     //CmiPrintf("using get based send\n");
995     GetHeader *gmsg = (GetHeader *) CmiAlloc(sizeof(GetHeader));
996
997     gmsg->src_addr = msg_tmp->msg;
998     gmsg->size = msg_tmp->size;
999     CMI_SET_BROADCAST_ROOT(gmsg, 0);
1000     
1001     msg_tmp->sent = 1;
1002     msg_tmp->e = NULL;
1003     msg_tmp->status = GET_BASED_SEND;
1004     msg_tmp->done = 0;
1005     
1006     msg_tmp->gmsg = (char *)gmsg;
1007     gmsg->flag_addr = (char *)&(msg_tmp->done);
1008     
1009     msg_tmp->e = elan_tportTxStart(elan_port, 0, msg_tmp->destpe, CmiMyPe(), 
1010                                    TAG_GET_BASED_SEND, gmsg, sizeof(GetHeader));
1011     
1012     MsgQueueLen++;
1013     MsgQueueBytes += msg_tmp->size;
1014     
1015     outstandingMsgs[msg_tmp->destpe] = 1;
1016 }
1017
1018 void handleGetHeader(char *msg, int src){
1019     GetHeader *gmsg = (GetHeader *) msg;
1020
1021     char *newmsg = CmiAlloc(gmsg->size);
1022     
1023     SMSG_LIST *msg_tmp = (SMSG_LIST *)CmiAlloc(sizeof(SMSG_LIST));
1024     msg_tmp->msg = msg;
1025     msg_tmp->next = 0;
1026     msg_tmp->size = gmsg->size;
1027     msg_tmp->sent = 1;
1028     msg_tmp->destpe = src;
1029     msg_tmp->status = RECEIVE_GET;
1030     msg_tmp->done = 0;
1031     msg_tmp->flag_addr = (long *)gmsg->flag_addr;
1032     msg_tmp->newmsg = newmsg;
1033     
1034     msg_tmp->e = elan_get(elan_base->state, gmsg->src_addr, msg_tmp->newmsg, gmsg->size, src);
1035     
1036     if(sent_msgs==0) {
1037         sent_msgs = msg_tmp;
1038         end_sent = msg_tmp;
1039     }
1040     else {
1041         msg_tmp->next = sent_msgs;
1042         sent_msgs = msg_tmp;
1043     }
1044 }
1045
1046 long trueFlag = 1;
1047 void processGetEnv(SMSG_LIST *ptr){
1048
1049     if(ptr->status == BASIC_SEND)
1050         return;
1051
1052     if(ptr->status == GET_BASED_SEND) {
1053         if(ptr->gmsg != NULL) {
1054             if(!elan_tportTxDone(ptr->e)) {
1055 #if USE_SHM 
1056                 elan_deviceCheck(elan_base->state);
1057 #else 
1058                 ;
1059 #endif
1060                 return;                    
1061             } 
1062             
1063             elan_tportTxWait(ptr->e);
1064             CmiFree(ptr->gmsg);
1065             ptr->gmsg =  NULL;
1066         }
1067         
1068         return;
1069     }        
1070
1071     //RECEIVE_GET or FINISHED_RECEIVE, a get or a put poll in either case
1072     int flag = elan_poll(ptr->e, ELAN_POLL_EVENT);
1073
1074     if(!flag)
1075         return;
1076     
1077     if(ptr->status == RECEIVE_GET){
1078         ptr->e = elan_put(elan_base->state, &trueFlag, ptr->flag_addr,
1079                           sizeof(long), ptr->destpe);
1080         ptr->status = GET_FINISHED_RECEIVE;
1081         CmiPushPE(0, ptr->newmsg);
1082         
1083         /*
1084           #if CMK_BROADCAST_SPANNING_TREE
1085           if (CMI_BROADCAST_ROOT(ptr->newmsg))
1086           SendSpanningChildren(ptr->size, ptr->newmsg);
1087           #endif
1088         */
1089         
1090         return;
1091     }
1092     
1093     if (ptr->status == GET_FINISHED_RECEIVE)
1094         ptr->done = 1;
1095 }
1096
1097 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg){
1098   return ElanSendFn(destPE, size, msg, 0);
1099 }
1100
1101 void CmiFreeSendFn(int destPE, int size, char *msg)
1102 {
1103 #ifndef CMK_OPTIMIZE 
1104     double snd_start_time = CmiWallTimer();
1105 #endif
1106     
1107     CmiState cs = CmiGetState();
1108     CMI_SET_BROADCAST_ROOT(msg, 0);
1109     
1110     if (cs->pe==destPE) {
1111         CmiSendSelf(msg);
1112     } 
1113     else { 
1114         if(size <= NON_BLOCKING_MSG) {
1115             CQdCreate(CpvAccess(cQdState), 1);
1116             (void)elan_tportTxWait(elan_tportTxStart(elan_port, 0, destPE, CmiMyPe(), TAG_SMALL, msg, size));
1117             CmiFree(msg);
1118         }
1119         else
1120             CmiAsyncSendFn(destPE, size, msg);
1121     }
1122     
1123 #ifndef CMK_OPTIMIZE 
1124 #if ! CMK_TRACE_IN_CHARM
1125     {
1126     double snd_end_time = CmiWallTimer();
1127     if(snd_end_time > snd_start_time + 5/1e6) 
1128         traceUserBracketEvent(30, snd_start_time, snd_end_time);
1129     if((snd_end_time > snd_start_time + 5/1e3) && stretchFlag)
1130         CmiPrintf("%d:Stretched Send to %d at %5.3lfs of %5.5lf ms\n", CmiMyPe(), destPE, snd_end_time, (snd_end_time - snd_start_time)*1e3);
1131     }
1132 #endif
1133 #endif
1134 }
1135
1136 static void registerElanEvents() {
1137 #ifndef CMK_OPTIMIZE
1138 #if ! CMK_TRACE_IN_CHARM
1139     traceRegisterUserEvent("Pump Messages", 10);
1140     traceRegisterUserEvent("Release Sent Messages", 20);
1141     traceRegisterUserEvent("ELAN Send", 30);
1142 #endif
1143 #endif
1144 }
1145
1146 /*********************** BROADCAST FUNCTIONS **********************/
1147
1148 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1149 void CmiSyncSendFn1(int destPE, int size, char *msg)
1150 {
1151     CmiState cs = CmiGetState();
1152     char *dupmsg = (char *) CmiAlloc(size);
1153     memcpy(dupmsg, msg, size);
1154     if (cs->pe==destPE) {
1155         CmiSendSelf(dupmsg);
1156     }
1157     else
1158         ElanSendFn(destPE, size, dupmsg, 1);
1159 }
1160
1161 /* send msg to its spanning children in broadcast. G. Zheng */
1162 void SendSpanningChildren(int size, char *msg)
1163 {
1164   CmiState cs = CmiGetState();
1165   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1166   int i;
1167   
1168   assert(startpe>=0 && startpe<_Cmi_numpes);
1169
1170   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1171     int p = cs->pe-startpe;
1172     if (p<0) p+=_Cmi_numpes;
1173     p = BROADCAST_SPANNING_FACTOR*p + i;
1174     if (p > _Cmi_numpes - 1) break;
1175     p += startpe;
1176     p = p%_Cmi_numpes;
1177     assert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1178     CmiSyncSendFn1(p, size, msg);
1179   }
1180 }
1181
1182 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1183 {
1184   CmiState cs = CmiGetState();
1185 #if CMK_BROADCAST_SPANNING_TREE
1186   CMI_SET_BROADCAST_ROOT(msg, CmiMyPe() + 1);
1187   SendSpanningChildren(size, msg);
1188 #else
1189   int i ;
1190      
1191   for ( i=cs->pe+1; i<_Cmi_numpes; i++ ) 
1192     CmiSyncSendFn(i, size,msg) ;
1193   for ( i=0; i<cs->pe; i++ ) 
1194     CmiSyncSendFn(i, size,msg) ;
1195 #endif
1196 }
1197
1198 /*  FIXME: luckily async is never used  G. Zheng */
1199 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)  
1200 {
1201   CmiState cs = CmiGetState();
1202   int i ;
1203
1204   for ( i=cs->pe+1; i<_Cmi_numpes; i++ ) 
1205     CmiAsyncSendFn(i,size,msg) ;
1206   for ( i=0; i<cs->pe; i++ ) 
1207     CmiAsyncSendFn(i,size,msg) ;
1208   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1209 }
1210
1211 void CmiFreeBroadcastFn(int size, char *msg)
1212 {
1213    CmiSyncBroadcastFn(size,msg);
1214    CmiFree(msg);
1215 }
1216  
1217 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1218 {
1219 #if CMK_BROADCAST_SPANNING_TREE
1220   CmiState cs = CmiGetState();
1221   CmiSyncSendFn(cs->pe, size,msg) ;
1222   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1223   SendSpanningChildren(size, msg);
1224 #else
1225   int i ;
1226   
1227   for ( i=0; i<_Cmi_numpes; i++ ) 
1228     CmiSyncSendFn(i,size,msg) ;
1229 #endif
1230 }
1231
1232 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)  
1233 {
1234   int i ;
1235
1236   for ( i=1; i<_Cmi_numpes; i++ ) 
1237     CmiAsyncSendFn(i,size,msg) ;
1238   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1239 }
1240
1241 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1242 {
1243 #if CMK_BROADCAST_SPANNING_TREE
1244   CmiState cs = CmiGetState();
1245   CmiSyncSendFn(cs->pe, size,msg) ;
1246   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1247   SendSpanningChildren(size, msg);
1248 #else
1249   int i ;
1250      
1251   for ( i=0; i<_Cmi_numpes; i++ ) 
1252     CmiSyncSendFn(i,size,msg) ;
1253 #endif
1254   CmiFree(msg) ;
1255 }
1256
1257 void ConverseExit(void)
1258 {  
1259     while(!CmiAllAsyncMsgsSent() || cur_unsent) {
1260         PumpMsgs(0);
1261         ElanSendQueuedMessages();
1262         CmiReleaseSentMessages();
1263     }   
1264     
1265     elan_gsync(elan_base->allGroup); 
1266     
1267     // register elan events before trace module destoried
1268     registerElanEvents();
1269     
1270     ConverseCommonExit();
1271     
1272 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1273     if (CmiMyPe() == 0){
1274         CmiPrintf("End of program\n");
1275     }
1276 #endif
1277     exit(0);
1278 }
1279
1280 static char     **Cmi_argv;
1281 static CmiStartFn Cmi_startfn;   /* The start function */
1282 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1283
1284 typedef struct {
1285   int sleepMs; /*Milliseconds to sleep while idle*/
1286   int nIdles; /*Number of times we've been idle in a row*/
1287   CmiState cs; /*Machine state*/
1288 } CmiIdleState;
1289
1290 static CmiIdleState *CmiNotifyGetState(void)
1291 {
1292   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1293   s->sleepMs=0;
1294   s->nIdles=0;
1295   s->cs=CmiGetState();
1296   return s;
1297 }
1298
1299 static void ConverseRunPE(int everReturn)
1300 {
1301   CmiIdleState *s=CmiNotifyGetState();
1302   CmiState cs;
1303   char** CmiMyArgv;
1304   CmiNodeBarrier();
1305   cs = CmiGetState();
1306   CpvInitialize(void *,CmiLocalQueue);
1307   CpvAccess(CmiLocalQueue) = cs->localqueue;
1308   /*  since elan version is not a SMP version */
1309   /*
1310   CmiMyArgv=CmiCopyArgs(Cmi_argv);
1311   */
1312   CmiMyArgv=Cmi_argv;
1313   CthInit(CmiMyArgv);
1314 #if MACHINE_DEBUG_LOG
1315   {
1316     char ln[200];
1317     sprintf(ln,"debugLog.%d",CmiMyPe());
1318     debugLog=fopen(ln,"w");
1319   }
1320 #endif
1321
1322   ConverseCommonInit(CmiMyArgv);
1323
1324   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1325   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1326   
1327   PumpMsgs(0);
1328   elan_gsync(elan_base->allGroup);
1329
1330   /* Converse initialization finishes, immediate messages can be processed.
1331      node barrier previously should take care of the node synchronization */
1332   _immediateReady = 1;
1333
1334   if (!everReturn) {
1335     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1336     if (Cmi_usrsched==0) CsdScheduler(-1);
1337     ConverseExit();
1338   }
1339 }
1340
1341 //fast low level global barrier
1342 void elan_barrier(){
1343     
1344     while(!CmiReleaseSentMessages() || cur_unsent) {
1345         PumpMsgs(0);
1346         ElanSendQueuedMessages();
1347     }   
1348     
1349     elan_gsync(elan_base->allGroup);
1350 }
1351
1352 //synchronous fast hardware broadcast
1353 //size = size of the message to be broadcast
1354 //buffer = address of the message
1355 //root = root and source of the broadcast
1356 void elan_machine_broadcast(int size, void *buffer, int root) {
1357     
1358     while(!CmiReleaseSentMessages() || cur_unsent) {
1359         PumpMsgs(0);
1360         ElanSendQueuedMessages();
1361     }   
1362     
1363     elan_bcast(elan_base->allGroup, buffer, size, root, 0); 
1364 }
1365
1366 typedef void (* ELAN_REDUCER)(void *in, void *inout, int *count, void *handle);
1367
1368 //nelem = number of elements
1369 //size = size of data type
1370 //data = data to be reduced
1371 //fn = function pointer of the function which will do the reduction
1372 //dest = destination buffer where data is finally stored on all processors
1373 void elan_machine_allreduce(int nelem, int size, void * data, void *dest, 
1374                             ELAN_REDUCER fn){
1375     
1376     while(!CmiReleaseSentMessages() || cur_unsent) {
1377         PumpMsgs(0);
1378         ElanSendQueuedMessages();
1379     }   
1380     
1381     //ELAN reduction call here
1382     elan_reduce (elan_base->allGroup, data, dest, size, nelem, fn, NULL, 0, 0, ELAN_REDUCE_COMMUTE | ELAN_RESULT_ALL | elan_base->group_flags, 0);
1383 }
1384
1385 //nelem = number of elements
1386 //size = size of data type
1387 //data = data to be reduced
1388 //fn = function pointer of the function which will do the reduction
1389 //dest = destination buffer where data is finally stored, NULL on all non root processors
1390 //root = root of the reduction where the data will be returned
1391 void elan_machine_reduce(int nelem, int size, void * data, void *dest, ELAN_REDUCER fn, int root){
1392     
1393     while(sent_msgs || cur_unsent) {
1394         CmiReleaseSentMessages();
1395         PumpMsgs(0);
1396         ElanSendQueuedMessages();
1397     }   
1398     
1399     printf("Machine Called Reduce %d\n", sent_msgs);
1400
1401     //ELAN reduction call here
1402     elan_reduce (elan_base->allGroup, data, dest, size, nelem, fn, NULL, 0, 0, ELAN_REDUCE_COMMUTE|elan_base->group_flags , root);
1403 }
1404
1405 // NOTE: The "size" parameter already includes the size of the CmiChunkHeader data structure.
1406 void *elan_CmiAlloc(int size){
1407     char *res = NULL;
1408     char *buf;
1409     
1410     int alloc_size = size;
1411     if(enableBufferPooling) {
1412
1413         if (size <= SMALL_MESSAGE_SIZE + sizeof(ElanChunkHeader)) {
1414            alloc_size = SMALL_MESSAGE_SIZE + sizeof(ChunkHeader);
1415            size = SMALL_MESSAGE_SIZE + sizeof(CmiChunkHeader);
1416
1417 #if CMK_PERSISTENT_COMM
1418             //Put a footer at the end the message of it. This footer only will be sent with the pers. message
1419             alloc_size += sizeof(int)*2;
1420 #endif
1421
1422             if(!PCQueueEmpty(localSmallBufferQueue))
1423                 buf = PCQueuePop(localSmallBufferQueue);
1424             else
1425                 buf = (char *)malloc_nomigrate(alloc_size);
1426         }
1427         /*
1428         else if (size < MID_MESSAGE_SIZE + sizeof(ElanChunkHeader)) {
1429             alloc_size = MID_MESSAGE_SIZE + sizeof(ChunkHeader);
1430             size = MID_MESSAGE_SIZE + sizeof(CmiChunkHeader);
1431
1432 #if CMK_PERSISTENT_COMM
1433             //Put a footer at the end the message of it. This footer will only be sent with the pers. message
1434             alloc_size += sizeof(int)*2;
1435 #endif
1436             
1437             if(!PCQueueEmpty(localMidBufferQueue))
1438                 buf = PCQueuePop(localMidBufferQueue);
1439             else
1440                 buf = (char *)malloc_nomigrate(alloc_size);
1441         }
1442         */
1443         else {
1444
1445             alloc_size = size + sizeof(ElanChunkHeader);
1446             
1447 #if CMK_PERSISTENT_COMM
1448             //Put a footer at the end the message of it. This footer will be sent with the pers. message
1449             alloc_size += sizeof(int)*2;
1450 #endif            
1451             
1452             buf =(char *)malloc_nomigrate(alloc_size);
1453         }
1454     }
1455     else {
1456
1457         alloc_size = size + sizeof(ElanChunkHeader);
1458 #if CMK_PERSISTENT_COMM
1459         //Put a footer at the end the message of it. This footer will be sent with the pers. message
1460         alloc_size += sizeof(int)*2;
1461 #endif
1462         buf =(char *)malloc_nomigrate(alloc_size);
1463     }
1464     
1465     TYPE_FIELD(buf) = DYNAMIC_MESSAGE;
1466     SIZE_FIELD(buf) = size; //size of user part of the buffer, excludes machine header and persistent footer
1467     res = CONV_BUF_START(buf);  //That is where the converse message starts
1468     return res;
1469 }
1470
1471 void elan_CmiFree(void *res){
1472
1473     char *buf = MACHINE_BUF_START(res);    
1474     int type = TYPE_FIELD(buf);
1475
1476     if(type == STATIC_MESSAGE)
1477         return;
1478     
1479     if(type == DYNAMIC_MESSAGE) {    
1480         
1481         if(enableBufferPooling) {
1482             //Called from Cmifree so we know the size and 
1483             //we dont hve to store it again
1484             int size = SIZE_FIELD(buf);
1485
1486             if (size == SMALL_MESSAGE_SIZE + sizeof(ElanChunkHeader))
1487
1488                 //I knew I allocated the SMALL_MESSSAGE_SIZE of user data, 
1489                 //so I can put it back to the pool
1490                 PCQueuePush(localSmallBufferQueue, buf);
1491             /*
1492               else if (size == MID_MESSAGE_SIZE + sizeof(ElanChunkHeader))
1493               PCQueuePush(localMidBufferQueue, buf);
1494             */
1495             else 
1496                 free_nomigrate(buf);
1497         }
1498         else
1499             free_nomigrate(buf);
1500         return;
1501     }
1502
1503     //ELAN_MESSAGE
1504     elan_freeElan(elan_base->state, 
1505                   elan_elan2sdram(elan_base->state, 
1506                                   elan_main2elan(elan_base->state, buf)));
1507 }
1508
1509 //Called from the application for static messages which 
1510 //should not be freed by the system.
1511 void *elan_CmiStaticAlloc(int size){
1512     char *res = NULL;
1513
1514     char *buf = (char*)malloc_nomigrate(size + sizeof(ChunkHeader));
1515
1516     TYPE_FIELD(buf) = STATIC_MESSAGE;
1517     SIZE_FIELD(buf) = size + sizeof(ChunkHeader);
1518     CONV_SIZE_FIELD(buf) = size;
1519     REF_FIELD(buf) = 0;    
1520
1521     res = buf + sizeof(ChunkHeader);
1522     return res;
1523 }
1524
1525 void elan_CmiStaticFree(void *res){
1526     char *buf = USER_BUF_START(res);
1527     if(TYPE_FIELD(buf) != STATIC_MESSAGE)
1528         return;
1529
1530     free_nomigrate(buf);
1531 }
1532
1533 /*
1534 //Called from the application for messages allocated in the NIC
1535 void *elan_CmiAllocElan(int size){
1536     char *res = NULL;
1537     char *buf = NULL;
1538
1539     void *elan_addr = elan_allocElan(elan_base->state, 8, len + sizeof(ChunkHeader));
1540
1541     if(elan_addr == 0)
1542         CmiPrintf("ELAN ALLOC FAILED\n");
1543     
1544     elan_buf = (char*)elan_elan2main(elan_base->state, elan_sdram2elan
1545                                      (elan_base->state, elan_addr));            
1546
1547     TYPE_FIELD(buf) = ELAN_MESSAGE;
1548     SIZE_FIELD(buf) = size + sizeof(ChunkHeader);
1549     CONV_SIZE_FIELD(buf) = size;
1550     REF_FIELD(buf) = 0;    
1551
1552     res = buf + sizeof(ChunkHeader);
1553     return res;
1554 }
1555 */
1556
1557 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1558 {
1559   int n,i ;
1560   int nslots;
1561
1562 #if CMK_USE_HP_MAIN_FIX
1563 #if FOR_CPLUS
1564   _main(argc,argv);
1565 #endif
1566 #endif
1567
1568 #if USE_SHM 
1569   putenv("LIBELAN_SHM_ENABLE=1");
1570 #else
1571   putenv("LIBELAN_SHM_ENABLE=0");
1572 #endif
1573
1574   localSmallBufferQueue = PCQueueCreate();
1575   localMidBufferQueue = PCQueueCreate();
1576
1577   if (!(elan_base = 
1578 #if QSNETLIBS_VERSION_CODE > QSNETLIBS_VERSION(1,4,0)
1579         elan_baseInit(0)
1580 #else
1581         elan_baseInit()
1582 #endif
1583      ))
1584   {
1585       perror("Failed elan_baseInit()");
1586       exit(1);
1587   }
1588
1589   elan_gsync(elan_base->allGroup);
1590   
1591   if ((elan_q = elan_gallocQueue(elan_base, elan_base->allGroup)) == NULL) {
1592     
1593     perror( "elan_gallocQueue failed" );
1594     exit (1);
1595   }
1596   
1597   nslots = elan_base->tport_nslots * 2;
1598   
1599   //if(nslots < elan_base->state->nvp)
1600   //  nslots = elan_base->state->nvp;
1601   //if(nslots > 256)
1602   //  nslots = 256;
1603   
1604   if (!(elan_port = elan_tportInit(elan_base->state,
1605                                    elan_q,
1606                                    nslots,
1607                                    //elan_base->tport_nslots, 
1608                                    elan_base->tport_smallmsg,
1609                                    //MID_MESSAGE_SIZE, 
1610                                    elan_base->tport_bigmsg,
1611 #if QSNETLIBS_VERSION_CODE > QSNETLIBS_VERSION(1,4,1)
1612                                    10000000, //elan_base->tport_stripemsg,
1613 #endif
1614                                    elan_base->waitType, elan_base->retryCount,
1615                                    &(elan_base->shm_key),
1616                                    elan_base->shm_fifodepth, 
1617                                    elan_base->shm_fragsize
1618 #if QSNETLIBS_VERSION_CODE > QSNETLIBS_VERSION(1,4,11)
1619                                    ,0
1620 #endif
1621 ))) {
1622     
1623     perror("Failed to to initialise TPORT");
1624     exit(1);
1625   }
1626   
1627   elan_gsync(elan_base->allGroup);
1628
1629   _Cmi_numnodes = elan_base->state->nvp;
1630   _Cmi_mynode =  elan_base->state->vp;
1631
1632   /* processor per node */
1633   _Cmi_mynodesize = 1;
1634   CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize);
1635
1636   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0) 
1637     CmiAbort("+ppn cannot be used in non SMP version!\n");
1638   
1639   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
1640   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
1641   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
1642   /* find dim = log2(numpes), to pretend we are a hypercube */
1643   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
1644     Cmi_dim++ ;
1645  /* CmiSpanTreeInit();*/
1646   i=0;
1647   request_max=MAX_QLEN;
1648   request_bytes = MAX_BYTES;
1649   
1650   CmiGetArgInt(argv,"+requestmax",&request_max);
1651   CmiGetArgInt(argv,"+requestbytes",&request_bytes);
1652
1653   /*printf("request max=%d\n", request_max);*/
1654   if (CmiGetArgFlag(argv,"++debug"))
1655   {   /*Pause so user has a chance to start and attach debugger*/
1656     printf("CHARMDEBUG> Processor %d has PID %d\n",CmiMyNode(),getpid());
1657     if (!CmiGetArgFlag(argv,"++debug-no-pause"))
1658       sleep(10);
1659   }
1660
1661   if (CmiGetArgFlag(argv,"+enableBlockingReceives")) {
1662       blockingReceiveFlag = 1;
1663   }
1664
1665   if (CmiGetArgFlag(argv,"+enableBufferPooling")) {
1666       enableBufferPooling = 1;
1667   }
1668
1669   CmiGetArgInt(argv,"+smallMessageSize", &SMALL_MESSAGE_SIZE);
1670   CmiGetArgInt(argv,"+midMessageSize", &MID_MESSAGE_SIZE);
1671
1672   CmiGetArgInt(argv,"+smallQSize", &smallQSize);
1673   CmiGetArgInt(argv,"+midQSize", &midQSize);
1674
1675   if(smallQSize > RECV_MSG_Q_SIZE) {
1676       CmiPrintf("Warning : resetting smallQSize to %d\n", RECV_MSG_Q_SIZE);
1677       smallQSize = RECV_MSG_Q_SIZE;
1678   }
1679   
1680   if(midQSize > MID_MSG_Q_SIZE) {
1681       CmiPrintf("Warning : resetting midQSize to %d\n", MID_MSG_Q_SIZE);
1682       midQSize = MID_MSG_Q_SIZE;
1683   }
1684
1685   enableGetBasedSend = CmiGetArgFlag(argv,"+enableGetBasedSend");
1686
1687   //CmiPrintf("SMALL_MESSAGE_SIZE = %d\n", SMALL_MESSAGE_SIZE);
1688
1689   CmiTimerInit(argv);
1690   msgBuf = PCQueueCreate();
1691
1692   CsvInitialize(CmiNodeState, NodeState);
1693   CmiNodeStateInit(&CsvAccess(NodeState));
1694
1695   int rms_nodes = 1, rms_procs = 1;
1696
1697   CmiStartThreads(argv);
1698   ConverseRunPE(initret);
1699
1700 }
1701
1702 /***********************************************************************
1703  *
1704  * Abort function:
1705  *
1706  ************************************************************************/
1707
1708 void CmiAbort(const char *message)
1709 {
1710   CmiError(message);
1711   *((int *)NULL) = 0;
1712   exit(1);
1713 }
1714
1715 void CmiSyncListSendFn(int npes, int *pes, int len, char *msg)
1716 {
1717   CmiError("ListSend not implemented.");
1718 }
1719
1720 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int len, char *msg)
1721 {
1722   CmiError("ListSend not implemented.");
1723   return (CmiCommHandle) 0;
1724 }
1725
1726 extern void CmiReference(void *blk);
1727 #if 1
1728 #define ELAN_BUF_SIZE MID_MESSAGE_SIZE
1729 #define USE_NIC_MULTICAST 0
1730
1731 void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
1732 {  
1733     static int ppn = 0;
1734     char* elan_buf = NULL;
1735     char *msg_start;
1736     int rflag;
1737     int i;
1738
1739     if(ppn == 0) {        
1740         int rms_nodes = 1;
1741         int rms_procs = 1;
1742         if(getenv("RMS_NODES"))
1743             rms_nodes = atoi(getenv("RMS_NODES"));
1744         
1745         if(getenv("RMS_PROCS"))
1746             rms_procs = atoi(getenv("RMS_PROCS"));
1747
1748         ppn = rms_procs/rms_nodes;
1749         if(ppn == 0)
1750             ppn = 4;
1751
1752         //ppn = 1;
1753         //CmiPrintf("CmiListSyncSendAndFree PPN=%d\n", ppn);
1754     }    
1755
1756     for(i=0;i<npes;i++) 
1757         if(pes[i] == CmiMyPe()) {
1758             CmiSyncSend(pes[i], len, msg); 
1759         }
1760
1761     int nremote = 0;
1762     msg_start = USER_BUF_START(msg);
1763     rflag = REF_FIELD(msg_start); 
1764
1765     for(i=0;i<npes;i++) {
1766         if(pes[i] != CmiMyPe()) {
1767             if(pes[i]/ppn == CmiMyPe()/ppn) {
1768                 CmiReference(msg);               
1769                 //dest in my node, send right away
1770                 CmiSyncSendAndFree(pes[i], len, msg);  
1771             }
1772             else
1773                 nremote++;
1774         } 
1775     }
1776     
1777     if(nremote == 0) {
1778         CmiFree(msg);
1779         return;
1780     }
1781     
1782     REF_FIELD(msg_start) += nremote - 1;
1783
1784 #if USE_NIC_MULTICAST   
1785     //ULTIMATE HACK FOR PERFORMANCE, CLEAN UP LATER   
1786     if(len > 2048 && len < MID_MESSAGE_SIZE && nremote > 4) {
1787         //Attempt to speedup Namd multicast, copy the message to local
1788         //elan memory and then send it several times from there. Should
1789         //improve performance as bandwidth is increased due to lower DMA
1790         //contention.
1791         
1792         void *elan_addr = elan_allocElan(elan_base->state, 8, len + sizeof(ChunkHeader));
1793         if(elan_addr == 0) {
1794             CmiPrintf("ELAN ALLOC FAILED, sending data from main memory instead\n");
1795         }
1796         else {
1797             elan_buf = (char*)elan_elan2main(elan_base->state, 
1798                                              elan_sdram2elan
1799                                              (elan_base->state, 
1800                                               elan_addr));            
1801             
1802             int old_size_field = SIZE_FIELD(msg_start);
1803             int old_conv_size_field = CONV_SIZE_FIELD(msg_start);
1804             int old_ref_field = REF_FIELD(msg_start);
1805             
1806             TYPE_FIELD(msg_start) = ELAN_MESSAGE;
1807             SIZE_FIELD(msg_start) = len + sizeof(ChunkHeader);
1808             CONV_SIZE_FIELD(msg_start) = len;
1809             REF_FIELD(msg_start) = nremote; 
1810             
1811             elan_wait(elan_get(elan_base->state,
1812                                msg_start,
1813                                elan_buf,
1814                                len + sizeof(ChunkHeader),
1815                                CmiMyPe()
1816                               ),
1817                       ELAN_WAIT_EVENT
1818                      );
1819             //memcpy(elan_buf, msg_start, len + sizeof(ChunkHeader));
1820             
1821             REF_FIELD(msg_start) = old_ref_field; 
1822             TYPE_FIELD(msg_start) = DYNAMIC_MESSAGE;
1823             SIZE_FIELD(msg_start) = old_size_field ;
1824             CONV_SIZE_FIELD(msg_start) = old_conv_size_field;
1825             
1826             //Actually free the message        
1827             for(i=0;i<npes;i++)
1828                 if(pes[i] != CmiMyPe() && pes[i]/ppn != CmiMyPe()/ppn)
1829                     CmiFree(msg);                
1830             
1831             msg = elan_buf + sizeof(ChunkHeader);
1832         }
1833     }
1834 #endif  
1835     
1836     for(i=0;i<npes;i++) {
1837         if(pes[i] != CmiMyPe() && pes[i]/ppn != CmiMyPe()/ppn) {
1838             //dest not in my node
1839             CmiSyncSendAndFree(pes[i], len, msg);        
1840             //if(len > SMALL_MESSAGE_SIZE)
1841             //  elan_barrier();                
1842         }        
1843     }
1844 }
1845
1846 #else
1847
1848 void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
1849 {
1850     int i;
1851     for(i=0;i<npes;i++) {
1852         CmiSyncSend(pes[i], len, msg);
1853     }
1854     CmiFree(msg);
1855 }
1856 #endif 
1857
1858 void CmiBarrier()
1859 {
1860     elan_gsync(elan_base->allGroup);
1861 }
1862
1863 /* a simple barrier - everyone sends a message to pe 0 and go on */
1864 /* it is ok here since we have real elan barrier */
1865 void CmiBarrierZero()
1866 {
1867     elan_gsync(elan_base->allGroup);
1868 }
1869
1870 #if CMK_PERSISTENT_COMM
1871 #include "persistent.c"
1872 #endif
1873