fixed dynamic bugs of trapping in sendbuf
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <stdint.h>
16 #include <errno.h>
17 #include <malloc.h>
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #if defined(_WIN32) && ! defined(__CYGWIN__)
26 #include <windows.h>
27 #include <wincon.h>
28 #include <sys/types.h>
29 #include <sys/timeb.h>
30
31 static void sleep(int secs) {
32     Sleep(1000*secs);
33 }
34 #else
35 #include <unistd.h> /*For getpid()*/
36 #endif
37
38
39 #define REMOTE_EVENT         0
40 #define USE_LRTS_MEMPOOL   1
41
42 #if USE_LRTS_MEMPOOL
43 static CmiInt8 _mempool_size = 1024ll*1024*32;
44 #endif
45
46 #define PRINT_SYH  0
47
48 #if PRINT_SYH
49 int         lrts_smsg_success = 0;
50 int         lrts_send_msg_id = 0;
51 int         lrts_send_rdma_id = 0;
52 int         lrts_send_rdma_success = 0;
53 int         lrts_received_msg = 0;
54 int         lrts_local_done_msg = 0;
55 #endif
56
57 #include "machine.h"
58
59 #include "pcqueue.h"
60
61 #if CMK_PERSISTENT_COMM
62 #include "machine-persistent.h"
63 #endif
64
65 //#define  USE_ONESIDED 1
66 #ifdef USE_ONESIDED
67 //onesided implementation is wrong, since no place to restore omdh
68 #include "onesided.h"
69 onesided_hnd_t   onesided_hnd;
70 onesided_md_t    omdh;
71 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
72
73 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
74
75 #else
76 uint8_t   onesided_hnd, omdh;
77 #if REMOTE_EVENT
78 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
79 #else
80 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
81 #endif
82 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
83 #endif
84
85 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
86 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
87
88 #define ALIGNBUF                64
89
90 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
91 /* If SMSG is not used */
92 #define FMA_PER_CORE  1024
93 #define FMA_BUFFER_SIZE 1024
94 /* If SMSG is used */
95 static int  SMSG_MAX_MSG;
96 //static int  log2_SMSG_MAX_MSG;
97 #define SMSG_MAX_CREDIT  36
98
99 #define MSGQ_MAXSIZE       2048
100 /* large message transfer with FMA or BTE */
101 #define LRTS_GNI_RDMA_THRESHOLD  2048
102
103 #define REMOTE_QUEUE_ENTRIES  2048 
104 #define LOCAL_QUEUE_ENTRIES   64 
105
106 #define PUT_DONE_TAG      0x29
107 #define ACK_TAG           0x30
108 /* SMSG is data message */
109 #define SMALL_DATA_TAG          0x31
110 /* SMSG is a control message to initialize a BTE */
111 #define MEDIUM_HEAD_TAG         0x32
112 #define MEDIUM_DATA_TAG         0x33
113 #define LMSG_INIT_TAG     0x39 
114
115 #define DEBUG
116 #ifdef GNI_RC_CHECK
117 #undef GNI_RC_CHECK
118 #endif
119 #ifdef DEBUG
120 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
121 #else
122 #define GNI_RC_CHECK(msg,rc)
123 #endif
124
125 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
126 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
127
128 static int Mempool_MaxSize = 1024*1024*128;
129 static int useDynamicSMSG   = 1;
130 static int useStaticMSGQ = 0;
131 static int useStaticFMA = 0;
132 static int mysize, myrank;
133 static gni_nic_handle_t      nic_hndl;
134
135 typedef struct {
136     gni_mem_handle_t mdh;
137     uint64_t addr;
138 } mdh_addr_t ;
139 // this is related to dynamic SMSG
140
141 typedef struct mdh_addr_list{
142     gni_mem_handle_t mdh;
143    void *addr;
144     struct mdh_addr_list *next;
145 }mdh_addr_list_t;
146
147 static unsigned int         smsg_memlen;
148 #define     SMSG_CONN_SIZE     sizeof(gni_smsg_attr_t)
149 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
150 int                 *smsg_connected_flag= 0;
151 char                *smsg_connection_addr = 0;
152 mdh_addr_t          setup_mem;
153 mdh_addr_t          *smsg_connection_vec = 0;
154 gni_mem_handle_t    smsg_connection_memhndl;
155 static int          smsg_expand_slots = 10;
156 static int          smsg_available_slot = 0;
157 static void         *smsg_mailbox_mempool = 0;
158 mdh_addr_list_t     *smsg_dynamic_list = 0;
159
160 static void             *smsg_mailbox_base;
161 gni_msgq_attr_t         msgq_attrs;
162 gni_msgq_handle_t       msgq_handle;
163 gni_msgq_ep_attr_t      msgq_ep_attrs;
164 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
165
166
167
168 /* preallocated DMA buffer */
169 int                     DMA_slots;
170 uint64_t                DMA_avail_tag = 0;
171 uint32_t                DMA_incoming_avail_tag = 0;
172 uint32_t                DMA_outgoing_avail_tag = 0;
173 void                    *DMA_incoming_base_addr;
174 void                    *DMA_outgoing_base_addr;
175 mdh_addr_t              DMA_buffer_base_mdh_addr;
176 mdh_addr_t              *DMA_buffer_base_mdh_addr_vec;
177 int                     DMA_buffer_size;
178 int                     DMA_max_single_msg = 131072;//524288 ;
179
180 #define                 DMA_SIZE_PER_SLOT       8192
181
182
183 typedef struct dma_msgid_map
184 {
185     uint64_t     msg_id;
186     int     msg_subid
187 } dma_msgid_map_t;
188
189 dma_msgid_map_t         *dma_map_list;
190
191 typedef struct msg_trace
192 {
193     uint64_t    msg_id;
194     int         done_num;
195 }msg_trace_t;
196
197 msg_trace_t             *pending_msg_list;
198 /* =====Beginning of Declarations of Machine Specific Variables===== */
199 static int cookie;
200 static int modes = 0;
201 static gni_cq_handle_t       smsg_rx_cqh = NULL;
202 static gni_cq_handle_t       smsg_tx_cqh = NULL;
203 static gni_cq_handle_t       post_rx_cqh = NULL;
204 static gni_cq_handle_t       post_tx_cqh = NULL;
205 static gni_ep_handle_t       *ep_hndl_array;
206
207 typedef    struct  pending_smg
208 {
209     int     inst_id;
210     struct  pending_smg *next;
211 } PENDING_GETNEXT;
212
213
214 typedef struct msg_list
215 {
216     uint32_t destNode;
217     uint32_t size;
218     void *msg;
219     struct msg_list *next;
220     struct msg_list *pehead_next;
221     uint8_t tag;
222 }MSG_LIST;
223
224 typedef struct medium_msg_list
225 {
226     uint32_t destNode;
227     uint32_t msg_id;
228     uint32_t msg_subid;
229     uint32_t remain_size;
230     void *msg;
231     struct medium_msg_list *next;
232 }MEDIUM_MSG_LIST;
233
234
235 typedef struct control_msg
236 {
237     uint64_t            source_addr;
238     int                 source;               /* source rank */
239     int                 length;
240     gni_mem_handle_t    source_mem_hndl;
241     struct control_msg *next;
242 }CONTROL_MSG;
243
244 typedef struct medium_msg_control
245 {
246     uint64_t            dma_offset;     //the dma_buffer for this block of msg
247     int                 msg_id;         //Id for the total index
248     int                 msg_subid;      //offset inside the message id 
249 }MEDIUM_MSG_CONTROL;
250
251 typedef struct  rmda_msg
252 {
253     int                   destNode;
254     gni_post_descriptor_t *pd;
255     struct  rmda_msg      *next;
256 }RDMA_REQUEST;
257
258 typedef struct  msg_list_index
259 {
260     int         next;
261     MSG_LIST    *head;
262     MSG_LIST    *tail;
263 } MSG_LIST_INDEX;
264
265 /* reuse PendingMsg memory */
266 static CONTROL_MSG          *control_freelist=0;
267 static MSG_LIST             *msglist_freelist=0;
268 static int                  smsg_head_index;
269 static MSG_LIST_INDEX       *smsg_msglist_index= 0;
270 static MSG_LIST             *smsg_free_head=0;
271 static MSG_LIST             *smsg_free_tail=0;
272
273 /*
274 #define FreeMsgList(msg_head, msg_tail, free_head, free_tail)       \
275     if(free_head == 0)  free_head = free_tail = msg_head;    \
276     else   free_tail = free_tail->next;    \
277     if( msg_head->next == msg_tail) msg_head =0;   \
278     else msg_head= msg_head->next;    
279
280 #define MallocMsgList(d, msg_head, msg_tail, free_head, free_tail, msgsize) \
281     if(free_head == 0) {d= malloc(msgsize);  \
282         if(msg_head == 0)   msg_head =msg_tail = msg_head->next = msg_tail->next = d; \
283         else { msg_tail->next = d; d->next = msg_head; msg_tail=d;} \
284     }else {d = free_head; free_head = free_head->next; if(free_tail->next == free_head) free_head =0;} \
285 */
286
287 #define FreeMsgList(d)  \
288   (d)->next = msglist_freelist;\
289   msglist_freelist = d;
290
291
292
293 #define MallocMsgList(d) \
294   d = msglist_freelist;\
295   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
296              _MEMCHECK(d);\
297   } else msglist_freelist = d->next;
298
299
300 #define FreeControlMsg(d)       \
301   (d)->next = control_freelist;\
302   control_freelist = d;
303
304 #define MallocControlMsg(d) \
305   d = control_freelist;\
306   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
307              _MEMCHECK(d);\
308   } else control_freelist = d->next;
309
310
311 static RDMA_REQUEST         *rdma_freelist = NULL;
312
313 #define FreeControlMsg(d)       \
314   (d)->next = control_freelist;\
315   control_freelist = d;
316
317 #define MallocControlMsg(d) \
318   d = control_freelist;\
319   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
320              _MEMCHECK(d);\
321   } else control_freelist = d->next;
322
323 #define FreeMediumControlMsg(d)       \
324   (d)->next = medium_control_freelist;\
325   medium_control_freelist = d;
326
327
328 #define MallocMediumControlMsg(d) \
329     d = medium_control_freelist;\
330     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
331     _MEMCHECK(d);\
332 } else mediumcontrol_freelist = d->next;
333
334 #define FreeRdmaRequest(d)       \
335   (d)->next = rdma_freelist;\
336   rdma_freelist = d;
337
338 #define MallocRdmaRequest(d) \
339   d = rdma_freelist;\
340   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
341              _MEMCHECK(d);\
342   } else rdma_freelist = d->next;
343
344 /* reuse gni_post_descriptor_t */
345 static gni_post_descriptor_t *post_freelist=0;
346
347 #if 1
348 #define FreePostDesc(d)       \
349     (d)->next_descr = post_freelist;\
350     post_freelist = d;
351
352 #define MallocPostDesc(d) \
353   d = post_freelist;\
354   if (d==0) { \
355      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
356      _MEMCHECK(d);\
357   } else post_freelist = d->next_descr;
358 #else
359
360 #define FreePostDesc(d)     free(d);
361 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
362
363 #endif
364
365 static PENDING_GETNEXT     *pending_smsg_head = 0;
366 static PENDING_GETNEXT     *pending_smsg_tail = 0;
367
368 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
369 static int      buffered_smsg_counter = 0;
370
371 /* SmsgSend return success but message sent is not confirmed by remote side */
372
373 static RDMA_REQUEST  *pending_rdma_head = 0;
374 static RDMA_REQUEST  *pending_rdma_tail = 0;
375 static MSG_LIST *buffered_fma_head = 0;
376 static MSG_LIST *buffered_fma_tail = 0;
377
378 /* functions  */
379 #define IsFree(a,ind)  !( a& (1<<(ind) ))
380 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
381 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
382
383 #include "mempool.c"
384
385 /* get the upper bound of log 2 */
386 int mylog2(int size)
387 {
388     int op = size;
389     unsigned int ret=0;
390     unsigned int mask = 0;
391     int i;
392     while(op>0)
393     {
394         op = op >> 1;
395         ret++;
396
397     }
398     for(i=1; i<ret; i++)
399     {
400         mask = mask << 1;
401         mask +=1;
402     }
403
404     ret -= ((size &mask) ? 0:1);
405     return ret;
406 }
407
408 static void
409 allgather(void *in,void *out, int len)
410 {
411     static int *ivec_ptr=NULL,already_called=0,job_size=0;
412     int i,rc;
413     int my_rank;
414     char *tmp_buf,*out_ptr;
415
416     if(!already_called) {
417
418         rc = PMI_Get_size(&job_size);
419         CmiAssert(rc == PMI_SUCCESS);
420         rc = PMI_Get_rank(&my_rank);
421         CmiAssert(rc == PMI_SUCCESS);
422
423         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
424         CmiAssert(ivec_ptr != NULL);
425
426         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
427         CmiAssert(rc == PMI_SUCCESS);
428
429         already_called = 1;
430
431     }
432
433     tmp_buf = (char *)malloc(job_size * len);
434     CmiAssert(tmp_buf);
435
436     rc = PMI_Allgather(in,tmp_buf,len);
437     CmiAssert(rc == PMI_SUCCESS);
438
439     out_ptr = out;
440
441     for(i=0;i<job_size;i++) {
442
443         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
444
445     }
446
447     free(tmp_buf);
448 }
449 static void
450 allgather_2(void *in,void *out, int len)
451 {
452     //PMI_Allgather is out of order
453     int i,rc, extend_len;
454     int  rank_index;
455     char *out_ptr, *out_ref;
456     char *in2;
457
458     extend_len = sizeof(int) + len;
459     in2 = (char*)malloc(extend_len);
460
461     memcpy(in2, &myrank, sizeof(int));
462     memcpy(in2+sizeof(int), in, len);
463
464     out_ptr = (char*)malloc(mysize*extend_len);
465
466     rc = PMI_Allgather(in2, out_ptr, extend_len);
467     GNI_RC_CHECK("allgather", rc);
468
469     out_ref = out;
470
471     for(i=0;i<mysize;i++) {
472         //rank index 
473         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
474         //copy to the rank index slot
475         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
476     }
477
478     free(out_ptr);
479     free(in2);
480
481 }
482
483 static unsigned int get_gni_nic_address(int device_id)
484 {
485     unsigned int address, cpu_id;
486     gni_return_t status;
487     int i, alps_dev_id=-1,alps_address=-1;
488     char *token, *p_ptr;
489
490     p_ptr = getenv("PMI_GNI_DEV_ID");
491     if (!p_ptr) {
492         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
493        
494         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
495     } else {
496         while ((token = strtok(p_ptr,":")) != NULL) {
497             alps_dev_id = atoi(token);
498             if (alps_dev_id == device_id) {
499                 break;
500             }
501             p_ptr = NULL;
502         }
503         CmiAssert(alps_dev_id != -1);
504         p_ptr = getenv("PMI_GNI_LOC_ADDR");
505         CmiAssert(p_ptr != NULL);
506         i = 0;
507         while ((token = strtok(p_ptr,":")) != NULL) {
508             if (i == alps_dev_id) {
509                 alps_address = atoi(token);
510                 break;
511             }
512             p_ptr = NULL;
513             ++i;
514         }
515         CmiAssert(alps_address != -1);
516         address = alps_address;
517     }
518     return address;
519 }
520
521 static uint8_t get_ptag(void)
522 {
523     char *p_ptr, *token;
524     uint8_t ptag;
525
526     p_ptr = getenv("PMI_GNI_PTAG");
527     CmiAssert(p_ptr != NULL);
528     token = strtok(p_ptr, ":");
529     ptag = (uint8_t)atoi(token);
530     return ptag;
531         
532 }
533
534 static uint32_t get_cookie(void)
535 {
536     uint32_t cookie;
537     char *p_ptr, *token;
538
539     p_ptr = getenv("PMI_GNI_COOKIE");
540     CmiAssert(p_ptr != NULL);
541     token = strtok(p_ptr, ":");
542     cookie = (uint32_t)atoi(token);
543
544     return cookie;
545 }
546
547 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
548 /* TODO: add any that are related */
549 /* =====End of Definitions of Message-Corruption Related Macros=====*/
550
551
552 #include "machine-lrts.h"
553 #include "machine-common-core.c"
554
555 /* Network progress function is used to poll the network when for
556    messages. This flushes receive buffers on some  implementations*/
557 #if CMK_MACHINE_PROGRESS_DEFINED
558 void CmiMachineProgressImpl() {
559 }
560 #endif
561
562 inline
563 static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
564 {
565     MSG_LIST        *msg_tmp;
566     MallocMsgList(msg_tmp);
567     msg_tmp->destNode = destNode;
568     msg_tmp->size   = size;
569     msg_tmp->msg    = msg;
570     msg_tmp->tag    = tag;
571     msg_tmp->next   = 0;
572     if (smsg_msglist_index[destNode].head == 0 ) {
573         smsg_msglist_index[destNode].head = msg_tmp;
574         smsg_msglist_index[destNode].next = smsg_head_index;
575         smsg_head_index = destNode;
576     }
577     else {
578       (smsg_msglist_index[destNode].tail)->next    = msg_tmp;
579     }
580     smsg_msglist_index[destNode].tail          = msg_tmp;
581 #if PRINT_SYH
582     buffered_smsg_counter++;
583 #endif
584 }
585
586 inline
587 static void setup_smsg_connection(int destNode)
588 {
589     mdh_addr_list_t  *new_entry = 0;
590     gni_post_descriptor_t *pd;
591     gni_smsg_attr_t      *smsg_attr;
592     gni_return_t status = GNI_RC_NOT_DONE;
593     if(smsg_available_slot == smsg_expand_slots)
594     {
595         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
596         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
597         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
598
599         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
600             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
601             GNI_MEM_READWRITE,   
602             -1,
603             &(new_entry->mdh));
604         smsg_available_slot = 0; 
605         new_entry->next = smsg_dynamic_list;
606         smsg_dynamic_list = new_entry;
607     }
608     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
609     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
610     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
611     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
612     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
613     smsg_attr->buff_size = smsg_memlen;
614     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
615     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
616     smsg_local_attr_vec[destNode] = smsg_attr;
617     smsg_available_slot++;
618     MallocPostDesc(pd);
619     pd->type            = GNI_POST_FMA_PUT;
620     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
621 //pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
622     pd->length          = sizeof(gni_smsg_attr_t);
623     pd->local_addr      = (uint64_t) smsg_attr;
624     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
625     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
626     pd->src_cq_hndl     = 0;
627     pd->rdma_mode       = 0;
628     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
629     GNI_RC_CHECK("SMSG Dynamic link", status);
630     //CmiPrintf("[%d=%d] send post FMA successful (%p) %s\n", myrank, destNode, (void*)pd->remote_addr, gni_err_str[status]);
631 }
632
633 inline 
634 static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
635 {
636     gni_return_t status = GNI_RC_NOT_DONE;
637     gni_smsg_attr_t      *smsg_attr;
638     gni_post_descriptor_t *pd;
639   
640     if(useDynamicSMSG == 1)
641     {
642         if(smsg_connected_flag[destNode] == 0)
643         {
644             //CmiPrintf("[%d]Init smsg connection\n", CmiMyPe());
645             setup_smsg_connection(destNode);
646             delay_send_small_msg(msg, size, destNode, tag);
647             smsg_connected_flag[destNode] =1;
648             return status;
649         }
650         else  if(smsg_connected_flag[destNode] < 3)
651         {
652             if(inbuff == 0)
653                 delay_send_small_msg(msg, size, destNode, tag);
654             return status;
655         }
656     }
657     //CmiPrintf("[%d] reach send\n", myrank);
658     if(smsg_msglist_index[destNode].head == 0 || inbuff==1)
659     {
660         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], header, size_header, msg, size, 0, tag);
661         if(status == GNI_RC_SUCCESS)
662         {
663 #if PRINT_SYH
664             lrts_smsg_success++;
665             if(lrts_smsg_success == lrts_send_msg_id)
666                 CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
667             else
668                 CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
669 #endif
670             return status;
671         }
672     }else {
673         if(inbuff ==0)
674             delay_send_small_msg(msg, size, destNode, tag);
675     }
676     return status;
677 }
678
679 // Get first 0 in DMA_tags starting from index
680 static int get_first_avail_bit(uint64_t DMA_tags, int start_index)
681 {
682
683     uint64_t         mask = 0x1;
684     register    int     i=0;
685     while((DMA_tags & mask) && i<DMA_slots) {mask << 1; i++;}
686
687 }
688
689 static int send_medium_messages(int destNode, int size, char *msg)
690 {
691 #if 0
692     gni_return_t status = GNI_RC_SUCCESS;
693     int first_avail_bit=0;
694     uint64_t mask = 0x1;
695     MEDIUM_MSG_CONTROL  *medium_msg_control_tmp;
696     MEDIUM_MSG_LIST        *msg_tmp;
697     int blocksize, remain_size, pos;
698     int sub_id = 0;
699     remain_size = size;
700     pos = 0;  //offset before which data are sent
701     /* copy blocks of the message to DMA preallocated buffer and send SMSG */
702     //Check whether there is any available DMA buffer
703     
704     do{
705         while((DMA_avail_tag & mask) && first_avail_bit<DMA_slots) {mask << 1; first_avail_bit++;}
706         if(first_avail_bit == DMA_slots) //No available DMA, buffer this message
707         {
708             MallocMediumMsgList(msg_tmp);
709             msg_tmp->destNode = destNode;
710             msg_tmp->msg_id   = lrts_send_msg_id;
711             msg_tmp->msg_subid   = sub_id;
712             msg_tmp->size   = remain_size;
713             msg_tmp->msg    = msg+pos;
714             msg_tmp->next   = NULL;
715             break;
716         }else
717         {
718             //copy this part of the message into this DMA buffer
719             //TODO optimize here, some data can go with this SMSG
720             blocksize = (remain_size>DMA_SIZE_PER_SLOT)?DMA_SIZE_PER_SLOT: remain_size;
721             memcpy(DMA_buffer_base_mdh_addr.addr[first_avail_bit], msg+pos, blocksize);
722             pos += blocksize;
723             remain_size -= blocksize;
724             SET_BITS(DMA_avail_tag, first_avail_bit);
725            
726             MallocMediumControlMsg(medium_msg_control_tmp);
727             medium_msg_control_tmp->msg_id = lrts_send_msg_id;
728             medium_msg_control_tmp->msg_subid = sub_id;
729             if(status == GNI_RC_SUCCESS)
730             {
731                 if(sub_id==0)
732                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_HEAD_TAG);
733                 else
734                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_DATA_TAG);
735             }
736             //buffer this smsg
737             if(status != GNI_RC_SUCCESS)
738             {
739                 delay_send_small_msg(medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), destNode, MEDIUM_HEAD_TAG);
740             }
741             sub_id++;
742         }while(remain_size > 0 );
743
744         }
745     }
746 #endif
747 }
748
749 // Large message, send control to receiver, receiver register memory and do a GET 
750 inline
751 static void send_large_messages(int destNode, int size, char *msg)
752 {
753 #if     USE_LRTS_MEMPOOL
754     gni_return_t        status  =   GNI_RC_SUCCESS;
755     CONTROL_MSG         *control_msg_tmp;
756     uint32_t            vmdh_index  = -1;
757     /* construct a control message and send */
758     MallocControlMsg(control_msg_tmp);
759     control_msg_tmp->source_addr    = (uint64_t)msg;
760     control_msg_tmp->source         = myrank;
761     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
762     //memcpy( &(control_msg_tmp->source_mem_hndl), GetMemHndl(msg), sizeof(gni_mem_handle_t)) ;
763     control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
764 #if PRINT_SYH
765     lrts_send_msg_id++;
766     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
767 #endif
768     status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
769     if(status == GNI_RC_SUCCESS)
770     {
771         FreeControlMsg(control_msg_tmp);
772     }
773 // NOT use mempool, should slow 
774 #else
775     gni_return_t        status  =   GNI_RC_SUCCESS;
776     CONTROL_MSG         *control_msg_tmp;
777     uint32_t            vmdh_index  = -1;
778     /* construct a control message and send */
779     MallocControlMsg(control_msg_tmp);
780     control_msg_tmp->source_addr    = (uint64_t)msg;
781     control_msg_tmp->source         = myrank;
782     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
783     control_msg_tmp->source_mem_hndl.qword1 = 0;
784     control_msg_tmp->source_mem_hndl.qword2 = 0;
785 #if PRINT_SYH
786     lrts_send_msg_id++;
787     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
788 #endif
789     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
790     if(status == GNI_RC_SUCCESS)
791     {
792         status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
793         if(status == GNI_RC_SUCCESS)
794         {
795             FreeControlMsg(control_msg_tmp);
796         }
797     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
798     {
799         CmiAbort("Memory registor for large msg\n");
800     }else 
801     {
802         delay_send_small_msg(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
803     }
804 #endif
805 }
806
807 inline void LrtsPrepareEnvelope(char *msg, int size)
808 {
809     CmiSetMsgSize(msg, size);
810 }
811
812 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
813 {
814     gni_return_t        status  =   GNI_RC_SUCCESS;
815     LrtsPrepareEnvelope(msg, size);
816
817     if(size <= SMSG_MAX_MSG)
818     {
819 #if PRINT_SYH
820         lrts_send_msg_id++;
821         CmiPrintf("SMSG LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_msg_id);
822 #endif
823         status = send_smsg_message( destNode, 0, 0, msg, size, SMALL_DATA_TAG, 0);  
824         if(status == GNI_RC_SUCCESS)
825         {
826             CmiFree(msg);
827         }
828     }
829     else
830     {
831         send_large_messages(destNode, size, msg);
832     }
833     return 0;
834 }
835
836 static void LrtsPreCommonInit(int everReturn){}
837
838 /* Idle-state related functions: called in non-smp mode */
839 void CmiNotifyIdleForGemini(void) {
840     LrtsAdvanceCommunication();
841 }
842
843 static void LrtsPostCommonInit(int everReturn)
844 {
845 #if CMK_SMP
846     CmiIdleState *s=CmiNotifyGetState();
847     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
848     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
849 #else
850     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForGemini,NULL);
851 #endif
852
853 }
854
855
856 void LrtsPostNonLocal(){}
857 /* pooling CQ to receive network message */
858 static void PumpNetworkRdmaMsgs()
859 {
860     gni_cq_entry_t      event_data;
861     gni_return_t        status;
862     while( (status = GNI_CqGetEvent(post_rx_cqh, &event_data)) == GNI_RC_SUCCESS);
863 }
864
865 static int SendRdmaMsg();
866 static void getLargeMsgRequest(void* header, uint64_t inst_id);
867 static void PumpNetworkSmsg()
868 {
869     uint64_t            inst_id;
870     PENDING_GETNEXT     *pending_next;
871     int                 ret;
872     gni_cq_entry_t      event_data;
873     gni_return_t        status;
874     void                *header;
875     uint8_t             msg_tag;
876     int                 msg_nbytes;
877     void                *msg_data;
878     gni_mem_handle_t    msg_mem_hndl;
879     gni_smsg_attr_t     *smsg_attr;
880     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
881     {
882         inst_id = GNI_CQ_GET_INST_ID(event_data);
883         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
884 #if PRINT_SYH
885         CmiPrintf("[%d] PumpNetworkMsgs small msgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
886 #endif
887
888         if(useDynamicSMSG == 1)
889         {
890             if(smsg_connected_flag[inst_id] == 0 )
891             {
892                 //CmiPrintf("[%d]pump Init smsg connection\n", CmiMyPe());
893                 smsg_connected_flag[inst_id] =2;
894                 setup_smsg_connection(inst_id);
895                 status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id],  &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]));
896                 GNI_RC_CHECK("SmsgInit", status);
897                 continue;
898             } else if (smsg_connected_flag[inst_id] <3) 
899             {
900                 //CmiPrintf("[%d]pump setup smsg connection\n", CmiMyPe());
901                 smsg_connected_flag[inst_id] += 1;
902                 status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id], &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]));
903                 GNI_RC_CHECK("SmsgInit", status);
904                 continue;
905             }
906         }
907         msg_tag = GNI_SMSG_ANY_TAG;
908         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
909         {
910 #if PRINT_SYH
911             lrts_received_msg++;
912             CmiPrintf("+++[%d] PumpNetwork msg is received, messageid:%d tag=%d\n", myrank, lrts_received_msg, msg_tag);
913 #endif
914             /* copy msg out and then put into queue (small message) */
915             switch (msg_tag) {
916             case SMALL_DATA_TAG:
917             {
918                 msg_nbytes = CmiGetMsgSize(header);
919                 msg_data    = CmiAlloc(msg_nbytes);
920                 memcpy(msg_data, (char*)header, msg_nbytes);
921                 handleOneRecvedMsg(msg_nbytes, msg_data);
922                 break;
923             }
924             case LMSG_INIT_TAG:
925             {
926 #if PRINT_SYH
927                 CmiPrintf("+++[%d] from %d PumpNetwork Rdma Request msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
928 #endif
929                 getLargeMsgRequest(header, inst_id);
930                 break;
931             }
932             case ACK_TAG:
933             {
934                 /* Get is done, release message . Now put is not used yet*/
935 #if         !USE_LRTS_MEMPOOL
936                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
937 #endif
938                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
939                 SendRdmaMsg();
940                 break;
941             }
942             case PUT_DONE_TAG: //persistent message
943             {
944                 handleOneRecvedMsg(((CONTROL_MSG *) header)->length,((void*) (CONTROL_MSG *) header)); 
945                 break;
946             }
947             default: {
948                 CmiPrintf("weird tag problem\n");
949                 CmiAbort("Unknown tag\n");
950             }
951             }
952             GNI_SmsgRelease(ep_hndl_array[inst_id]);
953             msg_tag = GNI_SMSG_ANY_TAG;
954         } //endwhile getNext
955     }   //end while GetEvent
956     if(status == GNI_RC_ERROR_RESOURCE)
957     {
958         GNI_RC_CHECK("Smsg_rx_cq full", status);
959     }
960 }
961
962 static void getLargeMsgRequest(void* header, uint64_t inst_id)
963 {
964 #if     USE_LRTS_MEMPOOL
965     CONTROL_MSG         *request_msg;
966     gni_return_t        status;
967     void                *msg_data;
968     gni_post_descriptor_t *pd;
969     RDMA_REQUEST        *rdma_request_msg;
970     gni_mem_handle_t    msg_mem_hndl;
971     int source;
972     // initial a get to transfer data from the sender side */
973     request_msg = (CONTROL_MSG *) header;
974     source = request_msg->source;
975     msg_data = CmiAlloc(request_msg->length);
976     _MEMCHECK(msg_data);
977     //memcpy(&msg_mem_hndl, GetMemHndl(msg_data), sizeof(gni_mem_handle_t));
978     msg_mem_hndl = GetMemHndl(msg_data);
979
980     MallocPostDesc(pd);
981     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
982         pd->type            = GNI_POST_FMA_GET;
983     else
984         pd->type            = GNI_POST_RDMA_GET;
985 #if REMOTE_EVENT
986     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
987 #else
988     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
989 #endif
990     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
991     pd->length          = ALIGN4(request_msg->length);
992     pd->local_addr      = (uint64_t) msg_data;
993     pd->local_mem_hndl  = msg_mem_hndl;
994     pd->remote_addr     = request_msg->source_addr;
995     pd->remote_mem_hndl = request_msg->source_mem_hndl;
996     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
997     pd->rdma_mode       = 0;
998
999     if(pd->type == GNI_POST_RDMA_GET) 
1000         status = GNI_PostRdma(ep_hndl_array[source], pd);
1001     else
1002         status = GNI_PostFma(ep_hndl_array[source],  pd);
1003     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1004     {
1005         MallocRdmaRequest(rdma_request_msg);
1006         rdma_request_msg->next = 0;
1007         rdma_request_msg->destNode = inst_id;
1008         rdma_request_msg->pd = pd;
1009         if(pending_rdma_head == 0)
1010         {
1011             pending_rdma_head = rdma_request_msg;
1012         }else
1013         {
1014             pending_rdma_tail->next = rdma_request_msg;
1015         }
1016         pending_rdma_tail = rdma_request_msg;
1017     }else
1018         GNI_RC_CHECK("AFter posting", status);
1019
1020 #else
1021     CONTROL_MSG         *request_msg;
1022     gni_return_t        status;
1023     void                *msg_data;
1024     gni_post_descriptor_t *pd;
1025     RDMA_REQUEST        *rdma_request_msg;
1026     gni_mem_handle_t    msg_mem_hndl;
1027     int source;
1028     // initial a get to transfer data from the sender side */
1029     request_msg = (CONTROL_MSG *) header;
1030     source = request_msg->source;
1031     msg_data = CmiAlloc(request_msg->length);
1032     _MEMCHECK(msg_data);
1033
1034     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
1035
1036     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1037     {
1038         GNI_RC_CHECK("Mem Register before post", status);
1039     }
1040
1041     MallocPostDesc(pd);
1042     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
1043         pd->type            = GNI_POST_FMA_GET;
1044     else
1045         pd->type            = GNI_POST_RDMA_GET;
1046 #if REMOTE_EVENT
1047     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1048 #else
1049     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1050 #endif
1051     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1052     pd->length          = ALIGN4(request_msg->length);
1053     pd->local_addr      = (uint64_t) msg_data;
1054     pd->remote_addr     = request_msg->source_addr;
1055     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1056     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1057     pd->rdma_mode       = 0;
1058
1059     //memory registration successful
1060     if(status == GNI_RC_SUCCESS)
1061     {
1062         pd->local_mem_hndl  = msg_mem_hndl;
1063         if(pd->type == GNI_POST_RDMA_GET) 
1064             status = GNI_PostRdma(ep_hndl_array[source], pd);
1065         else
1066             status = GNI_PostFma(ep_hndl_array[source],  pd);
1067     }else
1068     {
1069         pd->local_mem_hndl.qword1  = 0; 
1070         pd->local_mem_hndl.qword1  = 0; 
1071     }
1072     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1073     {
1074         MallocRdmaRequest(rdma_request_msg);
1075         rdma_request_msg->next = 0;
1076         rdma_request_msg->destNode = inst_id;
1077         rdma_request_msg->pd = pd;
1078         if(pending_rdma_head == 0)
1079         {
1080             pending_rdma_head = rdma_request_msg;
1081         }else
1082         {
1083             pending_rdma_tail->next = rdma_request_msg;
1084         }
1085         pending_rdma_tail = rdma_request_msg;
1086     }else
1087         GNI_RC_CHECK("AFter posting", status);
1088 #endif
1089 }
1090
1091 /* Check whether message send or get is confirmed by remote */
1092 static void PumpLocalSmsgTransactions()
1093 {
1094     gni_return_t            status;
1095     gni_cq_entry_t          ev;
1096     while ((status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1097     {
1098 #if PRINT_SYH
1099         lrts_local_done_msg++;
1100         //CmiPrintf("*[%d]  PumpLocalSmsgTransactions GNI_CQ_GET_TYPE %d. Localdone=%d\n", myrank, GNI_CQ_GET_TYPE(ev), lrts_local_done_msg);
1101 #endif
1102         if(GNI_CQ_OVERRUN(ev))
1103         {
1104             CmiPrintf("Overrun detected in local CQ");
1105             CmiAbort("Overrun in TX");
1106         }
1107     }
1108     if(status == GNI_RC_ERROR_RESOURCE)
1109     {
1110         GNI_RC_CHECK("Smsg_tx_cq full", status);
1111     }
1112 }
1113
1114 static void PumpLocalRdmaTransactions()
1115 {
1116     gni_cq_entry_t          ev;
1117     gni_return_t            status;
1118     uint64_t                type, inst_id;
1119     gni_post_descriptor_t   *tmp_pd;
1120     MSG_LIST                *ptr;
1121     CONTROL_MSG             *ack_msg_tmp;
1122     uint8_t             msg_tag;
1123
1124    // while ( (status = GNI_CqGetEvent(post_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1125     while ( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1126     {
1127         type        = GNI_CQ_GET_TYPE(ev);
1128 #if PRINT_SYH
1129         //lrts_local_done_msg++;
1130         CmiPrintf("**[%d] SMSGPumpLocalTransactions (type=%d)\n", myrank, type);
1131 #endif
1132         if (type == GNI_CQ_EVENT_TYPE_POST)
1133         {
1134             inst_id     = GNI_CQ_GET_INST_ID(ev);
1135 #if PRINT_SYH
1136             CmiPrintf("**[%d] SMSGPumpLocalTransactions localdone=%d, %d\n", myrank,  lrts_local_done_msg, smsg_connected_flag[inst_id]);
1137 #endif
1138             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
1139             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1140             ////Message is sent, free message , put is not used now
1141             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
1142             {
1143                 if(smsg_connected_flag[inst_id] <3)
1144                 {
1145                     smsg_connected_flag[inst_id] += 1;
1146                     continue;
1147                 }
1148                 else  
1149                 {
1150                     //persistent message 
1151                     CmiFree((void *)tmp_pd->local_addr);
1152                     msg_tag = PUT_DONE_TAG; 
1153                 }
1154             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1155             {
1156                 msg_tag = ACK_TAG;  
1157             }
1158             MallocControlMsg(ack_msg_tmp);
1159             ack_msg_tmp->source             = myrank;
1160             ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
1161             ack_msg_tmp->length             = tmp_pd->length; 
1162             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1163 #if PRINT_SYH
1164             lrts_send_msg_id++;
1165             CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
1166 #endif
1167             status = send_smsg_message(inst_id, 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0);  
1168             if(status == GNI_RC_SUCCESS)
1169             {
1170                 FreeControlMsg(ack_msg_tmp);
1171             }
1172 #if     !USE_LRTS_MEMPOOL
1173             MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1174 #endif
1175             CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1176             handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1177             SendRdmaMsg(); 
1178             FreePostDesc(tmp_pd);
1179         }
1180     } //end while
1181 }
1182
1183 static int SendRdmaMsg()
1184 {
1185     gni_return_t            status = GNI_RC_SUCCESS;
1186     gni_mem_handle_t        msg_mem_hndl;
1187
1188     RDMA_REQUEST *ptr = pending_rdma_head;
1189     RDMA_REQUEST *prev = NULL;
1190
1191     while (ptr != NULL)
1192     {
1193         gni_post_descriptor_t *pd = ptr->pd;
1194         status = GNI_RC_SUCCESS;
1195         // register memory first
1196         if( pd->local_mem_hndl.qword1 == 0 && pd->local_mem_hndl.qword2 == 0)
1197         {
1198             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1199         }
1200         if(status == GNI_RC_SUCCESS)
1201         {
1202             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1203                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1204             else
1205                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1206             if(status == GNI_RC_SUCCESS)
1207             {
1208                 RDMA_REQUEST *tmp = ptr;
1209                 if (prev)
1210                   prev->next = ptr->next;
1211                 else
1212                   pending_rdma_head = ptr->next;
1213                 ptr = ptr->next;
1214                 FreeRdmaRequest(tmp);
1215                 continue;
1216             }
1217         }
1218         prev = ptr;
1219         ptr = ptr->next;
1220     } //end while
1221     return 0;
1222 }
1223
1224 // return 1 if all messages are sent
1225 static int SendBufferMsg()
1226 {
1227     MSG_LIST            *ptr, *previous_head, *current_head;
1228     CONTROL_MSG         *control_msg_tmp;
1229     gni_return_t        status;
1230     int done = 1;
1231     register    int     i;
1232     int                 index_previous = -1;
1233     int                 index = smsg_head_index;
1234     //if( smsg_msglist_head == 0 && buffered_smsg_counter!= 0 ) {CmiPrintf("WRONGWRONG on rank%d, buffermsg=%d, (msgid-succ:%d)\n", myrank, buffered_smsg_counter, (lrts_send_msg_id-lrts_smsg_success)); CmiAbort("sendbuf");}
1235     /* can add flow control here to control the number of messages sent before handle message */
1236     while(index != -1)
1237     {
1238         ptr = smsg_msglist_index[index].head;
1239        
1240         while(ptr!=0)
1241         {
1242             CmiAssert(ptr!=NULL);
1243             if(ptr->tag == SMALL_DATA_TAG)
1244             {
1245                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, ptr->size, SMALL_DATA_TAG, 1);  
1246                 if(status == GNI_RC_SUCCESS)
1247                 {
1248                     CmiFree(ptr->msg);
1249                 }
1250             }
1251             else if(ptr->tag == LMSG_INIT_TAG)
1252             {
1253                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1254 #if PRINT_SYH
1255                 CmiPrintf("[%d==>%d] LMSG buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1256 #endif
1257                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
1258                 {
1259                     MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, control_msg_tmp->length, &(control_msg_tmp->source_mem_hndl), &omdh);
1260                     if(status != GNI_RC_SUCCESS) {
1261                         done = 0;
1262                         break;
1263                     }
1264                 }
1265                 
1266                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 1);  
1267                 if(status == GNI_RC_SUCCESS)
1268                 {
1269                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
1270                 }
1271             }else if (ptr->tag == ACK_TAG)
1272             {
1273                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), ACK_TAG, 1);  
1274                 if(status == GNI_RC_SUCCESS)
1275                 {
1276                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1277                 }
1278             }else
1279             {
1280                 CmiPrintf("Weird tag\n");
1281                 CmiAbort("should not happen\n");
1282             }
1283             if(status == GNI_RC_SUCCESS)
1284             {
1285 #if PRINT_SYH
1286                 buffered_smsg_counter--;
1287                 if(lrts_smsg_success == lrts_send_msg_id)
1288                     CmiPrintf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1289                 else
1290                     CmiPrintf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1291 #endif
1292                 smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
1293                 FreeMsgList(ptr);
1294                 ptr= smsg_msglist_index[index].head;
1295
1296             }else {
1297                 done = 0;
1298                 break;
1299             } 
1300         
1301         } //end while
1302         if(ptr == 0)
1303         {
1304             if(index_previous != -1)
1305                 smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next;
1306             else
1307                 smsg_head_index = smsg_msglist_index[index].next;
1308         }else
1309         {
1310             index_previous = index;
1311         }
1312         index = smsg_msglist_index[index].next;
1313     }   // end pooling for all cores
1314     return done;
1315 }
1316
1317 static void LrtsAdvanceCommunication()
1318 {
1319     /*  Receive Msg first */
1320 #if 0
1321     if(myrank == 0)
1322     CmiPrintf("Calling Lrts Pump Msg PE:%d\n", myrank);
1323 #endif
1324     PumpNetworkSmsg();
1325     //CmiPrintf("Calling Lrts Pump RdmaMsg PE:%d\n", CmiMyPe());
1326     //PumpNetworkRdmaMsgs();
1327     /* Release Sent Msg */
1328     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
1329 #if 0
1330     ////PumpLocalSmsgTransactions();
1331     if(myrank == 0)
1332     CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", myrank);
1333 #endif
1334     PumpLocalRdmaTransactions();
1335 #if 0
1336     if(myrank == 0)
1337     CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", myrank);
1338 #endif
1339     /* Send buffered Message */
1340     SendBufferMsg();
1341 #if 0
1342     if(myrank == 0)
1343     CmiPrintf("Calling Lrts rdma PE:%d\n", myrank);
1344 #endif
1345     SendRdmaMsg();
1346 #if 0
1347     if(myrank == 0)
1348     CmiPrintf("done PE:%d\n", myrank);
1349 #endif
1350 }
1351
1352 static void _init_dynamic_smsg()
1353 {
1354     gni_smsg_attr_t smsg_attr;
1355     gni_return_t status;
1356     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
1357     memset(smsg_connected_flag, 0, mysize*sizeof(int));
1358
1359     CmiPrintf("start Dynamic init done %d\n", myrank);
1360     smsg_local_attr_vec = (gni_smsg_attr_t**) malloc(sizeof(gni_smsg_attr_t*) *mysize);
1361     
1362     setup_mem.addr = (uint64_t)malloc(mysize * sizeof(gni_smsg_attr_t));
1363     status = GNI_MemRegister(nic_hndl, setup_mem.addr,  mysize * SMSG_CONN_SIZE, smsg_rx_cqh,  GNI_MEM_READWRITE, -1,  &(setup_mem.mdh));
1364    
1365     GNI_RC_CHECK("Smsg dynamic allocation \n", status);
1366     CmiPrintf("[%d]Smsg dynamic allocation (%p)\n", myrank, (void*)setup_mem.addr);
1367     smsg_connection_vec = (mdh_addr_t*) malloc(mysize*sizeof(mdh_addr_t)); 
1368     allgather(&setup_mem, smsg_connection_vec, sizeof(mdh_addr_t));
1369     
1370     //pre-allocate some memory as mailbox for dynamic connection
1371     if(mysize <=4096)
1372     {
1373         SMSG_MAX_MSG = 1024;
1374     }else if (mysize > 4096 && mysize <= 16384)
1375     {
1376         SMSG_MAX_MSG = 512;
1377     }else {
1378         SMSG_MAX_MSG = 256;
1379     }
1380     
1381     smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1382     smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1383     smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1384     status = GNI_SmsgBufferSizeNeeded(&smsg_attr, &smsg_memlen);
1385     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1386     
1387     smsg_dynamic_list = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1388
1389     smsg_dynamic_list->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1390     bzero(smsg_dynamic_list->addr, smsg_memlen*smsg_expand_slots);
1391     
1392     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_dynamic_list->addr,
1393             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1394             GNI_MEM_READWRITE,   
1395             -1,
1396             &(smsg_dynamic_list->mdh));
1397    smsg_available_slot = 0;  
1398    CmiPrintf(" Dynamic init done %d\n", myrank);
1399 }
1400
1401 static void _init_static_smsg()
1402 {
1403     gni_smsg_attr_t      *smsg_attr;
1404     gni_smsg_attr_t      remote_smsg_attr;
1405     gni_smsg_attr_t      *smsg_attr_vec;
1406     gni_mem_handle_t     my_smsg_mdh_mailbox;
1407     int      i;
1408     gni_return_t status;
1409     uint32_t              vmdh_index = -1;
1410     mdh_addr_t            base_infor;
1411     mdh_addr_t            *base_addr_vec;
1412     if(mysize <=4096)
1413     {
1414         SMSG_MAX_MSG = 1024;
1415         //log2_SMSG_MAX_MSG = 10;
1416     }else if (mysize > 4096 && mysize <= 16384)
1417     {
1418         SMSG_MAX_MSG = 512;
1419         //log2_SMSG_MAX_MSG = 9;
1420
1421     }else {
1422         SMSG_MAX_MSG = 256;
1423         //log2_SMSG_MAX_MSG = 8;
1424     }
1425     
1426     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
1427     
1428     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1429     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
1430     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
1431     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
1432     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1433     smsg_mailbox_base = memalign(64, smsg_memlen*(mysize));
1434     _MEMCHECK(smsg_mailbox_base);
1435     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
1436     if (myrank == 0) CmiPrintf("Charm++> allocates %.2fMB for SMSG. \n", smsg_memlen*mysize/1e6);
1437     
1438     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
1439             smsg_memlen*(mysize), smsg_rx_cqh,
1440             GNI_MEM_READWRITE,   
1441             vmdh_index,
1442             &my_smsg_mdh_mailbox);
1443
1444     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1445
1446     base_infor.addr =  (uint64_t)smsg_mailbox_base;
1447     base_infor.mdh =  my_smsg_mdh_mailbox;
1448     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
1449
1450     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
1451  
1452     for(i=0; i<mysize; i++)
1453     {
1454         if(i==myrank)
1455             continue;
1456         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1457         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1458         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1459         smsg_attr[i].mbox_offset = i*smsg_memlen;
1460         smsg_attr[i].buff_size = smsg_memlen;
1461         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
1462         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1463     }
1464
1465     for(i=0; i<mysize; i++)
1466     {
1467         if (myrank == i) continue;
1468
1469         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1470         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1471         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1472         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
1473         remote_smsg_attr.buff_size = smsg_memlen;
1474         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
1475         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
1476
1477         /* initialize the smsg channel */
1478         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
1479         GNI_RC_CHECK("SMSG Init", status);
1480     } //end initialization
1481
1482     free(base_addr_vec);
1483
1484     free(smsg_attr);
1485     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
1486      GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
1487
1488
1489 inline
1490 static void _init_smsg()
1491 {
1492     int i;
1493
1494      smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
1495      for(i =0; i<mysize; i++)
1496      {
1497         smsg_msglist_index[i].next = -1;
1498         smsg_msglist_index[i].head = 0;
1499         smsg_msglist_index[i].tail = 0;
1500      }
1501      smsg_head_index = -1;
1502 }
1503
1504 static void _init_static_msgq()
1505 {
1506     gni_return_t status;
1507     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
1508     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
1509     msgq_attrs.smsg_q_sz = 1;
1510     msgq_attrs.rcv_pool_sz = 1;
1511     msgq_attrs.num_msgq_eps = 2;
1512     msgq_attrs.nloc_insts = 8;
1513     msgq_attrs.modes = 0;
1514     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
1515
1516     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
1517     GNI_RC_CHECK("MSGQ Init", status);
1518
1519
1520 }
1521
1522 static void _init_DMA_buffer()
1523 {
1524     gni_return_t            status = GNI_RC_SUCCESS;
1525     /*AUTO tuning */
1526     /* suppose max_smsg is 1024, DMA buffer is split into 2048, 4096, 8192, ... */
1527     /*  This method might be better for SMP, but it is bad for Nonsmp since msgs are sharing same slots */
1528     /*
1529      * DMA_slots = 19-log2_SMSG_MAX_MSG;
1530     DMA_incoming_avail_tag = malloc(DMA_slots);
1531     DMA_buffer_size = 2*(DMA_max_single_msg - SMSG_MAX_MSG); 
1532     DMA_incoming_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1533     DMA_outgoing_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1534     
1535     status = GNI_MemRegister(nic_hndl, (uint64_t)DMA_incoming_base_addr,
1536             DMA_buffer_size, smsg_rx_cqh,
1537             GNI_MEM_READWRITE ,   
1538             vmdh_index,
1539             &);
1540             */
1541     // one is reserved to avoid deadlock
1542     DMA_slots           = 17; // each one is 8K  16*8K + 1 slot reserved to avoid deadlock
1543     DMA_buffer_size     = DMA_max_single_msg + 8192;
1544     DMA_buffer_base_mdh_addr.addr = (uint64_t)memalign(ALIGNBUF, DMA_buffer_size);
1545     status = GNI_MemRegister(nic_hndl, DMA_buffer_base_mdh_addr.addr,
1546         DMA_buffer_size, smsg_rx_cqh,
1547         GNI_MEM_READWRITE ,   
1548         -1,
1549         &(DMA_buffer_base_mdh_addr.mdh));
1550     GNI_RC_CHECK("GNI_MemRegister", status);
1551     DMA_buffer_base_mdh_addr_vec = (mdh_addr_t*) malloc(sizeof(mdh_addr_t) * mysize);
1552
1553     allgather(&DMA_buffer_base_mdh_addr, DMA_buffer_base_mdh_addr_vec, sizeof(mdh_addr_t) );
1554 }
1555
1556 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
1557 {
1558     register int            i;
1559     int                     rc;
1560     int                     device_id = 0;
1561     unsigned int            remote_addr;
1562     gni_cdm_handle_t        cdm_hndl;
1563     gni_return_t            status = GNI_RC_SUCCESS;
1564     uint32_t                vmdh_index = -1;
1565     uint8_t                 ptag;
1566     unsigned int            local_addr, *MPID_UGNI_AllAddr;
1567     int                     first_spawned;
1568     int                     physicalID;
1569     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
1570     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
1571     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
1572    
1573     //Mempool_MaxSize = CmiGetArgFlag(*argv, "+useMemorypoolSize");
1574     useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
1575     if (myrank==0) 
1576     {
1577         if(useDynamicSMSG) 
1578             CmiPrintf("Charm++> use Dynamic SMSG\n"); 
1579         else 
1580             CmiPrintf("Charm++> use Static SMSG\n");
1581     };
1582     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
1583     
1584     status = PMI_Init(&first_spawned);
1585     GNI_RC_CHECK("PMI_Init", status);
1586
1587     status = PMI_Get_size(&mysize);
1588     GNI_RC_CHECK("PMI_Getsize", status);
1589
1590     status = PMI_Get_rank(&myrank);
1591     GNI_RC_CHECK("PMI_getrank", status);
1592
1593     //physicalID = CmiPhysicalNodeID(myrank);
1594     
1595     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
1596
1597     *myNodeID = myrank;
1598     *numNodes = mysize;
1599   
1600     if(myrank == 0)
1601     {
1602         printf("Charm++> Running on Gemini (GNI) using %d  cores\n", mysize);
1603     }
1604 #ifdef USE_ONESIDED
1605     onesided_init(NULL, &onesided_hnd);
1606
1607     // this is a GNI test, so use the libonesided bypass functionality
1608     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
1609     local_addr = gniGetNicAddress();
1610 #else
1611     ptag = get_ptag();
1612     cookie = get_cookie();
1613 #if 0
1614     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
1615 #endif
1616     //Create and attach to the communication  domain */
1617     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
1618     GNI_RC_CHECK("GNI_CdmCreate", status);
1619     //* device id The device id is the minor number for the device
1620     //that is assigned to the device by the system when the device is created.
1621     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
1622     //where X is the device number 0 default 
1623     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
1624     GNI_RC_CHECK("GNI_CdmAttach", status);
1625     local_addr = get_gni_nic_address(0);
1626 #endif
1627     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
1628     _MEMCHECK(MPID_UGNI_AllAddr);
1629     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
1630     /* create the local completion queue */
1631     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
1632     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
1633     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
1634     
1635     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
1636     //GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
1637     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
1638
1639     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
1640     GNI_RC_CHECK("Create CQ (rx)", status);
1641     
1642     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
1643     //GNI_RC_CHECK("Create Post CQ (rx)", status);
1644     
1645     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
1646     //GNI_RC_CHECK("Create BTE CQ", status);
1647
1648     /* create the endpoints. they need to be bound to allow later CQWrites to them */
1649     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
1650     _MEMCHECK(ep_hndl_array);
1651
1652     for (i=0; i<mysize; i++) {
1653         if(i == myrank) continue;
1654         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
1655         GNI_RC_CHECK("GNI_EpCreate ", status);   
1656         remote_addr = MPID_UGNI_AllAddr[i];
1657         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
1658         GNI_RC_CHECK("GNI_EpBind ", status);   
1659     }
1660     /* Depending on the number of cores in the job, decide different method */
1661     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
1662     if(mysize > 1)
1663     {
1664         if(useDynamicSMSG == 0)
1665         {
1666             _init_static_smsg();
1667         }else
1668         {
1669             _init_dynamic_smsg();
1670         }
1671
1672         _init_smsg();
1673     }
1674 #if     USE_LRTS_MEMPOOL
1675     CmiGetArgLong(*argv, "+useMemorypoolSize", &_mempool_size);
1676     if (myrank==0) CmiPrintf("Charm++> use memorypool size: %1.fMB\n", _mempool_size/1024.0/1024);
1677     init_mempool(_mempool_size);
1678     //init_mempool(Mempool_MaxSize);
1679 #endif
1680
1681     /* init DMA buffer for medium message */
1682
1683     //_init_DMA_buffer();
1684     
1685     free(MPID_UGNI_AllAddr);
1686 }
1687
1688
1689 void* LrtsAlloc(int n_bytes, int header)
1690 {
1691     void *ptr;
1692 #if 0
1693     CmiPrintf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d\n", CmiMyPe(), n_bytes, header);
1694 #endif
1695     if(n_bytes <= SMSG_MAX_MSG)
1696     {
1697         int totalsize = n_bytes+header;
1698         ptr = malloc(totalsize);
1699     }else 
1700     {
1701
1702         CmiAssert(header <= ALIGNBUF);
1703 #if     USE_LRTS_MEMPOOL
1704         n_bytes = ALIGN64(n_bytes);
1705         char *res = syh_mempool_malloc(ALIGNBUF+n_bytes);
1706 #else
1707         n_bytes = ALIGN4(n_bytes);           /* make sure size if 4 aligned */
1708         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1709 #endif
1710         ptr = res + ALIGNBUF - header;
1711     }
1712 #if 0 
1713     CmiPrintf("Done Alloc Lrts for bytes=%d, head=%d\n", n_bytes, header);
1714 #endif
1715     return ptr;
1716 }
1717
1718 void  LrtsFree(void *msg)
1719 {
1720     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1721     if (size <= SMSG_MAX_MSG)
1722       free(msg);
1723     else
1724     {
1725 #if 0
1726         CmiPrintf("[PE:%d] Free lrts for bytes=%d, ptr=%p\n", CmiMyPe(), size, (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1727 #endif
1728 #if     USE_LRTS_MEMPOOL
1729         syh_mempool_free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1730 #else
1731         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1732 #endif
1733     }
1734 #if 0 
1735     CmiPrintf("Done Free lrts for bytes=%d\n", size);
1736 #endif
1737 }
1738
1739 static void LrtsExit()
1740 {
1741     /* free memory ? */
1742 #if     USE_LRTS_MEMPOOL
1743     kill_allmempool();
1744 #endif
1745     PMI_Finalize();
1746     exit(0);
1747 }
1748
1749 static void LrtsDrainResources()
1750 {
1751     while (!SendBufferMsg()) {
1752         PumpNetworkSmsg();
1753         PumpNetworkRdmaMsgs();
1754         PumpLocalSmsgTransactions();
1755         PumpLocalRdmaTransactions();
1756     }
1757     PMI_Barrier();
1758 }
1759
1760 void CmiAbort(const char *message) {
1761
1762     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1763     PMI_Abort(-1, message);
1764 }
1765
1766 /**************************  TIMER FUNCTIONS **************************/
1767 #if CMK_TIMER_USE_SPECIAL
1768 /* MPI calls are not threadsafe, even the timer on some machines */
1769 static CmiNodeLock  timerLock = 0;
1770 static int _absoluteTime = 0;
1771 static int _is_global = 0;
1772 static struct timespec start_ns;
1773
1774 inline int CmiTimerIsSynchronized() {
1775     return 1;
1776 }
1777
1778 inline int CmiTimerAbsolute() {
1779     return _absoluteTime;
1780 }
1781
1782 double CmiStartTimer() {
1783     return 0.0;
1784 }
1785
1786 double CmiInitTime() {
1787     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
1788 }
1789
1790 void CmiTimerInit(char **argv) {
1791     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1792     if (_absoluteTime && CmiMyPe() == 0)
1793         printf("Charm++> absolute  timer is used\n");
1794     
1795     _is_global = CmiTimerIsSynchronized();
1796
1797
1798     if (_is_global) {
1799         if (CmiMyRank() == 0) {
1800             clock_gettime(CLOCK_MONOTONIC, &start_ts)
1801         }
1802     } else { /* we don't have a synchronous timer, set our own start time */
1803         CmiBarrier();
1804         CmiBarrier();
1805         CmiBarrier();
1806         clock_gettime(CLOCK_MONOTONIC, &start_ts)
1807     }
1808     CmiNodeAllBarrier();          /* for smp */
1809 }
1810
1811 /**
1812  * Since the timerLock is never created, and is
1813  * always NULL, then all the if-condition inside
1814  * the timer functions could be disabled right
1815  * now in the case of SMP.
1816  */
1817 double CmiTimer(void) {
1818     struct timespec now_ts;
1819     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1820     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1821         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1822 }
1823
1824 double CmiWallTimer(void) {
1825     struct timespec now_ts;
1826     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1827     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1828         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1829 }
1830
1831 double CmiCpuTimer(void) {
1832     struct timespec now_ts;
1833     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1834     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1835         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1836 }
1837
1838 #endif
1839 /************Barrier Related Functions****************/
1840
1841 int CmiBarrier()
1842 {
1843     int status;
1844     status = PMI_Barrier();
1845     return status;
1846
1847 }
1848
1849
1850 #if CMK_PERSISTENT_COMM
1851 #include "machine-persistent.c"
1852 #endif
1853
1854