more code for dynamic smsg in Gemini
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <stdint.h>
16 #include <errno.h>
17 #include <malloc.h>
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #if defined(_WIN32) && ! defined(__CYGWIN__)
26 #include <windows.h>
27 #include <wincon.h>
28 #include <sys/types.h>
29 #include <sys/timeb.h>
30
31 static void sleep(int secs) {
32     Sleep(1000*secs);
33 }
34 #else
35 #include <unistd.h> /*For getpid()*/
36 #endif
37
38
39 #define REMOTE_EVENT         0
40 #define USE_LRTS_MEMPOOL   1
41
42 #if USE_LRTS_MEMPOOL
43 static size_t _mempool_size = 1024ll*1024*32;
44 #endif
45
46 #define PRINT_SYH  0
47
48 #if PRINT_SYH
49 int         lrts_smsg_success = 0;
50 int         lrts_send_msg_id = 0;
51 int         lrts_send_rdma_id = 0;
52 int         lrts_send_rdma_success = 0;
53 int         lrts_received_msg = 0;
54 int         lrts_local_done_msg = 0;
55 #endif
56
57 #include "machine.h"
58
59 #include "pcqueue.h"
60
61 #if CMK_PERSISTENT_COMM
62 #include "machine-persistent.h"
63 #endif
64
65 //#define  USE_ONESIDED 1
66 #ifdef USE_ONESIDED
67 //onesided implementation is wrong, since no place to restore omdh
68 #include "onesided.h"
69 onesided_hnd_t   onesided_hnd;
70 onesided_md_t    omdh;
71 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
72
73 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
74
75 #else
76 uint8_t   onesided_hnd, omdh;
77 #if REMOTE_EVENT
78 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
79 #else
80 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
81 #endif
82 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
83 #endif
84
85 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
86 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
87
88 #define ALIGNBUF                64
89
90 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
91 /* If SMSG is not used */
92 #define FMA_PER_CORE  1024
93 #define FMA_BUFFER_SIZE 1024
94 /* If SMSG is used */
95 static int  SMSG_MAX_MSG;
96 //static int  log2_SMSG_MAX_MSG;
97 #define SMSG_MAX_CREDIT  36
98
99 #define MSGQ_MAXSIZE       2048
100 /* large message transfer with FMA or BTE */
101 #define LRTS_GNI_RDMA_THRESHOLD  2048
102
103 #define REMOTE_QUEUE_ENTRIES  2048 
104 #define LOCAL_QUEUE_ENTRIES   64 
105
106 #define PUT_DONE_TAG      0x29
107 #define ACK_TAG           0x30
108 /* SMSG is data message */
109 #define SMALL_DATA_TAG          0x31
110 /* SMSG is a control message to initialize a BTE */
111 #define MEDIUM_HEAD_TAG         0x32
112 #define MEDIUM_DATA_TAG         0x33
113 #define LMSG_INIT_TAG     0x39 
114
115 #define DEBUG
116 #ifdef GNI_RC_CHECK
117 #undef GNI_RC_CHECK
118 #endif
119 #ifdef DEBUG
120 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
121 #else
122 #define GNI_RC_CHECK(msg,rc)
123 #endif
124
125 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
126 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
127
128 static int Mempool_MaxSize = 1024*1024*128;
129 static int useDynamicSMSG   = 0;
130 static int useStaticMSGQ = 0;
131 static int useStaticFMA = 0;
132 static int mysize, myrank;
133 static gni_nic_handle_t      nic_hndl;
134
135 typedef struct {
136     gni_mem_handle_t mdh;
137     uint64_t addr;
138 } mdh_addr_t ;
139 // this is related to dynamic SMSG
140
141 typedef struct mdh_addr_list{
142     gni_mem_handle_t mdh;
143     uint64_t addr;
144     struct mdh_addr_list *next;
145 }mdh_addr_list_t;
146
147 static unsigned int         smsg_memlen;
148 #define     SMSG_CONN_SIZE     24
149 int     *smsg_connected_flag= 0;
150 char    *smsg_connection_addr = 0;
151 mdh_addr_t    *smsg_connection_vec = 0;
152 gni_mem_handle_t    smsg_connection_memhndl;
153 static int            smsg_expand_slots = 10;
154 static int            smsg_available_slot = 0;
155 static void           *smsg_mailbox_mempool = 0;
156 mdh_addr_list_t       *smsg_dynamic_list = 0;
157
158 static void             *smsg_mailbox_base;
159 gni_msgq_attr_t         msgq_attrs;
160 gni_msgq_handle_t       msgq_handle;
161 gni_msgq_ep_attr_t      msgq_ep_attrs;
162 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
163
164
165
166 /* preallocated DMA buffer */
167 int                     DMA_slots;
168 uint64_t                DMA_avail_tag = 0;
169 uint32_t                DMA_incoming_avail_tag = 0;
170 uint32_t                DMA_outgoing_avail_tag = 0;
171 void                    *DMA_incoming_base_addr;
172 void                    *DMA_outgoing_base_addr;
173 mdh_addr_t              DMA_buffer_base_mdh_addr;
174 mdh_addr_t              *DMA_buffer_base_mdh_addr_vec;
175 int                     DMA_buffer_size;
176 int                     DMA_max_single_msg = 131072;//524288 ;
177
178 #define                 DMA_SIZE_PER_SLOT       8192
179
180
181 typedef struct dma_msgid_map
182 {
183     uint64_t     msg_id;
184     int     msg_subid
185 } dma_msgid_map_t;
186
187 dma_msgid_map_t         *dma_map_list;
188
189 typedef struct msg_trace
190 {
191     uint64_t    msg_id;
192     int         done_num;
193 }msg_trace_t;
194
195 msg_trace_t             *pending_msg_list;
196 /* =====Beginning of Declarations of Machine Specific Variables===== */
197 static int cookie;
198 static int modes = 0;
199 static gni_cq_handle_t       smsg_rx_cqh = NULL;
200 static gni_cq_handle_t       smsg_tx_cqh = NULL;
201 static gni_cq_handle_t       post_rx_cqh = NULL;
202 static gni_cq_handle_t       post_tx_cqh = NULL;
203 static gni_ep_handle_t       *ep_hndl_array;
204
205 typedef    struct  pending_smg
206 {
207     int     inst_id;
208     struct  pending_smg *next;
209 } PENDING_GETNEXT;
210
211
212 typedef struct msg_list
213 {
214     uint32_t destNode;
215     uint32_t size;
216     void *msg;
217     struct msg_list *next;
218     struct msg_list *pehead_next;
219     uint8_t tag;
220 }MSG_LIST;
221
222 typedef struct medium_msg_list
223 {
224     uint32_t destNode;
225     uint32_t msg_id;
226     uint32_t msg_subid;
227     uint32_t remain_size;
228     void *msg;
229     struct medium_msg_list *next;
230 }MEDIUM_MSG_LIST;
231
232
233 typedef struct control_msg
234 {
235     uint64_t            source_addr;
236     int                 source;               /* source rank */
237     int                 length;
238     gni_mem_handle_t    source_mem_hndl;
239     struct control_msg *next;
240 }CONTROL_MSG;
241
242 typedef struct medium_msg_control
243 {
244     uint64_t            dma_offset;     //the dma_buffer for this block of msg
245     int                 msg_id;         //Id for the total index
246     int                 msg_subid;      //offset inside the message id 
247 }MEDIUM_MSG_CONTROL;
248
249 typedef struct  rmda_msg
250 {
251     int                   destNode;
252     gni_post_descriptor_t *pd;
253     struct  rmda_msg      *next;
254 }RDMA_REQUEST;
255
256 typedef struct  msg_list_index
257 {
258     int         next;
259     MSG_LIST    *head;
260     MSG_LIST    *tail;
261 } MSG_LIST_INDEX;
262
263 /* reuse PendingMsg memory */
264 static CONTROL_MSG          *control_freelist=0;
265 static MSG_LIST             *msglist_freelist=0;
266 static int                  smsg_head_index;
267 static MSG_LIST_INDEX       *smsg_msglist_index= 0;
268 static MSG_LIST             *smsg_free_head=0;
269 static MSG_LIST             *smsg_free_tail=0;
270
271 /*
272 #define FreeMsgList(msg_head, msg_tail, free_head, free_tail)       \
273     if(free_head == 0)  free_head = free_tail = msg_head;    \
274     else   free_tail = free_tail->next;    \
275     if( msg_head->next == msg_tail) msg_head =0;   \
276     else msg_head= msg_head->next;    
277
278 #define MallocMsgList(d, msg_head, msg_tail, free_head, free_tail, msgsize) \
279     if(free_head == 0) {d= malloc(msgsize);  \
280         if(msg_head == 0)   msg_head =msg_tail = msg_head->next = msg_tail->next = d; \
281         else { msg_tail->next = d; d->next = msg_head; msg_tail=d;} \
282     }else {d = free_head; free_head = free_head->next; if(free_tail->next == free_head) free_head =0;} \
283 */
284
285 #define FreeMsgList(d)  \
286   (d)->next = msglist_freelist;\
287   msglist_freelist = d;
288
289
290
291 #define MallocMsgList(d) \
292   d = msglist_freelist;\
293   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
294              _MEMCHECK(d);\
295   } else msglist_freelist = d->next;
296
297
298 #define FreeControlMsg(d)       \
299   (d)->next = control_freelist;\
300   control_freelist = d;
301
302 #define MallocControlMsg(d) \
303   d = control_freelist;\
304   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
305              _MEMCHECK(d);\
306   } else control_freelist = d->next;
307
308
309 static RDMA_REQUEST         *rdma_freelist = NULL;
310
311 #define FreeControlMsg(d)       \
312   (d)->next = control_freelist;\
313   control_freelist = d;
314
315 #define MallocControlMsg(d) \
316   d = control_freelist;\
317   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
318              _MEMCHECK(d);\
319   } else control_freelist = d->next;
320
321 #define FreeMediumControlMsg(d)       \
322   (d)->next = medium_control_freelist;\
323   medium_control_freelist = d;
324
325
326 #define MallocMediumControlMsg(d) \
327     d = medium_control_freelist;\
328     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
329     _MEMCHECK(d);\
330 } else mediumcontrol_freelist = d->next;
331
332 #define FreeRdmaRequest(d)       \
333   (d)->next = rdma_freelist;\
334   rdma_freelist = d;
335
336 #define MallocRdmaRequest(d) \
337   d = rdma_freelist;\
338   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
339              _MEMCHECK(d);\
340   } else rdma_freelist = d->next;
341
342 /* reuse gni_post_descriptor_t */
343 static gni_post_descriptor_t *post_freelist=0;
344
345 #if 1
346 #define FreePostDesc(d)       \
347     (d)->next_descr = post_freelist;\
348     post_freelist = d;
349
350 #define MallocPostDesc(d) \
351   d = post_freelist;\
352   if (d==0) { \
353      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
354      _MEMCHECK(d);\
355   } else post_freelist = d->next_descr;
356 #else
357
358 #define FreePostDesc(d)     free(d);
359 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
360
361 #endif
362
363 static PENDING_GETNEXT     *pending_smsg_head = 0;
364 static PENDING_GETNEXT     *pending_smsg_tail = 0;
365
366 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
367 static int      buffered_smsg_counter = 0;
368
369 /* SmsgSend return success but message sent is not confirmed by remote side */
370
371 static RDMA_REQUEST  *pending_rdma_head = 0;
372 static RDMA_REQUEST  *pending_rdma_tail = 0;
373 static MSG_LIST *buffered_fma_head = 0;
374 static MSG_LIST *buffered_fma_tail = 0;
375
376 /* functions  */
377 #define IsFree(a,ind)  !( a& (1<<(ind) ))
378 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
379 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
380
381 #include "mempool.c"
382
383 /* get the upper bound of log 2 */
384 int mylog2(int size)
385 {
386     int op = size;
387     unsigned int ret=0;
388     unsigned int mask = 0;
389     int i;
390     while(op>0)
391     {
392         op = op >> 1;
393         ret++;
394
395     }
396     for(i=1; i<ret; i++)
397     {
398         mask = mask << 1;
399         mask +=1;
400     }
401
402     ret -= ((size &mask) ? 0:1);
403     return ret;
404 }
405
406 static void
407 allgather(void *in,void *out, int len)
408 {
409     static int *ivec_ptr=NULL,already_called=0,job_size=0;
410     int i,rc;
411     int my_rank;
412     char *tmp_buf,*out_ptr;
413
414     if(!already_called) {
415
416         rc = PMI_Get_size(&job_size);
417         CmiAssert(rc == PMI_SUCCESS);
418         rc = PMI_Get_rank(&my_rank);
419         CmiAssert(rc == PMI_SUCCESS);
420
421         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
422         CmiAssert(ivec_ptr != NULL);
423
424         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
425         CmiAssert(rc == PMI_SUCCESS);
426
427         already_called = 1;
428
429     }
430
431     tmp_buf = (char *)malloc(job_size * len);
432     CmiAssert(tmp_buf);
433
434     rc = PMI_Allgather(in,tmp_buf,len);
435     CmiAssert(rc == PMI_SUCCESS);
436
437     out_ptr = out;
438
439     for(i=0;i<job_size;i++) {
440
441         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
442
443     }
444
445     free(tmp_buf);
446 }
447 static void
448 allgather_2(void *in,void *out, int len)
449 {
450     //PMI_Allgather is out of order
451     int i,rc, extend_len;
452     int  rank_index;
453     char *out_ptr, *out_ref;
454     char *in2;
455
456     extend_len = sizeof(int) + len;
457     in2 = (char*)malloc(extend_len);
458
459     memcpy(in2, &myrank, sizeof(int));
460     memcpy(in2+sizeof(int), in, len);
461
462     out_ptr = (char*)malloc(mysize*extend_len);
463
464     rc = PMI_Allgather(in2, out_ptr, extend_len);
465     GNI_RC_CHECK("allgather", rc);
466
467     out_ref = out;
468
469     for(i=0;i<mysize;i++) {
470         //rank index 
471         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
472         //copy to the rank index slot
473         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
474     }
475
476     free(out_ptr);
477     free(in2);
478
479 }
480
481 static unsigned int get_gni_nic_address(int device_id)
482 {
483     unsigned int address, cpu_id;
484     gni_return_t status;
485     int i, alps_dev_id=-1,alps_address=-1;
486     char *token, *p_ptr;
487
488     p_ptr = getenv("PMI_GNI_DEV_ID");
489     if (!p_ptr) {
490         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
491        
492         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
493     } else {
494         while ((token = strtok(p_ptr,":")) != NULL) {
495             alps_dev_id = atoi(token);
496             if (alps_dev_id == device_id) {
497                 break;
498             }
499             p_ptr = NULL;
500         }
501         CmiAssert(alps_dev_id != -1);
502         p_ptr = getenv("PMI_GNI_LOC_ADDR");
503         CmiAssert(p_ptr != NULL);
504         i = 0;
505         while ((token = strtok(p_ptr,":")) != NULL) {
506             if (i == alps_dev_id) {
507                 alps_address = atoi(token);
508                 break;
509             }
510             p_ptr = NULL;
511             ++i;
512         }
513         CmiAssert(alps_address != -1);
514         address = alps_address;
515     }
516     return address;
517 }
518
519 static uint8_t get_ptag(void)
520 {
521     char *p_ptr, *token;
522     uint8_t ptag;
523
524     p_ptr = getenv("PMI_GNI_PTAG");
525     CmiAssert(p_ptr != NULL);
526     token = strtok(p_ptr, ":");
527     ptag = (uint8_t)atoi(token);
528     return ptag;
529         
530 }
531
532 static uint32_t get_cookie(void)
533 {
534     uint32_t cookie;
535     char *p_ptr, *token;
536
537     p_ptr = getenv("PMI_GNI_COOKIE");
538     CmiAssert(p_ptr != NULL);
539     token = strtok(p_ptr, ":");
540     cookie = (uint32_t)atoi(token);
541
542     return cookie;
543 }
544
545 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
546 /* TODO: add any that are related */
547 /* =====End of Definitions of Message-Corruption Related Macros=====*/
548
549
550 #include "machine-lrts.h"
551 #include "machine-common-core.c"
552
553 /* Network progress function is used to poll the network when for
554    messages. This flushes receive buffers on some  implementations*/
555 #if CMK_MACHINE_PROGRESS_DEFINED
556 void CmiMachineProgressImpl() {
557 }
558 #endif
559
560 inline
561 static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
562 {
563     MSG_LIST        *msg_tmp;
564     MallocMsgList(msg_tmp);
565     msg_tmp->destNode = destNode;
566     msg_tmp->size   = size;
567     msg_tmp->msg    = msg;
568     msg_tmp->tag    = tag;
569     msg_tmp->next   = 0;
570     if (smsg_msglist_index[destNode].head == 0 ) {
571         smsg_msglist_index[destNode].head = msg_tmp;
572         smsg_msglist_index[destNode].next = smsg_head_index;
573         smsg_head_index = destNode;
574     }
575     else {
576       (smsg_msglist_index[destNode].tail)->next    = msg_tmp;
577     }
578     smsg_msglist_index[destNode].tail          = msg_tmp;
579 #if PRINT_SYH
580     buffered_smsg_counter++;
581 #endif
582 }
583
584 inline 
585 static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
586 {
587     gni_return_t status = GNI_RC_NOT_DONE;
588     gni_smsg_attr_t      smsg_attr;
589     gni_post_descriptor_t *pd;
590     
591     mdh_addr_list_t  *new_entry = 0;
592     if(useDynamicSMSG == 1)
593     {
594         if(smsg_connected_flag[destNode] == 0)
595         {
596             if(smsg_available_slot == smsg_expand_slots)
597             {
598                 new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
599                 new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
600                 bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
601     
602                 status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
603                     smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
604                     GNI_MEM_READWRITE,   
605                     vmdh_index,
606                     &(new_entry->mdh));
607                 smsg_available_slot = 0; 
608                 new_entry->next = smsg_dynamic_list;
609                 smsg_dynamic_list = new_entry;
610             }
611             smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
612             smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
613             smsg_attr.msg_maxsize = SMSG_MAX_MSG;
614             smsg_attr.mbox_offset = smsg_available_slot * smsg_memlen;
615             smsg_attr.buff_size = smsg_memlen;
616             smsg_attr.msg_buffer = smsg_dynamic_list->addr ;
617             smsg_attr.mem_hndl = smsg_dynamic_list->mdh;
618             MallocPostDesc(pd);
619             pd->type            = GNI_POST_FMA_PUT;
620             pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
621             pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
622             pd->length          = ALIGN4(request_msg->length);
623             pd->local_addr      = (uint64_t) smsg_attr;
624             pd->remote_addr     = smsg_connection_vec[destNode].addr;
625             pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
626             status = GNI_PostFma(ep_hndl_array[destNode],  pd);
627             GNI_RC_CHECK("SMSG Dynamic link", status);
628
629         } else if (smsg_connected_flag[destNode] == 1)
630         {
631             //connection request sent
632         }
633     }
634     if(smsg_msglist_index[destNode].head == 0 || inbuff==1)
635     {
636         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], header, size_header, msg, size, 0, tag);
637         if(status == GNI_RC_SUCCESS)
638         {
639 #if PRINT_SYH
640             lrts_smsg_success++;
641             if(lrts_smsg_success == lrts_send_msg_id)
642                 CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
643             else
644                 CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
645 #endif
646             return status;
647         }
648     }else {
649         if(inbuff ==0)
650             delay_send_small_msg(msg, size, destNode, tag);
651     }
652     return status;
653 }
654
655 // Get first 0 in DMA_tags starting from index
656 static int get_first_avail_bit(uint64_t DMA_tags, int start_index)
657 {
658
659     uint64_t         mask = 0x1;
660     register    int     i=0;
661     while((DMA_tags & mask) && i<DMA_slots) {mask << 1; i++;}
662
663 }
664
665 static int send_medium_messages(int destNode, int size, char *msg)
666 {
667 #if 0
668     gni_return_t status = GNI_RC_SUCCESS;
669     int first_avail_bit=0;
670     uint64_t mask = 0x1;
671     MEDIUM_MSG_CONTROL  *medium_msg_control_tmp;
672     MEDIUM_MSG_LIST        *msg_tmp;
673     int blocksize, remain_size, pos;
674     int sub_id = 0;
675     remain_size = size;
676     pos = 0;  //offset before which data are sent
677     /* copy blocks of the message to DMA preallocated buffer and send SMSG */
678     //Check whether there is any available DMA buffer
679     
680     do{
681         while((DMA_avail_tag & mask) && first_avail_bit<DMA_slots) {mask << 1; first_avail_bit++;}
682         if(first_avail_bit == DMA_slots) //No available DMA, buffer this message
683         {
684             MallocMediumMsgList(msg_tmp);
685             msg_tmp->destNode = destNode;
686             msg_tmp->msg_id   = lrts_send_msg_id;
687             msg_tmp->msg_subid   = sub_id;
688             msg_tmp->size   = remain_size;
689             msg_tmp->msg    = msg+pos;
690             msg_tmp->next   = NULL;
691             break;
692         }else
693         {
694             //copy this part of the message into this DMA buffer
695             //TODO optimize here, some data can go with this SMSG
696             blocksize = (remain_size>DMA_SIZE_PER_SLOT)?DMA_SIZE_PER_SLOT: remain_size;
697             memcpy(DMA_buffer_base_mdh_addr.addr[first_avail_bit], msg+pos, blocksize);
698             pos += blocksize;
699             remain_size -= blocksize;
700             SET_BITS(DMA_avail_tag, first_avail_bit);
701            
702             MallocMediumControlMsg(medium_msg_control_tmp);
703             medium_msg_control_tmp->msg_id = lrts_send_msg_id;
704             medium_msg_control_tmp->msg_subid = sub_id;
705             if(status == GNI_RC_SUCCESS)
706             {
707                 if(sub_id==0)
708                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_HEAD_TAG);
709                 else
710                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_DATA_TAG);
711             }
712             //buffer this smsg
713             if(status != GNI_RC_SUCCESS)
714             {
715                 delay_send_small_msg(medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), destNode, MEDIUM_HEAD_TAG);
716             }
717             sub_id++;
718         }while(remain_size > 0 );
719
720         }
721     }
722 #endif
723 }
724
725 // Large message, send control to receiver, receiver register memory and do a GET 
726 inline
727 static void send_large_messages(int destNode, int size, char *msg)
728 {
729 #if     USE_LRTS_MEMPOOL
730     gni_return_t        status  =   GNI_RC_SUCCESS;
731     CONTROL_MSG         *control_msg_tmp;
732     uint32_t            vmdh_index  = -1;
733     /* construct a control message and send */
734     MallocControlMsg(control_msg_tmp);
735     control_msg_tmp->source_addr    = (uint64_t)msg;
736     control_msg_tmp->source         = myrank;
737     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
738     //memcpy( &(control_msg_tmp->source_mem_hndl), GetMemHndl(msg), sizeof(gni_mem_handle_t)) ;
739     control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
740 #if PRINT_SYH
741     lrts_send_msg_id++;
742     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
743 #endif
744     status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
745     if(status == GNI_RC_SUCCESS)
746     {
747         FreeControlMsg(control_msg_tmp);
748     }
749 // NOT use mempool, should slow 
750 #else
751     gni_return_t        status  =   GNI_RC_SUCCESS;
752     CONTROL_MSG         *control_msg_tmp;
753     uint32_t            vmdh_index  = -1;
754     /* construct a control message and send */
755     MallocControlMsg(control_msg_tmp);
756     control_msg_tmp->source_addr    = (uint64_t)msg;
757     control_msg_tmp->source         = myrank;
758     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
759     control_msg_tmp->source_mem_hndl.qword1 = 0;
760     control_msg_tmp->source_mem_hndl.qword2 = 0;
761 #if PRINT_SYH
762     lrts_send_msg_id++;
763     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
764 #endif
765     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
766     if(status == GNI_RC_SUCCESS)
767     {
768         status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
769         if(status == GNI_RC_SUCCESS)
770         {
771             FreeControlMsg(control_msg_tmp);
772         }
773     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
774     {
775         CmiAbort("Memory registor for large msg\n");
776     }else 
777     {
778         delay_send_small_msg(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
779     }
780 #endif
781 }
782
783 inline void LrtsPrepareEnvelope(char *msg, int size)
784 {
785     CmiSetMsgSize(msg, size);
786 }
787
788 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
789 {
790     gni_return_t        status  =   GNI_RC_SUCCESS;
791     LrtsPrepareEnvelope(msg, size);
792
793     if(size <= SMSG_MAX_MSG)
794     {
795 #if PRINT_SYH
796         lrts_send_msg_id++;
797         CmiPrintf("SMSG LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_msg_id);
798 #endif
799         status = send_smsg_message( destNode, 0, 0, msg, size, SMALL_DATA_TAG, 0);  
800         if(status == GNI_RC_SUCCESS)
801         {
802             CmiFree(msg);
803         }
804     }
805     else
806     {
807         send_large_messages(destNode, size, msg);
808     }
809     return 0;
810 }
811
812 static void LrtsPreCommonInit(int everReturn){}
813
814 /* Idle-state related functions: called in non-smp mode */
815 void CmiNotifyIdleForGemini(void) {
816     LrtsAdvanceCommunication();
817 }
818
819 static void LrtsPostCommonInit(int everReturn)
820 {
821 #if CMK_SMP
822     CmiIdleState *s=CmiNotifyGetState();
823     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
824     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
825 #else
826     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForGemini,NULL);
827 #endif
828
829 }
830
831
832 void LrtsPostNonLocal(){}
833 /* pooling CQ to receive network message */
834 static void PumpNetworkRdmaMsgs()
835 {
836     gni_cq_entry_t      event_data;
837     gni_return_t        status;
838     while( (status = GNI_CqGetEvent(post_rx_cqh, &event_data)) == GNI_RC_SUCCESS);
839 }
840
841 static int SendRdmaMsg();
842 static void getLargeMsgRequest(void* header, uint64_t inst_id);
843 static void PumpNetworkSmsg()
844 {
845     uint64_t            inst_id;
846     PENDING_GETNEXT     *pending_next;
847     int                 ret;
848     gni_cq_entry_t      event_data;
849     gni_return_t        status;
850     void                *header;
851     uint8_t             msg_tag;
852     int                 msg_nbytes;
853     void                *msg_data;
854     gni_mem_handle_t    msg_mem_hndl;
855  
856
857     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
858     {
859         inst_id = GNI_CQ_GET_INST_ID(event_data);
860         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
861 #if PRINT_SYH
862         CmiPrintf("[%d] PumpNetworkMsgs small msgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
863 #endif
864         msg_tag = GNI_SMSG_ANY_TAG;
865         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
866         {
867 #if PRINT_SYH
868             lrts_received_msg++;
869             CmiPrintf("+++[%d] PumpNetwork msg is received, messageid:%d tag=%d\n", myrank, lrts_received_msg, msg_tag);
870 #endif
871             /* copy msg out and then put into queue (small message) */
872             switch (msg_tag) {
873             case SMALL_DATA_TAG:
874             {
875                 msg_nbytes = CmiGetMsgSize(header);
876                 msg_data    = CmiAlloc(msg_nbytes);
877                 memcpy(msg_data, (char*)header, msg_nbytes);
878                 handleOneRecvedMsg(msg_nbytes, msg_data);
879                 break;
880             }
881             case LMSG_INIT_TAG:
882             {
883 #if PRINT_SYH
884                 CmiPrintf("+++[%d] from %d PumpNetwork Rdma Request msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
885 #endif
886                 getLargeMsgRequest(header, inst_id);
887                 break;
888             }
889             case ACK_TAG:
890             {
891                 /* Get is done, release message . Now put is not used yet*/
892 #if         !USE_LRTS_MEMPOOL
893                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
894 #endif
895                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
896                 SendRdmaMsg();
897                 break;
898             }
899             case PUT_DONE_TAG: //persistent message
900             {
901                 handleOneRecvedMsg(((CONTROL_MSG *) header)->length,((void*) (CONTROL_MSG *) header)); 
902                 break;
903             }
904             default: {
905                 CmiPrintf("weird tag problem\n");
906                 CmiAbort("Unknown tag\n");
907             }
908             }
909             GNI_SmsgRelease(ep_hndl_array[inst_id]);
910             msg_tag = GNI_SMSG_ANY_TAG;
911         } //endwhile getNext
912     }   //end while GetEvent
913     if(status == GNI_RC_ERROR_RESOURCE)
914     {
915         GNI_RC_CHECK("Smsg_rx_cq full", status);
916     }
917 }
918
919 static void getLargeMsgRequest(void* header, uint64_t inst_id)
920 {
921 #if     USE_LRTS_MEMPOOL
922     CONTROL_MSG         *request_msg;
923     gni_return_t        status;
924     void                *msg_data;
925     gni_post_descriptor_t *pd;
926     RDMA_REQUEST        *rdma_request_msg;
927     gni_mem_handle_t    msg_mem_hndl;
928     int source;
929     // initial a get to transfer data from the sender side */
930     request_msg = (CONTROL_MSG *) header;
931     source = request_msg->source;
932     msg_data = CmiAlloc(request_msg->length);
933     _MEMCHECK(msg_data);
934     //memcpy(&msg_mem_hndl, GetMemHndl(msg_data), sizeof(gni_mem_handle_t));
935     msg_mem_hndl = GetMemHndl(msg_data);
936
937     MallocPostDesc(pd);
938     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
939         pd->type            = GNI_POST_FMA_GET;
940     else
941         pd->type            = GNI_POST_RDMA_GET;
942 #if REMOTE_EVENT
943     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
944 #else
945     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
946 #endif
947     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
948     pd->length          = ALIGN4(request_msg->length);
949     pd->local_addr      = (uint64_t) msg_data;
950     pd->local_mem_hndl  = msg_mem_hndl;
951     pd->remote_addr     = request_msg->source_addr;
952     pd->remote_mem_hndl = request_msg->source_mem_hndl;
953     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
954     pd->rdma_mode       = 0;
955
956     if(pd->type == GNI_POST_RDMA_GET) 
957         status = GNI_PostRdma(ep_hndl_array[source], pd);
958     else
959         status = GNI_PostFma(ep_hndl_array[source],  pd);
960     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
961     {
962         MallocRdmaRequest(rdma_request_msg);
963         rdma_request_msg->next = 0;
964         rdma_request_msg->destNode = inst_id;
965         rdma_request_msg->pd = pd;
966         if(pending_rdma_head == 0)
967         {
968             pending_rdma_head = rdma_request_msg;
969         }else
970         {
971             pending_rdma_tail->next = rdma_request_msg;
972         }
973         pending_rdma_tail = rdma_request_msg;
974     }else
975         GNI_RC_CHECK("AFter posting", status);
976
977 #else
978     CONTROL_MSG         *request_msg;
979     gni_return_t        status;
980     void                *msg_data;
981     gni_post_descriptor_t *pd;
982     RDMA_REQUEST        *rdma_request_msg;
983     gni_mem_handle_t    msg_mem_hndl;
984     int source;
985     // initial a get to transfer data from the sender side */
986     request_msg = (CONTROL_MSG *) header;
987     source = request_msg->source;
988     msg_data = CmiAlloc(request_msg->length);
989     _MEMCHECK(msg_data);
990
991     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
992
993     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
994     {
995         GNI_RC_CHECK("Mem Register before post", status);
996     }
997
998     MallocPostDesc(pd);
999     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
1000         pd->type            = GNI_POST_FMA_GET;
1001     else
1002         pd->type            = GNI_POST_RDMA_GET;
1003 #if REMOTE_EVENT
1004     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1005 #else
1006     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1007 #endif
1008     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1009     pd->length          = ALIGN4(request_msg->length);
1010     pd->local_addr      = (uint64_t) msg_data;
1011     pd->remote_addr     = request_msg->source_addr;
1012     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1013     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1014     pd->rdma_mode       = 0;
1015
1016     //memory registration successful
1017     if(status == GNI_RC_SUCCESS)
1018     {
1019         pd->local_mem_hndl  = msg_mem_hndl;
1020         if(pd->type == GNI_POST_RDMA_GET) 
1021             status = GNI_PostRdma(ep_hndl_array[source], pd);
1022         else
1023             status = GNI_PostFma(ep_hndl_array[source],  pd);
1024     }else
1025     {
1026         pd->local_mem_hndl.qword1  = 0; 
1027         pd->local_mem_hndl.qword1  = 0; 
1028     }
1029     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1030     {
1031         MallocRdmaRequest(rdma_request_msg);
1032         rdma_request_msg->next = 0;
1033         rdma_request_msg->destNode = inst_id;
1034         rdma_request_msg->pd = pd;
1035         if(pending_rdma_head == 0)
1036         {
1037             pending_rdma_head = rdma_request_msg;
1038         }else
1039         {
1040             pending_rdma_tail->next = rdma_request_msg;
1041         }
1042         pending_rdma_tail = rdma_request_msg;
1043     }else
1044         GNI_RC_CHECK("AFter posting", status);
1045 #endif
1046 }
1047
1048 /* Check whether message send or get is confirmed by remote */
1049 static void PumpLocalSmsgTransactions()
1050 {
1051     gni_return_t            status;
1052     gni_cq_entry_t          ev;
1053     while ((status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1054     {
1055 #if PRINT_SYH
1056         lrts_local_done_msg++;
1057         //CmiPrintf("*[%d]  PumpLocalSmsgTransactions GNI_CQ_GET_TYPE %d. Localdone=%d\n", myrank, GNI_CQ_GET_TYPE(ev), lrts_local_done_msg);
1058 #endif
1059         if(GNI_CQ_OVERRUN(ev))
1060         {
1061             CmiPrintf("Overrun detected in local CQ");
1062             CmiAbort("Overrun in TX");
1063         }
1064     }
1065     if(status == GNI_RC_ERROR_RESOURCE)
1066     {
1067         GNI_RC_CHECK("Smsg_tx_cq full", status);
1068     }
1069 }
1070
1071 static void PumpLocalRdmaTransactions()
1072 {
1073     gni_cq_entry_t          ev;
1074     gni_return_t            status;
1075     uint64_t                type, inst_id;
1076     gni_post_descriptor_t   *tmp_pd;
1077     MSG_LIST                *ptr;
1078     CONTROL_MSG             *ack_msg_tmp;
1079     uint8_t             msg_tag;
1080
1081    // while ( (status = GNI_CqGetEvent(post_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1082     while ( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1083     {
1084         type        = GNI_CQ_GET_TYPE(ev);
1085 #if PRINT_SYH
1086         //lrts_local_done_msg++;
1087         CmiPrintf("**[%d] SMSGPumpLocalTransactions (type=%d)\n", myrank, type);
1088 #endif
1089         if (type == GNI_CQ_EVENT_TYPE_POST)
1090         {
1091             inst_id     = GNI_CQ_GET_INST_ID(ev);
1092 #if PRINT_SYH
1093             CmiPrintf("**[%d] SMSGPumpLocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1094 #endif
1095             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
1096             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1097             ////Message is sent, free message , put is not used now
1098             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
1099             {   //persistent message 
1100                 CmiFree((void *)tmp_pd->local_addr);
1101                 msg_tag = PUT_DONE_TAG;  
1102             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1103             {
1104                 msg_tag = ACK_TAG;  
1105             }
1106             MallocControlMsg(ack_msg_tmp);
1107             ack_msg_tmp->source             = myrank;
1108             ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
1109             ack_msg_tmp->length             = tmp_pd->length; 
1110             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1111 #if PRINT_SYH
1112             lrts_send_msg_id++;
1113             CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
1114 #endif
1115             status = send_smsg_message(inst_id, 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0);  
1116             if(status == GNI_RC_SUCCESS)
1117             {
1118                 FreeControlMsg(ack_msg_tmp);
1119             }
1120 #if     !USE_LRTS_MEMPOOL
1121             MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1122 #endif
1123             CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1124             handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1125             SendRdmaMsg(); 
1126             FreePostDesc(tmp_pd);
1127         }
1128     } //end while
1129 }
1130
1131 static int SendRdmaMsg()
1132 {
1133     gni_return_t            status = GNI_RC_SUCCESS;
1134     gni_mem_handle_t        msg_mem_hndl;
1135
1136     RDMA_REQUEST *ptr = pending_rdma_head;
1137     RDMA_REQUEST *prev = NULL;
1138
1139     while (ptr != NULL)
1140     {
1141         gni_post_descriptor_t *pd = ptr->pd;
1142         status = GNI_RC_SUCCESS;
1143         // register memory first
1144         if( pd->local_mem_hndl.qword1 == 0 && pd->local_mem_hndl.qword2 == 0)
1145         {
1146             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1147         }
1148         if(status == GNI_RC_SUCCESS)
1149         {
1150             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1151                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1152             else
1153                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1154             if(status == GNI_RC_SUCCESS)
1155             {
1156                 RDMA_REQUEST *tmp = ptr;
1157                 if (prev)
1158                   prev->next = ptr->next;
1159                 else
1160                   pending_rdma_head = ptr->next;
1161                 ptr = ptr->next;
1162                 FreeRdmaRequest(tmp);
1163                 continue;
1164             }
1165         }
1166         prev = ptr;
1167         ptr = ptr->next;
1168     } //end while
1169     return 0;
1170 }
1171
1172 // return 1 if all messages are sent
1173 static int SendBufferMsg()
1174 {
1175     MSG_LIST            *ptr, *previous_head, *current_head;
1176     CONTROL_MSG         *control_msg_tmp;
1177     gni_return_t        status;
1178     int done = 1;
1179     register    int     i;
1180     int                 index_previous = -1;
1181     int                 index = smsg_head_index;
1182     //if( smsg_msglist_head == 0 && buffered_smsg_counter!= 0 ) {CmiPrintf("WRONGWRONG on rank%d, buffermsg=%d, (msgid-succ:%d)\n", myrank, buffered_smsg_counter, (lrts_send_msg_id-lrts_smsg_success)); CmiAbort("sendbuf");}
1183     /* can add flow control here to control the number of messages sent before handle message */
1184     while(index != -1)
1185     {
1186         ptr = smsg_msglist_index[index].head;
1187        
1188         while(ptr!=0)
1189         {
1190             CmiAssert(ptr!=NULL);
1191             if(ptr->tag == SMALL_DATA_TAG)
1192             {
1193                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, ptr->size, SMALL_DATA_TAG, 1);  
1194                 if(status == GNI_RC_SUCCESS)
1195                 {
1196                     CmiFree(ptr->msg);
1197                 }
1198             }
1199             else if(ptr->tag == LMSG_INIT_TAG)
1200             {
1201                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1202 #if PRINT_SYH
1203                 CmiPrintf("[%d==>%d] LMSG buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1204 #endif
1205                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
1206                 {
1207                     MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, control_msg_tmp->length, &(control_msg_tmp->source_mem_hndl), &omdh);
1208                     if(status != GNI_RC_SUCCESS) {
1209                         done = 0;
1210                         break;
1211                     }
1212                 }
1213                 
1214                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 1);  
1215                 if(status == GNI_RC_SUCCESS)
1216                 {
1217                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
1218                 }
1219             }else if (ptr->tag == ACK_TAG)
1220             {
1221                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), ACK_TAG, 1);  
1222                 if(status == GNI_RC_SUCCESS)
1223                 {
1224                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1225                 }
1226             }else
1227             {
1228                 CmiPrintf("Weird tag\n");
1229                 CmiAbort("should not happen\n");
1230             }
1231         } //end while 
1232         if(status == GNI_RC_SUCCESS)
1233         {
1234             smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
1235             FreeMsgList(ptr);
1236             ptr= smsg_msglist_index[index].head;
1237 #if PRINT_SYH
1238             buffered_smsg_counter--;
1239             if(lrts_smsg_success == lrts_send_msg_id)
1240                 CmiPrintf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1241             else
1242                 CmiPrintf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1243 #endif
1244         }else {
1245             done = 0;
1246             break;
1247         }
1248         if(ptr == 0)
1249         {
1250             if(index_previous != -1)
1251                 smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next;
1252             else
1253                 smsg_head_index = smsg_msglist_index[index].next;
1254         }else
1255         {
1256             index_previous = index;
1257         }
1258         index = smsg_msglist_index[index].next;
1259     }   // end pooling for all cores
1260 #if PRINT_SYH
1261     if(lrts_send_msg_id-lrts_smsg_success !=0)
1262         CmiPrintf("WRONG [%d buffered msg is empty(%p) (actually=%d)(buffercounter=%d)\n", myrank, smsg_msglist_head[i], (lrts_send_msg_id-lrts_smsg_success, buffered_smsg_counter));
1263 #endif
1264     return done;
1265 }
1266
1267 static void LrtsAdvanceCommunication()
1268 {
1269     /*  Receive Msg first */
1270     //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", CmiMyPe());
1271     PumpNetworkSmsg();
1272     //CmiPrintf("Calling Lrts Pump RdmaMsg PE:%d\n", CmiMyPe());
1273     //PumpNetworkRdmaMsgs();
1274     /* Release Sent Msg */
1275     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
1276     //PumpLocalSmsgTransactions();
1277     //CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", CmiMyPe());
1278     PumpLocalRdmaTransactions();
1279     //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", CmiMyPe());
1280     /* Send buffered Message */
1281     SendBufferMsg();
1282     SendRdmaMsg();
1283 }
1284
1285 static void _init_dynamic_smsg()
1286 {
1287     gni_smsg_attr_t smsg_attr;
1288     mdh_addr_t current_addr;
1289     gni_return_t status;
1290     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
1291     memset(smsg_connected_flag, 0, mysize*sizeof(int));
1292
1293     current_addr.addr = (uint64_t)malloc(mysize * SMSG_CONN_SIZE);
1294     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, smsg_connection_addr, mysize * SMSG_CONN_SIZE,  &(current_addr.mdh), &omdh);
1295     
1296     smsg_connection_vec = (mdh_addr_t*) malloc(mysize*sizeof(mdh_addr_t)); 
1297     allgather(&current_addr, smsg_connection_vec, sizeof(mdh_addr_t));
1298     
1299     //pre-allocate some memory as mailbox for dynamic connection
1300     if(mysize <=4096)
1301     {
1302         SMSG_MAX_MSG = 1024;
1303     }else if (mysize > 4096 && mysize <= 16384)
1304     {
1305         SMSG_MAX_MSG = 512;
1306     }else {
1307         SMSG_MAX_MSG = 256;
1308     }
1309     
1310     smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1311     smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1312     smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1313     status = GNI_SmsgBufferSizeNeeded(&smsg_attr, &smsg_memlen);
1314     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1315     
1316     smsg_dynamic_list = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1317
1318     smsg_dynamic_list->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1319     bzero(smsg_dynamic_list->addr, smsg_memlen*smsg_expand_slots);
1320     
1321     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_dynamic_list->addr,
1322             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1323             GNI_MEM_READWRITE,   
1324             vmdh_index,
1325             &(smsg_dynamic_list->mdh));
1326    smsg_available_slot = 0;  
1327 }
1328
1329 static void _init_static_smsg()
1330 {
1331     gni_smsg_attr_t      *smsg_attr;
1332     gni_smsg_attr_t      remote_smsg_attr;
1333     gni_smsg_attr_t      *smsg_attr_vec;
1334     gni_mem_handle_t     my_smsg_mdh_mailbox;
1335     int      i;
1336     gni_return_t status;
1337     uint32_t              vmdh_index = -1;
1338     mdh_addr_t            base_infor;
1339     mdh_addr_t            *base_addr_vec;
1340     if(mysize <=4096)
1341     {
1342         SMSG_MAX_MSG = 1024;
1343         //log2_SMSG_MAX_MSG = 10;
1344     }else if (mysize > 4096 && mysize <= 16384)
1345     {
1346         SMSG_MAX_MSG = 512;
1347         //log2_SMSG_MAX_MSG = 9;
1348
1349     }else {
1350         SMSG_MAX_MSG = 256;
1351         //log2_SMSG_MAX_MSG = 8;
1352     }
1353     
1354     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
1355     
1356     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1357     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
1358     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
1359     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
1360     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1361     smsg_mailbox_base = memalign(64, smsg_memlen*(mysize));
1362     _MEMCHECK(smsg_mailbox_base);
1363     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
1364     if (myrank == 0) CmiPrintf("Charm++> allocates %.2fMB for SMSG. \n", smsg_memlen*mysize/1e6);
1365     
1366     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
1367             smsg_memlen*(mysize), smsg_rx_cqh,
1368             GNI_MEM_READWRITE,   
1369             vmdh_index,
1370             &my_smsg_mdh_mailbox);
1371
1372     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1373
1374 #if 1
1375     base_infor.addr =  (uint64_t)smsg_mailbox_base;
1376     base_infor.mdh =  my_smsg_mdh_mailbox;
1377     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
1378
1379     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
1380  
1381     for(i=0; i<mysize; i++)
1382     {
1383         if(i==myrank)
1384             continue;
1385         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1386         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1387         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1388         smsg_attr[i].mbox_offset = i*smsg_memlen;
1389         smsg_attr[i].buff_size = smsg_memlen;
1390         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
1391         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1392     }
1393
1394     for(i=0; i<mysize; i++)
1395     {
1396         if (myrank == i) continue;
1397
1398         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1399         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1400         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1401         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
1402         remote_smsg_attr.buff_size = smsg_memlen;
1403         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
1404         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
1405
1406         /* initialize the smsg channel */
1407         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
1408         GNI_RC_CHECK("SMSG Init", status);
1409     } //end initialization
1410
1411     free(base_addr_vec);
1412 #else
1413     for(i=0; i<mysize; i++)
1414     {
1415         if(i==myrank)
1416             continue;
1417         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1418         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1419         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1420         if(i<myrank)
1421             smsg_attr[i].mbox_offset = i*smsg_memlen;
1422         else
1423             smsg_attr[i].mbox_offset = (i-1)*smsg_memlen;
1424
1425         smsg_attr[i].msg_buffer = smsg_mailbox_base;
1426         smsg_attr[i].buff_size = smsg_memlen;
1427         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1428 #if 0 
1429         if(i<2)
1430         {
1431             CmiPrintf("[assign %d==%d]  offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, (smsg_attr[i]).mbox_offset, (smsg_attr[i]).msg_buffer, (smsg_attr[i]).mbox_maxcredit, (smsg_attr[i]).msg_maxsize, (smsg_attr[i]).buff_size ); 
1432 #endif 
1433         }
1434     }
1435     smsg_attr_vec = (gni_smsg_attr_t*)malloc(mysize * mysize * sizeof(gni_smsg_attr_t));
1436     _MEMCHECK(smsg_attr_vec);
1437  
1438     if(myrank==0)
1439         CmiPrintf("total size=%d\n", mysize*mysize*sizeof(gni_smsg_attr_t));
1440     allgather(smsg_attr, smsg_attr_vec,  mysize*sizeof(gni_smsg_attr_t));
1441     
1442     CmiBarrier();
1443     for(i=0; i<mysize; i++)
1444     {
1445         if (myrank == i) continue;
1446         /* initialize the smsg channel */
1447         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &smsg_attr_vec[i*mysize+myrank]);
1448 #if 0 
1449         if(i<2)
1450         {
1451             CmiPrintf("[%d==%d] SmsgInit rc=%s, offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, gni_err_str[status], (smsg_attr_vec[i*mysize+myrank]).mbox_offset, (smsg_attr_vec[i*mysize+myrank]).msg_buffer, (smsg_attr_vec[i*mysize+myrank]).mbox_maxcredit, (smsg_attr_vec[i*mysize+myrank]).msg_maxsize, (smsg_attr_vec[i*mysize+myrank]).buff_size ); 
1452         }
1453 #endif
1454         GNI_RC_CHECK("SMSG Init", status);
1455     } //end initialization
1456     CmiBarrier();
1457     free(smsg_attr_vec);
1458 #endif
1459     free(smsg_attr);
1460     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
1461     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
1462     smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
1463     for(i =0; i<mysize; i++)
1464     {
1465         smsg_msglist_index[i].next = -1;
1466         smsg_msglist_index[i].head = 0;
1467         smsg_msglist_index[i].tail = 0;
1468     }
1469     smsg_head_index = -1;
1470
1471
1472 static void _init_static_msgq()
1473 {
1474     gni_return_t status;
1475     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
1476     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
1477     msgq_attrs.smsg_q_sz = 1;
1478     msgq_attrs.rcv_pool_sz = 1;
1479     msgq_attrs.num_msgq_eps = 2;
1480     msgq_attrs.nloc_insts = 8;
1481     msgq_attrs.modes = 0;
1482     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
1483
1484     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
1485     GNI_RC_CHECK("MSGQ Init", status);
1486
1487
1488 }
1489
1490 static void _init_DMA_buffer()
1491 {
1492     gni_return_t            status = GNI_RC_SUCCESS;
1493     /*AUTO tuning */
1494     /* suppose max_smsg is 1024, DMA buffer is split into 2048, 4096, 8192, ... */
1495     /*  This method might be better for SMP, but it is bad for Nonsmp since msgs are sharing same slots */
1496     /*
1497      * DMA_slots = 19-log2_SMSG_MAX_MSG;
1498     DMA_incoming_avail_tag = malloc(DMA_slots);
1499     DMA_buffer_size = 2*(DMA_max_single_msg - SMSG_MAX_MSG); 
1500     DMA_incoming_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1501     DMA_outgoing_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1502     
1503     status = GNI_MemRegister(nic_hndl, (uint64_t)DMA_incoming_base_addr,
1504             DMA_buffer_size, smsg_rx_cqh,
1505             GNI_MEM_READWRITE ,   
1506             vmdh_index,
1507             &);
1508             */
1509     // one is reserved to avoid deadlock
1510     DMA_slots           = 17; // each one is 8K  16*8K + 1 slot reserved to avoid deadlock
1511     DMA_buffer_size     = DMA_max_single_msg + 8192;
1512     DMA_buffer_base_mdh_addr.addr = (uint64_t)memalign(ALIGNBUF, DMA_buffer_size);
1513     status = GNI_MemRegister(nic_hndl, DMA_buffer_base_mdh_addr.addr,
1514         DMA_buffer_size, smsg_rx_cqh,
1515         GNI_MEM_READWRITE ,   
1516         -1,
1517         &(DMA_buffer_base_mdh_addr.mdh));
1518     GNI_RC_CHECK("GNI_MemRegister", status);
1519     DMA_buffer_base_mdh_addr_vec = (mdh_addr_t*) malloc(sizeof(mdh_addr_t) * mysize);
1520
1521     allgather(&DMA_buffer_base_mdh_addr, DMA_buffer_base_mdh_addr_vec, sizeof(mdh_addr_t) );
1522 }
1523
1524 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
1525 {
1526     register int            i;
1527     int                     rc;
1528     int                     device_id = 0;
1529     unsigned int            remote_addr;
1530     gni_cdm_handle_t        cdm_hndl;
1531     gni_return_t            status = GNI_RC_SUCCESS;
1532     uint32_t                vmdh_index = -1;
1533     uint8_t                 ptag;
1534     unsigned int            local_addr, *MPID_UGNI_AllAddr;
1535     int                     first_spawned;
1536     int                     physicalID;
1537     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
1538     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
1539     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
1540    
1541     //Mempool_MaxSize = CmiGetArgFlag(*argv, "+useMemorypoolSize");
1542     useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
1543     if (myrank==0) 
1544     {
1545         if(useDynamicSMSG) 
1546             CmiPrintf("Charm++> use Dynamic SMSG\n"); 
1547         else 
1548             CmiPrintf("Charm++> use Static SMSG\n");
1549     };
1550     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
1551     
1552     status = PMI_Init(&first_spawned);
1553     GNI_RC_CHECK("PMI_Init", status);
1554
1555     status = PMI_Get_size(&mysize);
1556     GNI_RC_CHECK("PMI_Getsize", status);
1557
1558     status = PMI_Get_rank(&myrank);
1559     GNI_RC_CHECK("PMI_getrank", status);
1560
1561     //physicalID = CmiPhysicalNodeID(myrank);
1562     
1563     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
1564
1565     *myNodeID = myrank;
1566     *numNodes = mysize;
1567   
1568     if(myrank == 0)
1569     {
1570         printf("Charm++> Running on Gemini (GNI) using %d  cores\n", mysize);
1571     }
1572 #ifdef USE_ONESIDED
1573     onesided_init(NULL, &onesided_hnd);
1574
1575     // this is a GNI test, so use the libonesided bypass functionality
1576     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
1577     local_addr = gniGetNicAddress();
1578 #else
1579     ptag = get_ptag();
1580     cookie = get_cookie();
1581 #if 0
1582     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
1583 #endif
1584     //Create and attach to the communication  domain */
1585     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
1586     GNI_RC_CHECK("GNI_CdmCreate", status);
1587     //* device id The device id is the minor number for the device
1588     //that is assigned to the device by the system when the device is created.
1589     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
1590     //where X is the device number 0 default 
1591     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
1592     GNI_RC_CHECK("GNI_CdmAttach", status);
1593     local_addr = get_gni_nic_address(0);
1594 #endif
1595     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
1596     _MEMCHECK(MPID_UGNI_AllAddr);
1597     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
1598     /* create the local completion queue */
1599     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
1600     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
1601     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
1602     
1603     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
1604     //GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
1605     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
1606
1607     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
1608     GNI_RC_CHECK("Create CQ (rx)", status);
1609     
1610     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
1611     //GNI_RC_CHECK("Create Post CQ (rx)", status);
1612     
1613     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
1614     //GNI_RC_CHECK("Create BTE CQ", status);
1615
1616     /* create the endpoints. they need to be bound to allow later CQWrites to them */
1617     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
1618     _MEMCHECK(ep_hndl_array);
1619
1620     for (i=0; i<mysize; i++) {
1621         if(i == myrank) continue;
1622         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
1623         GNI_RC_CHECK("GNI_EpCreate ", status);   
1624         remote_addr = MPID_UGNI_AllAddr[i];
1625         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
1626         GNI_RC_CHECK("GNI_EpBind ", status);   
1627     }
1628     /* Depending on the number of cores in the job, decide different method */
1629     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
1630     if(mysize > 1)
1631     {
1632         if(useDynamicSMSG == 0)
1633         {
1634             _init_static_smsg();
1635         }else
1636         {
1637             _init_dynamic_smsg();
1638         }
1639     }
1640 #if     USE_LRTS_MEMPOOL
1641     CmiGetArgLong(*argv, "+useMemorypoolSize", &_mempool_size);
1642     if (myrank==0) CmiPrintf("Charm++> use memorypool size: %1.fMB\n", _mempool_size/1024.0/1024);
1643     init_mempool(_mempool_size);
1644     //init_mempool(Mempool_MaxSize);
1645 #endif
1646
1647     /* init DMA buffer for medium message */
1648
1649     //_init_DMA_buffer();
1650     
1651     free(MPID_UGNI_AllAddr);
1652 }
1653
1654
1655 void* LrtsAlloc(int n_bytes, int header)
1656 {
1657     void *ptr;
1658 #if 0
1659     CmiPrintf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d\n", CmiMyPe(), n_bytes, header);
1660 #endif
1661     if(n_bytes <= SMSG_MAX_MSG)
1662     {
1663         int totalsize = n_bytes+header;
1664         ptr = malloc(totalsize);
1665     }else 
1666     {
1667
1668         CmiAssert(header <= ALIGNBUF);
1669 #if     USE_LRTS_MEMPOOL
1670         n_bytes = ALIGN64(n_bytes);
1671         char *res = syh_mempool_malloc(ALIGNBUF+n_bytes);
1672 #else
1673         n_bytes = ALIGN4(n_bytes);           /* make sure size if 4 aligned */
1674         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1675 #endif
1676         ptr = res + ALIGNBUF - header;
1677     }
1678 #if 0 
1679     CmiPrintf("Done Alloc Lrts for bytes=%d, head=%d\n", n_bytes, header);
1680 #endif
1681     return ptr;
1682 }
1683
1684 void  LrtsFree(void *msg)
1685 {
1686     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1687     if (size <= SMSG_MAX_MSG)
1688       free(msg);
1689     else
1690     {
1691 #if 0
1692         CmiPrintf("[PE:%d] Free lrts for bytes=%d, ptr=%p\n", CmiMyPe(), size, (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1693 #endif
1694 #if     USE_LRTS_MEMPOOL
1695         syh_mempool_free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1696 #else
1697         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1698 #endif
1699     }
1700 #if 0 
1701     CmiPrintf("Done Free lrts for bytes=%d\n", size);
1702 #endif
1703 }
1704
1705 static void LrtsExit()
1706 {
1707     /* free memory ? */
1708 #if     USE_LRTS_MEMPOOL
1709     kill_allmempool();
1710 #endif
1711     PMI_Finalize();
1712     exit(0);
1713 }
1714
1715 static void LrtsDrainResources()
1716 {
1717     while (!SendBufferMsg()) {
1718         PumpNetworkSmsg();
1719         PumpNetworkRdmaMsgs();
1720         PumpLocalSmsgTransactions();
1721         PumpLocalRdmaTransactions();
1722     }
1723     PMI_Barrier();
1724 }
1725
1726 void CmiAbort(const char *message) {
1727
1728     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1729     PMI_Abort(-1, message);
1730 }
1731
1732 /**************************  TIMER FUNCTIONS **************************/
1733 #if CMK_TIMER_USE_SPECIAL
1734 /* MPI calls are not threadsafe, even the timer on some machines */
1735 static CmiNodeLock  timerLock = 0;
1736 static int _absoluteTime = 0;
1737 static int _is_global = 0;
1738 static struct timespec start_ns;
1739
1740 inline int CmiTimerIsSynchronized() {
1741     return 1;
1742 }
1743
1744 inline int CmiTimerAbsolute() {
1745     return _absoluteTime;
1746 }
1747
1748 double CmiStartTimer() {
1749     return 0.0;
1750 }
1751
1752 double CmiInitTime() {
1753     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
1754 }
1755
1756 void CmiTimerInit(char **argv) {
1757     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1758     if (_absoluteTime && CmiMyPe() == 0)
1759         printf("Charm++> absolute  timer is used\n");
1760     
1761     _is_global = CmiTimerIsSynchronized();
1762
1763
1764     if (_is_global) {
1765         if (CmiMyRank() == 0) {
1766             clock_gettime(CLOCK_MONOTONIC, &start_ts)
1767         }
1768     } else { /* we don't have a synchronous timer, set our own start time */
1769         CmiBarrier();
1770         CmiBarrier();
1771         CmiBarrier();
1772         clock_gettime(CLOCK_MONOTONIC, &start_ts)
1773     }
1774     CmiNodeAllBarrier();          /* for smp */
1775 }
1776
1777 /**
1778  * Since the timerLock is never created, and is
1779  * always NULL, then all the if-condition inside
1780  * the timer functions could be disabled right
1781  * now in the case of SMP.
1782  */
1783 double CmiTimer(void) {
1784     struct timespec now_ts;
1785     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1786     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1787         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1788 }
1789
1790 double CmiWallTimer(void) {
1791     struct timespec now_ts;
1792     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1793     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1794         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1795 }
1796
1797 double CmiCpuTimer(void) {
1798     struct timespec now_ts;
1799     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1800     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1801         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1802 }
1803
1804 #endif
1805 /************Barrier Related Functions****************/
1806
1807 int CmiBarrier()
1808 {
1809     int status;
1810     status = PMI_Barrier();
1811     return status;
1812
1813 }
1814
1815
1816 #if CMK_PERSISTENT_COMM
1817 #include "machine-persistent.c"
1818 #endif
1819
1820