fixed a deadlock problem
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <stdint.h>
16 #include <errno.h>
17 #include <malloc.h>
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #if defined(_WIN32) && ! defined(__CYGWIN__)
26 #include <windows.h>
27 #include <wincon.h>
28 #include <sys/types.h>
29 #include <sys/timeb.h>
30
31 static void sleep(int secs) {
32     Sleep(1000*secs);
33 }
34 #else
35 #include <unistd.h> /*For getpid()*/
36 #endif
37
38
39 #define REMOTE_EVENT         0
40 #define USE_LRTS_MEMPOOL   1
41
42 #if USE_LRTS_MEMPOOL
43 static CmiInt8 _mempool_size = 1024ll*1024*32;
44 #endif
45
46 #define PRINT_SYH  0
47
48 #if PRINT_SYH
49 int         lrts_smsg_success = 0;
50 int         lrts_send_msg_id = 0;
51 int         lrts_send_rdma_id = 0;
52 int         lrts_send_rdma_success = 0;
53 int         lrts_received_msg = 0;
54 int         lrts_local_done_msg = 0;
55 #endif
56
57 #include "machine.h"
58
59 #include "pcqueue.h"
60
61 #if CMK_PERSISTENT_COMM
62 #include "machine-persistent.h"
63 #endif
64
65 //#define  USE_ONESIDED 1
66 #ifdef USE_ONESIDED
67 //onesided implementation is wrong, since no place to restore omdh
68 #include "onesided.h"
69 onesided_hnd_t   onesided_hnd;
70 onesided_md_t    omdh;
71 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
72
73 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
74
75 #else
76 uint8_t   onesided_hnd, omdh;
77 #if REMOTE_EVENT
78 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
79 #else
80 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
81 #endif
82 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
83 #endif
84
85 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
86 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
87
88 #define ALIGNBUF                64
89
90 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
91 /* If SMSG is not used */
92 #define FMA_PER_CORE  1024
93 #define FMA_BUFFER_SIZE 1024
94 /* If SMSG is used */
95 static int  SMSG_MAX_MSG;
96 //static int  log2_SMSG_MAX_MSG;
97 #define SMSG_MAX_CREDIT  36
98
99 #define MSGQ_MAXSIZE       2048
100 /* large message transfer with FMA or BTE */
101 #define LRTS_GNI_RDMA_THRESHOLD  2048
102
103 #define REMOTE_QUEUE_ENTRIES  2048 
104 #define LOCAL_QUEUE_ENTRIES   64 
105
106 #define PUT_DONE_TAG      0x29
107 #define ACK_TAG           0x30
108 /* SMSG is data message */
109 #define SMALL_DATA_TAG          0x31
110 /* SMSG is a control message to initialize a BTE */
111 #define MEDIUM_HEAD_TAG         0x32
112 #define MEDIUM_DATA_TAG         0x33
113 #define LMSG_INIT_TAG     0x39 
114
115 #define DEBUG
116 #ifdef GNI_RC_CHECK
117 #undef GNI_RC_CHECK
118 #endif
119 #ifdef DEBUG
120 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
121 #else
122 #define GNI_RC_CHECK(msg,rc)
123 #endif
124
125 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
126 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
127
128 static int Mempool_MaxSize = 1024*1024*128;
129 static int useDynamicSMSG   = 1;
130 static int useStaticMSGQ = 0;
131 static int useStaticFMA = 0;
132 static int mysize, myrank;
133 static gni_nic_handle_t      nic_hndl;
134
135 typedef struct {
136     gni_mem_handle_t mdh;
137     uint64_t addr;
138 } mdh_addr_t ;
139 // this is related to dynamic SMSG
140
141 typedef struct mdh_addr_list{
142     gni_mem_handle_t mdh;
143    void *addr;
144     struct mdh_addr_list *next;
145 }mdh_addr_list_t;
146
147 static unsigned int         smsg_memlen;
148 #define     SMSG_CONN_SIZE     sizeof(gni_smsg_attr_t)
149 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
150 int                 *smsg_connected_flag= 0;
151 char                *smsg_connection_addr = 0;
152 mdh_addr_t          setup_mem;
153 mdh_addr_t          *smsg_connection_vec = 0;
154 gni_mem_handle_t    smsg_connection_memhndl;
155 static int          smsg_expand_slots = 10;
156 static int          smsg_available_slot = 0;
157 static void         *smsg_mailbox_mempool = 0;
158 mdh_addr_list_t     *smsg_dynamic_list = 0;
159
160 static void             *smsg_mailbox_base;
161 gni_msgq_attr_t         msgq_attrs;
162 gni_msgq_handle_t       msgq_handle;
163 gni_msgq_ep_attr_t      msgq_ep_attrs;
164 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
165
166
167
168 /* preallocated DMA buffer */
169 int                     DMA_slots;
170 uint64_t                DMA_avail_tag = 0;
171 uint32_t                DMA_incoming_avail_tag = 0;
172 uint32_t                DMA_outgoing_avail_tag = 0;
173 void                    *DMA_incoming_base_addr;
174 void                    *DMA_outgoing_base_addr;
175 mdh_addr_t              DMA_buffer_base_mdh_addr;
176 mdh_addr_t              *DMA_buffer_base_mdh_addr_vec;
177 int                     DMA_buffer_size;
178 int                     DMA_max_single_msg = 131072;//524288 ;
179
180 #define                 DMA_SIZE_PER_SLOT       8192
181
182
183 typedef struct dma_msgid_map
184 {
185     uint64_t     msg_id;
186     int     msg_subid
187 } dma_msgid_map_t;
188
189 dma_msgid_map_t         *dma_map_list;
190
191 typedef struct msg_trace
192 {
193     uint64_t    msg_id;
194     int         done_num;
195 }msg_trace_t;
196
197 msg_trace_t             *pending_msg_list;
198 /* =====Beginning of Declarations of Machine Specific Variables===== */
199 static int cookie;
200 static int modes = 0;
201 static gni_cq_handle_t       smsg_rx_cqh = NULL;
202 static gni_cq_handle_t       smsg_tx_cqh = NULL;
203 static gni_cq_handle_t       post_rx_cqh = NULL;
204 static gni_cq_handle_t       post_tx_cqh = NULL;
205 static gni_ep_handle_t       *ep_hndl_array;
206
207 typedef    struct  pending_smg
208 {
209     int     inst_id;
210     struct  pending_smg *next;
211 } PENDING_GETNEXT;
212
213
214 typedef struct msg_list
215 {
216     uint32_t destNode;
217     uint32_t size;
218     void *msg;
219     struct msg_list *next;
220     struct msg_list *pehead_next;
221     uint8_t tag;
222 }MSG_LIST;
223
224 typedef struct medium_msg_list
225 {
226     uint32_t destNode;
227     uint32_t msg_id;
228     uint32_t msg_subid;
229     uint32_t remain_size;
230     void *msg;
231     struct medium_msg_list *next;
232 }MEDIUM_MSG_LIST;
233
234
235 typedef struct control_msg
236 {
237     uint64_t            source_addr;
238     int                 source;               /* source rank */
239     int                 length;
240     gni_mem_handle_t    source_mem_hndl;
241     struct control_msg *next;
242 }CONTROL_MSG;
243
244 typedef struct medium_msg_control
245 {
246     uint64_t            dma_offset;     //the dma_buffer for this block of msg
247     int                 msg_id;         //Id for the total index
248     int                 msg_subid;      //offset inside the message id 
249 }MEDIUM_MSG_CONTROL;
250
251 typedef struct  rmda_msg
252 {
253     int                   destNode;
254     gni_post_descriptor_t *pd;
255     struct  rmda_msg      *next;
256 }RDMA_REQUEST;
257
258 typedef struct  msg_list_index
259 {
260     int         next;
261     MSG_LIST    *head;
262     MSG_LIST    *tail;
263 } MSG_LIST_INDEX;
264
265 /* reuse PendingMsg memory */
266 static CONTROL_MSG          *control_freelist=0;
267 static MSG_LIST             *msglist_freelist=0;
268 static int                  smsg_head_index;
269 static MSG_LIST_INDEX       *smsg_msglist_index= 0;
270 static MSG_LIST             *smsg_free_head=0;
271 static MSG_LIST             *smsg_free_tail=0;
272
273 /*
274 #define FreeMsgList(msg_head, msg_tail, free_head, free_tail)       \
275     if(free_head == 0)  free_head = free_tail = msg_head;    \
276     else   free_tail = free_tail->next;    \
277     if( msg_head->next == msg_tail) msg_head =0;   \
278     else msg_head= msg_head->next;    
279
280 #define MallocMsgList(d, msg_head, msg_tail, free_head, free_tail, msgsize) \
281     if(free_head == 0) {d= malloc(msgsize);  \
282         if(msg_head == 0)   msg_head =msg_tail = msg_head->next = msg_tail->next = d; \
283         else { msg_tail->next = d; d->next = msg_head; msg_tail=d;} \
284     }else {d = free_head; free_head = free_head->next; if(free_tail->next == free_head) free_head =0;} \
285 */
286
287 #define FreeMsgList(d)  \
288   (d)->next = msglist_freelist;\
289   msglist_freelist = d;
290
291
292
293 #define MallocMsgList(d) \
294   d = msglist_freelist;\
295   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
296              _MEMCHECK(d);\
297   } else msglist_freelist = d->next;
298
299
300 #define FreeControlMsg(d)       \
301   (d)->next = control_freelist;\
302   control_freelist = d;
303
304 #define MallocControlMsg(d) \
305   d = control_freelist;\
306   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
307              _MEMCHECK(d);\
308   } else control_freelist = d->next;
309
310
311 static RDMA_REQUEST         *rdma_freelist = NULL;
312
313 #define FreeControlMsg(d)       \
314   (d)->next = control_freelist;\
315   control_freelist = d;
316
317 #define MallocControlMsg(d) \
318   d = control_freelist;\
319   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
320              _MEMCHECK(d);\
321   } else control_freelist = d->next;
322
323 #define FreeMediumControlMsg(d)       \
324   (d)->next = medium_control_freelist;\
325   medium_control_freelist = d;
326
327
328 #define MallocMediumControlMsg(d) \
329     d = medium_control_freelist;\
330     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
331     _MEMCHECK(d);\
332 } else mediumcontrol_freelist = d->next;
333
334 #define FreeRdmaRequest(d)       \
335   (d)->next = rdma_freelist;\
336   rdma_freelist = d;
337
338 #define MallocRdmaRequest(d) \
339   d = rdma_freelist;\
340   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
341              _MEMCHECK(d);\
342   } else rdma_freelist = d->next;
343
344 /* reuse gni_post_descriptor_t */
345 static gni_post_descriptor_t *post_freelist=0;
346
347 #if 1
348 #define FreePostDesc(d)       \
349     (d)->next_descr = post_freelist;\
350     post_freelist = d;
351
352 #define MallocPostDesc(d) \
353   d = post_freelist;\
354   if (d==0) { \
355      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
356      _MEMCHECK(d);\
357   } else post_freelist = d->next_descr;
358 #else
359
360 #define FreePostDesc(d)     free(d);
361 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
362
363 #endif
364
365 static PENDING_GETNEXT     *pending_smsg_head = 0;
366 static PENDING_GETNEXT     *pending_smsg_tail = 0;
367
368 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
369 static int      buffered_smsg_counter = 0;
370
371 /* SmsgSend return success but message sent is not confirmed by remote side */
372
373 static RDMA_REQUEST  *pending_rdma_head = 0;
374 static RDMA_REQUEST  *pending_rdma_tail = 0;
375 static MSG_LIST *buffered_fma_head = 0;
376 static MSG_LIST *buffered_fma_tail = 0;
377
378 /* functions  */
379 #define IsFree(a,ind)  !( a& (1<<(ind) ))
380 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
381 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
382
383 #include "mempool.c"
384
385 /* get the upper bound of log 2 */
386 int mylog2(int size)
387 {
388     int op = size;
389     unsigned int ret=0;
390     unsigned int mask = 0;
391     int i;
392     while(op>0)
393     {
394         op = op >> 1;
395         ret++;
396
397     }
398     for(i=1; i<ret; i++)
399     {
400         mask = mask << 1;
401         mask +=1;
402     }
403
404     ret -= ((size &mask) ? 0:1);
405     return ret;
406 }
407
408 static void
409 allgather(void *in,void *out, int len)
410 {
411     static int *ivec_ptr=NULL,already_called=0,job_size=0;
412     int i,rc;
413     int my_rank;
414     char *tmp_buf,*out_ptr;
415
416     if(!already_called) {
417
418         rc = PMI_Get_size(&job_size);
419         CmiAssert(rc == PMI_SUCCESS);
420         rc = PMI_Get_rank(&my_rank);
421         CmiAssert(rc == PMI_SUCCESS);
422
423         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
424         CmiAssert(ivec_ptr != NULL);
425
426         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
427         CmiAssert(rc == PMI_SUCCESS);
428
429         already_called = 1;
430
431     }
432
433     tmp_buf = (char *)malloc(job_size * len);
434     CmiAssert(tmp_buf);
435
436     rc = PMI_Allgather(in,tmp_buf,len);
437     CmiAssert(rc == PMI_SUCCESS);
438
439     out_ptr = out;
440
441     for(i=0;i<job_size;i++) {
442
443         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
444
445     }
446
447     free(tmp_buf);
448 }
449 static void
450 allgather_2(void *in,void *out, int len)
451 {
452     //PMI_Allgather is out of order
453     int i,rc, extend_len;
454     int  rank_index;
455     char *out_ptr, *out_ref;
456     char *in2;
457
458     extend_len = sizeof(int) + len;
459     in2 = (char*)malloc(extend_len);
460
461     memcpy(in2, &myrank, sizeof(int));
462     memcpy(in2+sizeof(int), in, len);
463
464     out_ptr = (char*)malloc(mysize*extend_len);
465
466     rc = PMI_Allgather(in2, out_ptr, extend_len);
467     GNI_RC_CHECK("allgather", rc);
468
469     out_ref = out;
470
471     for(i=0;i<mysize;i++) {
472         //rank index 
473         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
474         //copy to the rank index slot
475         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
476     }
477
478     free(out_ptr);
479     free(in2);
480
481 }
482
483 static unsigned int get_gni_nic_address(int device_id)
484 {
485     unsigned int address, cpu_id;
486     gni_return_t status;
487     int i, alps_dev_id=-1,alps_address=-1;
488     char *token, *p_ptr;
489
490     p_ptr = getenv("PMI_GNI_DEV_ID");
491     if (!p_ptr) {
492         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
493        
494         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
495     } else {
496         while ((token = strtok(p_ptr,":")) != NULL) {
497             alps_dev_id = atoi(token);
498             if (alps_dev_id == device_id) {
499                 break;
500             }
501             p_ptr = NULL;
502         }
503         CmiAssert(alps_dev_id != -1);
504         p_ptr = getenv("PMI_GNI_LOC_ADDR");
505         CmiAssert(p_ptr != NULL);
506         i = 0;
507         while ((token = strtok(p_ptr,":")) != NULL) {
508             if (i == alps_dev_id) {
509                 alps_address = atoi(token);
510                 break;
511             }
512             p_ptr = NULL;
513             ++i;
514         }
515         CmiAssert(alps_address != -1);
516         address = alps_address;
517     }
518     return address;
519 }
520
521 static uint8_t get_ptag(void)
522 {
523     char *p_ptr, *token;
524     uint8_t ptag;
525
526     p_ptr = getenv("PMI_GNI_PTAG");
527     CmiAssert(p_ptr != NULL);
528     token = strtok(p_ptr, ":");
529     ptag = (uint8_t)atoi(token);
530     return ptag;
531         
532 }
533
534 static uint32_t get_cookie(void)
535 {
536     uint32_t cookie;
537     char *p_ptr, *token;
538
539     p_ptr = getenv("PMI_GNI_COOKIE");
540     CmiAssert(p_ptr != NULL);
541     token = strtok(p_ptr, ":");
542     cookie = (uint32_t)atoi(token);
543
544     return cookie;
545 }
546
547 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
548 /* TODO: add any that are related */
549 /* =====End of Definitions of Message-Corruption Related Macros=====*/
550
551
552 #include "machine-lrts.h"
553 #include "machine-common-core.c"
554
555 /* Network progress function is used to poll the network when for
556    messages. This flushes receive buffers on some  implementations*/
557 #if CMK_MACHINE_PROGRESS_DEFINED
558 void CmiMachineProgressImpl() {
559 }
560 #endif
561
562 inline
563 static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
564 {
565     MSG_LIST        *msg_tmp;
566     MallocMsgList(msg_tmp);
567     msg_tmp->destNode = destNode;
568     msg_tmp->size   = size;
569     msg_tmp->msg    = msg;
570     msg_tmp->tag    = tag;
571     msg_tmp->next   = 0;
572     if (smsg_msglist_index[destNode].head == 0 ) {
573         smsg_msglist_index[destNode].head = msg_tmp;
574         smsg_msglist_index[destNode].next = smsg_head_index;
575         smsg_head_index = destNode;
576     }
577     else {
578       (smsg_msglist_index[destNode].tail)->next    = msg_tmp;
579     }
580     smsg_msglist_index[destNode].tail          = msg_tmp;
581 #if PRINT_SYH
582     buffered_smsg_counter++;
583 #endif
584 }
585
586 inline
587 static void setup_smsg_connection(int destNode)
588 {
589     mdh_addr_list_t  *new_entry = 0;
590     gni_post_descriptor_t *pd;
591     gni_smsg_attr_t      *smsg_attr;
592     gni_return_t status = GNI_RC_NOT_DONE;
593     if(smsg_available_slot == smsg_expand_slots)
594     {
595         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
596         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
597         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
598
599         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
600             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
601             GNI_MEM_READWRITE,   
602             -1,
603             &(new_entry->mdh));
604         smsg_available_slot = 0; 
605         new_entry->next = smsg_dynamic_list;
606         smsg_dynamic_list = new_entry;
607     }
608     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
609     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
610     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
611     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
612     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
613     smsg_attr->buff_size = smsg_memlen;
614     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
615     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
616     smsg_local_attr_vec[destNode] = smsg_attr;
617     smsg_available_slot++;
618     MallocPostDesc(pd);
619     pd->type            = GNI_POST_FMA_PUT;
620     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
621 //pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
622     pd->length          = sizeof(gni_smsg_attr_t);
623     pd->local_addr      = (uint64_t) smsg_attr;
624     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
625     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
626     pd->src_cq_hndl     = 0;
627     pd->rdma_mode       = 0;
628     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
629     GNI_RC_CHECK("SMSG Dynamic link", status);
630     //CmiPrintf("[%d=%d] send post FMA successful (%p) %s\n", myrank, destNode, (void*)pd->remote_addr, gni_err_str[status]);
631 }
632
633 inline 
634 static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
635 {
636     gni_return_t status = GNI_RC_NOT_DONE;
637     gni_smsg_attr_t      *smsg_attr;
638     gni_post_descriptor_t *pd;
639   
640     if(useDynamicSMSG == 1)
641     {
642         if(smsg_connected_flag[destNode] == 0)
643         {
644             //CmiPrintf("[%d]Init smsg connection\n", CmiMyPe());
645             setup_smsg_connection(destNode);
646             delay_send_small_msg(msg, size, destNode, tag);
647             smsg_connected_flag[destNode] =1;
648             return status;
649         }
650         else  if(smsg_connected_flag[destNode] < 3)
651         {
652             if(inbuff == 0)
653                 delay_send_small_msg(msg, size, destNode, tag);
654             return status;
655         }
656     }
657     //CmiPrintf("[%d] reach send\n", myrank);
658     if(smsg_msglist_index[destNode].head == 0 || inbuff==1)
659     {
660         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], header, size_header, msg, size, 0, tag);
661         if(status == GNI_RC_SUCCESS)
662         {
663 #if PRINT_SYH
664             lrts_smsg_success++;
665             if(lrts_smsg_success == lrts_send_msg_id)
666                 CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
667             else
668                 CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
669 #endif
670             return status;
671         }
672     }
673     if(inbuff ==0)
674         delay_send_small_msg(msg, size, destNode, tag);
675     return status;
676 }
677
678 // Get first 0 in DMA_tags starting from index
679 static int get_first_avail_bit(uint64_t DMA_tags, int start_index)
680 {
681
682     uint64_t         mask = 0x1;
683     register    int     i=0;
684     while((DMA_tags & mask) && i<DMA_slots) {mask << 1; i++;}
685
686 }
687
688 static int send_medium_messages(int destNode, int size, char *msg)
689 {
690 #if 0
691     gni_return_t status = GNI_RC_SUCCESS;
692     int first_avail_bit=0;
693     uint64_t mask = 0x1;
694     MEDIUM_MSG_CONTROL  *medium_msg_control_tmp;
695     MEDIUM_MSG_LIST        *msg_tmp;
696     int blocksize, remain_size, pos;
697     int sub_id = 0;
698     remain_size = size;
699     pos = 0;  //offset before which data are sent
700     /* copy blocks of the message to DMA preallocated buffer and send SMSG */
701     //Check whether there is any available DMA buffer
702     
703     do{
704         while((DMA_avail_tag & mask) && first_avail_bit<DMA_slots) {mask << 1; first_avail_bit++;}
705         if(first_avail_bit == DMA_slots) //No available DMA, buffer this message
706         {
707             MallocMediumMsgList(msg_tmp);
708             msg_tmp->destNode = destNode;
709             msg_tmp->msg_id   = lrts_send_msg_id;
710             msg_tmp->msg_subid   = sub_id;
711             msg_tmp->size   = remain_size;
712             msg_tmp->msg    = msg+pos;
713             msg_tmp->next   = NULL;
714             break;
715         }else
716         {
717             //copy this part of the message into this DMA buffer
718             //TODO optimize here, some data can go with this SMSG
719             blocksize = (remain_size>DMA_SIZE_PER_SLOT)?DMA_SIZE_PER_SLOT: remain_size;
720             memcpy(DMA_buffer_base_mdh_addr.addr[first_avail_bit], msg+pos, blocksize);
721             pos += blocksize;
722             remain_size -= blocksize;
723             SET_BITS(DMA_avail_tag, first_avail_bit);
724            
725             MallocMediumControlMsg(medium_msg_control_tmp);
726             medium_msg_control_tmp->msg_id = lrts_send_msg_id;
727             medium_msg_control_tmp->msg_subid = sub_id;
728             if(status == GNI_RC_SUCCESS)
729             {
730                 if(sub_id==0)
731                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_HEAD_TAG);
732                 else
733                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_DATA_TAG);
734             }
735             //buffer this smsg
736             if(status != GNI_RC_SUCCESS)
737             {
738                 delay_send_small_msg(medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), destNode, MEDIUM_HEAD_TAG);
739             }
740             sub_id++;
741         }while(remain_size > 0 );
742
743         }
744     }
745 #endif
746 }
747
748 // Large message, send control to receiver, receiver register memory and do a GET 
749 inline
750 static void send_large_messages(int destNode, int size, char *msg)
751 {
752 #if     USE_LRTS_MEMPOOL
753     gni_return_t        status  =   GNI_RC_SUCCESS;
754     CONTROL_MSG         *control_msg_tmp;
755     uint32_t            vmdh_index  = -1;
756     /* construct a control message and send */
757     MallocControlMsg(control_msg_tmp);
758     control_msg_tmp->source_addr    = (uint64_t)msg;
759     control_msg_tmp->source         = myrank;
760     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
761     //memcpy( &(control_msg_tmp->source_mem_hndl), GetMemHndl(msg), sizeof(gni_mem_handle_t)) ;
762     control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
763 #if PRINT_SYH
764     lrts_send_msg_id++;
765     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
766 #endif
767     status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
768     if(status == GNI_RC_SUCCESS)
769     {
770         FreeControlMsg(control_msg_tmp);
771     }
772 // NOT use mempool, should slow 
773 #else
774     gni_return_t        status  =   GNI_RC_SUCCESS;
775     CONTROL_MSG         *control_msg_tmp;
776     uint32_t            vmdh_index  = -1;
777     /* construct a control message and send */
778     MallocControlMsg(control_msg_tmp);
779     control_msg_tmp->source_addr    = (uint64_t)msg;
780     control_msg_tmp->source         = myrank;
781     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
782     control_msg_tmp->source_mem_hndl.qword1 = 0;
783     control_msg_tmp->source_mem_hndl.qword2 = 0;
784 #if PRINT_SYH
785     lrts_send_msg_id++;
786     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
787 #endif
788     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
789     if(status == GNI_RC_SUCCESS)
790     {
791         status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
792         if(status == GNI_RC_SUCCESS)
793         {
794             FreeControlMsg(control_msg_tmp);
795         }
796     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
797     {
798         CmiAbort("Memory registor for large msg\n");
799     }else 
800     {
801         delay_send_small_msg(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
802     }
803 #endif
804 }
805
806 inline void LrtsPrepareEnvelope(char *msg, int size)
807 {
808     CmiSetMsgSize(msg, size);
809 }
810
811 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
812 {
813     gni_return_t        status  =   GNI_RC_SUCCESS;
814     LrtsPrepareEnvelope(msg, size);
815
816     if(size <= SMSG_MAX_MSG)
817     {
818 #if PRINT_SYH
819         lrts_send_msg_id++;
820         CmiPrintf("SMSG LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_msg_id);
821 #endif
822         status = send_smsg_message( destNode, 0, 0, msg, size, SMALL_DATA_TAG, 0);  
823         if(status == GNI_RC_SUCCESS)
824         {
825             CmiFree(msg);
826         }
827     }
828     else
829     {
830         send_large_messages(destNode, size, msg);
831     }
832     return 0;
833 }
834
835 static void LrtsPreCommonInit(int everReturn){}
836
837 /* Idle-state related functions: called in non-smp mode */
838 void CmiNotifyIdleForGemini(void) {
839     LrtsAdvanceCommunication();
840 }
841
842 static void LrtsPostCommonInit(int everReturn)
843 {
844 #if CMK_SMP
845     CmiIdleState *s=CmiNotifyGetState();
846     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
847     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
848 #else
849     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForGemini,NULL);
850 #endif
851
852 }
853
854
855 void LrtsPostNonLocal(){}
856 /* pooling CQ to receive network message */
857 static void PumpNetworkRdmaMsgs()
858 {
859     gni_cq_entry_t      event_data;
860     gni_return_t        status;
861     while( (status = GNI_CqGetEvent(post_rx_cqh, &event_data)) == GNI_RC_SUCCESS);
862 }
863
864 static int SendRdmaMsg();
865 static void getLargeMsgRequest(void* header, uint64_t inst_id);
866 static void PumpNetworkSmsg()
867 {
868     uint64_t            inst_id;
869     PENDING_GETNEXT     *pending_next;
870     int                 ret;
871     gni_cq_entry_t      event_data;
872     gni_return_t        status;
873     void                *header;
874     uint8_t             msg_tag;
875     int                 msg_nbytes;
876     void                *msg_data;
877     gni_mem_handle_t    msg_mem_hndl;
878     gni_smsg_attr_t     *smsg_attr;
879     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
880     {
881         inst_id = GNI_CQ_GET_INST_ID(event_data);
882         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
883 #if PRINT_SYH
884         CmiPrintf("[%d] PumpNetworkMsgs small msgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
885 #endif
886
887         if(useDynamicSMSG == 1)
888         {
889             if(smsg_connected_flag[inst_id] == 0 )
890             {
891                 //CmiPrintf("[%d]pump Init smsg connection\n", CmiMyPe());
892                 smsg_connected_flag[inst_id] =2;
893                 setup_smsg_connection(inst_id);
894                 status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id],  &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]));
895                 GNI_RC_CHECK("SmsgInit", status);
896                 continue;
897             } else if (smsg_connected_flag[inst_id] <3) 
898             {
899                 //CmiPrintf("[%d]pump setup smsg connection\n", CmiMyPe());
900                 smsg_connected_flag[inst_id] += 1;
901                 status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id], &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]));
902                 GNI_RC_CHECK("SmsgInit", status);
903                 continue;
904             }
905         }
906         msg_tag = GNI_SMSG_ANY_TAG;
907         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
908         {
909 #if PRINT_SYH
910             lrts_received_msg++;
911             CmiPrintf("+++[%d] PumpNetwork msg is received, messageid:%d tag=%d\n", myrank, lrts_received_msg, msg_tag);
912 #endif
913             /* copy msg out and then put into queue (small message) */
914             switch (msg_tag) {
915             case SMALL_DATA_TAG:
916             {
917                 msg_nbytes = CmiGetMsgSize(header);
918                 msg_data    = CmiAlloc(msg_nbytes);
919                 memcpy(msg_data, (char*)header, msg_nbytes);
920                 handleOneRecvedMsg(msg_nbytes, msg_data);
921                 break;
922             }
923             case LMSG_INIT_TAG:
924             {
925 #if PRINT_SYH
926                 CmiPrintf("+++[%d] from %d PumpNetwork Rdma Request msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
927 #endif
928                 getLargeMsgRequest(header, inst_id);
929                 break;
930             }
931             case ACK_TAG:
932             {
933                 /* Get is done, release message . Now put is not used yet*/
934 #if         !USE_LRTS_MEMPOOL
935                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
936 #endif
937                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
938                 SendRdmaMsg();
939                 break;
940             }
941             case PUT_DONE_TAG: //persistent message
942             {
943                 handleOneRecvedMsg(((CONTROL_MSG *) header)->length,((void*) (CONTROL_MSG *) header)); 
944                 break;
945             }
946             default: {
947                 CmiPrintf("weird tag problem\n");
948                 CmiAbort("Unknown tag\n");
949             }
950             }
951             GNI_SmsgRelease(ep_hndl_array[inst_id]);
952             msg_tag = GNI_SMSG_ANY_TAG;
953         } //endwhile getNext
954     }   //end while GetEvent
955     if(status == GNI_RC_ERROR_RESOURCE)
956     {
957         GNI_RC_CHECK("Smsg_rx_cq full", status);
958     }
959 }
960
961 static void getLargeMsgRequest(void* header, uint64_t inst_id)
962 {
963 #if     USE_LRTS_MEMPOOL
964     CONTROL_MSG         *request_msg;
965     gni_return_t        status;
966     void                *msg_data;
967     gni_post_descriptor_t *pd;
968     RDMA_REQUEST        *rdma_request_msg;
969     gni_mem_handle_t    msg_mem_hndl;
970     int source;
971     // initial a get to transfer data from the sender side */
972     request_msg = (CONTROL_MSG *) header;
973     source = request_msg->source;
974     msg_data = CmiAlloc(request_msg->length);
975     _MEMCHECK(msg_data);
976     //memcpy(&msg_mem_hndl, GetMemHndl(msg_data), sizeof(gni_mem_handle_t));
977     msg_mem_hndl = GetMemHndl(msg_data);
978
979     MallocPostDesc(pd);
980     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
981         pd->type            = GNI_POST_FMA_GET;
982     else
983         pd->type            = GNI_POST_RDMA_GET;
984 #if REMOTE_EVENT
985     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
986 #else
987     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
988 #endif
989     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
990     pd->length          = ALIGN4(request_msg->length);
991     pd->local_addr      = (uint64_t) msg_data;
992     pd->local_mem_hndl  = msg_mem_hndl;
993     pd->remote_addr     = request_msg->source_addr;
994     pd->remote_mem_hndl = request_msg->source_mem_hndl;
995     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
996     pd->rdma_mode       = 0;
997
998     if(pd->type == GNI_POST_RDMA_GET) 
999         status = GNI_PostRdma(ep_hndl_array[source], pd);
1000     else
1001         status = GNI_PostFma(ep_hndl_array[source],  pd);
1002     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1003     {
1004         MallocRdmaRequest(rdma_request_msg);
1005         rdma_request_msg->next = 0;
1006         rdma_request_msg->destNode = inst_id;
1007         rdma_request_msg->pd = pd;
1008         if(pending_rdma_head == 0)
1009         {
1010             pending_rdma_head = rdma_request_msg;
1011         }else
1012         {
1013             pending_rdma_tail->next = rdma_request_msg;
1014         }
1015         pending_rdma_tail = rdma_request_msg;
1016     }else
1017         GNI_RC_CHECK("AFter posting", status);
1018
1019 #else
1020     CONTROL_MSG         *request_msg;
1021     gni_return_t        status;
1022     void                *msg_data;
1023     gni_post_descriptor_t *pd;
1024     RDMA_REQUEST        *rdma_request_msg;
1025     gni_mem_handle_t    msg_mem_hndl;
1026     int source;
1027     // initial a get to transfer data from the sender side */
1028     request_msg = (CONTROL_MSG *) header;
1029     source = request_msg->source;
1030     msg_data = CmiAlloc(request_msg->length);
1031     _MEMCHECK(msg_data);
1032
1033     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
1034
1035     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1036     {
1037         GNI_RC_CHECK("Mem Register before post", status);
1038     }
1039
1040     MallocPostDesc(pd);
1041     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
1042         pd->type            = GNI_POST_FMA_GET;
1043     else
1044         pd->type            = GNI_POST_RDMA_GET;
1045 #if REMOTE_EVENT
1046     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1047 #else
1048     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1049 #endif
1050     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1051     pd->length          = ALIGN4(request_msg->length);
1052     pd->local_addr      = (uint64_t) msg_data;
1053     pd->remote_addr     = request_msg->source_addr;
1054     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1055     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1056     pd->rdma_mode       = 0;
1057
1058     //memory registration successful
1059     if(status == GNI_RC_SUCCESS)
1060     {
1061         pd->local_mem_hndl  = msg_mem_hndl;
1062         if(pd->type == GNI_POST_RDMA_GET) 
1063             status = GNI_PostRdma(ep_hndl_array[source], pd);
1064         else
1065             status = GNI_PostFma(ep_hndl_array[source],  pd);
1066     }else
1067     {
1068         pd->local_mem_hndl.qword1  = 0; 
1069         pd->local_mem_hndl.qword1  = 0; 
1070     }
1071     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1072     {
1073         MallocRdmaRequest(rdma_request_msg);
1074         rdma_request_msg->next = 0;
1075         rdma_request_msg->destNode = inst_id;
1076         rdma_request_msg->pd = pd;
1077         if(pending_rdma_head == 0)
1078         {
1079             pending_rdma_head = rdma_request_msg;
1080         }else
1081         {
1082             pending_rdma_tail->next = rdma_request_msg;
1083         }
1084         pending_rdma_tail = rdma_request_msg;
1085     }else
1086         GNI_RC_CHECK("AFter posting", status);
1087 #endif
1088 }
1089
1090 /* Check whether message send or get is confirmed by remote */
1091 static void PumpLocalSmsgTransactions()
1092 {
1093     gni_return_t            status;
1094     gni_cq_entry_t          ev;
1095     while ((status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1096     {
1097 #if PRINT_SYH
1098         lrts_local_done_msg++;
1099         //CmiPrintf("*[%d]  PumpLocalSmsgTransactions GNI_CQ_GET_TYPE %d. Localdone=%d\n", myrank, GNI_CQ_GET_TYPE(ev), lrts_local_done_msg);
1100 #endif
1101         if(GNI_CQ_OVERRUN(ev))
1102         {
1103             CmiPrintf("Overrun detected in local CQ");
1104             CmiAbort("Overrun in TX");
1105         }
1106     }
1107     if(status == GNI_RC_ERROR_RESOURCE)
1108     {
1109         GNI_RC_CHECK("Smsg_tx_cq full", status);
1110     }
1111 }
1112
1113 static void PumpLocalRdmaTransactions()
1114 {
1115     gni_cq_entry_t          ev;
1116     gni_return_t            status;
1117     uint64_t                type, inst_id;
1118     gni_post_descriptor_t   *tmp_pd;
1119     MSG_LIST                *ptr;
1120     CONTROL_MSG             *ack_msg_tmp;
1121     uint8_t             msg_tag;
1122
1123    // while ( (status = GNI_CqGetEvent(post_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1124     while ( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1125     {
1126         type        = GNI_CQ_GET_TYPE(ev);
1127 #if PRINT_SYH
1128         //lrts_local_done_msg++;
1129         CmiPrintf("**[%d] SMSGPumpLocalTransactions (type=%d)\n", myrank, type);
1130 #endif
1131         if (type == GNI_CQ_EVENT_TYPE_POST)
1132         {
1133             inst_id     = GNI_CQ_GET_INST_ID(ev);
1134 #if PRINT_SYH
1135             CmiPrintf("**[%d] SMSGPumpLocalTransactions localdone=%d, %d\n", myrank,  lrts_local_done_msg, smsg_connected_flag[inst_id]);
1136 #endif
1137             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
1138             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1139             ////Message is sent, free message , put is not used now
1140             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
1141             {
1142                 if(smsg_connected_flag[inst_id] <3)
1143                 {
1144                     smsg_connected_flag[inst_id] += 1;
1145                     continue;
1146                 }
1147                 else  
1148                 {
1149                     //persistent message 
1150                     CmiFree((void *)tmp_pd->local_addr);
1151                     msg_tag = PUT_DONE_TAG; 
1152                 }
1153             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1154             {
1155                 msg_tag = ACK_TAG;  
1156             }
1157             MallocControlMsg(ack_msg_tmp);
1158             ack_msg_tmp->source             = myrank;
1159             ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
1160             ack_msg_tmp->length             = tmp_pd->length; 
1161             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1162 #if PRINT_SYH
1163             lrts_send_msg_id++;
1164             CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
1165 #endif
1166             status = send_smsg_message(inst_id, 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0);  
1167             if(status == GNI_RC_SUCCESS)
1168             {
1169                 FreeControlMsg(ack_msg_tmp);
1170             }
1171 #if     !USE_LRTS_MEMPOOL
1172             MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1173 #endif
1174             CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1175             handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1176             SendRdmaMsg(); 
1177             FreePostDesc(tmp_pd);
1178         }
1179     } //end while
1180 }
1181
1182 static int SendRdmaMsg()
1183 {
1184     gni_return_t            status = GNI_RC_SUCCESS;
1185     gni_mem_handle_t        msg_mem_hndl;
1186
1187     RDMA_REQUEST *ptr = pending_rdma_head;
1188     RDMA_REQUEST *prev = NULL;
1189
1190     while (ptr != NULL)
1191     {
1192         gni_post_descriptor_t *pd = ptr->pd;
1193         status = GNI_RC_SUCCESS;
1194         // register memory first
1195         if( pd->local_mem_hndl.qword1 == 0 && pd->local_mem_hndl.qword2 == 0)
1196         {
1197             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1198         }
1199         if(status == GNI_RC_SUCCESS)
1200         {
1201             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1202                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1203             else
1204                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1205             if(status == GNI_RC_SUCCESS)
1206             {
1207                 RDMA_REQUEST *tmp = ptr;
1208                 if (prev)
1209                   prev->next = ptr->next;
1210                 else
1211                   pending_rdma_head = ptr->next;
1212                 ptr = ptr->next;
1213                 FreeRdmaRequest(tmp);
1214                 continue;
1215             }
1216         }
1217         prev = ptr;
1218         ptr = ptr->next;
1219     } //end while
1220     return 0;
1221 }
1222
1223 // return 1 if all messages are sent
1224 static int SendBufferMsg()
1225 {
1226     MSG_LIST            *ptr, *previous_head, *current_head;
1227     CONTROL_MSG         *control_msg_tmp;
1228     gni_return_t        status;
1229     int done = 1;
1230     register    int     i;
1231     int                 index_previous = -1;
1232     int                 index = smsg_head_index;
1233     //if( smsg_msglist_head == 0 && buffered_smsg_counter!= 0 ) {CmiPrintf("WRONGWRONG on rank%d, buffermsg=%d, (msgid-succ:%d)\n", myrank, buffered_smsg_counter, (lrts_send_msg_id-lrts_smsg_success)); CmiAbort("sendbuf");}
1234     /* can add flow control here to control the number of messages sent before handle message */
1235     while(index != -1)
1236     {
1237         ptr = smsg_msglist_index[index].head;
1238        
1239         while(ptr!=0)
1240         {
1241             CmiAssert(ptr!=NULL);
1242             if(ptr->tag == SMALL_DATA_TAG)
1243             {
1244                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, ptr->size, SMALL_DATA_TAG, 1);  
1245                 if(status == GNI_RC_SUCCESS)
1246                 {
1247                     CmiFree(ptr->msg);
1248                 }
1249             }
1250             else if(ptr->tag == LMSG_INIT_TAG)
1251             {
1252                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1253 #if PRINT_SYH
1254                 CmiPrintf("[%d==>%d] LMSG buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1255 #endif
1256                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
1257                 {
1258                     MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, control_msg_tmp->length, &(control_msg_tmp->source_mem_hndl), &omdh);
1259                     if(status != GNI_RC_SUCCESS) {
1260                         done = 0;
1261                         break;
1262                     }
1263                 }
1264                 
1265                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 1);  
1266                 if(status == GNI_RC_SUCCESS)
1267                 {
1268                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
1269                 }
1270             }else if (ptr->tag == ACK_TAG)
1271             {
1272                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), ACK_TAG, 1);  
1273                 if(status == GNI_RC_SUCCESS)
1274                 {
1275                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1276                 }
1277             }else
1278             {
1279                 CmiPrintf("Weird tag\n");
1280                 CmiAbort("should not happen\n");
1281             }
1282             if(status == GNI_RC_SUCCESS)
1283             {
1284 #if PRINT_SYH
1285                 buffered_smsg_counter--;
1286                 if(lrts_smsg_success == lrts_send_msg_id)
1287                     CmiPrintf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1288                 else
1289                     CmiPrintf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1290 #endif
1291                 smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
1292                 FreeMsgList(ptr);
1293                 ptr= smsg_msglist_index[index].head;
1294
1295             }else {
1296                 done = 0;
1297                 break;
1298             } 
1299         
1300         } //end while
1301         if(ptr == 0)
1302         {
1303             if(index_previous != -1)
1304                 smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next;
1305             else
1306                 smsg_head_index = smsg_msglist_index[index].next;
1307         }else
1308         {
1309             index_previous = index;
1310         }
1311         index = smsg_msglist_index[index].next;
1312     }   // end pooling for all cores
1313     return done;
1314 }
1315
1316 static void LrtsAdvanceCommunication()
1317 {
1318     /*  Receive Msg first */
1319 #if 0
1320     if(myrank == 0)
1321     CmiPrintf("Calling Lrts Pump Msg PE:%d\n", myrank);
1322 #endif
1323     PumpNetworkSmsg();
1324     //CmiPrintf("Calling Lrts Pump RdmaMsg PE:%d\n", CmiMyPe());
1325     //PumpNetworkRdmaMsgs();
1326     /* Release Sent Msg */
1327     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
1328 #if 0
1329     ////PumpLocalSmsgTransactions();
1330     if(myrank == 0)
1331     CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", myrank);
1332 #endif
1333     PumpLocalRdmaTransactions();
1334 #if 0
1335     if(myrank == 0)
1336     CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", myrank);
1337 #endif
1338     /* Send buffered Message */
1339     SendBufferMsg();
1340 #if 0
1341     if(myrank == 0)
1342     CmiPrintf("Calling Lrts rdma PE:%d\n", myrank);
1343 #endif
1344     SendRdmaMsg();
1345 #if 0
1346     if(myrank == 0)
1347     CmiPrintf("done PE:%d\n", myrank);
1348 #endif
1349 }
1350
1351 static void _init_dynamic_smsg()
1352 {
1353     gni_smsg_attr_t smsg_attr;
1354     gni_return_t status;
1355     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
1356     memset(smsg_connected_flag, 0, mysize*sizeof(int));
1357
1358     CmiPrintf("start Dynamic init done %d\n", myrank);
1359     smsg_local_attr_vec = (gni_smsg_attr_t**) malloc(sizeof(gni_smsg_attr_t*) *mysize);
1360     
1361     setup_mem.addr = (uint64_t)malloc(mysize * sizeof(gni_smsg_attr_t));
1362     status = GNI_MemRegister(nic_hndl, setup_mem.addr,  mysize * SMSG_CONN_SIZE, smsg_rx_cqh,  GNI_MEM_READWRITE, -1,  &(setup_mem.mdh));
1363    
1364     GNI_RC_CHECK("Smsg dynamic allocation \n", status);
1365     CmiPrintf("[%d]Smsg dynamic allocation (%p)\n", myrank, (void*)setup_mem.addr);
1366     smsg_connection_vec = (mdh_addr_t*) malloc(mysize*sizeof(mdh_addr_t)); 
1367     allgather(&setup_mem, smsg_connection_vec, sizeof(mdh_addr_t));
1368     
1369     //pre-allocate some memory as mailbox for dynamic connection
1370     if(mysize <=4096)
1371     {
1372         SMSG_MAX_MSG = 1024;
1373     }else if (mysize > 4096 && mysize <= 16384)
1374     {
1375         SMSG_MAX_MSG = 512;
1376     }else {
1377         SMSG_MAX_MSG = 256;
1378     }
1379     
1380     smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1381     smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1382     smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1383     status = GNI_SmsgBufferSizeNeeded(&smsg_attr, &smsg_memlen);
1384     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1385     
1386     smsg_dynamic_list = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1387
1388     smsg_dynamic_list->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1389     bzero(smsg_dynamic_list->addr, smsg_memlen*smsg_expand_slots);
1390     
1391     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_dynamic_list->addr,
1392             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1393             GNI_MEM_READWRITE,   
1394             -1,
1395             &(smsg_dynamic_list->mdh));
1396    smsg_available_slot = 0;  
1397    CmiPrintf(" Dynamic init done %d\n", myrank);
1398 }
1399
1400 static void _init_static_smsg()
1401 {
1402     gni_smsg_attr_t      *smsg_attr;
1403     gni_smsg_attr_t      remote_smsg_attr;
1404     gni_smsg_attr_t      *smsg_attr_vec;
1405     gni_mem_handle_t     my_smsg_mdh_mailbox;
1406     int      i;
1407     gni_return_t status;
1408     uint32_t              vmdh_index = -1;
1409     mdh_addr_t            base_infor;
1410     mdh_addr_t            *base_addr_vec;
1411     if(mysize <=4096)
1412     {
1413         SMSG_MAX_MSG = 1024;
1414         //log2_SMSG_MAX_MSG = 10;
1415     }else if (mysize > 4096 && mysize <= 16384)
1416     {
1417         SMSG_MAX_MSG = 512;
1418         //log2_SMSG_MAX_MSG = 9;
1419
1420     }else {
1421         SMSG_MAX_MSG = 256;
1422         //log2_SMSG_MAX_MSG = 8;
1423     }
1424     
1425     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
1426     
1427     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1428     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
1429     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
1430     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
1431     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1432     smsg_mailbox_base = memalign(64, smsg_memlen*(mysize));
1433     _MEMCHECK(smsg_mailbox_base);
1434     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
1435     if (myrank == 0) CmiPrintf("Charm++> allocates %.2fMB for SMSG. \n", smsg_memlen*mysize/1e6);
1436     
1437     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
1438             smsg_memlen*(mysize), smsg_rx_cqh,
1439             GNI_MEM_READWRITE,   
1440             vmdh_index,
1441             &my_smsg_mdh_mailbox);
1442
1443     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1444
1445     base_infor.addr =  (uint64_t)smsg_mailbox_base;
1446     base_infor.mdh =  my_smsg_mdh_mailbox;
1447     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
1448
1449     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
1450  
1451     for(i=0; i<mysize; i++)
1452     {
1453         if(i==myrank)
1454             continue;
1455         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1456         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1457         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1458         smsg_attr[i].mbox_offset = i*smsg_memlen;
1459         smsg_attr[i].buff_size = smsg_memlen;
1460         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
1461         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1462     }
1463
1464     for(i=0; i<mysize; i++)
1465     {
1466         if (myrank == i) continue;
1467
1468         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1469         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1470         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1471         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
1472         remote_smsg_attr.buff_size = smsg_memlen;
1473         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
1474         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
1475
1476         /* initialize the smsg channel */
1477         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
1478         GNI_RC_CHECK("SMSG Init", status);
1479     } //end initialization
1480
1481     free(base_addr_vec);
1482
1483     free(smsg_attr);
1484     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
1485      GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
1486
1487
1488 inline
1489 static void _init_smsg()
1490 {
1491     int i;
1492
1493      smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
1494      for(i =0; i<mysize; i++)
1495      {
1496         smsg_msglist_index[i].next = -1;
1497         smsg_msglist_index[i].head = 0;
1498         smsg_msglist_index[i].tail = 0;
1499      }
1500      smsg_head_index = -1;
1501 }
1502
1503 static void _init_static_msgq()
1504 {
1505     gni_return_t status;
1506     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
1507     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
1508     msgq_attrs.smsg_q_sz = 1;
1509     msgq_attrs.rcv_pool_sz = 1;
1510     msgq_attrs.num_msgq_eps = 2;
1511     msgq_attrs.nloc_insts = 8;
1512     msgq_attrs.modes = 0;
1513     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
1514
1515     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
1516     GNI_RC_CHECK("MSGQ Init", status);
1517
1518
1519 }
1520
1521 static void _init_DMA_buffer()
1522 {
1523     gni_return_t            status = GNI_RC_SUCCESS;
1524     /*AUTO tuning */
1525     /* suppose max_smsg is 1024, DMA buffer is split into 2048, 4096, 8192, ... */
1526     /*  This method might be better for SMP, but it is bad for Nonsmp since msgs are sharing same slots */
1527     /*
1528      * DMA_slots = 19-log2_SMSG_MAX_MSG;
1529     DMA_incoming_avail_tag = malloc(DMA_slots);
1530     DMA_buffer_size = 2*(DMA_max_single_msg - SMSG_MAX_MSG); 
1531     DMA_incoming_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1532     DMA_outgoing_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1533     
1534     status = GNI_MemRegister(nic_hndl, (uint64_t)DMA_incoming_base_addr,
1535             DMA_buffer_size, smsg_rx_cqh,
1536             GNI_MEM_READWRITE ,   
1537             vmdh_index,
1538             &);
1539             */
1540     // one is reserved to avoid deadlock
1541     DMA_slots           = 17; // each one is 8K  16*8K + 1 slot reserved to avoid deadlock
1542     DMA_buffer_size     = DMA_max_single_msg + 8192;
1543     DMA_buffer_base_mdh_addr.addr = (uint64_t)memalign(ALIGNBUF, DMA_buffer_size);
1544     status = GNI_MemRegister(nic_hndl, DMA_buffer_base_mdh_addr.addr,
1545         DMA_buffer_size, smsg_rx_cqh,
1546         GNI_MEM_READWRITE ,   
1547         -1,
1548         &(DMA_buffer_base_mdh_addr.mdh));
1549     GNI_RC_CHECK("GNI_MemRegister", status);
1550     DMA_buffer_base_mdh_addr_vec = (mdh_addr_t*) malloc(sizeof(mdh_addr_t) * mysize);
1551
1552     allgather(&DMA_buffer_base_mdh_addr, DMA_buffer_base_mdh_addr_vec, sizeof(mdh_addr_t) );
1553 }
1554
1555 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
1556 {
1557     register int            i;
1558     int                     rc;
1559     int                     device_id = 0;
1560     unsigned int            remote_addr;
1561     gni_cdm_handle_t        cdm_hndl;
1562     gni_return_t            status = GNI_RC_SUCCESS;
1563     uint32_t                vmdh_index = -1;
1564     uint8_t                 ptag;
1565     unsigned int            local_addr, *MPID_UGNI_AllAddr;
1566     int                     first_spawned;
1567     int                     physicalID;
1568     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
1569     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
1570     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
1571    
1572     //Mempool_MaxSize = CmiGetArgFlag(*argv, "+useMemorypoolSize");
1573     useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
1574     if (myrank==0) 
1575     {
1576         if(useDynamicSMSG) 
1577             CmiPrintf("Charm++> use Dynamic SMSG\n"); 
1578         else 
1579             CmiPrintf("Charm++> use Static SMSG\n");
1580     };
1581     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
1582     
1583     status = PMI_Init(&first_spawned);
1584     GNI_RC_CHECK("PMI_Init", status);
1585
1586     status = PMI_Get_size(&mysize);
1587     GNI_RC_CHECK("PMI_Getsize", status);
1588
1589     status = PMI_Get_rank(&myrank);
1590     GNI_RC_CHECK("PMI_getrank", status);
1591
1592     //physicalID = CmiPhysicalNodeID(myrank);
1593     
1594     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
1595
1596     *myNodeID = myrank;
1597     *numNodes = mysize;
1598   
1599     if(myrank == 0)
1600     {
1601         printf("Charm++> Running on Gemini (GNI) using %d  cores\n", mysize);
1602     }
1603 #ifdef USE_ONESIDED
1604     onesided_init(NULL, &onesided_hnd);
1605
1606     // this is a GNI test, so use the libonesided bypass functionality
1607     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
1608     local_addr = gniGetNicAddress();
1609 #else
1610     ptag = get_ptag();
1611     cookie = get_cookie();
1612 #if 0
1613     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
1614 #endif
1615     //Create and attach to the communication  domain */
1616     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
1617     GNI_RC_CHECK("GNI_CdmCreate", status);
1618     //* device id The device id is the minor number for the device
1619     //that is assigned to the device by the system when the device is created.
1620     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
1621     //where X is the device number 0 default 
1622     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
1623     GNI_RC_CHECK("GNI_CdmAttach", status);
1624     local_addr = get_gni_nic_address(0);
1625 #endif
1626     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
1627     _MEMCHECK(MPID_UGNI_AllAddr);
1628     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
1629     /* create the local completion queue */
1630     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
1631     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
1632     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
1633     
1634     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
1635     //GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
1636     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
1637
1638     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
1639     GNI_RC_CHECK("Create CQ (rx)", status);
1640     
1641     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
1642     //GNI_RC_CHECK("Create Post CQ (rx)", status);
1643     
1644     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
1645     //GNI_RC_CHECK("Create BTE CQ", status);
1646
1647     /* create the endpoints. they need to be bound to allow later CQWrites to them */
1648     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
1649     _MEMCHECK(ep_hndl_array);
1650
1651     for (i=0; i<mysize; i++) {
1652         if(i == myrank) continue;
1653         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
1654         GNI_RC_CHECK("GNI_EpCreate ", status);   
1655         remote_addr = MPID_UGNI_AllAddr[i];
1656         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
1657         GNI_RC_CHECK("GNI_EpBind ", status);   
1658     }
1659     /* Depending on the number of cores in the job, decide different method */
1660     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
1661     if(mysize > 1)
1662     {
1663         if(useDynamicSMSG == 0)
1664         {
1665             _init_static_smsg();
1666         }else
1667         {
1668             _init_dynamic_smsg();
1669         }
1670
1671         _init_smsg();
1672     }
1673 #if     USE_LRTS_MEMPOOL
1674     CmiGetArgLong(*argv, "+useMemorypoolSize", &_mempool_size);
1675     if (myrank==0) CmiPrintf("Charm++> use memorypool size: %1.fMB\n", _mempool_size/1024.0/1024);
1676     init_mempool(_mempool_size);
1677     //init_mempool(Mempool_MaxSize);
1678 #endif
1679
1680     /* init DMA buffer for medium message */
1681
1682     //_init_DMA_buffer();
1683     
1684     free(MPID_UGNI_AllAddr);
1685 }
1686
1687
1688 void* LrtsAlloc(int n_bytes, int header)
1689 {
1690     void *ptr;
1691 #if 0
1692     CmiPrintf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d\n", CmiMyPe(), n_bytes, header);
1693 #endif
1694     if(n_bytes <= SMSG_MAX_MSG)
1695     {
1696         int totalsize = n_bytes+header;
1697         ptr = malloc(totalsize);
1698     }else 
1699     {
1700
1701         CmiAssert(header <= ALIGNBUF);
1702 #if     USE_LRTS_MEMPOOL
1703         n_bytes = ALIGN64(n_bytes);
1704         char *res = syh_mempool_malloc(ALIGNBUF+n_bytes);
1705 #else
1706         n_bytes = ALIGN4(n_bytes);           /* make sure size if 4 aligned */
1707         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1708 #endif
1709         ptr = res + ALIGNBUF - header;
1710     }
1711 #if 0 
1712     CmiPrintf("Done Alloc Lrts for bytes=%d, head=%d\n", n_bytes, header);
1713 #endif
1714     return ptr;
1715 }
1716
1717 void  LrtsFree(void *msg)
1718 {
1719     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1720     if (size <= SMSG_MAX_MSG)
1721       free(msg);
1722     else
1723     {
1724 #if 0
1725         CmiPrintf("[PE:%d] Free lrts for bytes=%d, ptr=%p\n", CmiMyPe(), size, (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1726 #endif
1727 #if     USE_LRTS_MEMPOOL
1728         syh_mempool_free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1729 #else
1730         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1731 #endif
1732     }
1733 #if 0 
1734     CmiPrintf("Done Free lrts for bytes=%d\n", size);
1735 #endif
1736 }
1737
1738 static void LrtsExit()
1739 {
1740     /* free memory ? */
1741 #if     USE_LRTS_MEMPOOL
1742     kill_allmempool();
1743 #endif
1744     PMI_Finalize();
1745     exit(0);
1746 }
1747
1748 static void LrtsDrainResources()
1749 {
1750     while (!SendBufferMsg()) {
1751         PumpNetworkSmsg();
1752         PumpNetworkRdmaMsgs();
1753         PumpLocalSmsgTransactions();
1754         PumpLocalRdmaTransactions();
1755     }
1756     PMI_Barrier();
1757 }
1758
1759 void CmiAbort(const char *message) {
1760
1761     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1762     PMI_Abort(-1, message);
1763 }
1764
1765 /**************************  TIMER FUNCTIONS **************************/
1766 #if CMK_TIMER_USE_SPECIAL
1767 /* MPI calls are not threadsafe, even the timer on some machines */
1768 static CmiNodeLock  timerLock = 0;
1769 static int _absoluteTime = 0;
1770 static int _is_global = 0;
1771 static struct timespec start_ns;
1772
1773 inline int CmiTimerIsSynchronized() {
1774     return 1;
1775 }
1776
1777 inline int CmiTimerAbsolute() {
1778     return _absoluteTime;
1779 }
1780
1781 double CmiStartTimer() {
1782     return 0.0;
1783 }
1784
1785 double CmiInitTime() {
1786     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
1787 }
1788
1789 void CmiTimerInit(char **argv) {
1790     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1791     if (_absoluteTime && CmiMyPe() == 0)
1792         printf("Charm++> absolute  timer is used\n");
1793     
1794     _is_global = CmiTimerIsSynchronized();
1795
1796
1797     if (_is_global) {
1798         if (CmiMyRank() == 0) {
1799             clock_gettime(CLOCK_MONOTONIC, &start_ts)
1800         }
1801     } else { /* we don't have a synchronous timer, set our own start time */
1802         CmiBarrier();
1803         CmiBarrier();
1804         CmiBarrier();
1805         clock_gettime(CLOCK_MONOTONIC, &start_ts)
1806     }
1807     CmiNodeAllBarrier();          /* for smp */
1808 }
1809
1810 /**
1811  * Since the timerLock is never created, and is
1812  * always NULL, then all the if-condition inside
1813  * the timer functions could be disabled right
1814  * now in the case of SMP.
1815  */
1816 double CmiTimer(void) {
1817     struct timespec now_ts;
1818     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1819     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1820         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1821 }
1822
1823 double CmiWallTimer(void) {
1824     struct timespec now_ts;
1825     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1826     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1827         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1828 }
1829
1830 double CmiCpuTimer(void) {
1831     struct timespec now_ts;
1832     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1833     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1834         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1835 }
1836
1837 #endif
1838 /************Barrier Related Functions****************/
1839
1840 int CmiBarrier()
1841 {
1842     int status;
1843     status = PMI_Barrier();
1844     return status;
1845
1846 }
1847
1848
1849 #if CMK_PERSISTENT_COMM
1850 #include "machine-persistent.c"
1851 #endif
1852
1853