smsg dynamic connection fix
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <stdint.h>
16 #include <errno.h>
17 #include <malloc.h>
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #if defined(_WIN32) && ! defined(__CYGWIN__)
26 #include <windows.h>
27 #include <wincon.h>
28 #include <sys/types.h>
29 #include <sys/timeb.h>
30
31 static void sleep(int secs) {
32     Sleep(1000*secs);
33 }
34 #else
35 #include <unistd.h> /*For getpid()*/
36 #endif
37
38
39 #define REMOTE_EVENT         0
40 #define USE_LRTS_MEMPOOL   1
41
42 #if USE_LRTS_MEMPOOL
43 static CmiInt8 _mempool_size = 1024ll*1024*32;
44 #endif
45
46 #define PRINT_SYH  0
47
48 int         rdma_id = 0;
49 #if PRINT_SYH
50 int         lrts_smsg_success = 0;
51 int         lrts_send_msg_id = 0;
52 int         lrts_send_rdma_success = 0;
53 int         lrts_received_msg = 0;
54 int         lrts_local_done_msg = 0;
55 #endif
56
57 #include "machine.h"
58
59 #include "pcqueue.h"
60
61 #if CMK_PERSISTENT_COMM
62 #include "machine-persistent.h"
63 #endif
64
65 //#define  USE_ONESIDED 1
66 #ifdef USE_ONESIDED
67 //onesided implementation is wrong, since no place to restore omdh
68 #include "onesided.h"
69 onesided_hnd_t   onesided_hnd;
70 onesided_md_t    omdh;
71 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
72
73 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
74
75 #else
76 uint8_t   onesided_hnd, omdh;
77 #if REMOTE_EVENT
78 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
79 #else
80 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
81 #endif
82 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
83 #endif
84
85 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
86 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
87
88 #define ALIGNBUF                64
89
90 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
91 /* If SMSG is not used */
92 #define FMA_PER_CORE  1024
93 #define FMA_BUFFER_SIZE 1024
94 /* If SMSG is used */
95 static int  SMSG_MAX_MSG;
96 //static int  log2_SMSG_MAX_MSG;
97 #define SMSG_MAX_CREDIT  36
98
99 #define MSGQ_MAXSIZE       2048
100 /* large message transfer with FMA or BTE */
101 #define LRTS_GNI_RDMA_THRESHOLD  2048
102
103 #define REMOTE_QUEUE_ENTRIES  2048 
104 #define LOCAL_QUEUE_ENTRIES   64 
105
106 #define PUT_DONE_TAG      0x29
107 #define ACK_TAG           0x30
108 /* SMSG is data message */
109 #define SMALL_DATA_TAG          0x31
110 /* SMSG is a control message to initialize a BTE */
111 #define MEDIUM_HEAD_TAG         0x32
112 #define MEDIUM_DATA_TAG         0x33
113 #define LMSG_INIT_TAG     0x39 
114
115 #define DEBUG
116 #ifdef GNI_RC_CHECK
117 #undef GNI_RC_CHECK
118 #endif
119 #ifdef DEBUG
120 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
121 #else
122 #define GNI_RC_CHECK(msg,rc)
123 #endif
124
125 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
126 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
127
128 static int Mempool_MaxSize = 1024*1024*128;
129 static int useDynamicSMSG   = 1;
130 static int useStaticMSGQ = 0;
131 static int useStaticFMA = 0;
132 static int mysize, myrank;
133 static gni_nic_handle_t      nic_hndl;
134
135 typedef struct {
136     gni_mem_handle_t mdh;
137     uint64_t addr;
138 } mdh_addr_t ;
139 // this is related to dynamic SMSG
140
141 typedef struct mdh_addr_list{
142     gni_mem_handle_t mdh;
143    void *addr;
144     struct mdh_addr_list *next;
145 }mdh_addr_list_t;
146
147 static unsigned int         smsg_memlen;
148 #define     SMSG_CONN_SIZE     sizeof(gni_smsg_attr_t)
149 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
150 int                 *smsg_connected_flag= 0;
151 char                *smsg_connection_addr = 0;
152 mdh_addr_t          setup_mem;
153 mdh_addr_t          *smsg_connection_vec = 0;
154 gni_mem_handle_t    smsg_connection_memhndl;
155 static int          smsg_expand_slots = 10;
156 static int          smsg_available_slot = 0;
157 static void         *smsg_mailbox_mempool = 0;
158 mdh_addr_list_t     *smsg_dynamic_list = 0;
159
160 static void             *smsg_mailbox_base;
161 gni_msgq_attr_t         msgq_attrs;
162 gni_msgq_handle_t       msgq_handle;
163 gni_msgq_ep_attr_t      msgq_ep_attrs;
164 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
165
166
167
168 /* preallocated DMA buffer */
169 int                     DMA_slots;
170 uint64_t                DMA_avail_tag = 0;
171 uint32_t                DMA_incoming_avail_tag = 0;
172 uint32_t                DMA_outgoing_avail_tag = 0;
173 void                    *DMA_incoming_base_addr;
174 void                    *DMA_outgoing_base_addr;
175 mdh_addr_t              DMA_buffer_base_mdh_addr;
176 mdh_addr_t              *DMA_buffer_base_mdh_addr_vec;
177 int                     DMA_buffer_size;
178 int                     DMA_max_single_msg = 131072;//524288 ;
179
180 #define                 DMA_SIZE_PER_SLOT       8192
181
182
183 typedef struct dma_msgid_map
184 {
185     uint64_t     msg_id;
186     int     msg_subid
187 } dma_msgid_map_t;
188
189 dma_msgid_map_t         *dma_map_list;
190
191 typedef struct msg_trace
192 {
193     uint64_t    msg_id;
194     int         done_num;
195 }msg_trace_t;
196
197 msg_trace_t             *pending_msg_list;
198 /* =====Beginning of Declarations of Machine Specific Variables===== */
199 static int cookie;
200 static int modes = 0;
201 static gni_cq_handle_t       smsg_rx_cqh = NULL;
202 static gni_cq_handle_t       smsg_tx_cqh = NULL;
203 static gni_cq_handle_t       post_rx_cqh = NULL;
204 static gni_cq_handle_t       post_tx_cqh = NULL;
205 static gni_ep_handle_t       *ep_hndl_array;
206
207 typedef    struct  pending_smg
208 {
209     int     inst_id;
210     struct  pending_smg *next;
211 } PENDING_GETNEXT;
212
213
214 typedef struct msg_list
215 {
216     uint32_t destNode;
217     uint32_t size;
218     void *msg;
219     struct msg_list *next;
220     struct msg_list *pehead_next;
221     uint8_t tag;
222 }MSG_LIST;
223
224 typedef struct medium_msg_list
225 {
226     uint32_t destNode;
227     uint32_t msg_id;
228     uint32_t msg_subid;
229     uint32_t remain_size;
230     void *msg;
231     struct medium_msg_list *next;
232 }MEDIUM_MSG_LIST;
233
234
235 typedef struct control_msg
236 {
237     uint64_t            source_addr;
238     int                 source;               /* source rank */
239     int                 length;
240     gni_mem_handle_t    source_mem_hndl;
241     struct control_msg *next;
242 }CONTROL_MSG;
243
244 typedef struct medium_msg_control
245 {
246     uint64_t            dma_offset;     //the dma_buffer for this block of msg
247     int                 msg_id;         //Id for the total index
248     int                 msg_subid;      //offset inside the message id 
249 }MEDIUM_MSG_CONTROL;
250
251 typedef struct  rmda_msg
252 {
253     int                   destNode;
254     gni_post_descriptor_t *pd;
255     struct  rmda_msg      *next;
256 }RDMA_REQUEST;
257
258 typedef struct  msg_list_index
259 {
260     int         next;
261     MSG_LIST    *head;
262     MSG_LIST    *tail;
263 } MSG_LIST_INDEX;
264
265 /* reuse PendingMsg memory */
266 static CONTROL_MSG          *control_freelist=0;
267 static MSG_LIST             *msglist_freelist=0;
268 static int                  smsg_head_index;
269 static MSG_LIST_INDEX       *smsg_msglist_index= 0;
270 static MSG_LIST             *smsg_free_head=0;
271 static MSG_LIST             *smsg_free_tail=0;
272
273 /*
274 #define FreeMsgList(msg_head, msg_tail, free_head, free_tail)       \
275     if(free_head == 0)  free_head = free_tail = msg_head;    \
276     else   free_tail = free_tail->next;    \
277     if( msg_head->next == msg_tail) msg_head =0;   \
278     else msg_head= msg_head->next;    
279
280 #define MallocMsgList(d, msg_head, msg_tail, free_head, free_tail, msgsize) \
281     if(free_head == 0) {d= malloc(msgsize);  \
282         if(msg_head == 0)   msg_head =msg_tail = msg_head->next = msg_tail->next = d; \
283         else { msg_tail->next = d; d->next = msg_head; msg_tail=d;} \
284     }else {d = free_head; free_head = free_head->next; if(free_tail->next == free_head) free_head =0;} \
285 */
286
287 #define FreeMsgList(d)  \
288   (d)->next = msglist_freelist;\
289   msglist_freelist = d;
290
291
292
293 #define MallocMsgList(d) \
294   d = msglist_freelist;\
295   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
296              _MEMCHECK(d);\
297   } else msglist_freelist = d->next;
298
299
300 #define FreeControlMsg(d)       \
301   (d)->next = control_freelist;\
302   control_freelist = d;
303
304 #define MallocControlMsg(d) \
305   d = control_freelist;\
306   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
307              _MEMCHECK(d);\
308   } else control_freelist = d->next;
309
310
311 static RDMA_REQUEST         *rdma_freelist = NULL;
312
313 #define FreeControlMsg(d)       \
314   (d)->next = control_freelist;\
315   control_freelist = d;
316
317 #define MallocControlMsg(d) \
318   d = control_freelist;\
319   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
320              _MEMCHECK(d);\
321   } else control_freelist = d->next;
322
323 #define FreeMediumControlMsg(d)       \
324   (d)->next = medium_control_freelist;\
325   medium_control_freelist = d;
326
327
328 #define MallocMediumControlMsg(d) \
329     d = medium_control_freelist;\
330     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
331     _MEMCHECK(d);\
332 } else mediumcontrol_freelist = d->next;
333
334 #define FreeRdmaRequest(d)       \
335   (d)->next = rdma_freelist;\
336   rdma_freelist = d;
337
338 #define MallocRdmaRequest(d) \
339   d = rdma_freelist;\
340   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
341              _MEMCHECK(d);\
342   } else rdma_freelist = d->next;
343
344 /* reuse gni_post_descriptor_t */
345 static gni_post_descriptor_t *post_freelist=0;
346
347 #if 1
348 #define FreePostDesc(d)       \
349     (d)->next_descr = post_freelist;\
350     post_freelist = d;
351
352 #define MallocPostDesc(d) \
353   d = post_freelist;\
354   if (d==0) { \
355      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
356      _MEMCHECK(d);\
357   } else post_freelist = d->next_descr;
358 #else
359
360 #define FreePostDesc(d)     free(d);
361 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
362
363 #endif
364
365 static PENDING_GETNEXT     *pending_smsg_head = 0;
366 static PENDING_GETNEXT     *pending_smsg_tail = 0;
367
368 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
369 static int      buffered_smsg_counter = 0;
370
371 /* SmsgSend return success but message sent is not confirmed by remote side */
372
373 static RDMA_REQUEST  *pending_rdma_head = 0;
374 static RDMA_REQUEST  *pending_rdma_tail = 0;
375 static MSG_LIST *buffered_fma_head = 0;
376 static MSG_LIST *buffered_fma_tail = 0;
377
378 /* functions  */
379 #define IsFree(a,ind)  !( a& (1<<(ind) ))
380 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
381 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
382
383 #include "mempool.c"
384
385 /* get the upper bound of log 2 */
386 int mylog2(int size)
387 {
388     int op = size;
389     unsigned int ret=0;
390     unsigned int mask = 0;
391     int i;
392     while(op>0)
393     {
394         op = op >> 1;
395         ret++;
396
397     }
398     for(i=1; i<ret; i++)
399     {
400         mask = mask << 1;
401         mask +=1;
402     }
403
404     ret -= ((size &mask) ? 0:1);
405     return ret;
406 }
407
408 static void
409 allgather(void *in,void *out, int len)
410 {
411     static int *ivec_ptr=NULL,already_called=0,job_size=0;
412     int i,rc;
413     int my_rank;
414     char *tmp_buf,*out_ptr;
415
416     if(!already_called) {
417
418         rc = PMI_Get_size(&job_size);
419         CmiAssert(rc == PMI_SUCCESS);
420         rc = PMI_Get_rank(&my_rank);
421         CmiAssert(rc == PMI_SUCCESS);
422
423         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
424         CmiAssert(ivec_ptr != NULL);
425
426         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
427         CmiAssert(rc == PMI_SUCCESS);
428
429         already_called = 1;
430
431     }
432
433     tmp_buf = (char *)malloc(job_size * len);
434     CmiAssert(tmp_buf);
435
436     rc = PMI_Allgather(in,tmp_buf,len);
437     CmiAssert(rc == PMI_SUCCESS);
438
439     out_ptr = out;
440
441     for(i=0;i<job_size;i++) {
442
443         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
444
445     }
446
447     free(tmp_buf);
448 }
449 static void
450 allgather_2(void *in,void *out, int len)
451 {
452     //PMI_Allgather is out of order
453     int i,rc, extend_len;
454     int  rank_index;
455     char *out_ptr, *out_ref;
456     char *in2;
457
458     extend_len = sizeof(int) + len;
459     in2 = (char*)malloc(extend_len);
460
461     memcpy(in2, &myrank, sizeof(int));
462     memcpy(in2+sizeof(int), in, len);
463
464     out_ptr = (char*)malloc(mysize*extend_len);
465
466     rc = PMI_Allgather(in2, out_ptr, extend_len);
467     GNI_RC_CHECK("allgather", rc);
468
469     out_ref = out;
470
471     for(i=0;i<mysize;i++) {
472         //rank index 
473         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
474         //copy to the rank index slot
475         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
476     }
477
478     free(out_ptr);
479     free(in2);
480
481 }
482
483 static unsigned int get_gni_nic_address(int device_id)
484 {
485     unsigned int address, cpu_id;
486     gni_return_t status;
487     int i, alps_dev_id=-1,alps_address=-1;
488     char *token, *p_ptr;
489
490     p_ptr = getenv("PMI_GNI_DEV_ID");
491     if (!p_ptr) {
492         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
493        
494         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
495     } else {
496         while ((token = strtok(p_ptr,":")) != NULL) {
497             alps_dev_id = atoi(token);
498             if (alps_dev_id == device_id) {
499                 break;
500             }
501             p_ptr = NULL;
502         }
503         CmiAssert(alps_dev_id != -1);
504         p_ptr = getenv("PMI_GNI_LOC_ADDR");
505         CmiAssert(p_ptr != NULL);
506         i = 0;
507         while ((token = strtok(p_ptr,":")) != NULL) {
508             if (i == alps_dev_id) {
509                 alps_address = atoi(token);
510                 break;
511             }
512             p_ptr = NULL;
513             ++i;
514         }
515         CmiAssert(alps_address != -1);
516         address = alps_address;
517     }
518     return address;
519 }
520
521 static uint8_t get_ptag(void)
522 {
523     char *p_ptr, *token;
524     uint8_t ptag;
525
526     p_ptr = getenv("PMI_GNI_PTAG");
527     CmiAssert(p_ptr != NULL);
528     token = strtok(p_ptr, ":");
529     ptag = (uint8_t)atoi(token);
530     return ptag;
531         
532 }
533
534 static uint32_t get_cookie(void)
535 {
536     uint32_t cookie;
537     char *p_ptr, *token;
538
539     p_ptr = getenv("PMI_GNI_COOKIE");
540     CmiAssert(p_ptr != NULL);
541     token = strtok(p_ptr, ":");
542     cookie = (uint32_t)atoi(token);
543
544     return cookie;
545 }
546
547 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
548 /* TODO: add any that are related */
549 /* =====End of Definitions of Message-Corruption Related Macros=====*/
550
551
552 #include "machine-lrts.h"
553 #include "machine-common-core.c"
554
555 /* Network progress function is used to poll the network when for
556    messages. This flushes receive buffers on some  implementations*/
557 #if CMK_MACHINE_PROGRESS_DEFINED
558 void CmiMachineProgressImpl() {
559 }
560 #endif
561
562 inline
563 static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
564 {
565     MSG_LIST        *msg_tmp;
566     MallocMsgList(msg_tmp);
567     msg_tmp->destNode = destNode;
568     msg_tmp->size   = size;
569     msg_tmp->msg    = msg;
570     msg_tmp->tag    = tag;
571     msg_tmp->next   = 0;
572     if (smsg_msglist_index[destNode].head == 0 ) {
573         smsg_msglist_index[destNode].head = msg_tmp;
574         smsg_msglist_index[destNode].next = smsg_head_index;
575         smsg_head_index = destNode;
576     }
577     else {
578       (smsg_msglist_index[destNode].tail)->next    = msg_tmp;
579     }
580     smsg_msglist_index[destNode].tail          = msg_tmp;
581 #if PRINT_SYH
582     buffered_smsg_counter++;
583 #endif
584 }
585
586 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
587 {
588     //CmiPrintf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
589 }
590
591 inline
592 static void setup_smsg_connection(int destNode)
593 {
594     mdh_addr_list_t  *new_entry = 0;
595     gni_post_descriptor_t *pd;
596     gni_smsg_attr_t      *smsg_attr;
597     gni_return_t status = GNI_RC_NOT_DONE;
598     if(smsg_available_slot == smsg_expand_slots)
599     {
600         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
601         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
602         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
603
604         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
605             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
606             GNI_MEM_READWRITE,   
607             -1,
608             &(new_entry->mdh));
609         smsg_available_slot = 0; 
610         new_entry->next = smsg_dynamic_list;
611         smsg_dynamic_list = new_entry;
612     }
613     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
614     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
615     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
616     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
617     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
618     smsg_attr->buff_size = smsg_memlen;
619     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
620     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
621     smsg_local_attr_vec[destNode] = smsg_attr;
622     smsg_available_slot++;
623     MallocPostDesc(pd);
624     pd->type            = GNI_POST_FMA_PUT;
625     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
626 //pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
627     pd->length          = sizeof(gni_smsg_attr_t);
628     pd->local_addr      = (uint64_t) smsg_attr;
629     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
630     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
631     pd->src_cq_hndl     = 0;
632     pd->rdma_mode       = 0;
633     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
634     //CmiPrintf("[%d=%d] send post FMA successful\n", myrank, destNode);
635     print_smsg_attr(smsg_attr);
636     GNI_RC_CHECK("SMSG Dynamic link", status);
637 }
638
639 inline 
640 static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
641 {
642     gni_return_t status = GNI_RC_NOT_DONE;
643     gni_smsg_attr_t      *smsg_attr;
644     gni_post_descriptor_t *pd;
645   
646     if(useDynamicSMSG == 1)
647     {
648         if(smsg_connected_flag[destNode] == 0)
649         {
650             //CmiPrintf("[%d]Init smsg connection\n", CmiMyPe());
651             setup_smsg_connection(destNode);
652             delay_send_small_msg(msg, size, destNode, tag);
653             smsg_connected_flag[destNode] =10;
654             return status;
655         }
656         else  if(smsg_connected_flag[destNode] <20)
657         {
658             if(inbuff == 0)
659                 delay_send_small_msg(msg, size, destNode, tag);
660             return status;
661         }
662     }
663     //CmiPrintf("[%d] reach send\n", myrank);
664     if(smsg_msglist_index[destNode].head == 0 || inbuff==1)
665     {
666         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], header, size_header, msg, size, 0, tag);
667         if(status == GNI_RC_SUCCESS)
668         {
669 #if PRINT_SYH
670             lrts_smsg_success++;
671             if(lrts_smsg_success == lrts_send_msg_id)
672                 CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
673             else
674                 CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
675 #endif
676             return status;
677         }
678     }
679     if(inbuff ==0)
680         delay_send_small_msg(msg, size, destNode, tag);
681     return status;
682 }
683
684 // Get first 0 in DMA_tags starting from index
685 static int get_first_avail_bit(uint64_t DMA_tags, int start_index)
686 {
687
688     uint64_t         mask = 0x1;
689     register    int     i=0;
690     while((DMA_tags & mask) && i<DMA_slots) {mask << 1; i++;}
691
692 }
693
694 static int send_medium_messages(int destNode, int size, char *msg)
695 {
696 #if 0
697     gni_return_t status = GNI_RC_SUCCESS;
698     int first_avail_bit=0;
699     uint64_t mask = 0x1;
700     MEDIUM_MSG_CONTROL  *medium_msg_control_tmp;
701     MEDIUM_MSG_LIST        *msg_tmp;
702     int blocksize, remain_size, pos;
703     int sub_id = 0;
704     remain_size = size;
705     pos = 0;  //offset before which data are sent
706     /* copy blocks of the message to DMA preallocated buffer and send SMSG */
707     //Check whether there is any available DMA buffer
708     
709     do{
710         while((DMA_avail_tag & mask) && first_avail_bit<DMA_slots) {mask << 1; first_avail_bit++;}
711         if(first_avail_bit == DMA_slots) //No available DMA, buffer this message
712         {
713             MallocMediumMsgList(msg_tmp);
714             msg_tmp->destNode = destNode;
715             msg_tmp->msg_id   = lrts_send_msg_id;
716             msg_tmp->msg_subid   = sub_id;
717             msg_tmp->size   = remain_size;
718             msg_tmp->msg    = msg+pos;
719             msg_tmp->next   = NULL;
720             break;
721         }else
722         {
723             //copy this part of the message into this DMA buffer
724             //TODO optimize here, some data can go with this SMSG
725             blocksize = (remain_size>DMA_SIZE_PER_SLOT)?DMA_SIZE_PER_SLOT: remain_size;
726             memcpy(DMA_buffer_base_mdh_addr.addr[first_avail_bit], msg+pos, blocksize);
727             pos += blocksize;
728             remain_size -= blocksize;
729             SET_BITS(DMA_avail_tag, first_avail_bit);
730            
731             MallocMediumControlMsg(medium_msg_control_tmp);
732             medium_msg_control_tmp->msg_id = lrts_send_msg_id;
733             medium_msg_control_tmp->msg_subid = sub_id;
734             if(status == GNI_RC_SUCCESS)
735             {
736                 if(sub_id==0)
737                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_HEAD_TAG);
738                 else
739                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_DATA_TAG);
740             }
741             //buffer this smsg
742             if(status != GNI_RC_SUCCESS)
743             {
744                 delay_send_small_msg(medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), destNode, MEDIUM_HEAD_TAG);
745             }
746             sub_id++;
747         }while(remain_size > 0 );
748
749         }
750     }
751 #endif
752 }
753
754 // Large message, send control to receiver, receiver register memory and do a GET 
755 inline
756 static void send_large_messages(int destNode, int size, char *msg)
757 {
758 #if     USE_LRTS_MEMPOOL
759     gni_return_t        status  =   GNI_RC_SUCCESS;
760     CONTROL_MSG         *control_msg_tmp;
761     uint32_t            vmdh_index  = -1;
762     /* construct a control message and send */
763     MallocControlMsg(control_msg_tmp);
764     control_msg_tmp->source_addr    = (uint64_t)msg;
765     control_msg_tmp->source         = myrank;
766     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
767     //memcpy( &(control_msg_tmp->source_mem_hndl), GetMemHndl(msg), sizeof(gni_mem_handle_t)) ;
768     control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
769 #if PRINT_SYH
770     lrts_send_msg_id++;
771     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
772 #endif
773     status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
774     if(status == GNI_RC_SUCCESS)
775     {
776         FreeControlMsg(control_msg_tmp);
777     }
778 // NOT use mempool, should slow 
779 #else
780     gni_return_t        status  =   GNI_RC_SUCCESS;
781     CONTROL_MSG         *control_msg_tmp;
782     uint32_t            vmdh_index  = -1;
783     /* construct a control message and send */
784     MallocControlMsg(control_msg_tmp);
785     control_msg_tmp->source_addr    = (uint64_t)msg;
786     control_msg_tmp->source         = myrank;
787     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
788     control_msg_tmp->source_mem_hndl.qword1 = 0;
789     control_msg_tmp->source_mem_hndl.qword2 = 0;
790 #if PRINT_SYH
791     lrts_send_msg_id++;
792     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
793 #endif
794     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
795     if(status == GNI_RC_SUCCESS)
796     {
797         status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
798         if(status == GNI_RC_SUCCESS)
799         {
800             FreeControlMsg(control_msg_tmp);
801         }
802     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
803     {
804         CmiAbort("Memory registor for large msg\n");
805     }else 
806     {
807         delay_send_small_msg(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
808     }
809 #endif
810 }
811
812 inline void LrtsPrepareEnvelope(char *msg, int size)
813 {
814     CmiSetMsgSize(msg, size);
815 }
816
817 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
818 {
819     gni_return_t        status  =   GNI_RC_SUCCESS;
820     LrtsPrepareEnvelope(msg, size);
821
822     if(size <= SMSG_MAX_MSG)
823     {
824 #if PRINT_SYH
825         lrts_send_msg_id++;
826         CmiPrintf("SMSG LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_msg_id);
827 #endif
828         status = send_smsg_message( destNode, 0, 0, msg, size, SMALL_DATA_TAG, 0);  
829         if(status == GNI_RC_SUCCESS)
830         {
831             CmiFree(msg);
832         }
833     }
834     else
835     {
836         send_large_messages(destNode, size, msg);
837     }
838     return 0;
839 }
840
841 static void LrtsPreCommonInit(int everReturn){}
842
843 /* Idle-state related functions: called in non-smp mode */
844 void CmiNotifyIdleForGemini(void) {
845     LrtsAdvanceCommunication();
846 }
847
848 static void LrtsPostCommonInit(int everReturn)
849 {
850 #if CMK_SMP
851     CmiIdleState *s=CmiNotifyGetState();
852     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
853     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
854 #else
855     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForGemini,NULL);
856 #endif
857
858 }
859
860
861 void LrtsPostNonLocal(){}
862 /* pooling CQ to receive network message */
863 static void PumpNetworkRdmaMsgs()
864 {
865     gni_cq_entry_t      event_data;
866     gni_return_t        status;
867     while( (status = GNI_CqGetEvent(post_rx_cqh, &event_data)) == GNI_RC_SUCCESS);
868 }
869
870 static int SendRdmaMsg();
871 static void getLargeMsgRequest(void* header, uint64_t inst_id);
872 static void PumpNetworkSmsg()
873 {
874     uint64_t            inst_id;
875     PENDING_GETNEXT     *pending_next;
876     int                 ret;
877     gni_cq_entry_t      event_data;
878     gni_return_t        status;
879     void                *header;
880     uint8_t             msg_tag;
881     int                 msg_nbytes;
882     void                *msg_data;
883     gni_mem_handle_t    msg_mem_hndl;
884     gni_smsg_attr_t     *smsg_attr;
885     gni_smsg_attr_t     *remote_smsg_attr;
886     int                 init_flag;
887     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
888     {
889         inst_id = GNI_CQ_GET_INST_ID(event_data);
890         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
891 #if PRINT_SYH
892         CmiPrintf("[%d] PumpNetworkMsgs small msgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
893 #endif
894
895         rdma_id++;
896         if(useDynamicSMSG == 1)
897         {
898             init_flag = smsg_connected_flag[inst_id];
899             //CmiPrintf("[%d] initflag=%d\n", myrank, init_flag);
900             if(init_flag == 0 )
901             {
902                 CmiPrintf("[%d=%d]pump Init smsg connection id=%d\n", myrank, inst_id, rdma_id);
903                 smsg_connected_flag[inst_id] =20;
904                 setup_smsg_connection(inst_id);
905                 remote_smsg_attr = &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]);
906                 status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id],  remote_smsg_attr);
907                 GNI_RC_CHECK("no send SmsgInit", status);
908                 continue;
909             } else if (init_flag <20) 
910             {
911                 CmiPrintf("[%d==%d]pump setup smsg connection id=%d\n", myrank, inst_id, rdma_id);
912                 smsg_connected_flag[inst_id] = 20;
913                 remote_smsg_attr = &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]);
914                 status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id],  remote_smsg_attr);
915                 print_smsg_attr(remote_smsg_attr);
916                 GNI_RC_CHECK("send once SmsgInit", status);
917                 continue;
918             }
919         }
920         msg_tag = GNI_SMSG_ANY_TAG;
921         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
922         {
923 #if PRINT_SYH
924             lrts_received_msg++;
925             CmiPrintf("+++[%d] PumpNetwork msg is received, messageid:%d tag=%d\n", myrank, lrts_received_msg, msg_tag);
926 #endif
927             /* copy msg out and then put into queue (small message) */
928             switch (msg_tag) {
929             case SMALL_DATA_TAG:
930             {
931                 msg_nbytes = CmiGetMsgSize(header);
932                 msg_data    = CmiAlloc(msg_nbytes);
933                 memcpy(msg_data, (char*)header, msg_nbytes);
934                 handleOneRecvedMsg(msg_nbytes, msg_data);
935                 break;
936             }
937             case LMSG_INIT_TAG:
938             {
939 #if PRINT_SYH
940                 CmiPrintf("+++[%d] from %d PumpNetwork Rdma Request msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
941 #endif
942                 getLargeMsgRequest(header, inst_id);
943                 break;
944             }
945             case ACK_TAG:
946             {
947                 /* Get is done, release message . Now put is not used yet*/
948 #if         !USE_LRTS_MEMPOOL
949                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
950 #endif
951                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
952                 SendRdmaMsg();
953                 break;
954             }
955             case PUT_DONE_TAG: //persistent message
956             {
957                 handleOneRecvedMsg(((CONTROL_MSG *) header)->length,((void*) (CONTROL_MSG *) header)); 
958                 break;
959             }
960             default: {
961                 CmiPrintf("weird tag problem\n");
962                 CmiAbort("Unknown tag\n");
963             }
964             }
965             GNI_SmsgRelease(ep_hndl_array[inst_id]);
966             msg_tag = GNI_SMSG_ANY_TAG;
967         } //endwhile getNext
968     }   //end while GetEvent
969     if(status == GNI_RC_ERROR_RESOURCE)
970     {
971         GNI_RC_CHECK("Smsg_rx_cq full", status);
972     }
973 }
974
975 static void getLargeMsgRequest(void* header, uint64_t inst_id)
976 {
977 #if     USE_LRTS_MEMPOOL
978     CONTROL_MSG         *request_msg;
979     gni_return_t        status;
980     void                *msg_data;
981     gni_post_descriptor_t *pd;
982     RDMA_REQUEST        *rdma_request_msg;
983     gni_mem_handle_t    msg_mem_hndl;
984     int source;
985     // initial a get to transfer data from the sender side */
986     request_msg = (CONTROL_MSG *) header;
987     source = request_msg->source;
988     msg_data = CmiAlloc(request_msg->length);
989     _MEMCHECK(msg_data);
990     //memcpy(&msg_mem_hndl, GetMemHndl(msg_data), sizeof(gni_mem_handle_t));
991     msg_mem_hndl = GetMemHndl(msg_data);
992
993     MallocPostDesc(pd);
994     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
995         pd->type            = GNI_POST_FMA_GET;
996     else
997         pd->type            = GNI_POST_RDMA_GET;
998 #if REMOTE_EVENT
999     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1000 #else
1001     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1002 #endif
1003     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1004     pd->length          = ALIGN4(request_msg->length);
1005     pd->local_addr      = (uint64_t) msg_data;
1006     pd->local_mem_hndl  = msg_mem_hndl;
1007     pd->remote_addr     = request_msg->source_addr;
1008     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1009     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1010     pd->rdma_mode       = 0;
1011
1012     if(pd->type == GNI_POST_RDMA_GET) 
1013         status = GNI_PostRdma(ep_hndl_array[source], pd);
1014     else
1015         status = GNI_PostFma(ep_hndl_array[source],  pd);
1016     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1017     {
1018         MallocRdmaRequest(rdma_request_msg);
1019         rdma_request_msg->next = 0;
1020         rdma_request_msg->destNode = inst_id;
1021         rdma_request_msg->pd = pd;
1022         if(pending_rdma_head == 0)
1023         {
1024             pending_rdma_head = rdma_request_msg;
1025         }else
1026         {
1027             pending_rdma_tail->next = rdma_request_msg;
1028         }
1029         pending_rdma_tail = rdma_request_msg;
1030     }else
1031         GNI_RC_CHECK("AFter posting", status);
1032
1033 #else
1034     CONTROL_MSG         *request_msg;
1035     gni_return_t        status;
1036     void                *msg_data;
1037     gni_post_descriptor_t *pd;
1038     RDMA_REQUEST        *rdma_request_msg;
1039     gni_mem_handle_t    msg_mem_hndl;
1040     int source;
1041     // initial a get to transfer data from the sender side */
1042     request_msg = (CONTROL_MSG *) header;
1043     source = request_msg->source;
1044     msg_data = CmiAlloc(request_msg->length);
1045     _MEMCHECK(msg_data);
1046
1047     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
1048
1049     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1050     {
1051         GNI_RC_CHECK("Mem Register before post", status);
1052     }
1053
1054     MallocPostDesc(pd);
1055     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
1056         pd->type            = GNI_POST_FMA_GET;
1057     else
1058         pd->type            = GNI_POST_RDMA_GET;
1059 #if REMOTE_EVENT
1060     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1061 #else
1062     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1063 #endif
1064     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1065     pd->length          = ALIGN4(request_msg->length);
1066     pd->local_addr      = (uint64_t) msg_data;
1067     pd->remote_addr     = request_msg->source_addr;
1068     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1069     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1070     pd->rdma_mode       = 0;
1071
1072     //memory registration successful
1073     if(status == GNI_RC_SUCCESS)
1074     {
1075         pd->local_mem_hndl  = msg_mem_hndl;
1076         if(pd->type == GNI_POST_RDMA_GET) 
1077             status = GNI_PostRdma(ep_hndl_array[source], pd);
1078         else
1079             status = GNI_PostFma(ep_hndl_array[source],  pd);
1080     }else
1081     {
1082         pd->local_mem_hndl.qword1  = 0; 
1083         pd->local_mem_hndl.qword1  = 0; 
1084     }
1085     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1086     {
1087         MallocRdmaRequest(rdma_request_msg);
1088         rdma_request_msg->next = 0;
1089         rdma_request_msg->destNode = inst_id;
1090         rdma_request_msg->pd = pd;
1091         if(pending_rdma_head == 0)
1092         {
1093             pending_rdma_head = rdma_request_msg;
1094         }else
1095         {
1096             pending_rdma_tail->next = rdma_request_msg;
1097         }
1098         pending_rdma_tail = rdma_request_msg;
1099     }else
1100         GNI_RC_CHECK("AFter posting", status);
1101 #endif
1102 }
1103
1104 /* Check whether message send or get is confirmed by remote */
1105 static void PumpLocalSmsgTransactions()
1106 {
1107     gni_return_t            status;
1108     gni_cq_entry_t          ev;
1109     while ((status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1110     {
1111 #if PRINT_SYH
1112         lrts_local_done_msg++;
1113         //CmiPrintf("*[%d]  PumpLocalSmsgTransactions GNI_CQ_GET_TYPE %d. Localdone=%d\n", myrank, GNI_CQ_GET_TYPE(ev), lrts_local_done_msg);
1114 #endif
1115         if(GNI_CQ_OVERRUN(ev))
1116         {
1117             CmiPrintf("Overrun detected in local CQ");
1118             CmiAbort("Overrun in TX");
1119         }
1120     }
1121     if(status == GNI_RC_ERROR_RESOURCE)
1122     {
1123         GNI_RC_CHECK("Smsg_tx_cq full", status);
1124     }
1125 }
1126
1127 static void PumpLocalRdmaTransactions()
1128 {
1129     gni_cq_entry_t          ev;
1130     gni_return_t            status;
1131     uint64_t                type, inst_id;
1132     gni_post_descriptor_t   *tmp_pd;
1133     MSG_LIST                *ptr;
1134     CONTROL_MSG             *ack_msg_tmp;
1135     uint8_t             msg_tag;
1136
1137    // while ( (status = GNI_CqGetEvent(post_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1138     while ( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1139     {
1140         type        = GNI_CQ_GET_TYPE(ev);
1141 #if PRINT_SYH
1142         //lrts_local_done_msg++;
1143         CmiPrintf("**[%d] SMSGPumpLocalTransactions (type=%d)\n", myrank, type);
1144 #endif
1145         if (type == GNI_CQ_EVENT_TYPE_POST)
1146         {
1147             inst_id     = GNI_CQ_GET_INST_ID(ev);
1148 #if PRINT_SYH
1149             CmiPrintf("**[%d] SMSGPumpLocalTransactions localdone=%d, %d\n", myrank,  lrts_local_done_msg, smsg_connected_flag[inst_id]);
1150 #endif
1151             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
1152             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1153             ////Message is sent, free message , put is not used now
1154             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
1155             {
1156                     continue;
1157                 //else  
1158                 {
1159                     //persistent message 
1160                     CmiFree((void *)tmp_pd->local_addr);
1161                     msg_tag = PUT_DONE_TAG; 
1162                 }
1163             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1164             {
1165                 msg_tag = ACK_TAG;  
1166             }
1167             MallocControlMsg(ack_msg_tmp);
1168             ack_msg_tmp->source             = myrank;
1169             ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
1170             ack_msg_tmp->length             = tmp_pd->length; 
1171             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1172 #if PRINT_SYH
1173             lrts_send_msg_id++;
1174             CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
1175 #endif
1176             status = send_smsg_message(inst_id, 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0);  
1177             if(status == GNI_RC_SUCCESS)
1178             {
1179                 FreeControlMsg(ack_msg_tmp);
1180             }
1181 #if     !USE_LRTS_MEMPOOL
1182             MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1183 #endif
1184             CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1185             handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1186             SendRdmaMsg(); 
1187             FreePostDesc(tmp_pd);
1188         }
1189     } //end while
1190 }
1191
1192 static int SendRdmaMsg()
1193 {
1194     gni_return_t            status = GNI_RC_SUCCESS;
1195     gni_mem_handle_t        msg_mem_hndl;
1196
1197     RDMA_REQUEST *ptr = pending_rdma_head;
1198     RDMA_REQUEST *prev = NULL;
1199
1200     while (ptr != NULL)
1201     {
1202         gni_post_descriptor_t *pd = ptr->pd;
1203         status = GNI_RC_SUCCESS;
1204         // register memory first
1205         if( pd->local_mem_hndl.qword1 == 0 && pd->local_mem_hndl.qword2 == 0)
1206         {
1207             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1208         }
1209         if(status == GNI_RC_SUCCESS)
1210         {
1211             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1212                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1213             else
1214                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1215             if(status == GNI_RC_SUCCESS)
1216             {
1217                 RDMA_REQUEST *tmp = ptr;
1218                 if (prev)
1219                   prev->next = ptr->next;
1220                 else
1221                   pending_rdma_head = ptr->next;
1222                 ptr = ptr->next;
1223                 FreeRdmaRequest(tmp);
1224                 continue;
1225             }
1226         }
1227         prev = ptr;
1228         ptr = ptr->next;
1229     } //end while
1230     return 0;
1231 }
1232
1233 // return 1 if all messages are sent
1234 static int SendBufferMsg()
1235 {
1236     MSG_LIST            *ptr, *previous_head, *current_head;
1237     CONTROL_MSG         *control_msg_tmp;
1238     gni_return_t        status;
1239     int done = 1;
1240     register    int     i;
1241     int                 index_previous = -1;
1242     int                 index = smsg_head_index;
1243     //if( smsg_msglist_head == 0 && buffered_smsg_counter!= 0 ) {CmiPrintf("WRONGWRONG on rank%d, buffermsg=%d, (msgid-succ:%d)\n", myrank, buffered_smsg_counter, (lrts_send_msg_id-lrts_smsg_success)); CmiAbort("sendbuf");}
1244     /* can add flow control here to control the number of messages sent before handle message */
1245     while(index != -1)
1246     {
1247         ptr = smsg_msglist_index[index].head;
1248        
1249         while(ptr!=0)
1250         {
1251             CmiAssert(ptr!=NULL);
1252             if(ptr->tag == SMALL_DATA_TAG)
1253             {
1254                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, ptr->size, SMALL_DATA_TAG, 1);  
1255                 if(status == GNI_RC_SUCCESS)
1256                 {
1257                     CmiFree(ptr->msg);
1258                 }
1259             }
1260             else if(ptr->tag == LMSG_INIT_TAG)
1261             {
1262                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1263 #if PRINT_SYH
1264                 CmiPrintf("[%d==>%d] LMSG buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1265 #endif
1266                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
1267                 {
1268                     MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, control_msg_tmp->length, &(control_msg_tmp->source_mem_hndl), &omdh);
1269                     if(status != GNI_RC_SUCCESS) {
1270                         done = 0;
1271                         break;
1272                     }
1273                 }
1274                 
1275                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 1);  
1276                 if(status == GNI_RC_SUCCESS)
1277                 {
1278                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
1279                 }
1280             }else if (ptr->tag == ACK_TAG)
1281             {
1282                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), ACK_TAG, 1);  
1283                 if(status == GNI_RC_SUCCESS)
1284                 {
1285                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1286                 }
1287             }else
1288             {
1289                 CmiPrintf("Weird tag\n");
1290                 CmiAbort("should not happen\n");
1291             }
1292             if(status == GNI_RC_SUCCESS)
1293             {
1294 #if PRINT_SYH
1295                 buffered_smsg_counter--;
1296                 if(lrts_smsg_success == lrts_send_msg_id)
1297                     CmiPrintf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1298                 else
1299                     CmiPrintf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1300 #endif
1301                 smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
1302                 FreeMsgList(ptr);
1303                 ptr= smsg_msglist_index[index].head;
1304
1305             }else {
1306                 done = 0;
1307                 break;
1308             } 
1309         
1310         } //end while
1311         if(ptr == 0)
1312         {
1313             if(index_previous != -1)
1314                 smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next;
1315             else
1316                 smsg_head_index = smsg_msglist_index[index].next;
1317         }else
1318         {
1319             index_previous = index;
1320         }
1321         index = smsg_msglist_index[index].next;
1322     }   // end pooling for all cores
1323     return done;
1324 }
1325
1326 static void LrtsAdvanceCommunication()
1327 {
1328     /*  Receive Msg first */
1329 #if 0
1330     if(myrank == 0)
1331     CmiPrintf("Calling Lrts Pump Msg PE:%d\n", myrank);
1332 #endif
1333     PumpNetworkSmsg();
1334     //CmiPrintf("Calling Lrts Pump RdmaMsg PE:%d\n", CmiMyPe());
1335     //PumpNetworkRdmaMsgs();
1336     /* Release Sent Msg */
1337     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
1338 #if 0
1339     ////PumpLocalSmsgTransactions();
1340     if(myrank == 0)
1341     CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", myrank);
1342 #endif
1343     PumpLocalRdmaTransactions();
1344 #if 0
1345     if(myrank == 0)
1346     CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", myrank);
1347 #endif
1348     /* Send buffered Message */
1349     SendBufferMsg();
1350 #if 0
1351     if(myrank == 0)
1352     CmiPrintf("Calling Lrts rdma PE:%d\n", myrank);
1353 #endif
1354     SendRdmaMsg();
1355 #if 0
1356     if(myrank == 0)
1357     CmiPrintf("done PE:%d\n", myrank);
1358 #endif
1359 }
1360
1361 static void _init_dynamic_smsg()
1362 {
1363     gni_smsg_attr_t smsg_attr;
1364     gni_return_t status;
1365     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
1366     memset(smsg_connected_flag, 0, mysize*sizeof(int));
1367
1368     smsg_local_attr_vec = (gni_smsg_attr_t**) malloc(sizeof(gni_smsg_attr_t*) *mysize);
1369     
1370     setup_mem.addr = (uint64_t)malloc(mysize * sizeof(gni_smsg_attr_t));
1371     status = GNI_MemRegister(nic_hndl, setup_mem.addr,  mysize * sizeof(gni_smsg_attr_t), smsg_rx_cqh,  GNI_MEM_READWRITE, -1,  &(setup_mem.mdh));
1372    
1373     GNI_RC_CHECK("Smsg dynamic allocation \n", status);
1374     smsg_connection_vec = (mdh_addr_t*) malloc(mysize*sizeof(mdh_addr_t)); 
1375     allgather(&setup_mem, smsg_connection_vec, sizeof(mdh_addr_t));
1376     
1377     //pre-allocate some memory as mailbox for dynamic connection
1378     if(mysize <=4096)
1379     {
1380         SMSG_MAX_MSG = 1024;
1381     }else if (mysize > 4096 && mysize <= 16384)
1382     {
1383         SMSG_MAX_MSG = 512;
1384     }else {
1385         SMSG_MAX_MSG = 256;
1386     }
1387     
1388     smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1389     smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1390     smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1391     status = GNI_SmsgBufferSizeNeeded(&smsg_attr, &smsg_memlen);
1392     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1393     
1394     smsg_dynamic_list = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1395
1396     smsg_dynamic_list->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1397     bzero(smsg_dynamic_list->addr, smsg_memlen*smsg_expand_slots);
1398     
1399     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_dynamic_list->addr,
1400             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1401             GNI_MEM_READWRITE,   
1402             -1,
1403             &(smsg_dynamic_list->mdh));
1404    smsg_available_slot = 0;  
1405 }
1406
1407 static void _init_static_smsg()
1408 {
1409     gni_smsg_attr_t      *smsg_attr;
1410     gni_smsg_attr_t      remote_smsg_attr;
1411     gni_smsg_attr_t      *smsg_attr_vec;
1412     gni_mem_handle_t     my_smsg_mdh_mailbox;
1413     int      i;
1414     gni_return_t status;
1415     uint32_t              vmdh_index = -1;
1416     mdh_addr_t            base_infor;
1417     mdh_addr_t            *base_addr_vec;
1418     if(mysize <=4096)
1419     {
1420         SMSG_MAX_MSG = 1024;
1421         //log2_SMSG_MAX_MSG = 10;
1422     }else if (mysize > 4096 && mysize <= 16384)
1423     {
1424         SMSG_MAX_MSG = 512;
1425         //log2_SMSG_MAX_MSG = 9;
1426
1427     }else {
1428         SMSG_MAX_MSG = 256;
1429         //log2_SMSG_MAX_MSG = 8;
1430     }
1431     
1432     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
1433     
1434     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1435     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
1436     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
1437     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
1438     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1439     smsg_mailbox_base = memalign(64, smsg_memlen*(mysize));
1440     _MEMCHECK(smsg_mailbox_base);
1441     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
1442     if (myrank == 0) CmiPrintf("Charm++> allocates %.2fMB for SMSG. \n", smsg_memlen*mysize/1e6);
1443     
1444     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
1445             smsg_memlen*(mysize), smsg_rx_cqh,
1446             GNI_MEM_READWRITE,   
1447             vmdh_index,
1448             &my_smsg_mdh_mailbox);
1449
1450     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1451
1452     base_infor.addr =  (uint64_t)smsg_mailbox_base;
1453     base_infor.mdh =  my_smsg_mdh_mailbox;
1454     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
1455
1456     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
1457  
1458     for(i=0; i<mysize; i++)
1459     {
1460         if(i==myrank)
1461             continue;
1462         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1463         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1464         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1465         smsg_attr[i].mbox_offset = i*smsg_memlen;
1466         smsg_attr[i].buff_size = smsg_memlen;
1467         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
1468         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1469     }
1470
1471     for(i=0; i<mysize; i++)
1472     {
1473         if (myrank == i) continue;
1474
1475         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1476         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1477         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1478         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
1479         remote_smsg_attr.buff_size = smsg_memlen;
1480         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
1481         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
1482
1483         /* initialize the smsg channel */
1484         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
1485         GNI_RC_CHECK("SMSG Init", status);
1486     } //end initialization
1487
1488     free(base_addr_vec);
1489
1490     free(smsg_attr);
1491     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
1492      GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
1493
1494
1495 inline
1496 static void _init_smsg()
1497 {
1498     int i;
1499
1500      smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
1501      for(i =0; i<mysize; i++)
1502      {
1503         smsg_msglist_index[i].next = -1;
1504         smsg_msglist_index[i].head = 0;
1505         smsg_msglist_index[i].tail = 0;
1506      }
1507      smsg_head_index = -1;
1508 }
1509
1510 static void _init_static_msgq()
1511 {
1512     gni_return_t status;
1513     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
1514     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
1515     msgq_attrs.smsg_q_sz = 1;
1516     msgq_attrs.rcv_pool_sz = 1;
1517     msgq_attrs.num_msgq_eps = 2;
1518     msgq_attrs.nloc_insts = 8;
1519     msgq_attrs.modes = 0;
1520     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
1521
1522     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
1523     GNI_RC_CHECK("MSGQ Init", status);
1524
1525
1526 }
1527
1528 static void _init_DMA_buffer()
1529 {
1530     gni_return_t            status = GNI_RC_SUCCESS;
1531     /*AUTO tuning */
1532     /* suppose max_smsg is 1024, DMA buffer is split into 2048, 4096, 8192, ... */
1533     /*  This method might be better for SMP, but it is bad for Nonsmp since msgs are sharing same slots */
1534     /*
1535      * DMA_slots = 19-log2_SMSG_MAX_MSG;
1536     DMA_incoming_avail_tag = malloc(DMA_slots);
1537     DMA_buffer_size = 2*(DMA_max_single_msg - SMSG_MAX_MSG); 
1538     DMA_incoming_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1539     DMA_outgoing_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1540     
1541     status = GNI_MemRegister(nic_hndl, (uint64_t)DMA_incoming_base_addr,
1542             DMA_buffer_size, smsg_rx_cqh,
1543             GNI_MEM_READWRITE ,   
1544             vmdh_index,
1545             &);
1546             */
1547     // one is reserved to avoid deadlock
1548     DMA_slots           = 17; // each one is 8K  16*8K + 1 slot reserved to avoid deadlock
1549     DMA_buffer_size     = DMA_max_single_msg + 8192;
1550     DMA_buffer_base_mdh_addr.addr = (uint64_t)memalign(ALIGNBUF, DMA_buffer_size);
1551     status = GNI_MemRegister(nic_hndl, DMA_buffer_base_mdh_addr.addr,
1552         DMA_buffer_size, smsg_rx_cqh,
1553         GNI_MEM_READWRITE ,   
1554         -1,
1555         &(DMA_buffer_base_mdh_addr.mdh));
1556     GNI_RC_CHECK("GNI_MemRegister", status);
1557     DMA_buffer_base_mdh_addr_vec = (mdh_addr_t*) malloc(sizeof(mdh_addr_t) * mysize);
1558
1559     allgather(&DMA_buffer_base_mdh_addr, DMA_buffer_base_mdh_addr_vec, sizeof(mdh_addr_t) );
1560 }
1561
1562 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
1563 {
1564     register int            i;
1565     int                     rc;
1566     int                     device_id = 0;
1567     unsigned int            remote_addr;
1568     gni_cdm_handle_t        cdm_hndl;
1569     gni_return_t            status = GNI_RC_SUCCESS;
1570     uint32_t                vmdh_index = -1;
1571     uint8_t                 ptag;
1572     unsigned int            local_addr, *MPID_UGNI_AllAddr;
1573     int                     first_spawned;
1574     int                     physicalID;
1575     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
1576     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
1577     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
1578    
1579     //Mempool_MaxSize = CmiGetArgFlag(*argv, "+useMemorypoolSize");
1580     useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
1581     if (myrank==0) 
1582     {
1583         if(useDynamicSMSG) 
1584             CmiPrintf("Charm++> use Dynamic SMSG\n"); 
1585         else 
1586             CmiPrintf("Charm++> use Static SMSG\n");
1587     };
1588     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
1589     
1590     status = PMI_Init(&first_spawned);
1591     GNI_RC_CHECK("PMI_Init", status);
1592
1593     status = PMI_Get_size(&mysize);
1594     GNI_RC_CHECK("PMI_Getsize", status);
1595
1596     status = PMI_Get_rank(&myrank);
1597     GNI_RC_CHECK("PMI_getrank", status);
1598
1599     //physicalID = CmiPhysicalNodeID(myrank);
1600     
1601     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
1602
1603     *myNodeID = myrank;
1604     *numNodes = mysize;
1605   
1606     if(myrank == 0)
1607     {
1608         printf("Charm++> Running on Gemini (GNI) using %d  cores\n", mysize);
1609     }
1610 #ifdef USE_ONESIDED
1611     onesided_init(NULL, &onesided_hnd);
1612
1613     // this is a GNI test, so use the libonesided bypass functionality
1614     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
1615     local_addr = gniGetNicAddress();
1616 #else
1617     ptag = get_ptag();
1618     cookie = get_cookie();
1619 #if 0
1620     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
1621 #endif
1622     //Create and attach to the communication  domain */
1623     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
1624     GNI_RC_CHECK("GNI_CdmCreate", status);
1625     //* device id The device id is the minor number for the device
1626     //that is assigned to the device by the system when the device is created.
1627     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
1628     //where X is the device number 0 default 
1629     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
1630     GNI_RC_CHECK("GNI_CdmAttach", status);
1631     local_addr = get_gni_nic_address(0);
1632 #endif
1633     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
1634     _MEMCHECK(MPID_UGNI_AllAddr);
1635     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
1636     /* create the local completion queue */
1637     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
1638     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
1639     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
1640     
1641     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
1642     //GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
1643     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
1644
1645     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
1646     GNI_RC_CHECK("Create CQ (rx)", status);
1647     
1648     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
1649     //GNI_RC_CHECK("Create Post CQ (rx)", status);
1650     
1651     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
1652     //GNI_RC_CHECK("Create BTE CQ", status);
1653
1654     /* create the endpoints. they need to be bound to allow later CQWrites to them */
1655     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
1656     _MEMCHECK(ep_hndl_array);
1657
1658     for (i=0; i<mysize; i++) {
1659         if(i == myrank) continue;
1660         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
1661         GNI_RC_CHECK("GNI_EpCreate ", status);   
1662         remote_addr = MPID_UGNI_AllAddr[i];
1663         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
1664         GNI_RC_CHECK("GNI_EpBind ", status);   
1665     }
1666     /* Depending on the number of cores in the job, decide different method */
1667     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
1668     if(mysize > 1)
1669     {
1670         if(useDynamicSMSG == 0)
1671         {
1672             _init_static_smsg();
1673         }else
1674         {
1675             _init_dynamic_smsg();
1676         }
1677
1678         _init_smsg();
1679     }
1680 #if     USE_LRTS_MEMPOOL
1681     CmiGetArgLong(*argv, "+useMemorypoolSize", &_mempool_size);
1682     if (myrank==0) CmiPrintf("Charm++> use memorypool size: %1.fMB\n", _mempool_size/1024.0/1024);
1683     init_mempool(_mempool_size);
1684     //init_mempool(Mempool_MaxSize);
1685 #endif
1686
1687     /* init DMA buffer for medium message */
1688
1689     //_init_DMA_buffer();
1690     
1691     free(MPID_UGNI_AllAddr);
1692 }
1693
1694
1695 void* LrtsAlloc(int n_bytes, int header)
1696 {
1697     void *ptr;
1698 #if 0
1699     CmiPrintf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d\n", CmiMyPe(), n_bytes, header);
1700 #endif
1701     if(n_bytes <= SMSG_MAX_MSG)
1702     {
1703         int totalsize = n_bytes+header;
1704         ptr = malloc(totalsize);
1705     }else 
1706     {
1707
1708         CmiAssert(header <= ALIGNBUF);
1709 #if     USE_LRTS_MEMPOOL
1710         n_bytes = ALIGN64(n_bytes);
1711         char *res = syh_mempool_malloc(ALIGNBUF+n_bytes);
1712 #else
1713         n_bytes = ALIGN4(n_bytes);           /* make sure size if 4 aligned */
1714         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1715 #endif
1716         ptr = res + ALIGNBUF - header;
1717     }
1718 #if 0 
1719     CmiPrintf("Done Alloc Lrts for bytes=%d, head=%d\n", n_bytes, header);
1720 #endif
1721     return ptr;
1722 }
1723
1724 void  LrtsFree(void *msg)
1725 {
1726     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1727     if (size <= SMSG_MAX_MSG)
1728       free(msg);
1729     else
1730     {
1731 #if 0
1732         CmiPrintf("[PE:%d] Free lrts for bytes=%d, ptr=%p\n", CmiMyPe(), size, (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1733 #endif
1734 #if     USE_LRTS_MEMPOOL
1735         syh_mempool_free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1736 #else
1737         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1738 #endif
1739     }
1740 #if 0 
1741     CmiPrintf("Done Free lrts for bytes=%d\n", size);
1742 #endif
1743 }
1744
1745 static void LrtsExit()
1746 {
1747     /* free memory ? */
1748 #if     USE_LRTS_MEMPOOL
1749     kill_allmempool();
1750 #endif
1751     PMI_Finalize();
1752     exit(0);
1753 }
1754
1755 static void LrtsDrainResources()
1756 {
1757     while (!SendBufferMsg()) {
1758         PumpNetworkSmsg();
1759         PumpNetworkRdmaMsgs();
1760         PumpLocalSmsgTransactions();
1761         PumpLocalRdmaTransactions();
1762     }
1763     PMI_Barrier();
1764 }
1765
1766 void CmiAbort(const char *message) {
1767
1768     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1769     PMI_Abort(-1, message);
1770 }
1771
1772 /**************************  TIMER FUNCTIONS **************************/
1773 #if CMK_TIMER_USE_SPECIAL
1774 /* MPI calls are not threadsafe, even the timer on some machines */
1775 static CmiNodeLock  timerLock = 0;
1776 static int _absoluteTime = 0;
1777 static int _is_global = 0;
1778 static struct timespec start_ns;
1779
1780 inline int CmiTimerIsSynchronized() {
1781     return 1;
1782 }
1783
1784 inline int CmiTimerAbsolute() {
1785     return _absoluteTime;
1786 }
1787
1788 double CmiStartTimer() {
1789     return 0.0;
1790 }
1791
1792 double CmiInitTime() {
1793     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
1794 }
1795
1796 void CmiTimerInit(char **argv) {
1797     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1798     if (_absoluteTime && CmiMyPe() == 0)
1799         printf("Charm++> absolute  timer is used\n");
1800     
1801     _is_global = CmiTimerIsSynchronized();
1802
1803
1804     if (_is_global) {
1805         if (CmiMyRank() == 0) {
1806             clock_gettime(CLOCK_MONOTONIC, &start_ts)
1807         }
1808     } else { /* we don't have a synchronous timer, set our own start time */
1809         CmiBarrier();
1810         CmiBarrier();
1811         CmiBarrier();
1812         clock_gettime(CLOCK_MONOTONIC, &start_ts)
1813     }
1814     CmiNodeAllBarrier();          /* for smp */
1815 }
1816
1817 /**
1818  * Since the timerLock is never created, and is
1819  * always NULL, then all the if-condition inside
1820  * the timer functions could be disabled right
1821  * now in the case of SMP.
1822  */
1823 double CmiTimer(void) {
1824     struct timespec now_ts;
1825     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1826     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1827         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1828 }
1829
1830 double CmiWallTimer(void) {
1831     struct timespec now_ts;
1832     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1833     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1834         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1835 }
1836
1837 double CmiCpuTimer(void) {
1838     struct timespec now_ts;
1839     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1840     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1841         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1842 }
1843
1844 #endif
1845 /************Barrier Related Functions****************/
1846
1847 int CmiBarrier()
1848 {
1849     int status;
1850     status = PMI_Barrier();
1851     return status;
1852
1853 }
1854
1855
1856 #if CMK_PERSISTENT_COMM
1857 #include "machine-persistent.c"
1858 #endif
1859
1860