fixed dynamic smsg problem
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <stdint.h>
16 #include <errno.h>
17 #include <malloc.h>
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #if defined(_WIN32) && ! defined(__CYGWIN__)
26 #include <windows.h>
27 #include <wincon.h>
28 #include <sys/types.h>
29 #include <sys/timeb.h>
30
31 static void sleep(int secs) {
32     Sleep(1000*secs);
33 }
34 #else
35 #include <unistd.h> /*For getpid()*/
36 #endif
37
38
39 #define REMOTE_EVENT         0
40 #define USE_LRTS_MEMPOOL   1
41
42 #if USE_LRTS_MEMPOOL
43 static CmiInt8 _mempool_size = 1024ll*1024*32;
44 #endif
45
46 #define PRINT_SYH  1
47
48 #if PRINT_SYH
49 int         lrts_smsg_success = 0;
50 int         lrts_send_msg_id = 0;
51 int         lrts_send_rdma_id = 0;
52 int         lrts_send_rdma_success = 0;
53 int         lrts_received_msg = 0;
54 int         lrts_local_done_msg = 0;
55 #endif
56
57 #include "machine.h"
58
59 #include "pcqueue.h"
60
61 #if CMK_PERSISTENT_COMM
62 #include "machine-persistent.h"
63 #endif
64
65 //#define  USE_ONESIDED 1
66 #ifdef USE_ONESIDED
67 //onesided implementation is wrong, since no place to restore omdh
68 #include "onesided.h"
69 onesided_hnd_t   onesided_hnd;
70 onesided_md_t    omdh;
71 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
72
73 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
74
75 #else
76 uint8_t   onesided_hnd, omdh;
77 #if REMOTE_EVENT
78 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
79 #else
80 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
81 #endif
82 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
83 #endif
84
85 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
86 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
87
88 #define ALIGNBUF                64
89
90 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
91 /* If SMSG is not used */
92 #define FMA_PER_CORE  1024
93 #define FMA_BUFFER_SIZE 1024
94 /* If SMSG is used */
95 static int  SMSG_MAX_MSG;
96 //static int  log2_SMSG_MAX_MSG;
97 #define SMSG_MAX_CREDIT  36
98
99 #define MSGQ_MAXSIZE       2048
100 /* large message transfer with FMA or BTE */
101 #define LRTS_GNI_RDMA_THRESHOLD  2048
102
103 #define REMOTE_QUEUE_ENTRIES  2048 
104 #define LOCAL_QUEUE_ENTRIES   64 
105
106 #define PUT_DONE_TAG      0x29
107 #define ACK_TAG           0x30
108 /* SMSG is data message */
109 #define SMALL_DATA_TAG          0x31
110 /* SMSG is a control message to initialize a BTE */
111 #define MEDIUM_HEAD_TAG         0x32
112 #define MEDIUM_DATA_TAG         0x33
113 #define LMSG_INIT_TAG     0x39 
114
115 #define DEBUG
116 #ifdef GNI_RC_CHECK
117 #undef GNI_RC_CHECK
118 #endif
119 #ifdef DEBUG
120 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
121 #else
122 #define GNI_RC_CHECK(msg,rc)
123 #endif
124
125 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
126 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
127
128 static int Mempool_MaxSize = 1024*1024*128;
129 static int useDynamicSMSG   = 0;
130 static int useStaticMSGQ = 0;
131 static int useStaticFMA = 0;
132 static int mysize, myrank;
133 static gni_nic_handle_t      nic_hndl;
134
135 typedef struct {
136     gni_mem_handle_t mdh;
137     uint64_t addr;
138 } mdh_addr_t ;
139 // this is related to dynamic SMSG
140
141 typedef struct mdh_addr_list{
142     gni_mem_handle_t mdh;
143    void *addr;
144     struct mdh_addr_list *next;
145 }mdh_addr_list_t;
146
147 static unsigned int         smsg_memlen;
148 #define     SMSG_CONN_SIZE     sizeof(gni_smsg_attr_t)
149 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
150 int                 *smsg_connected_flag= 0;
151 char                *smsg_connection_addr = 0;
152 mdh_addr_t          setup_mem;
153 mdh_addr_t          *smsg_connection_vec = 0;
154 gni_mem_handle_t    smsg_connection_memhndl;
155 static int          smsg_expand_slots = 10;
156 static int          smsg_available_slot = 0;
157 static void         *smsg_mailbox_mempool = 0;
158 mdh_addr_list_t     *smsg_dynamic_list = 0;
159
160 static void             *smsg_mailbox_base;
161 gni_msgq_attr_t         msgq_attrs;
162 gni_msgq_handle_t       msgq_handle;
163 gni_msgq_ep_attr_t      msgq_ep_attrs;
164 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
165
166
167
168 /* preallocated DMA buffer */
169 int                     DMA_slots;
170 uint64_t                DMA_avail_tag = 0;
171 uint32_t                DMA_incoming_avail_tag = 0;
172 uint32_t                DMA_outgoing_avail_tag = 0;
173 void                    *DMA_incoming_base_addr;
174 void                    *DMA_outgoing_base_addr;
175 mdh_addr_t              DMA_buffer_base_mdh_addr;
176 mdh_addr_t              *DMA_buffer_base_mdh_addr_vec;
177 int                     DMA_buffer_size;
178 int                     DMA_max_single_msg = 131072;//524288 ;
179
180 #define                 DMA_SIZE_PER_SLOT       8192
181
182
183 typedef struct dma_msgid_map
184 {
185     uint64_t     msg_id;
186     int     msg_subid
187 } dma_msgid_map_t;
188
189 dma_msgid_map_t         *dma_map_list;
190
191 typedef struct msg_trace
192 {
193     uint64_t    msg_id;
194     int         done_num;
195 }msg_trace_t;
196
197 msg_trace_t             *pending_msg_list;
198 /* =====Beginning of Declarations of Machine Specific Variables===== */
199 static int cookie;
200 static int modes = 0;
201 static gni_cq_handle_t       smsg_rx_cqh = NULL;
202 static gni_cq_handle_t       smsg_tx_cqh = NULL;
203 static gni_cq_handle_t       post_rx_cqh = NULL;
204 static gni_cq_handle_t       post_tx_cqh = NULL;
205 static gni_ep_handle_t       *ep_hndl_array;
206
207 typedef    struct  pending_smg
208 {
209     int     inst_id;
210     struct  pending_smg *next;
211 } PENDING_GETNEXT;
212
213
214 typedef struct msg_list
215 {
216     uint32_t destNode;
217     uint32_t size;
218     void *msg;
219     struct msg_list *next;
220     struct msg_list *pehead_next;
221     uint8_t tag;
222 }MSG_LIST;
223
224 typedef struct medium_msg_list
225 {
226     uint32_t destNode;
227     uint32_t msg_id;
228     uint32_t msg_subid;
229     uint32_t remain_size;
230     void *msg;
231     struct medium_msg_list *next;
232 }MEDIUM_MSG_LIST;
233
234
235 typedef struct control_msg
236 {
237     uint64_t            source_addr;
238     int                 source;               /* source rank */
239     int                 length;
240     gni_mem_handle_t    source_mem_hndl;
241     struct control_msg *next;
242 }CONTROL_MSG;
243
244 typedef struct medium_msg_control
245 {
246     uint64_t            dma_offset;     //the dma_buffer for this block of msg
247     int                 msg_id;         //Id for the total index
248     int                 msg_subid;      //offset inside the message id 
249 }MEDIUM_MSG_CONTROL;
250
251 typedef struct  rmda_msg
252 {
253     int                   destNode;
254     gni_post_descriptor_t *pd;
255     struct  rmda_msg      *next;
256 }RDMA_REQUEST;
257
258 typedef struct  msg_list_index
259 {
260     int         next;
261     MSG_LIST    *head;
262     MSG_LIST    *tail;
263 } MSG_LIST_INDEX;
264
265 /* reuse PendingMsg memory */
266 static CONTROL_MSG          *control_freelist=0;
267 static MSG_LIST             *msglist_freelist=0;
268 static int                  smsg_head_index;
269 static MSG_LIST_INDEX       *smsg_msglist_index= 0;
270 static MSG_LIST             *smsg_free_head=0;
271 static MSG_LIST             *smsg_free_tail=0;
272
273 /*
274 #define FreeMsgList(msg_head, msg_tail, free_head, free_tail)       \
275     if(free_head == 0)  free_head = free_tail = msg_head;    \
276     else   free_tail = free_tail->next;    \
277     if( msg_head->next == msg_tail) msg_head =0;   \
278     else msg_head= msg_head->next;    
279
280 #define MallocMsgList(d, msg_head, msg_tail, free_head, free_tail, msgsize) \
281     if(free_head == 0) {d= malloc(msgsize);  \
282         if(msg_head == 0)   msg_head =msg_tail = msg_head->next = msg_tail->next = d; \
283         else { msg_tail->next = d; d->next = msg_head; msg_tail=d;} \
284     }else {d = free_head; free_head = free_head->next; if(free_tail->next == free_head) free_head =0;} \
285 */
286
287 #define FreeMsgList(d)  \
288   (d)->next = msglist_freelist;\
289   msglist_freelist = d;
290
291
292
293 #define MallocMsgList(d) \
294   d = msglist_freelist;\
295   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
296              _MEMCHECK(d);\
297   } else msglist_freelist = d->next;
298
299
300 #define FreeControlMsg(d)       \
301   (d)->next = control_freelist;\
302   control_freelist = d;
303
304 #define MallocControlMsg(d) \
305   d = control_freelist;\
306   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
307              _MEMCHECK(d);\
308   } else control_freelist = d->next;
309
310
311 static RDMA_REQUEST         *rdma_freelist = NULL;
312
313 #define FreeControlMsg(d)       \
314   (d)->next = control_freelist;\
315   control_freelist = d;
316
317 #define MallocControlMsg(d) \
318   d = control_freelist;\
319   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
320              _MEMCHECK(d);\
321   } else control_freelist = d->next;
322
323 #define FreeMediumControlMsg(d)       \
324   (d)->next = medium_control_freelist;\
325   medium_control_freelist = d;
326
327
328 #define MallocMediumControlMsg(d) \
329     d = medium_control_freelist;\
330     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
331     _MEMCHECK(d);\
332 } else mediumcontrol_freelist = d->next;
333
334 #define FreeRdmaRequest(d)       \
335   (d)->next = rdma_freelist;\
336   rdma_freelist = d;
337
338 #define MallocRdmaRequest(d) \
339   d = rdma_freelist;\
340   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
341              _MEMCHECK(d);\
342   } else rdma_freelist = d->next;
343
344 /* reuse gni_post_descriptor_t */
345 static gni_post_descriptor_t *post_freelist=0;
346
347 #if 1
348 #define FreePostDesc(d)       \
349     (d)->next_descr = post_freelist;\
350     post_freelist = d;
351
352 #define MallocPostDesc(d) \
353   d = post_freelist;\
354   if (d==0) { \
355      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
356      _MEMCHECK(d);\
357   } else post_freelist = d->next_descr;
358 #else
359
360 #define FreePostDesc(d)     free(d);
361 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
362
363 #endif
364
365 static PENDING_GETNEXT     *pending_smsg_head = 0;
366 static PENDING_GETNEXT     *pending_smsg_tail = 0;
367
368 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
369 static int      buffered_smsg_counter = 0;
370
371 /* SmsgSend return success but message sent is not confirmed by remote side */
372
373 static RDMA_REQUEST  *pending_rdma_head = 0;
374 static RDMA_REQUEST  *pending_rdma_tail = 0;
375 static MSG_LIST *buffered_fma_head = 0;
376 static MSG_LIST *buffered_fma_tail = 0;
377
378 /* functions  */
379 #define IsFree(a,ind)  !( a& (1<<(ind) ))
380 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
381 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
382
383 #include "mempool.c"
384
385 /* get the upper bound of log 2 */
386 int mylog2(int size)
387 {
388     int op = size;
389     unsigned int ret=0;
390     unsigned int mask = 0;
391     int i;
392     while(op>0)
393     {
394         op = op >> 1;
395         ret++;
396
397     }
398     for(i=1; i<ret; i++)
399     {
400         mask = mask << 1;
401         mask +=1;
402     }
403
404     ret -= ((size &mask) ? 0:1);
405     return ret;
406 }
407
408 static void
409 allgather(void *in,void *out, int len)
410 {
411     static int *ivec_ptr=NULL,already_called=0,job_size=0;
412     int i,rc;
413     int my_rank;
414     char *tmp_buf,*out_ptr;
415
416     if(!already_called) {
417
418         rc = PMI_Get_size(&job_size);
419         CmiAssert(rc == PMI_SUCCESS);
420         rc = PMI_Get_rank(&my_rank);
421         CmiAssert(rc == PMI_SUCCESS);
422
423         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
424         CmiAssert(ivec_ptr != NULL);
425
426         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
427         CmiAssert(rc == PMI_SUCCESS);
428
429         already_called = 1;
430
431     }
432
433     tmp_buf = (char *)malloc(job_size * len);
434     CmiAssert(tmp_buf);
435
436     rc = PMI_Allgather(in,tmp_buf,len);
437     CmiAssert(rc == PMI_SUCCESS);
438
439     out_ptr = out;
440
441     for(i=0;i<job_size;i++) {
442
443         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
444
445     }
446
447     free(tmp_buf);
448 }
449 static void
450 allgather_2(void *in,void *out, int len)
451 {
452     //PMI_Allgather is out of order
453     int i,rc, extend_len;
454     int  rank_index;
455     char *out_ptr, *out_ref;
456     char *in2;
457
458     extend_len = sizeof(int) + len;
459     in2 = (char*)malloc(extend_len);
460
461     memcpy(in2, &myrank, sizeof(int));
462     memcpy(in2+sizeof(int), in, len);
463
464     out_ptr = (char*)malloc(mysize*extend_len);
465
466     rc = PMI_Allgather(in2, out_ptr, extend_len);
467     GNI_RC_CHECK("allgather", rc);
468
469     out_ref = out;
470
471     for(i=0;i<mysize;i++) {
472         //rank index 
473         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
474         //copy to the rank index slot
475         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
476     }
477
478     free(out_ptr);
479     free(in2);
480
481 }
482
483 static unsigned int get_gni_nic_address(int device_id)
484 {
485     unsigned int address, cpu_id;
486     gni_return_t status;
487     int i, alps_dev_id=-1,alps_address=-1;
488     char *token, *p_ptr;
489
490     p_ptr = getenv("PMI_GNI_DEV_ID");
491     if (!p_ptr) {
492         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
493        
494         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
495     } else {
496         while ((token = strtok(p_ptr,":")) != NULL) {
497             alps_dev_id = atoi(token);
498             if (alps_dev_id == device_id) {
499                 break;
500             }
501             p_ptr = NULL;
502         }
503         CmiAssert(alps_dev_id != -1);
504         p_ptr = getenv("PMI_GNI_LOC_ADDR");
505         CmiAssert(p_ptr != NULL);
506         i = 0;
507         while ((token = strtok(p_ptr,":")) != NULL) {
508             if (i == alps_dev_id) {
509                 alps_address = atoi(token);
510                 break;
511             }
512             p_ptr = NULL;
513             ++i;
514         }
515         CmiAssert(alps_address != -1);
516         address = alps_address;
517     }
518     return address;
519 }
520
521 static uint8_t get_ptag(void)
522 {
523     char *p_ptr, *token;
524     uint8_t ptag;
525
526     p_ptr = getenv("PMI_GNI_PTAG");
527     CmiAssert(p_ptr != NULL);
528     token = strtok(p_ptr, ":");
529     ptag = (uint8_t)atoi(token);
530     return ptag;
531         
532 }
533
534 static uint32_t get_cookie(void)
535 {
536     uint32_t cookie;
537     char *p_ptr, *token;
538
539     p_ptr = getenv("PMI_GNI_COOKIE");
540     CmiAssert(p_ptr != NULL);
541     token = strtok(p_ptr, ":");
542     cookie = (uint32_t)atoi(token);
543
544     return cookie;
545 }
546
547 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
548 /* TODO: add any that are related */
549 /* =====End of Definitions of Message-Corruption Related Macros=====*/
550
551
552 #include "machine-lrts.h"
553 #include "machine-common-core.c"
554
555 /* Network progress function is used to poll the network when for
556    messages. This flushes receive buffers on some  implementations*/
557 #if CMK_MACHINE_PROGRESS_DEFINED
558 void CmiMachineProgressImpl() {
559 }
560 #endif
561
562 inline
563 static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
564 {
565     MSG_LIST        *msg_tmp;
566     MallocMsgList(msg_tmp);
567     msg_tmp->destNode = destNode;
568     msg_tmp->size   = size;
569     msg_tmp->msg    = msg;
570     msg_tmp->tag    = tag;
571     msg_tmp->next   = 0;
572     if (smsg_msglist_index[destNode].head == 0 ) {
573         smsg_msglist_index[destNode].head = msg_tmp;
574         smsg_msglist_index[destNode].next = smsg_head_index;
575         smsg_head_index = destNode;
576     }
577     else {
578       (smsg_msglist_index[destNode].tail)->next    = msg_tmp;
579     }
580     smsg_msglist_index[destNode].tail          = msg_tmp;
581 #if PRINT_SYH
582     buffered_smsg_counter++;
583 #endif
584 }
585
586 inline
587 static void setup_smsg_connection(int destNode)
588 {
589     mdh_addr_list_t  *new_entry = 0;
590     gni_post_descriptor_t *pd;
591     gni_smsg_attr_t      *smsg_attr;
592     gni_return_t status = GNI_RC_NOT_DONE;
593     if(smsg_available_slot == smsg_expand_slots)
594     {
595         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
596         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
597         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
598
599         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
600             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
601             GNI_MEM_READWRITE,   
602             -1,
603             &(new_entry->mdh));
604         smsg_available_slot = 0; 
605         new_entry->next = smsg_dynamic_list;
606         smsg_dynamic_list = new_entry;
607     }
608     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
609     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
610     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
611     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
612     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
613     smsg_attr->buff_size = smsg_memlen;
614     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
615     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
616     smsg_local_attr_vec[destNode] = smsg_attr;
617     smsg_available_slot++;
618     MallocPostDesc(pd);
619     pd->type            = GNI_POST_FMA_PUT;
620     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
621 //pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
622     pd->length          = sizeof(gni_smsg_attr_t);
623     pd->local_addr      = (uint64_t) smsg_attr;
624     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
625     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
626     pd->src_cq_hndl     = 0;
627     pd->rdma_mode       = 0;
628     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
629     GNI_RC_CHECK("SMSG Dynamic link", status);
630     CmiPrintf("[%d=%d] send post FMA successful (%p) %s\n", myrank, destNode, (void*)pd->remote_addr, gni_err_str[status]);
631 }
632
633 inline 
634 static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
635 {
636     gni_return_t status = GNI_RC_NOT_DONE;
637     gni_smsg_attr_t      *smsg_attr;
638     gni_post_descriptor_t *pd;
639   
640     if(useDynamicSMSG == 1)
641     {
642         if(smsg_connected_flag[destNode] == 0)
643         {
644             CmiPrintf("[%d]Init smsg connection\n", CmiMyPe());
645             setup_smsg_connection(destNode);
646             delay_send_small_msg(msg, size, destNode, tag);
647             smsg_connected_flag[destNode] =1;
648             return status;
649         }
650         else  if(smsg_connected_flag[destNode] < 3)
651         {
652             if(inbuff == 0)
653                 delay_send_small_msg(msg, size, destNode, tag);
654             return status;
655         }
656     }
657     CmiPrintf("[%d] reach send\n", myrank);
658     if(smsg_msglist_index[destNode].head == 0 || inbuff==1)
659     {
660         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], header, size_header, msg, size, 0, tag);
661         if(status == GNI_RC_SUCCESS)
662         {
663 #if PRINT_SYH
664             lrts_smsg_success++;
665             if(lrts_smsg_success == lrts_send_msg_id)
666                 CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
667             else
668                 CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
669 #endif
670             return status;
671         }
672     }else {
673         if(inbuff ==0)
674             delay_send_small_msg(msg, size, destNode, tag);
675     }
676     return status;
677 }
678
679 // Get first 0 in DMA_tags starting from index
680 static int get_first_avail_bit(uint64_t DMA_tags, int start_index)
681 {
682
683     uint64_t         mask = 0x1;
684     register    int     i=0;
685     while((DMA_tags & mask) && i<DMA_slots) {mask << 1; i++;}
686
687 }
688
689 static int send_medium_messages(int destNode, int size, char *msg)
690 {
691 #if 0
692     gni_return_t status = GNI_RC_SUCCESS;
693     int first_avail_bit=0;
694     uint64_t mask = 0x1;
695     MEDIUM_MSG_CONTROL  *medium_msg_control_tmp;
696     MEDIUM_MSG_LIST        *msg_tmp;
697     int blocksize, remain_size, pos;
698     int sub_id = 0;
699     remain_size = size;
700     pos = 0;  //offset before which data are sent
701     /* copy blocks of the message to DMA preallocated buffer and send SMSG */
702     //Check whether there is any available DMA buffer
703     
704     do{
705         while((DMA_avail_tag & mask) && first_avail_bit<DMA_slots) {mask << 1; first_avail_bit++;}
706         if(first_avail_bit == DMA_slots) //No available DMA, buffer this message
707         {
708             MallocMediumMsgList(msg_tmp);
709             msg_tmp->destNode = destNode;
710             msg_tmp->msg_id   = lrts_send_msg_id;
711             msg_tmp->msg_subid   = sub_id;
712             msg_tmp->size   = remain_size;
713             msg_tmp->msg    = msg+pos;
714             msg_tmp->next   = NULL;
715             break;
716         }else
717         {
718             //copy this part of the message into this DMA buffer
719             //TODO optimize here, some data can go with this SMSG
720             blocksize = (remain_size>DMA_SIZE_PER_SLOT)?DMA_SIZE_PER_SLOT: remain_size;
721             memcpy(DMA_buffer_base_mdh_addr.addr[first_avail_bit], msg+pos, blocksize);
722             pos += blocksize;
723             remain_size -= blocksize;
724             SET_BITS(DMA_avail_tag, first_avail_bit);
725            
726             MallocMediumControlMsg(medium_msg_control_tmp);
727             medium_msg_control_tmp->msg_id = lrts_send_msg_id;
728             medium_msg_control_tmp->msg_subid = sub_id;
729             if(status == GNI_RC_SUCCESS)
730             {
731                 if(sub_id==0)
732                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_HEAD_TAG);
733                 else
734                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_DATA_TAG);
735             }
736             //buffer this smsg
737             if(status != GNI_RC_SUCCESS)
738             {
739                 delay_send_small_msg(medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), destNode, MEDIUM_HEAD_TAG);
740             }
741             sub_id++;
742         }while(remain_size > 0 );
743
744         }
745     }
746 #endif
747 }
748
749 // Large message, send control to receiver, receiver register memory and do a GET 
750 inline
751 static void send_large_messages(int destNode, int size, char *msg)
752 {
753 #if     USE_LRTS_MEMPOOL
754     gni_return_t        status  =   GNI_RC_SUCCESS;
755     CONTROL_MSG         *control_msg_tmp;
756     uint32_t            vmdh_index  = -1;
757     /* construct a control message and send */
758     MallocControlMsg(control_msg_tmp);
759     control_msg_tmp->source_addr    = (uint64_t)msg;
760     control_msg_tmp->source         = myrank;
761     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
762     //memcpy( &(control_msg_tmp->source_mem_hndl), GetMemHndl(msg), sizeof(gni_mem_handle_t)) ;
763     control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
764 #if PRINT_SYH
765     lrts_send_msg_id++;
766     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
767 #endif
768     status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
769     if(status == GNI_RC_SUCCESS)
770     {
771         FreeControlMsg(control_msg_tmp);
772     }
773 // NOT use mempool, should slow 
774 #else
775     gni_return_t        status  =   GNI_RC_SUCCESS;
776     CONTROL_MSG         *control_msg_tmp;
777     uint32_t            vmdh_index  = -1;
778     /* construct a control message and send */
779     MallocControlMsg(control_msg_tmp);
780     control_msg_tmp->source_addr    = (uint64_t)msg;
781     control_msg_tmp->source         = myrank;
782     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
783     control_msg_tmp->source_mem_hndl.qword1 = 0;
784     control_msg_tmp->source_mem_hndl.qword2 = 0;
785 #if PRINT_SYH
786     lrts_send_msg_id++;
787     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
788 #endif
789     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
790     if(status == GNI_RC_SUCCESS)
791     {
792         status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
793         if(status == GNI_RC_SUCCESS)
794         {
795             FreeControlMsg(control_msg_tmp);
796         }
797     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
798     {
799         CmiAbort("Memory registor for large msg\n");
800     }else 
801     {
802         delay_send_small_msg(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
803     }
804 #endif
805 }
806
807 inline void LrtsPrepareEnvelope(char *msg, int size)
808 {
809     CmiSetMsgSize(msg, size);
810 }
811
812 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
813 {
814     gni_return_t        status  =   GNI_RC_SUCCESS;
815     LrtsPrepareEnvelope(msg, size);
816
817     if(size <= SMSG_MAX_MSG)
818     {
819 #if PRINT_SYH
820         lrts_send_msg_id++;
821         CmiPrintf("SMSG LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_msg_id);
822 #endif
823         status = send_smsg_message( destNode, 0, 0, msg, size, SMALL_DATA_TAG, 0);  
824         if(status == GNI_RC_SUCCESS)
825         {
826             CmiFree(msg);
827         }
828     }
829     else
830     {
831         send_large_messages(destNode, size, msg);
832     }
833     return 0;
834 }
835
836 static void LrtsPreCommonInit(int everReturn){}
837
838 /* Idle-state related functions: called in non-smp mode */
839 void CmiNotifyIdleForGemini(void) {
840     LrtsAdvanceCommunication();
841 }
842
843 static void LrtsPostCommonInit(int everReturn)
844 {
845 #if CMK_SMP
846     CmiIdleState *s=CmiNotifyGetState();
847     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
848     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
849 #else
850     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForGemini,NULL);
851 #endif
852
853 }
854
855
856 void LrtsPostNonLocal(){}
857 /* pooling CQ to receive network message */
858 static void PumpNetworkRdmaMsgs()
859 {
860     gni_cq_entry_t      event_data;
861     gni_return_t        status;
862     while( (status = GNI_CqGetEvent(post_rx_cqh, &event_data)) == GNI_RC_SUCCESS);
863 }
864
865 static int SendRdmaMsg();
866 static void getLargeMsgRequest(void* header, uint64_t inst_id);
867 static void PumpNetworkSmsg()
868 {
869     uint64_t            inst_id;
870     PENDING_GETNEXT     *pending_next;
871     int                 ret;
872     gni_cq_entry_t      event_data;
873     gni_return_t        status;
874     void                *header;
875     uint8_t             msg_tag;
876     int                 msg_nbytes;
877     void                *msg_data;
878     gni_mem_handle_t    msg_mem_hndl;
879     gni_smsg_attr_t     *smsg_attr;
880     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
881     {
882         inst_id = GNI_CQ_GET_INST_ID(event_data);
883         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
884 #if PRINT_SYH
885         CmiPrintf("[%d] PumpNetworkMsgs small msgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
886 #endif
887
888         if(useDynamicSMSG == 1)
889         {
890             if(smsg_connected_flag[inst_id] == 0 )
891             {
892                 CmiPrintf("[%d]pump Init smsg connection\n", CmiMyPe());
893                 smsg_connected_flag[inst_id] =2;
894                 setup_smsg_connection(inst_id);
895                 status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id],  &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]));
896                 GNI_RC_CHECK("SmsgInit", status);
897                 continue;
898             } else if (smsg_connected_flag[inst_id] <3) 
899             {
900                 CmiPrintf("[%d]pump setup smsg connection\n", CmiMyPe());
901                 smsg_connected_flag[inst_id] += 1;
902                 status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id], &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]));
903                 GNI_RC_CHECK("SmsgInit", status);
904                 continue;
905             }
906         }
907         msg_tag = GNI_SMSG_ANY_TAG;
908         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
909         {
910 #if PRINT_SYH
911             lrts_received_msg++;
912             CmiPrintf("+++[%d] PumpNetwork msg is received, messageid:%d tag=%d\n", myrank, lrts_received_msg, msg_tag);
913 #endif
914             /* copy msg out and then put into queue (small message) */
915             switch (msg_tag) {
916             case SMALL_DATA_TAG:
917             {
918                 msg_nbytes = CmiGetMsgSize(header);
919                 msg_data    = CmiAlloc(msg_nbytes);
920                 memcpy(msg_data, (char*)header, msg_nbytes);
921                 handleOneRecvedMsg(msg_nbytes, msg_data);
922                 break;
923             }
924             case LMSG_INIT_TAG:
925             {
926 #if PRINT_SYH
927                 CmiPrintf("+++[%d] from %d PumpNetwork Rdma Request msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
928 #endif
929                 getLargeMsgRequest(header, inst_id);
930                 break;
931             }
932             case ACK_TAG:
933             {
934                 /* Get is done, release message . Now put is not used yet*/
935 #if         !USE_LRTS_MEMPOOL
936                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
937 #endif
938                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
939                 SendRdmaMsg();
940                 break;
941             }
942             case PUT_DONE_TAG: //persistent message
943             {
944                 handleOneRecvedMsg(((CONTROL_MSG *) header)->length,((void*) (CONTROL_MSG *) header)); 
945                 break;
946             }
947             default: {
948                 CmiPrintf("weird tag problem\n");
949                 CmiAbort("Unknown tag\n");
950             }
951             }
952             GNI_SmsgRelease(ep_hndl_array[inst_id]);
953             msg_tag = GNI_SMSG_ANY_TAG;
954         } //endwhile getNext
955     }   //end while GetEvent
956     if(status == GNI_RC_ERROR_RESOURCE)
957     {
958         GNI_RC_CHECK("Smsg_rx_cq full", status);
959     }
960 }
961
962 static void getLargeMsgRequest(void* header, uint64_t inst_id)
963 {
964 #if     USE_LRTS_MEMPOOL
965     CONTROL_MSG         *request_msg;
966     gni_return_t        status;
967     void                *msg_data;
968     gni_post_descriptor_t *pd;
969     RDMA_REQUEST        *rdma_request_msg;
970     gni_mem_handle_t    msg_mem_hndl;
971     int source;
972     // initial a get to transfer data from the sender side */
973     request_msg = (CONTROL_MSG *) header;
974     source = request_msg->source;
975     msg_data = CmiAlloc(request_msg->length);
976     _MEMCHECK(msg_data);
977     //memcpy(&msg_mem_hndl, GetMemHndl(msg_data), sizeof(gni_mem_handle_t));
978     msg_mem_hndl = GetMemHndl(msg_data);
979
980     MallocPostDesc(pd);
981     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
982         pd->type            = GNI_POST_FMA_GET;
983     else
984         pd->type            = GNI_POST_RDMA_GET;
985 #if REMOTE_EVENT
986     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
987 #else
988     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
989 #endif
990     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
991     pd->length          = ALIGN4(request_msg->length);
992     pd->local_addr      = (uint64_t) msg_data;
993     pd->local_mem_hndl  = msg_mem_hndl;
994     pd->remote_addr     = request_msg->source_addr;
995     pd->remote_mem_hndl = request_msg->source_mem_hndl;
996     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
997     pd->rdma_mode       = 0;
998
999     if(pd->type == GNI_POST_RDMA_GET) 
1000         status = GNI_PostRdma(ep_hndl_array[source], pd);
1001     else
1002         status = GNI_PostFma(ep_hndl_array[source],  pd);
1003     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1004     {
1005         MallocRdmaRequest(rdma_request_msg);
1006         rdma_request_msg->next = 0;
1007         rdma_request_msg->destNode = inst_id;
1008         rdma_request_msg->pd = pd;
1009         if(pending_rdma_head == 0)
1010         {
1011             pending_rdma_head = rdma_request_msg;
1012         }else
1013         {
1014             pending_rdma_tail->next = rdma_request_msg;
1015         }
1016         pending_rdma_tail = rdma_request_msg;
1017     }else
1018         GNI_RC_CHECK("AFter posting", status);
1019
1020 #else
1021     CONTROL_MSG         *request_msg;
1022     gni_return_t        status;
1023     void                *msg_data;
1024     gni_post_descriptor_t *pd;
1025     RDMA_REQUEST        *rdma_request_msg;
1026     gni_mem_handle_t    msg_mem_hndl;
1027     int source;
1028     // initial a get to transfer data from the sender side */
1029     request_msg = (CONTROL_MSG *) header;
1030     source = request_msg->source;
1031     msg_data = CmiAlloc(request_msg->length);
1032     _MEMCHECK(msg_data);
1033
1034     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
1035
1036     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1037     {
1038         GNI_RC_CHECK("Mem Register before post", status);
1039     }
1040
1041     MallocPostDesc(pd);
1042     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
1043         pd->type            = GNI_POST_FMA_GET;
1044     else
1045         pd->type            = GNI_POST_RDMA_GET;
1046 #if REMOTE_EVENT
1047     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1048 #else
1049     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1050 #endif
1051     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1052     pd->length          = ALIGN4(request_msg->length);
1053     pd->local_addr      = (uint64_t) msg_data;
1054     pd->remote_addr     = request_msg->source_addr;
1055     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1056     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1057     pd->rdma_mode       = 0;
1058
1059     //memory registration successful
1060     if(status == GNI_RC_SUCCESS)
1061     {
1062         pd->local_mem_hndl  = msg_mem_hndl;
1063         if(pd->type == GNI_POST_RDMA_GET) 
1064             status = GNI_PostRdma(ep_hndl_array[source], pd);
1065         else
1066             status = GNI_PostFma(ep_hndl_array[source],  pd);
1067     }else
1068     {
1069         pd->local_mem_hndl.qword1  = 0; 
1070         pd->local_mem_hndl.qword1  = 0; 
1071     }
1072     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1073     {
1074         MallocRdmaRequest(rdma_request_msg);
1075         rdma_request_msg->next = 0;
1076         rdma_request_msg->destNode = inst_id;
1077         rdma_request_msg->pd = pd;
1078         if(pending_rdma_head == 0)
1079         {
1080             pending_rdma_head = rdma_request_msg;
1081         }else
1082         {
1083             pending_rdma_tail->next = rdma_request_msg;
1084         }
1085         pending_rdma_tail = rdma_request_msg;
1086     }else
1087         GNI_RC_CHECK("AFter posting", status);
1088 #endif
1089 }
1090
1091 /* Check whether message send or get is confirmed by remote */
1092 static void PumpLocalSmsgTransactions()
1093 {
1094     gni_return_t            status;
1095     gni_cq_entry_t          ev;
1096     while ((status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1097     {
1098 #if PRINT_SYH
1099         lrts_local_done_msg++;
1100         //CmiPrintf("*[%d]  PumpLocalSmsgTransactions GNI_CQ_GET_TYPE %d. Localdone=%d\n", myrank, GNI_CQ_GET_TYPE(ev), lrts_local_done_msg);
1101 #endif
1102         if(GNI_CQ_OVERRUN(ev))
1103         {
1104             CmiPrintf("Overrun detected in local CQ");
1105             CmiAbort("Overrun in TX");
1106         }
1107     }
1108     if(status == GNI_RC_ERROR_RESOURCE)
1109     {
1110         GNI_RC_CHECK("Smsg_tx_cq full", status);
1111     }
1112 }
1113
1114 static void PumpLocalRdmaTransactions()
1115 {
1116     gni_cq_entry_t          ev;
1117     gni_return_t            status;
1118     uint64_t                type, inst_id;
1119     gni_post_descriptor_t   *tmp_pd;
1120     MSG_LIST                *ptr;
1121     CONTROL_MSG             *ack_msg_tmp;
1122     uint8_t             msg_tag;
1123
1124    // while ( (status = GNI_CqGetEvent(post_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1125     while ( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1126     {
1127         type        = GNI_CQ_GET_TYPE(ev);
1128 #if PRINT_SYH
1129         //lrts_local_done_msg++;
1130         CmiPrintf("**[%d] SMSGPumpLocalTransactions (type=%d)\n", myrank, type);
1131 #endif
1132         if (type == GNI_CQ_EVENT_TYPE_POST)
1133         {
1134             inst_id     = GNI_CQ_GET_INST_ID(ev);
1135 #if PRINT_SYH
1136             CmiPrintf("**[%d] SMSGPumpLocalTransactions localdone=%d, %d\n", myrank,  lrts_local_done_msg, smsg_connected_flag[inst_id]);
1137 #endif
1138             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
1139             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1140             ////Message is sent, free message , put is not used now
1141             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
1142             {
1143                 if(smsg_connected_flag[inst_id] <3)
1144                 {
1145                     smsg_connected_flag[inst_id] += 1;
1146                     continue;
1147                 }
1148                 else  
1149                 {
1150                     //persistent message 
1151                     CmiFree((void *)tmp_pd->local_addr);
1152                     msg_tag = PUT_DONE_TAG; 
1153                 }
1154             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1155             {
1156                 msg_tag = ACK_TAG;  
1157             }
1158             MallocControlMsg(ack_msg_tmp);
1159             ack_msg_tmp->source             = myrank;
1160             ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
1161             ack_msg_tmp->length             = tmp_pd->length; 
1162             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1163 #if PRINT_SYH
1164             lrts_send_msg_id++;
1165             CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
1166 #endif
1167             status = send_smsg_message(inst_id, 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0);  
1168             if(status == GNI_RC_SUCCESS)
1169             {
1170                 FreeControlMsg(ack_msg_tmp);
1171             }
1172 #if     !USE_LRTS_MEMPOOL
1173             MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1174 #endif
1175             CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1176             handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1177             SendRdmaMsg(); 
1178             FreePostDesc(tmp_pd);
1179         }
1180     } //end while
1181 }
1182
1183 static int SendRdmaMsg()
1184 {
1185     gni_return_t            status = GNI_RC_SUCCESS;
1186     gni_mem_handle_t        msg_mem_hndl;
1187
1188     RDMA_REQUEST *ptr = pending_rdma_head;
1189     RDMA_REQUEST *prev = NULL;
1190
1191     while (ptr != NULL)
1192     {
1193         gni_post_descriptor_t *pd = ptr->pd;
1194         status = GNI_RC_SUCCESS;
1195         // register memory first
1196         if( pd->local_mem_hndl.qword1 == 0 && pd->local_mem_hndl.qword2 == 0)
1197         {
1198             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1199         }
1200         if(status == GNI_RC_SUCCESS)
1201         {
1202             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1203                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1204             else
1205                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1206             if(status == GNI_RC_SUCCESS)
1207             {
1208                 RDMA_REQUEST *tmp = ptr;
1209                 if (prev)
1210                   prev->next = ptr->next;
1211                 else
1212                   pending_rdma_head = ptr->next;
1213                 ptr = ptr->next;
1214                 FreeRdmaRequest(tmp);
1215                 continue;
1216             }
1217         }
1218         prev = ptr;
1219         ptr = ptr->next;
1220     } //end while
1221     return 0;
1222 }
1223
1224 // return 1 if all messages are sent
1225 static int SendBufferMsg()
1226 {
1227     MSG_LIST            *ptr, *previous_head, *current_head;
1228     CONTROL_MSG         *control_msg_tmp;
1229     gni_return_t        status;
1230     int done = 1;
1231     register    int     i;
1232     int                 index_previous = -1;
1233     int                 index = smsg_head_index;
1234     //if( smsg_msglist_head == 0 && buffered_smsg_counter!= 0 ) {CmiPrintf("WRONGWRONG on rank%d, buffermsg=%d, (msgid-succ:%d)\n", myrank, buffered_smsg_counter, (lrts_send_msg_id-lrts_smsg_success)); CmiAbort("sendbuf");}
1235     /* can add flow control here to control the number of messages sent before handle message */
1236     while(index != -1)
1237     {
1238         ptr = smsg_msglist_index[index].head;
1239        
1240         while(ptr!=0)
1241         {
1242             CmiAssert(ptr!=NULL);
1243             if(ptr->tag == SMALL_DATA_TAG)
1244             {
1245                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, ptr->size, SMALL_DATA_TAG, 1);  
1246                 if(status == GNI_RC_SUCCESS)
1247                 {
1248                     CmiFree(ptr->msg);
1249                 }
1250             }
1251             else if(ptr->tag == LMSG_INIT_TAG)
1252             {
1253                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1254 #if PRINT_SYH
1255                 CmiPrintf("[%d==>%d] LMSG buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1256 #endif
1257                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
1258                 {
1259                     MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, control_msg_tmp->length, &(control_msg_tmp->source_mem_hndl), &omdh);
1260                     if(status != GNI_RC_SUCCESS) {
1261                         done = 0;
1262                         break;
1263                     }
1264                 }
1265                 
1266                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 1);  
1267                 if(status == GNI_RC_SUCCESS)
1268                 {
1269                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
1270                 }
1271             }else if (ptr->tag == ACK_TAG)
1272             {
1273                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), ACK_TAG, 1);  
1274                 if(status == GNI_RC_SUCCESS)
1275                 {
1276                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1277                 }
1278             }else
1279             {
1280                 CmiPrintf("Weird tag\n");
1281                 CmiAbort("should not happen\n");
1282             }
1283             if(status == GNI_RC_SUCCESS)
1284             {
1285                 smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
1286                 FreeMsgList(ptr);
1287                 ptr= smsg_msglist_index[index].head;
1288 #if PRINT_SYH
1289                 buffered_smsg_counter--;
1290                 if(lrts_smsg_success == lrts_send_msg_id)
1291                     CmiPrintf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1292                 else
1293                     CmiPrintf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1294 #endif
1295             }else {
1296                 done = 0;
1297                 break;
1298             } 
1299         
1300         } //end while
1301         if(ptr == 0)
1302         {
1303             if(index_previous != -1)
1304                 smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next;
1305             else
1306                 smsg_head_index = smsg_msglist_index[index].next;
1307         }else
1308         {
1309             index_previous = index;
1310         }
1311         index = smsg_msglist_index[index].next;
1312     }   // end pooling for all cores
1313     return done;
1314 }
1315
1316 static void LrtsAdvanceCommunication()
1317 {
1318     /*  Receive Msg first */
1319     if(myrank == 0)
1320     CmiPrintf("Calling Lrts Pump Msg PE:%d\n", myrank);
1321     PumpNetworkSmsg();
1322     //CmiPrintf("Calling Lrts Pump RdmaMsg PE:%d\n", CmiMyPe());
1323     //PumpNetworkRdmaMsgs();
1324     /* Release Sent Msg */
1325     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
1326     ////PumpLocalSmsgTransactions();
1327     if(myrank == 0)
1328     CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", myrank);
1329     PumpLocalRdmaTransactions();
1330     if(myrank == 0)
1331     CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", myrank);
1332     /* Send buffered Message */
1333     SendBufferMsg();
1334     if(myrank == 0)
1335     CmiPrintf("Calling Lrts rdma PE:%d\n", myrank);
1336     SendRdmaMsg();
1337     if(myrank == 0)
1338     CmiPrintf("done PE:%d\n", myrank);
1339 }
1340
1341 static void _init_dynamic_smsg()
1342 {
1343     gni_smsg_attr_t smsg_attr;
1344     gni_return_t status;
1345     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
1346     memset(smsg_connected_flag, 0, mysize*sizeof(int));
1347
1348     CmiPrintf("start Dynamic init done %d\n", myrank);
1349     smsg_local_attr_vec = (gni_smsg_attr_t**) malloc(sizeof(gni_smsg_attr_t*) *mysize);
1350     
1351     setup_mem.addr = (uint64_t)malloc(mysize * sizeof(gni_smsg_attr_t));
1352     status = GNI_MemRegister(nic_hndl, setup_mem.addr,  mysize * SMSG_CONN_SIZE, smsg_rx_cqh,  GNI_MEM_READWRITE, -1,  &(setup_mem.mdh));
1353    
1354     GNI_RC_CHECK("Smsg dynamic allocation \n", status);
1355     CmiPrintf("[%d]Smsg dynamic allocation (%p)\n", myrank, (void*)setup_mem.addr);
1356     smsg_connection_vec = (mdh_addr_t*) malloc(mysize*sizeof(mdh_addr_t)); 
1357     allgather(&setup_mem, smsg_connection_vec, sizeof(mdh_addr_t));
1358     
1359     //pre-allocate some memory as mailbox for dynamic connection
1360     if(mysize <=4096)
1361     {
1362         SMSG_MAX_MSG = 1024;
1363     }else if (mysize > 4096 && mysize <= 16384)
1364     {
1365         SMSG_MAX_MSG = 512;
1366     }else {
1367         SMSG_MAX_MSG = 256;
1368     }
1369     
1370     smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1371     smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1372     smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1373     status = GNI_SmsgBufferSizeNeeded(&smsg_attr, &smsg_memlen);
1374     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1375     
1376     smsg_dynamic_list = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1377
1378     smsg_dynamic_list->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1379     bzero(smsg_dynamic_list->addr, smsg_memlen*smsg_expand_slots);
1380     
1381     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_dynamic_list->addr,
1382             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1383             GNI_MEM_READWRITE,   
1384             -1,
1385             &(smsg_dynamic_list->mdh));
1386    smsg_available_slot = 0;  
1387    CmiPrintf(" Dynamic init done %d\n", myrank);
1388 }
1389
1390 static void _init_static_smsg()
1391 {
1392     gni_smsg_attr_t      *smsg_attr;
1393     gni_smsg_attr_t      remote_smsg_attr;
1394     gni_smsg_attr_t      *smsg_attr_vec;
1395     gni_mem_handle_t     my_smsg_mdh_mailbox;
1396     int      i;
1397     gni_return_t status;
1398     uint32_t              vmdh_index = -1;
1399     mdh_addr_t            base_infor;
1400     mdh_addr_t            *base_addr_vec;
1401     if(mysize <=4096)
1402     {
1403         SMSG_MAX_MSG = 1024;
1404         //log2_SMSG_MAX_MSG = 10;
1405     }else if (mysize > 4096 && mysize <= 16384)
1406     {
1407         SMSG_MAX_MSG = 512;
1408         //log2_SMSG_MAX_MSG = 9;
1409
1410     }else {
1411         SMSG_MAX_MSG = 256;
1412         //log2_SMSG_MAX_MSG = 8;
1413     }
1414     
1415     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
1416     
1417     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1418     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
1419     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
1420     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
1421     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1422     smsg_mailbox_base = memalign(64, smsg_memlen*(mysize));
1423     _MEMCHECK(smsg_mailbox_base);
1424     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
1425     if (myrank == 0) CmiPrintf("Charm++> allocates %.2fMB for SMSG. \n", smsg_memlen*mysize/1e6);
1426     
1427     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
1428             smsg_memlen*(mysize), smsg_rx_cqh,
1429             GNI_MEM_READWRITE,   
1430             vmdh_index,
1431             &my_smsg_mdh_mailbox);
1432
1433     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1434
1435     base_infor.addr =  (uint64_t)smsg_mailbox_base;
1436     base_infor.mdh =  my_smsg_mdh_mailbox;
1437     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
1438
1439     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
1440  
1441     for(i=0; i<mysize; i++)
1442     {
1443         if(i==myrank)
1444             continue;
1445         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1446         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1447         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1448         smsg_attr[i].mbox_offset = i*smsg_memlen;
1449         smsg_attr[i].buff_size = smsg_memlen;
1450         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
1451         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1452     }
1453
1454     for(i=0; i<mysize; i++)
1455     {
1456         if (myrank == i) continue;
1457
1458         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1459         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1460         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1461         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
1462         remote_smsg_attr.buff_size = smsg_memlen;
1463         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
1464         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
1465
1466         /* initialize the smsg channel */
1467         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
1468         GNI_RC_CHECK("SMSG Init", status);
1469     } //end initialization
1470
1471     free(base_addr_vec);
1472
1473     free(smsg_attr);
1474     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
1475      GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
1476
1477
1478 inline
1479 static void _init_smsg()
1480 {
1481     int i;
1482
1483      smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
1484      for(i =0; i<mysize; i++)
1485      {
1486         smsg_msglist_index[i].next = -1;
1487         smsg_msglist_index[i].head = 0;
1488         smsg_msglist_index[i].tail = 0;
1489      }
1490      smsg_head_index = -1;
1491
1492 }
1493
1494 static void _init_static_msgq()
1495 {
1496     gni_return_t status;
1497     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
1498     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
1499     msgq_attrs.smsg_q_sz = 1;
1500     msgq_attrs.rcv_pool_sz = 1;
1501     msgq_attrs.num_msgq_eps = 2;
1502     msgq_attrs.nloc_insts = 8;
1503     msgq_attrs.modes = 0;
1504     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
1505
1506     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
1507     GNI_RC_CHECK("MSGQ Init", status);
1508
1509
1510 }
1511
1512 static void _init_DMA_buffer()
1513 {
1514     gni_return_t            status = GNI_RC_SUCCESS;
1515     /*AUTO tuning */
1516     /* suppose max_smsg is 1024, DMA buffer is split into 2048, 4096, 8192, ... */
1517     /*  This method might be better for SMP, but it is bad for Nonsmp since msgs are sharing same slots */
1518     /*
1519      * DMA_slots = 19-log2_SMSG_MAX_MSG;
1520     DMA_incoming_avail_tag = malloc(DMA_slots);
1521     DMA_buffer_size = 2*(DMA_max_single_msg - SMSG_MAX_MSG); 
1522     DMA_incoming_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1523     DMA_outgoing_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1524     
1525     status = GNI_MemRegister(nic_hndl, (uint64_t)DMA_incoming_base_addr,
1526             DMA_buffer_size, smsg_rx_cqh,
1527             GNI_MEM_READWRITE ,   
1528             vmdh_index,
1529             &);
1530             */
1531     // one is reserved to avoid deadlock
1532     DMA_slots           = 17; // each one is 8K  16*8K + 1 slot reserved to avoid deadlock
1533     DMA_buffer_size     = DMA_max_single_msg + 8192;
1534     DMA_buffer_base_mdh_addr.addr = (uint64_t)memalign(ALIGNBUF, DMA_buffer_size);
1535     status = GNI_MemRegister(nic_hndl, DMA_buffer_base_mdh_addr.addr,
1536         DMA_buffer_size, smsg_rx_cqh,
1537         GNI_MEM_READWRITE ,   
1538         -1,
1539         &(DMA_buffer_base_mdh_addr.mdh));
1540     GNI_RC_CHECK("GNI_MemRegister", status);
1541     DMA_buffer_base_mdh_addr_vec = (mdh_addr_t*) malloc(sizeof(mdh_addr_t) * mysize);
1542
1543     allgather(&DMA_buffer_base_mdh_addr, DMA_buffer_base_mdh_addr_vec, sizeof(mdh_addr_t) );
1544 }
1545
1546 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
1547 {
1548     register int            i;
1549     int                     rc;
1550     int                     device_id = 0;
1551     unsigned int            remote_addr;
1552     gni_cdm_handle_t        cdm_hndl;
1553     gni_return_t            status = GNI_RC_SUCCESS;
1554     uint32_t                vmdh_index = -1;
1555     uint8_t                 ptag;
1556     unsigned int            local_addr, *MPID_UGNI_AllAddr;
1557     int                     first_spawned;
1558     int                     physicalID;
1559     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
1560     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
1561     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
1562    
1563     //Mempool_MaxSize = CmiGetArgFlag(*argv, "+useMemorypoolSize");
1564     //useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
1565     if (myrank==0) 
1566     {
1567         if(useDynamicSMSG) 
1568             CmiPrintf("Charm++> use Dynamic SMSG\n"); 
1569         else 
1570             CmiPrintf("Charm++> use Static SMSG\n");
1571     };
1572     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
1573     
1574     status = PMI_Init(&first_spawned);
1575     GNI_RC_CHECK("PMI_Init", status);
1576
1577     status = PMI_Get_size(&mysize);
1578     GNI_RC_CHECK("PMI_Getsize", status);
1579
1580     status = PMI_Get_rank(&myrank);
1581     GNI_RC_CHECK("PMI_getrank", status);
1582
1583     //physicalID = CmiPhysicalNodeID(myrank);
1584     
1585     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
1586
1587     *myNodeID = myrank;
1588     *numNodes = mysize;
1589   
1590     if(myrank == 0)
1591     {
1592         printf("Charm++> Running on Gemini (GNI) using %d  cores\n", mysize);
1593     }
1594 #ifdef USE_ONESIDED
1595     onesided_init(NULL, &onesided_hnd);
1596
1597     // this is a GNI test, so use the libonesided bypass functionality
1598     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
1599     local_addr = gniGetNicAddress();
1600 #else
1601     ptag = get_ptag();
1602     cookie = get_cookie();
1603 #if 0
1604     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
1605 #endif
1606     //Create and attach to the communication  domain */
1607     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
1608     GNI_RC_CHECK("GNI_CdmCreate", status);
1609     //* device id The device id is the minor number for the device
1610     //that is assigned to the device by the system when the device is created.
1611     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
1612     //where X is the device number 0 default 
1613     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
1614     GNI_RC_CHECK("GNI_CdmAttach", status);
1615     local_addr = get_gni_nic_address(0);
1616 #endif
1617     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
1618     _MEMCHECK(MPID_UGNI_AllAddr);
1619     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
1620     /* create the local completion queue */
1621     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
1622     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
1623     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
1624     
1625     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
1626     //GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
1627     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
1628
1629     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
1630     GNI_RC_CHECK("Create CQ (rx)", status);
1631     
1632     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
1633     //GNI_RC_CHECK("Create Post CQ (rx)", status);
1634     
1635     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
1636     //GNI_RC_CHECK("Create BTE CQ", status);
1637
1638     /* create the endpoints. they need to be bound to allow later CQWrites to them */
1639     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
1640     _MEMCHECK(ep_hndl_array);
1641
1642     for (i=0; i<mysize; i++) {
1643         if(i == myrank) continue;
1644         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
1645         GNI_RC_CHECK("GNI_EpCreate ", status);   
1646         remote_addr = MPID_UGNI_AllAddr[i];
1647         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
1648         GNI_RC_CHECK("GNI_EpBind ", status);   
1649     }
1650     /* Depending on the number of cores in the job, decide different method */
1651     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
1652     if(mysize > 1)
1653     {
1654         if(useDynamicSMSG == 0)
1655         {
1656             _init_static_smsg();
1657         }else
1658         {
1659             _init_dynamic_smsg();
1660         }
1661
1662         _init_smsg();
1663     }
1664 #if     USE_LRTS_MEMPOOL
1665     CmiGetArgLong(*argv, "+useMemorypoolSize", &_mempool_size);
1666     if (myrank==0) CmiPrintf("Charm++> use memorypool size: %1.fMB\n", _mempool_size/1024.0/1024);
1667     init_mempool(_mempool_size);
1668     //init_mempool(Mempool_MaxSize);
1669 #endif
1670
1671     /* init DMA buffer for medium message */
1672
1673     //_init_DMA_buffer();
1674     
1675     free(MPID_UGNI_AllAddr);
1676 }
1677
1678
1679 void* LrtsAlloc(int n_bytes, int header)
1680 {
1681     void *ptr;
1682 #if 0
1683     CmiPrintf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d\n", CmiMyPe(), n_bytes, header);
1684 #endif
1685     if(n_bytes <= SMSG_MAX_MSG)
1686     {
1687         int totalsize = n_bytes+header;
1688         ptr = malloc(totalsize);
1689     }else 
1690     {
1691
1692         CmiAssert(header <= ALIGNBUF);
1693 #if     USE_LRTS_MEMPOOL
1694         n_bytes = ALIGN64(n_bytes);
1695         char *res = syh_mempool_malloc(ALIGNBUF+n_bytes);
1696 #else
1697         n_bytes = ALIGN4(n_bytes);           /* make sure size if 4 aligned */
1698         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1699 #endif
1700         ptr = res + ALIGNBUF - header;
1701     }
1702 #if 0 
1703     CmiPrintf("Done Alloc Lrts for bytes=%d, head=%d\n", n_bytes, header);
1704 #endif
1705     return ptr;
1706 }
1707
1708 void  LrtsFree(void *msg)
1709 {
1710     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1711     if (size <= SMSG_MAX_MSG)
1712       free(msg);
1713     else
1714     {
1715 #if 0
1716         CmiPrintf("[PE:%d] Free lrts for bytes=%d, ptr=%p\n", CmiMyPe(), size, (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1717 #endif
1718 #if     USE_LRTS_MEMPOOL
1719         syh_mempool_free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1720 #else
1721         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1722 #endif
1723     }
1724 #if 0 
1725     CmiPrintf("Done Free lrts for bytes=%d\n", size);
1726 #endif
1727 }
1728
1729 static void LrtsExit()
1730 {
1731     /* free memory ? */
1732 #if     USE_LRTS_MEMPOOL
1733     kill_allmempool();
1734 #endif
1735     PMI_Finalize();
1736     exit(0);
1737 }
1738
1739 static void LrtsDrainResources()
1740 {
1741     while (!SendBufferMsg()) {
1742         PumpNetworkSmsg();
1743         PumpNetworkRdmaMsgs();
1744         PumpLocalSmsgTransactions();
1745         PumpLocalRdmaTransactions();
1746     }
1747     PMI_Barrier();
1748 }
1749
1750 void CmiAbort(const char *message) {
1751
1752     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1753     PMI_Abort(-1, message);
1754 }
1755
1756 /**************************  TIMER FUNCTIONS **************************/
1757 #if CMK_TIMER_USE_SPECIAL
1758 /* MPI calls are not threadsafe, even the timer on some machines */
1759 static CmiNodeLock  timerLock = 0;
1760 static int _absoluteTime = 0;
1761 static int _is_global = 0;
1762 static struct timespec start_ns;
1763
1764 inline int CmiTimerIsSynchronized() {
1765     return 1;
1766 }
1767
1768 inline int CmiTimerAbsolute() {
1769     return _absoluteTime;
1770 }
1771
1772 double CmiStartTimer() {
1773     return 0.0;
1774 }
1775
1776 double CmiInitTime() {
1777     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
1778 }
1779
1780 void CmiTimerInit(char **argv) {
1781     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1782     if (_absoluteTime && CmiMyPe() == 0)
1783         printf("Charm++> absolute  timer is used\n");
1784     
1785     _is_global = CmiTimerIsSynchronized();
1786
1787
1788     if (_is_global) {
1789         if (CmiMyRank() == 0) {
1790             clock_gettime(CLOCK_MONOTONIC, &start_ts)
1791         }
1792     } else { /* we don't have a synchronous timer, set our own start time */
1793         CmiBarrier();
1794         CmiBarrier();
1795         CmiBarrier();
1796         clock_gettime(CLOCK_MONOTONIC, &start_ts)
1797     }
1798     CmiNodeAllBarrier();          /* for smp */
1799 }
1800
1801 /**
1802  * Since the timerLock is never created, and is
1803  * always NULL, then all the if-condition inside
1804  * the timer functions could be disabled right
1805  * now in the case of SMP.
1806  */
1807 double CmiTimer(void) {
1808     struct timespec now_ts;
1809     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1810     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1811         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1812 }
1813
1814 double CmiWallTimer(void) {
1815     struct timespec now_ts;
1816     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1817     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1818         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1819 }
1820
1821 double CmiCpuTimer(void) {
1822     struct timespec now_ts;
1823     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1824     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1825         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1826 }
1827
1828 #endif
1829 /************Barrier Related Functions****************/
1830
1831 int CmiBarrier()
1832 {
1833     int status;
1834     status = PMI_Barrier();
1835     return status;
1836
1837 }
1838
1839
1840 #if CMK_PERSISTENT_COMM
1841 #include "machine-persistent.c"
1842 #endif
1843
1844