c1d6395a3f992519ec6901c9f2eb203d13aad163
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <stdint.h>
16 #include <errno.h>
17 #include <malloc.h>
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #if defined(_WIN32) && ! defined(__CYGWIN__)
26 #include <windows.h>
27 #include <wincon.h>
28 #include <sys/types.h>
29 #include <sys/timeb.h>
30
31 static void sleep(int secs) {
32     Sleep(1000*secs);
33 }
34 #else
35 #include <unistd.h> /*For getpid()*/
36 #endif
37
38 #define REMOTE_EVENT         0
39 #define USE_LRTS_MEMPOOL   1
40
41 #if USE_LRTS_MEMPOOL
42 static size_t _mempool_size = 1024ll*1024*32;
43 #endif
44
45 #define PRINT_SYH  0
46
47 #if PRINT_SYH
48 int         lrts_smsg_success = 0;
49 int         lrts_send_msg_id = 0;
50 int         lrts_send_rdma_id = 0;
51 int         lrts_send_rdma_success = 0;
52 int         lrts_received_msg = 0;
53 int         lrts_local_done_msg = 0;
54 #endif
55
56 #include "machine.h"
57
58 #include "pcqueue.h"
59
60 #if CMK_PERSISTENT_COMM
61 #include "machine-persistent.h"
62 #endif
63
64 //#define  USE_ONESIDED 1
65 #ifdef USE_ONESIDED
66 //onesided implementation is wrong, since no place to restore omdh
67 #include "onesided.h"
68 onesided_hnd_t   onesided_hnd;
69 onesided_md_t    omdh;
70 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
71
72 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
73
74 #else
75 uint8_t   onesided_hnd, omdh;
76 #if REMOTE_EVENT
77 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
78 #else
79 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
80 #endif
81 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
82 #endif
83
84 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
85 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
86
87 #define ALIGNBUF                64
88
89 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
90 /* If SMSG is not used */
91 #define FMA_PER_CORE  1024
92 #define FMA_BUFFER_SIZE 1024
93 /* If SMSG is used */
94 static int  SMSG_MAX_MSG;
95 //static int  log2_SMSG_MAX_MSG;
96 #define SMSG_MAX_CREDIT  36
97
98 #define MSGQ_MAXSIZE       2048
99 /* large message transfer with FMA or BTE */
100 #define LRTS_GNI_RDMA_THRESHOLD  2048
101
102 #define REMOTE_QUEUE_ENTRIES  2048 
103 #define LOCAL_QUEUE_ENTRIES   64 
104
105 #define PUT_DONE_TAG      0x29
106 #define ACK_TAG           0x30
107 /* SMSG is data message */
108 #define SMALL_DATA_TAG          0x31
109 /* SMSG is a control message to initialize a BTE */
110 #define MEDIUM_HEAD_TAG         0x32
111 #define MEDIUM_DATA_TAG         0x33
112 #define LMSG_INIT_TAG     0x39 
113
114 #define DEBUG
115 #ifdef GNI_RC_CHECK
116 #undef GNI_RC_CHECK
117 #endif
118 #ifdef DEBUG
119 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
120 #else
121 #define GNI_RC_CHECK(msg,rc)
122 #endif
123
124 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
125 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
126
127 static int Mempool_MaxSize = 1024*1024*128;
128 static int useStaticSMSG   = 1;
129 static int useStaticMSGQ = 0;
130 static int useStaticFMA = 0;
131 static int mysize, myrank;
132 static gni_nic_handle_t      nic_hndl;
133
134
135 static void             *smsg_mailbox_base;
136 gni_msgq_attr_t         msgq_attrs;
137 gni_msgq_handle_t       msgq_handle;
138 gni_msgq_ep_attr_t      msgq_ep_attrs;
139 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
140
141 /* preallocated memory buffer for FMA for short message and control message */
142 typedef struct {
143     gni_mem_handle_t mdh;
144     uint64_t addr;
145 } mdh_addr_t;
146
147
148 /* preallocated DMA buffer */
149 int                     DMA_slots;
150 uint64_t                DMA_avail_tag = 0;
151 uint32_t                DMA_incoming_avail_tag = 0;
152 uint32_t                DMA_outgoing_avail_tag = 0;
153 void                    *DMA_incoming_base_addr;
154 void                    *DMA_outgoing_base_addr;
155 mdh_addr_t              DMA_buffer_base_mdh_addr;
156 mdh_addr_t              *DMA_buffer_base_mdh_addr_vec;
157 int                     DMA_buffer_size;
158 int                     DMA_max_single_msg = 131072;//524288 ;
159
160 #define                 DMA_SIZE_PER_SLOT       8192
161
162
163 typedef struct dma_msgid_map
164 {
165     uint64_t     msg_id;
166     int     msg_subid
167 } dma_msgid_map_t;
168
169 dma_msgid_map_t         *dma_map_list;
170
171 typedef struct msg_trace
172 {
173     uint64_t    msg_id;
174     int         done_num;
175 }msg_trace_t;
176
177 msg_trace_t             *pending_msg_list;
178 /* =====Beginning of Declarations of Machine Specific Variables===== */
179 static int cookie;
180 static int modes = 0;
181 static gni_cq_handle_t       smsg_rx_cqh = NULL;
182 static gni_cq_handle_t       smsg_tx_cqh = NULL;
183 static gni_cq_handle_t       post_rx_cqh = NULL;
184 static gni_cq_handle_t       post_tx_cqh = NULL;
185 static gni_ep_handle_t       *ep_hndl_array;
186
187 typedef    struct  pending_smg
188 {
189     int     inst_id;
190     struct  pending_smg *next;
191 } PENDING_GETNEXT;
192
193
194 typedef struct msg_list
195 {
196     uint32_t destNode;
197     uint32_t size;
198     void *msg;
199     struct msg_list *next;
200     struct msg_list *pehead_next;
201     uint8_t tag;
202 }MSG_LIST;
203
204 typedef struct medium_msg_list
205 {
206     uint32_t destNode;
207     uint32_t msg_id;
208     uint32_t msg_subid;
209     uint32_t remain_size;
210     void *msg;
211     struct medium_msg_list *next;
212 }MEDIUM_MSG_LIST;
213
214
215 typedef struct control_msg
216 {
217     uint64_t            source_addr;
218     int                 source;               /* source rank */
219     int                 length;
220     gni_mem_handle_t    source_mem_hndl;
221     struct control_msg *next;
222 }CONTROL_MSG;
223
224 typedef struct medium_msg_control
225 {
226     uint64_t            dma_offset;     //the dma_buffer for this block of msg
227     int                 msg_id;         //Id for the total index
228     int                 msg_subid;      //offset inside the message id 
229 }MEDIUM_MSG_CONTROL;
230
231 typedef struct  rmda_msg
232 {
233     int                   destNode;
234     gni_post_descriptor_t *pd;
235     struct  rmda_msg      *next;
236 }RDMA_REQUEST;
237
238 typedef struct  msg_list_index
239 {
240     int         next;
241     MSG_LIST    *head;
242     MSG_LIST    *tail;
243 } MSG_LIST_INDEX;
244
245 /* reuse PendingMsg memory */
246 static CONTROL_MSG          *control_freelist=0;
247 static MSG_LIST             *msglist_freelist=0;
248 static int                  smsg_head_index;
249 static MSG_LIST_INDEX       *smsg_msglist_index= 0;
250 static MSG_LIST             *smsg_free_head=0;
251 static MSG_LIST             *smsg_free_tail=0;
252
253 /*
254 #define FreeMsgList(msg_head, msg_tail, free_head, free_tail)       \
255     if(free_head == 0)  free_head = free_tail = msg_head;    \
256     else   free_tail = free_tail->next;    \
257     if( msg_head->next == msg_tail) msg_head =0;   \
258     else msg_head= msg_head->next;    
259
260 #define MallocMsgList(d, msg_head, msg_tail, free_head, free_tail, msgsize) \
261     if(free_head == 0) {d= malloc(msgsize);  \
262         if(msg_head == 0)   msg_head =msg_tail = msg_head->next = msg_tail->next = d; \
263         else { msg_tail->next = d; d->next = msg_head; msg_tail=d;} \
264     }else {d = free_head; free_head = free_head->next; if(free_tail->next == free_head) free_head =0;} \
265 */
266
267 #define FreeMsgList(d)  \
268   (d)->next = msglist_freelist;\
269   msglist_freelist = d;
270
271
272
273 #define MallocMsgList(d) \
274   d = msglist_freelist;\
275   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
276              _MEMCHECK(d);\
277   } else msglist_freelist = d->next;
278
279
280 #define FreeControlMsg(d)       \
281   (d)->next = control_freelist;\
282   control_freelist = d;
283
284 #define MallocControlMsg(d) \
285   d = control_freelist;\
286   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
287              _MEMCHECK(d);\
288   } else control_freelist = d->next;
289
290
291 static RDMA_REQUEST         *rdma_freelist = NULL;
292
293 #define FreeControlMsg(d)       \
294   (d)->next = control_freelist;\
295   control_freelist = d;
296
297 #define MallocControlMsg(d) \
298   d = control_freelist;\
299   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
300              _MEMCHECK(d);\
301   } else control_freelist = d->next;
302
303 #define FreeMediumControlMsg(d)       \
304   (d)->next = medium_control_freelist;\
305   medium_control_freelist = d;
306
307
308 #define MallocMediumControlMsg(d) \
309     d = medium_control_freelist;\
310     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
311     _MEMCHECK(d);\
312 } else mediumcontrol_freelist = d->next;
313
314 #define FreeRdmaRequest(d)       \
315   (d)->next = rdma_freelist;\
316   rdma_freelist = d;
317
318 #define MallocRdmaRequest(d) \
319   d = rdma_freelist;\
320   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
321              _MEMCHECK(d);\
322   } else rdma_freelist = d->next;
323
324 /* reuse gni_post_descriptor_t */
325 static gni_post_descriptor_t *post_freelist=0;
326
327 #if 1
328 #define FreePostDesc(d)       \
329     (d)->next_descr = post_freelist;\
330     post_freelist = d;
331
332 #define MallocPostDesc(d) \
333   d = post_freelist;\
334   if (d==0) { \
335      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
336      _MEMCHECK(d);\
337   } else post_freelist = d->next_descr;
338 #else
339
340 #define FreePostDesc(d)     free(d);
341 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
342
343 #endif
344
345 static PENDING_GETNEXT     *pending_smsg_head = 0;
346 static PENDING_GETNEXT     *pending_smsg_tail = 0;
347
348 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
349 static int      buffered_smsg_counter = 0;
350
351 /* SmsgSend return success but message sent is not confirmed by remote side */
352
353 static RDMA_REQUEST  *pending_rdma_head = 0;
354 static RDMA_REQUEST  *pending_rdma_tail = 0;
355 static MSG_LIST *buffered_fma_head = 0;
356 static MSG_LIST *buffered_fma_tail = 0;
357
358 /* functions  */
359 #define IsFree(a,ind)  !( a& (1<<(ind) ))
360 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
361 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
362
363 #include "mempool.c"
364
365 /* get the upper bound of log 2 */
366 int mylog2(int size)
367 {
368     int op = size;
369     unsigned int ret=0;
370     unsigned int mask = 0;
371     int i;
372     while(op>0)
373     {
374         op = op >> 1;
375         ret++;
376
377     }
378     for(i=1; i<ret; i++)
379     {
380         mask = mask << 1;
381         mask +=1;
382     }
383
384     ret -= ((size &mask) ? 0:1);
385     return ret;
386 }
387
388 static void
389 allgather(void *in,void *out, int len)
390 {
391     static int *ivec_ptr=NULL,already_called=0,job_size=0;
392     int i,rc;
393     int my_rank;
394     char *tmp_buf,*out_ptr;
395
396     if(!already_called) {
397
398         rc = PMI_Get_size(&job_size);
399         CmiAssert(rc == PMI_SUCCESS);
400         rc = PMI_Get_rank(&my_rank);
401         CmiAssert(rc == PMI_SUCCESS);
402
403         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
404         CmiAssert(ivec_ptr != NULL);
405
406         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
407         CmiAssert(rc == PMI_SUCCESS);
408
409         already_called = 1;
410
411     }
412
413     tmp_buf = (char *)malloc(job_size * len);
414     CmiAssert(tmp_buf);
415
416     rc = PMI_Allgather(in,tmp_buf,len);
417     CmiAssert(rc == PMI_SUCCESS);
418
419     out_ptr = out;
420
421     for(i=0;i<job_size;i++) {
422
423         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
424
425     }
426
427     free(tmp_buf);
428 }
429 static void
430 allgather_2(void *in,void *out, int len)
431 {
432     //PMI_Allgather is out of order
433     int i,rc, extend_len;
434     int  rank_index;
435     char *out_ptr, *out_ref;
436     char *in2;
437
438     extend_len = sizeof(int) + len;
439     in2 = (char*)malloc(extend_len);
440
441     memcpy(in2, &myrank, sizeof(int));
442     memcpy(in2+sizeof(int), in, len);
443
444     out_ptr = (char*)malloc(mysize*extend_len);
445
446     rc = PMI_Allgather(in2, out_ptr, extend_len);
447     GNI_RC_CHECK("allgather", rc);
448
449     out_ref = out;
450
451     for(i=0;i<mysize;i++) {
452         //rank index 
453         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
454         //copy to the rank index slot
455         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
456     }
457
458     free(out_ptr);
459     free(in2);
460
461 }
462
463 static unsigned int get_gni_nic_address(int device_id)
464 {
465     unsigned int address, cpu_id;
466     gni_return_t status;
467     int i, alps_dev_id=-1,alps_address=-1;
468     char *token, *p_ptr;
469
470     p_ptr = getenv("PMI_GNI_DEV_ID");
471     if (!p_ptr) {
472         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
473        
474         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
475     } else {
476         while ((token = strtok(p_ptr,":")) != NULL) {
477             alps_dev_id = atoi(token);
478             if (alps_dev_id == device_id) {
479                 break;
480             }
481             p_ptr = NULL;
482         }
483         CmiAssert(alps_dev_id != -1);
484         p_ptr = getenv("PMI_GNI_LOC_ADDR");
485         CmiAssert(p_ptr != NULL);
486         i = 0;
487         while ((token = strtok(p_ptr,":")) != NULL) {
488             if (i == alps_dev_id) {
489                 alps_address = atoi(token);
490                 break;
491             }
492             p_ptr = NULL;
493             ++i;
494         }
495         CmiAssert(alps_address != -1);
496         address = alps_address;
497     }
498     return address;
499 }
500
501 static uint8_t get_ptag(void)
502 {
503     char *p_ptr, *token;
504     uint8_t ptag;
505
506     p_ptr = getenv("PMI_GNI_PTAG");
507     CmiAssert(p_ptr != NULL);
508     token = strtok(p_ptr, ":");
509     ptag = (uint8_t)atoi(token);
510     return ptag;
511         
512 }
513
514 static uint32_t get_cookie(void)
515 {
516     uint32_t cookie;
517     char *p_ptr, *token;
518
519     p_ptr = getenv("PMI_GNI_COOKIE");
520     CmiAssert(p_ptr != NULL);
521     token = strtok(p_ptr, ":");
522     cookie = (uint32_t)atoi(token);
523
524     return cookie;
525 }
526
527 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
528 /* TODO: add any that are related */
529 /* =====End of Definitions of Message-Corruption Related Macros=====*/
530
531
532 #include "machine-lrts.h"
533 #include "machine-common-core.c"
534
535 /* Network progress function is used to poll the network when for
536    messages. This flushes receive buffers on some  implementations*/
537 #if CMK_MACHINE_PROGRESS_DEFINED
538 void CmiMachineProgressImpl() {
539 }
540 #endif
541
542 inline
543 static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
544 {
545     MSG_LIST        *msg_tmp;
546     MallocMsgList(msg_tmp);
547     msg_tmp->destNode = destNode;
548     msg_tmp->size   = size;
549     msg_tmp->msg    = msg;
550     msg_tmp->tag    = tag;
551     msg_tmp->next   = 0;
552     if (smsg_msglist_index[destNode].head == 0 ) {
553         smsg_msglist_index[destNode].head = msg_tmp;
554         smsg_msglist_index[destNode].next = smsg_head_index;
555         smsg_head_index = destNode;
556     }
557     else {
558       (smsg_msglist_index[destNode].tail)->next    = msg_tmp;
559     }
560     smsg_msglist_index[destNode].tail          = msg_tmp;
561 #if PRINT_SYH
562     buffered_smsg_counter++;
563 #endif
564 }
565
566 // messages smaller than max single SMSG
567 inline
568 static void send_small_messages(int destNode, int size, char *msg)
569 {
570     gni_return_t        status  =   GNI_RC_SUCCESS;
571     const uint8_t       tag_data    = SMALL_DATA_TAG;
572     
573     if(smsg_msglist_index[destNode].head == 0)
574     {
575         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, SMALL_DATA_TAG);
576         if (status == GNI_RC_SUCCESS)
577         {
578 #if PRINT_SYH
579             lrts_smsg_success++;
580             if(lrts_smsg_success == lrts_send_msg_id)
581                 CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
582             else
583                 CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
584 #endif
585             CmiFree(msg);
586             return;
587         }else if (status == GNI_RC_INVALID_PARAM)
588         {
589             CmiAbort("SmsgSen fails\n");
590         }
591     }
592     //buffer this message when buffer_msg_head!=0 or send fails
593     delay_send_small_msg(msg, size, destNode, SMALL_DATA_TAG);
594 }
595
596 // Get first 0 in DMA_tags starting from index
597 static int get_first_avail_bit(uint64_t DMA_tags, int start_index)
598 {
599
600     uint64_t         mask = 0x1;
601     register    int     i=0;
602     while((DMA_tags & mask) && i<DMA_slots) {mask << 1; i++;}
603
604 }
605
606 static int send_medium_messages(int destNode, int size, char *msg)
607 {
608 #if 0
609     gni_return_t status = GNI_RC_SUCCESS;
610     int first_avail_bit=0;
611     uint64_t mask = 0x1;
612     MEDIUM_MSG_CONTROL  *medium_msg_control_tmp;
613     MEDIUM_MSG_LIST        *msg_tmp;
614     int blocksize, remain_size, pos;
615     int sub_id = 0;
616     remain_size = size;
617     pos = 0;  //offset before which data are sent
618     /* copy blocks of the message to DMA preallocated buffer and send SMSG */
619     //Check whether there is any available DMA buffer
620     
621     do{
622         while((DMA_avail_tag & mask) && first_avail_bit<DMA_slots) {mask << 1; first_avail_bit++;}
623         if(first_avail_bit == DMA_slots) //No available DMA, buffer this message
624         {
625             MallocMediumMsgList(msg_tmp);
626             msg_tmp->destNode = destNode;
627             msg_tmp->msg_id   = lrts_send_msg_id;
628             msg_tmp->msg_subid   = sub_id;
629             msg_tmp->size   = remain_size;
630             msg_tmp->msg    = msg+pos;
631             msg_tmp->next   = NULL;
632             break;
633         }else
634         {
635             //copy this part of the message into this DMA buffer
636             //TODO optimize here, some data can go with this SMSG
637             blocksize = (remain_size>DMA_SIZE_PER_SLOT)?DMA_SIZE_PER_SLOT: remain_size;
638             memcpy(DMA_buffer_base_mdh_addr.addr[first_avail_bit], msg+pos, blocksize);
639             pos += blocksize;
640             remain_size -= blocksize;
641             SET_BITS(DMA_avail_tag, first_avail_bit);
642            
643             MallocMediumControlMsg(medium_msg_control_tmp);
644             medium_msg_control_tmp->msg_id = lrts_send_msg_id;
645             medium_msg_control_tmp->msg_subid = sub_id;
646             if(status == GNI_RC_SUCCESS)
647             {
648                 if(sub_id==0)
649                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_HEAD_TAG);
650                 else
651                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_DATA_TAG);
652             }
653             //buffer this smsg
654             if(status != GNI_RC_SUCCESS)
655             {
656                 delay_send_small_msg(medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), destNode, MEDIUM_HEAD_TAG);
657             }
658             sub_id++;
659         }while(remain_size > 0 );
660
661         }
662     }
663 #endif
664 }
665
666 // Large message, send control to receiver, receiver register memory and do a GET 
667 inline
668 static int send_large_messages(int destNode, int size, char *msg)
669 {
670 #if     USE_LRTS_MEMPOOL
671     gni_return_t        status  =   GNI_RC_SUCCESS;
672     CONTROL_MSG         *control_msg_tmp;
673     uint32_t            vmdh_index  = -1;
674     /* construct a control message and send */
675     MallocControlMsg(control_msg_tmp);
676     control_msg_tmp->source_addr    = (uint64_t)msg;
677     control_msg_tmp->source         = myrank;
678     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
679     //memcpy( &(control_msg_tmp->source_mem_hndl), GetMemHndl(msg), sizeof(gni_mem_handle_t)) ;
680     control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
681 #if PRINT_SYH
682     lrts_send_msg_id++;
683     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
684 #endif
685     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
686     if(status == GNI_RC_SUCCESS)
687     {
688 #if PRINT_SYH
689         lrts_smsg_success++;
690         if(lrts_smsg_success == lrts_send_msg_id)
691             CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
692         else
693             CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
694 #endif
695         FreeControlMsg(control_msg_tmp);
696         return 1;
697     }
698
699     delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
700     return 0;
701 // NOT use mempool, should slow 
702 #else
703     gni_return_t        status  =   GNI_RC_SUCCESS;
704     CONTROL_MSG         *control_msg_tmp;
705     uint32_t            vmdh_index  = -1;
706     /* construct a control message and send */
707     MallocControlMsg(control_msg_tmp);
708     control_msg_tmp->source_addr    = (uint64_t)msg;
709     control_msg_tmp->source         = myrank;
710     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
711     control_msg_tmp->source_mem_hndl.qword1 = 0;
712     control_msg_tmp->source_mem_hndl.qword2 = 0;
713 #if PRINT_SYH
714     lrts_send_msg_id++;
715     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
716 #endif
717     if(smsg_msglist_index[destNode].head == 0 )
718     {
719         status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
720         if(status == GNI_RC_SUCCESS)
721         {
722             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
723             if(status == GNI_RC_SUCCESS)
724             {
725 #if PRINT_SYH
726                 lrts_smsg_success++;
727                 if(lrts_smsg_success == lrts_send_msg_id)
728                     CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
729                 else
730                     CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
731 #endif
732                 FreeControlMsg(control_msg_tmp);
733                 return 1;
734             }
735
736         } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
737         {
738             CmiAbort("Memory registor for large msg\n");
739         }
740     }
741     delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
742     return 0;
743 #endif
744 }
745
746 inline void LrtsPrepareEnvelope(char *msg, int size)
747 {
748     CmiSetMsgSize(msg, size);
749 }
750
751 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
752 {
753     LrtsPrepareEnvelope(msg, size);
754
755     if(size <= SMSG_MAX_MSG)
756     {
757 #if PRINT_SYH
758         lrts_send_msg_id++;
759         CmiPrintf("SMSG LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_msg_id);
760 #endif
761         send_small_messages(destNode, size, msg);
762     }
763     else
764     {
765         send_large_messages(destNode, size, msg);
766     }
767     return 0;
768 }
769
770 static void LrtsPreCommonInit(int everReturn){}
771
772 /* Idle-state related functions: called in non-smp mode */
773 void CmiNotifyIdleForGemini(void) {
774     LrtsAdvanceCommunication();
775 }
776
777 static void LrtsPostCommonInit(int everReturn)
778 {
779 #if CMK_SMP
780     CmiIdleState *s=CmiNotifyGetState();
781     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
782     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
783 #else
784     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForGemini,NULL);
785 #endif
786
787 }
788
789
790 void LrtsPostNonLocal(){}
791 /* pooling CQ to receive network message */
792 static void PumpNetworkRdmaMsgs()
793 {
794     gni_cq_entry_t      event_data;
795     gni_return_t        status;
796     while( (status = GNI_CqGetEvent(post_rx_cqh, &event_data)) == GNI_RC_SUCCESS);
797 }
798
799 static int SendRdmaMsg();
800 static void getLargeMsgRequest(void* header, uint64_t inst_id);
801 static void PumpNetworkSmsg()
802 {
803     uint64_t            inst_id;
804     PENDING_GETNEXT     *pending_next;
805     int                 ret;
806     gni_cq_entry_t      event_data;
807     gni_return_t        status;
808     void                *header;
809     uint8_t             msg_tag;
810     int                 msg_nbytes;
811     void                *msg_data;
812     gni_mem_handle_t    msg_mem_hndl;
813  
814
815     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
816     {
817         inst_id = GNI_CQ_GET_INST_ID(event_data);
818         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
819 #if PRINT_SYH
820         CmiPrintf("[%d] PumpNetworkMsgs small msgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
821 #endif
822         msg_tag = GNI_SMSG_ANY_TAG;
823         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
824         {
825 #if PRINT_SYH
826             lrts_received_msg++;
827             CmiPrintf("+++[%d] PumpNetwork msg is received, messageid:%d tag=%d\n", myrank, lrts_received_msg, msg_tag);
828 #endif
829             /* copy msg out and then put into queue (small message) */
830             switch (msg_tag) {
831             case SMALL_DATA_TAG:
832             {
833                 msg_nbytes = CmiGetMsgSize(header);
834                 msg_data    = CmiAlloc(msg_nbytes);
835                 memcpy(msg_data, (char*)header, msg_nbytes);
836                 handleOneRecvedMsg(msg_nbytes, msg_data);
837                 break;
838             }
839             case LMSG_INIT_TAG:
840             {
841 #if PRINT_SYH
842                 CmiPrintf("+++[%d] from %d PumpNetwork Rdma Request msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
843 #endif
844                 getLargeMsgRequest(header, inst_id);
845                 break;
846             }
847             case ACK_TAG:
848             {
849                 /* Get is done, release message . Now put is not used yet*/
850 #if         !USE_LRTS_MEMPOOL
851                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
852 #endif
853                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
854                 SendRdmaMsg();
855                 break;
856             }
857             case PUT_DONE_TAG: //persistent message
858             {
859                 handleOneRecvedMsg(((CONTROL_MSG *) header)->length,((void*) (CONTROL_MSG *) header)); 
860                 break;
861             }
862             default: {
863                 CmiPrintf("weird tag problem\n");
864                 CmiAbort("Unknown tag\n");
865             }
866             }
867             GNI_SmsgRelease(ep_hndl_array[inst_id]);
868             msg_tag = GNI_SMSG_ANY_TAG;
869         } //endwhile getNext
870     }   //end while GetEvent
871     if(status == GNI_RC_ERROR_RESOURCE)
872     {
873         GNI_RC_CHECK("Smsg_rx_cq full", status);
874     }
875 }
876
877 static void getLargeMsgRequest(void* header, uint64_t inst_id)
878 {
879 #if     USE_LRTS_MEMPOOL
880     CONTROL_MSG         *request_msg;
881     gni_return_t        status;
882     void                *msg_data;
883     gni_post_descriptor_t *pd;
884     RDMA_REQUEST        *rdma_request_msg;
885     gni_mem_handle_t    msg_mem_hndl;
886     int source;
887     // initial a get to transfer data from the sender side */
888     request_msg = (CONTROL_MSG *) header;
889     source = request_msg->source;
890     msg_data = CmiAlloc(request_msg->length);
891     _MEMCHECK(msg_data);
892     //memcpy(&msg_mem_hndl, GetMemHndl(msg_data), sizeof(gni_mem_handle_t));
893     msg_mem_hndl = GetMemHndl(msg_data);
894
895     MallocPostDesc(pd);
896     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
897         pd->type            = GNI_POST_FMA_GET;
898     else
899         pd->type            = GNI_POST_RDMA_GET;
900 #if REMOTE_EVENT
901     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
902 #else
903     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
904 #endif
905     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
906     pd->length          = ALIGN4(request_msg->length);
907     pd->local_addr      = (uint64_t) msg_data;
908     pd->local_mem_hndl  = msg_mem_hndl;
909     pd->remote_addr     = request_msg->source_addr;
910     pd->remote_mem_hndl = request_msg->source_mem_hndl;
911     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
912     pd->rdma_mode       = 0;
913
914     if(pd->type == GNI_POST_RDMA_GET) 
915         status = GNI_PostRdma(ep_hndl_array[source], pd);
916     else
917         status = GNI_PostFma(ep_hndl_array[source],  pd);
918     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
919     {
920         MallocRdmaRequest(rdma_request_msg);
921         rdma_request_msg->next = 0;
922         rdma_request_msg->destNode = inst_id;
923         rdma_request_msg->pd = pd;
924         if(pending_rdma_head == 0)
925         {
926             pending_rdma_head = rdma_request_msg;
927         }else
928         {
929             pending_rdma_tail->next = rdma_request_msg;
930         }
931         pending_rdma_tail = rdma_request_msg;
932     }else
933         GNI_RC_CHECK("AFter posting", status);
934
935 #else
936     CONTROL_MSG         *request_msg;
937     gni_return_t        status;
938     void                *msg_data;
939     gni_post_descriptor_t *pd;
940     RDMA_REQUEST        *rdma_request_msg;
941     gni_mem_handle_t    msg_mem_hndl;
942     int source;
943     // initial a get to transfer data from the sender side */
944     request_msg = (CONTROL_MSG *) header;
945     source = request_msg->source;
946     msg_data = CmiAlloc(request_msg->length);
947     _MEMCHECK(msg_data);
948
949     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
950
951     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
952     {
953         GNI_RC_CHECK("Mem Register before post", status);
954     }
955
956     MallocPostDesc(pd);
957     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
958         pd->type            = GNI_POST_FMA_GET;
959     else
960         pd->type            = GNI_POST_RDMA_GET;
961 #if REMOTE_EVENT
962     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
963 #else
964     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
965 #endif
966     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
967     pd->length          = ALIGN4(request_msg->length);
968     pd->local_addr      = (uint64_t) msg_data;
969     pd->remote_addr     = request_msg->source_addr;
970     pd->remote_mem_hndl = request_msg->source_mem_hndl;
971     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
972     pd->rdma_mode       = 0;
973
974     //memory registration successful
975     if(status == GNI_RC_SUCCESS)
976     {
977         pd->local_mem_hndl  = msg_mem_hndl;
978         if(pd->type == GNI_POST_RDMA_GET) 
979             status = GNI_PostRdma(ep_hndl_array[source], pd);
980         else
981             status = GNI_PostFma(ep_hndl_array[source],  pd);
982     }else
983     {
984         pd->local_mem_hndl.qword1  = 0; 
985         pd->local_mem_hndl.qword1  = 0; 
986     }
987     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
988     {
989         MallocRdmaRequest(rdma_request_msg);
990         rdma_request_msg->next = 0;
991         rdma_request_msg->destNode = inst_id;
992         rdma_request_msg->pd = pd;
993         if(pending_rdma_head == 0)
994         {
995             pending_rdma_head = rdma_request_msg;
996         }else
997         {
998             pending_rdma_tail->next = rdma_request_msg;
999         }
1000         pending_rdma_tail = rdma_request_msg;
1001     }else
1002         GNI_RC_CHECK("AFter posting", status);
1003 #endif
1004 }
1005
1006 /* Check whether message send or get is confirmed by remote */
1007 static void PumpLocalSmsgTransactions()
1008 {
1009     gni_return_t            status;
1010     gni_cq_entry_t          ev;
1011     while ((status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1012     {
1013 #if PRINT_SYH
1014         lrts_local_done_msg++;
1015         //CmiPrintf("*[%d]  PumpLocalSmsgTransactions GNI_CQ_GET_TYPE %d. Localdone=%d\n", myrank, GNI_CQ_GET_TYPE(ev), lrts_local_done_msg);
1016 #endif
1017         if(GNI_CQ_OVERRUN(ev))
1018         {
1019             CmiPrintf("Overrun detected in local CQ");
1020             CmiAbort("Overrun in TX");
1021         }
1022     }
1023     if(status == GNI_RC_ERROR_RESOURCE)
1024     {
1025         GNI_RC_CHECK("Smsg_tx_cq full", status);
1026     }
1027 }
1028
1029 static void PumpLocalRdmaTransactions()
1030 {
1031     gni_cq_entry_t          ev;
1032     gni_return_t            status;
1033     uint64_t                type, inst_id;
1034     gni_post_descriptor_t   *tmp_pd;
1035     MSG_LIST                *ptr;
1036     CONTROL_MSG             *ack_msg_tmp;
1037     uint8_t             msg_tag;
1038
1039    // while ( (status = GNI_CqGetEvent(post_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1040     while ( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1041     {
1042         type        = GNI_CQ_GET_TYPE(ev);
1043 #if PRINT_SYH
1044         //lrts_local_done_msg++;
1045         CmiPrintf("**[%d] SMSGPumpLocalTransactions (type=%d)\n", myrank, type);
1046 #endif
1047         /*if(type == GNI_CQ_EVENT_TYPE_SMSG)
1048         {
1049         }else */
1050         if (type == GNI_CQ_EVENT_TYPE_POST)
1051         {
1052             inst_id     = GNI_CQ_GET_INST_ID(ev);
1053 #if PRINT_SYH
1054             CmiPrintf("**[%d] SMSGPumpLocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1055 #endif
1056             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
1057             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1058             //CmiPrintf("**[%d] SMSGPumpLocalTransactions local done(type=%d) length=%d, size=%d\n", myrank, type, tmp_pd->length, SIZEFIELD((void*)(tmp_pd->local_addr)) );
1059             ////Message is sent, free message , put is not used now
1060             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
1061             {   //persistent message 
1062                 CmiFree((void *)tmp_pd->local_addr);
1063                 msg_tag = PUT_DONE_TAG;  
1064             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1065             {
1066                 msg_tag = ACK_TAG;  
1067             }
1068             MallocControlMsg(ack_msg_tmp);
1069             ack_msg_tmp->source             = myrank;
1070             ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
1071             ack_msg_tmp->length             = tmp_pd->length; 
1072             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1073 #if PRINT_SYH
1074             lrts_send_msg_id++;
1075             CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
1076 #endif
1077             if(smsg_msglist_index[inst_id].head != 0 )
1078             {
1079                 delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, msg_tag);
1080             }else
1081             {
1082                 status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, msg_tag);
1083                 if(status == GNI_RC_SUCCESS)
1084                 {
1085 #if PRINT_SYH
1086                     lrts_smsg_success++;
1087                     CmiPrintf("[%d==>%d] sent ACK done%d\n", myrank, inst_id, lrts_smsg_success);
1088 #endif
1089                     FreeControlMsg(ack_msg_tmp);
1090                 }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
1091                 {
1092                     delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, msg_tag);
1093                 }
1094                 else
1095                     GNI_RC_CHECK("GNI_SmsgSendWTag", status);
1096             }
1097 #if     !USE_LRTS_MEMPOOL
1098             MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1099 #endif
1100             CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1101             handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1102             SendRdmaMsg(); 
1103             FreePostDesc(tmp_pd);
1104         }
1105     } //end while
1106 }
1107
1108 static int SendRdmaMsg()
1109 {
1110     gni_return_t            status = GNI_RC_SUCCESS;
1111     gni_mem_handle_t        msg_mem_hndl;
1112
1113     RDMA_REQUEST *ptr = pending_rdma_head;
1114     RDMA_REQUEST *prev = NULL;
1115
1116     while (ptr != NULL)
1117     {
1118         gni_post_descriptor_t *pd = ptr->pd;
1119         status = GNI_RC_SUCCESS;
1120         // register memory first
1121         if( pd->local_mem_hndl.qword1 == 0 && pd->local_mem_hndl.qword2 == 0)
1122         {
1123             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1124         }
1125         if(status == GNI_RC_SUCCESS)
1126         {
1127             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1128                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1129             else
1130                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1131             if(status == GNI_RC_SUCCESS)
1132             {
1133                 RDMA_REQUEST *tmp = ptr;
1134                 if (prev)
1135                   prev->next = ptr->next;
1136                 else
1137                   pending_rdma_head = ptr->next;
1138                 ptr = ptr->next;
1139                 FreeRdmaRequest(tmp);
1140                 continue;
1141             }
1142         }
1143         prev = ptr;
1144         ptr = ptr->next;
1145     } //end while
1146     return 0;
1147 }
1148
1149 // return 1 if all messages are sent
1150 static int SendBufferMsg()
1151 {
1152     MSG_LIST            *ptr, *previous_head, *current_head;
1153     CONTROL_MSG         *control_msg_tmp;
1154     gni_return_t        status;
1155     int done = 1;
1156     register    int     i;
1157     int                 index_previous = -1;
1158     int                 index = smsg_head_index;
1159     //if( smsg_msglist_head == 0 && buffered_smsg_counter!= 0 ) {CmiPrintf("WRONGWRONG on rank%d, buffermsg=%d, (msgid-succ:%d)\n", myrank, buffered_smsg_counter, (lrts_send_msg_id-lrts_smsg_success)); CmiAbort("sendbuf");}
1160     /* can add flow control here to control the number of messages sent before handle message */
1161     while(index != -1)
1162     {
1163         ptr = smsg_msglist_index[index].head;
1164        
1165         while(ptr!=0)
1166         {
1167         if(useStaticSMSG)
1168         {
1169             CmiAssert(ptr!=NULL);
1170             if(ptr->tag == SMALL_DATA_TAG)
1171             {
1172                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], NULL, 0, ptr->msg, ptr->size, 0, SMALL_DATA_TAG);
1173                // CmiPrintf("[%d==>%d] buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1174                 if(status == GNI_RC_SUCCESS) {
1175 #if PRINT_SYH
1176                     lrts_smsg_success++;
1177                     if(lrts_smsg_success == lrts_send_msg_id)
1178                         CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1179                     else
1180                         CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1181 #endif
1182                     CmiFree(ptr->msg);
1183                 }
1184             }
1185             else if(ptr->tag == LMSG_INIT_TAG)
1186             {
1187                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1188 #if PRINT_SYH
1189                 CmiPrintf("[%d==>%d] LMSG buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1190 #endif
1191                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
1192                 {
1193                     MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, control_msg_tmp->length, &(control_msg_tmp->source_mem_hndl), &omdh);
1194                     if(status != GNI_RC_SUCCESS) {
1195                         done = 0;
1196                         break;
1197                     }
1198                 }
1199                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
1200                 if(status == GNI_RC_SUCCESS) {
1201 #if PRINT_SYH
1202                     lrts_smsg_success++;
1203                     CmiPrintf("[%d==>%d] sent LMSG done%d\n", myrank, ptr->destNode, lrts_smsg_success);
1204 #endif
1205                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
1206                 }
1207             }else if (ptr->tag == ACK_TAG)
1208             {
1209 #if PRINT_SYH
1210                 CmiPrintf("[%d==>%d] ACK buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1211 #endif
1212                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, ACK_TAG);
1213                 if(status == GNI_RC_SUCCESS) {
1214 #if PRINT_SYH
1215                     lrts_smsg_success++;
1216                     if(lrts_smsg_success == lrts_send_msg_id)
1217                         CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1218                     else
1219                         CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1220 #endif
1221                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1222                 }
1223             }else
1224             {
1225                 CmiPrintf("Weird tag\n");
1226                 CmiAbort("should not happen\n");
1227             }
1228         } 
1229         if(status == GNI_RC_SUCCESS)
1230         {
1231             smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
1232             FreeMsgList(ptr);
1233             ptr= smsg_msglist_index[index].head;
1234 #if PRINT_SYH
1235             buffered_smsg_counter--;
1236             if(lrts_smsg_success == lrts_send_msg_id)
1237                 CmiPrintf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1238             else
1239                 CmiPrintf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1240 #endif
1241         }else {
1242             done = 0;
1243             break;
1244         }
1245         } //end pooling this i-th core
1246         if(ptr == 0)
1247         {
1248             if(index_previous != -1)
1249                 smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next;
1250             else
1251                 smsg_head_index = smsg_msglist_index[index].next;
1252         }else
1253         {
1254             index_previous = index;
1255         }
1256         index = smsg_msglist_index[index].next;
1257     }   // end pooling for all cores
1258 #if PRINT_SYH
1259     if(lrts_send_msg_id-lrts_smsg_success !=0)
1260         CmiPrintf("WRONG [%d buffered msg is empty(%p) (actually=%d)(buffercounter=%d)\n", myrank, smsg_msglist_head[i], (lrts_send_msg_id-lrts_smsg_success, buffered_smsg_counter));
1261 #endif
1262     return done;
1263 }
1264
1265 static void LrtsAdvanceCommunication()
1266 {
1267     /*  Receive Msg first */
1268     //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", CmiMyPe());
1269     PumpNetworkSmsg();
1270     //CmiPrintf("Calling Lrts Pump RdmaMsg PE:%d\n", CmiMyPe());
1271     //PumpNetworkRdmaMsgs();
1272     /* Release Sent Msg */
1273     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
1274     //PumpLocalSmsgTransactions();
1275     //CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", CmiMyPe());
1276     PumpLocalRdmaTransactions();
1277     //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", CmiMyPe());
1278     /* Send buffered Message */
1279     SendBufferMsg();
1280     SendRdmaMsg();
1281 }
1282
1283 static void _init_static_smsg()
1284 {
1285     gni_smsg_attr_t      *smsg_attr;
1286     gni_smsg_attr_t      remote_smsg_attr;
1287     gni_smsg_attr_t      *smsg_attr_vec;
1288     unsigned int         smsg_memlen;
1289     gni_mem_handle_t     my_smsg_mdh_mailbox;
1290     int      i;
1291     gni_return_t status;
1292     uint32_t              vmdh_index = -1;
1293     mdh_addr_t            base_infor;
1294     mdh_addr_t            *base_addr_vec;
1295     if(mysize <=4096)
1296     {
1297         SMSG_MAX_MSG = 1024;
1298         //log2_SMSG_MAX_MSG = 10;
1299     }else if (mysize > 4096 && mysize <= 16384)
1300     {
1301         SMSG_MAX_MSG = 512;
1302         //log2_SMSG_MAX_MSG = 9;
1303
1304     }else {
1305         SMSG_MAX_MSG = 256;
1306         //log2_SMSG_MAX_MSG = 8;
1307     }
1308     
1309     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
1310     
1311     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1312     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
1313     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
1314     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
1315     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1316     smsg_mailbox_base = memalign(64, smsg_memlen*(mysize));
1317     _MEMCHECK(smsg_mailbox_base);
1318     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
1319     if (myrank == 0) CmiPrintf("Charm++> allocates %.2fMB for SMSG. \n", smsg_memlen*mysize/1e6);
1320     
1321     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
1322             smsg_memlen*(mysize), smsg_rx_cqh,
1323             GNI_MEM_READWRITE,   
1324             vmdh_index,
1325             &my_smsg_mdh_mailbox);
1326
1327     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1328
1329 #if 1
1330     base_infor.addr =  (uint64_t)smsg_mailbox_base;
1331     base_infor.mdh =  my_smsg_mdh_mailbox;
1332     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
1333
1334     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
1335  
1336     for(i=0; i<mysize; i++)
1337     {
1338         if(i==myrank)
1339             continue;
1340         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1341         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1342         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1343         smsg_attr[i].mbox_offset = i*smsg_memlen;
1344         smsg_attr[i].buff_size = smsg_memlen;
1345         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
1346         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1347     }
1348
1349     for(i=0; i<mysize; i++)
1350     {
1351         if (myrank == i) continue;
1352
1353         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1354         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1355         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1356         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
1357         remote_smsg_attr.buff_size = smsg_memlen;
1358         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
1359         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
1360
1361         /* initialize the smsg channel */
1362         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
1363         GNI_RC_CHECK("SMSG Init", status);
1364     } //end initialization
1365
1366     free(base_addr_vec);
1367 #else
1368     for(i=0; i<mysize; i++)
1369     {
1370         if(i==myrank)
1371             continue;
1372         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1373         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1374         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1375         if(i<myrank)
1376             smsg_attr[i].mbox_offset = i*smsg_memlen;
1377         else
1378             smsg_attr[i].mbox_offset = (i-1)*smsg_memlen;
1379
1380         smsg_attr[i].msg_buffer = smsg_mailbox_base;
1381         smsg_attr[i].buff_size = smsg_memlen;
1382         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1383 #if 0 
1384         if(i<2)
1385         {
1386             CmiPrintf("[assign %d==%d]  offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, (smsg_attr[i]).mbox_offset, (smsg_attr[i]).msg_buffer, (smsg_attr[i]).mbox_maxcredit, (smsg_attr[i]).msg_maxsize, (smsg_attr[i]).buff_size ); 
1387 #endif 
1388         }
1389     }
1390     smsg_attr_vec = (gni_smsg_attr_t*)malloc(mysize * mysize * sizeof(gni_smsg_attr_t));
1391     _MEMCHECK(smsg_attr_vec);
1392  
1393     if(myrank==0)
1394         CmiPrintf("total size=%d\n", mysize*mysize*sizeof(gni_smsg_attr_t));
1395     allgather(smsg_attr, smsg_attr_vec,  mysize*sizeof(gni_smsg_attr_t));
1396     
1397     CmiBarrier();
1398     for(i=0; i<mysize; i++)
1399     {
1400         if (myrank == i) continue;
1401         /* initialize the smsg channel */
1402         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &smsg_attr_vec[i*mysize+myrank]);
1403 #if 0 
1404         if(i<2)
1405         {
1406             CmiPrintf("[%d==%d] SmsgInit rc=%s, offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, gni_err_str[status], (smsg_attr_vec[i*mysize+myrank]).mbox_offset, (smsg_attr_vec[i*mysize+myrank]).msg_buffer, (smsg_attr_vec[i*mysize+myrank]).mbox_maxcredit, (smsg_attr_vec[i*mysize+myrank]).msg_maxsize, (smsg_attr_vec[i*mysize+myrank]).buff_size ); 
1407         }
1408 #endif
1409         GNI_RC_CHECK("SMSG Init", status);
1410     } //end initialization
1411     CmiBarrier();
1412     free(smsg_attr_vec);
1413 #endif
1414     free(smsg_attr);
1415     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
1416     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
1417     smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
1418     for(i =0; i<mysize; i++)
1419     {
1420         smsg_msglist_index[i].next = -1;
1421         smsg_msglist_index[i].head = 0;
1422         smsg_msglist_index[i].tail = 0;
1423     }
1424     smsg_head_index = -1;
1425
1426
1427 static void _init_static_msgq()
1428 {
1429     gni_return_t status;
1430     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
1431     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
1432     msgq_attrs.smsg_q_sz = 1;
1433     msgq_attrs.rcv_pool_sz = 1;
1434     msgq_attrs.num_msgq_eps = 2;
1435     msgq_attrs.nloc_insts = 8;
1436     msgq_attrs.modes = 0;
1437     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
1438
1439     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
1440     GNI_RC_CHECK("MSGQ Init", status);
1441
1442
1443 }
1444
1445 static void _init_DMA_buffer()
1446 {
1447     gni_return_t            status = GNI_RC_SUCCESS;
1448     /*AUTO tuning */
1449     /* suppose max_smsg is 1024, DMA buffer is split into 2048, 4096, 8192, ... */
1450     /*  This method might be better for SMP, but it is bad for Nonsmp since msgs are sharing same slots */
1451     /*
1452      * DMA_slots = 19-log2_SMSG_MAX_MSG;
1453     DMA_incoming_avail_tag = malloc(DMA_slots);
1454     DMA_buffer_size = 2*(DMA_max_single_msg - SMSG_MAX_MSG); 
1455     DMA_incoming_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1456     DMA_outgoing_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1457     
1458     status = GNI_MemRegister(nic_hndl, (uint64_t)DMA_incoming_base_addr,
1459             DMA_buffer_size, smsg_rx_cqh,
1460             GNI_MEM_READWRITE ,   
1461             vmdh_index,
1462             &);
1463             */
1464     // one is reserved to avoid deadlock
1465     DMA_slots           = 17; // each one is 8K  16*8K + 1 slot reserved to avoid deadlock
1466     DMA_buffer_size     = DMA_max_single_msg + 8192;
1467     DMA_buffer_base_mdh_addr.addr = (uint64_t)memalign(ALIGNBUF, DMA_buffer_size);
1468     status = GNI_MemRegister(nic_hndl, DMA_buffer_base_mdh_addr.addr,
1469         DMA_buffer_size, smsg_rx_cqh,
1470         GNI_MEM_READWRITE ,   
1471         -1,
1472         &(DMA_buffer_base_mdh_addr.mdh));
1473     GNI_RC_CHECK("GNI_MemRegister", status);
1474     DMA_buffer_base_mdh_addr_vec = (mdh_addr_t*) malloc(sizeof(mdh_addr_t) * mysize);
1475
1476     allgather(&DMA_buffer_base_mdh_addr, DMA_buffer_base_mdh_addr_vec, sizeof(mdh_addr_t) );
1477 }
1478
1479 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
1480 {
1481     register int            i;
1482     int                     rc;
1483     int                     device_id = 0;
1484     unsigned int            remote_addr;
1485     gni_cdm_handle_t        cdm_hndl;
1486     gni_return_t            status = GNI_RC_SUCCESS;
1487     uint32_t                vmdh_index = -1;
1488     uint8_t                 ptag;
1489     unsigned int            local_addr, *MPID_UGNI_AllAddr;
1490     int                     first_spawned;
1491     int                     physicalID;
1492     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
1493     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
1494     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
1495    
1496     //Mempool_MaxSize = CmiGetArgFlag(*argv, "+useMemorypoolSize");
1497     //useStaticSMSG = CmiGetArgFlag(*argv, "+useStaticSmsg");
1498     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
1499     
1500     status = PMI_Init(&first_spawned);
1501     GNI_RC_CHECK("PMI_Init", status);
1502
1503     status = PMI_Get_size(&mysize);
1504     GNI_RC_CHECK("PMI_Getsize", status);
1505
1506     status = PMI_Get_rank(&myrank);
1507     GNI_RC_CHECK("PMI_getrank", status);
1508
1509     //physicalID = CmiPhysicalNodeID(myrank);
1510     
1511     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
1512
1513     *myNodeID = myrank;
1514     *numNodes = mysize;
1515   
1516     if(myrank == 0)
1517     {
1518         printf("Charm++> Running on Gemini (GNI) using %d  cores\n", mysize);
1519     }
1520 #ifdef USE_ONESIDED
1521     onesided_init(NULL, &onesided_hnd);
1522
1523     // this is a GNI test, so use the libonesided bypass functionality
1524     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
1525     local_addr = gniGetNicAddress();
1526 #else
1527     ptag = get_ptag();
1528     cookie = get_cookie();
1529 #if 0
1530     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
1531 #endif
1532     //Create and attach to the communication  domain */
1533     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
1534     GNI_RC_CHECK("GNI_CdmCreate", status);
1535     //* device id The device id is the minor number for the device
1536     //that is assigned to the device by the system when the device is created.
1537     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
1538     //where X is the device number 0 default 
1539     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
1540     GNI_RC_CHECK("GNI_CdmAttach", status);
1541     local_addr = get_gni_nic_address(0);
1542 #endif
1543     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
1544     _MEMCHECK(MPID_UGNI_AllAddr);
1545     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
1546     /* create the local completion queue */
1547     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
1548     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
1549     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
1550     
1551     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
1552     //GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
1553     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
1554
1555     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
1556     GNI_RC_CHECK("Create CQ (rx)", status);
1557     
1558     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
1559     //GNI_RC_CHECK("Create Post CQ (rx)", status);
1560     
1561     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
1562     //GNI_RC_CHECK("Create BTE CQ", status);
1563
1564     /* create the endpoints. they need to be bound to allow later CQWrites to them */
1565     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
1566     _MEMCHECK(ep_hndl_array);
1567
1568     for (i=0; i<mysize; i++) {
1569         if(i == myrank) continue;
1570         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
1571         GNI_RC_CHECK("GNI_EpCreate ", status);   
1572         remote_addr = MPID_UGNI_AllAddr[i];
1573         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
1574         GNI_RC_CHECK("GNI_EpBind ", status);   
1575     }
1576     /* Depending on the number of cores in the job, decide different method */
1577     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
1578     if(mysize > 1)
1579     {
1580         if(useStaticSMSG == 1)
1581         {
1582             _init_static_smsg(mysize);
1583         }else if(useStaticMSGQ == 1)
1584         {
1585             _init_static_msgq();
1586         }
1587     }
1588 #if     USE_LRTS_MEMPOOL
1589     CmiGetArgLong(*argv, "+useMemorypoolSize", &_mempool_size);
1590     if (myrank==0) CmiPrintf("Charm++> use memorypool size: %1.fMB\n", _mempool_size/1024.0/1024);
1591     init_mempool(_mempool_size);
1592     //init_mempool(Mempool_MaxSize);
1593 #endif
1594
1595     /* init DMA buffer for medium message */
1596
1597     //_init_DMA_buffer();
1598     
1599     free(MPID_UGNI_AllAddr);
1600 }
1601
1602
1603 void* LrtsAlloc(int n_bytes, int header)
1604 {
1605     void *ptr;
1606 #if 0
1607     CmiPrintf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d\n", CmiMyPe(), n_bytes, header);
1608 #endif
1609     if(n_bytes <= SMSG_MAX_MSG)
1610     {
1611         int totalsize = n_bytes+header;
1612         ptr = malloc(totalsize);
1613     }else 
1614     {
1615
1616         CmiAssert(header <= ALIGNBUF);
1617 #if     USE_LRTS_MEMPOOL
1618         n_bytes = ALIGN64(n_bytes);
1619         char *res = syh_mempool_malloc(ALIGNBUF+n_bytes);
1620 #else
1621         n_bytes = ALIGN4(n_bytes);           /* make sure size if 4 aligned */
1622         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1623 #endif
1624         ptr = res + ALIGNBUF - header;
1625     }
1626 #if 0 
1627     CmiPrintf("Done Alloc Lrts for bytes=%d, head=%d\n", n_bytes, header);
1628 #endif
1629     return ptr;
1630 }
1631
1632 void  LrtsFree(void *msg)
1633 {
1634     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1635     if (size <= SMSG_MAX_MSG)
1636       free(msg);
1637     else
1638     {
1639 #if 0
1640         CmiPrintf("[PE:%d] Free lrts for bytes=%d, ptr=%p\n", CmiMyPe(), size, (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1641 #endif
1642 #if     USE_LRTS_MEMPOOL
1643         syh_mempool_free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1644 #else
1645         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1646 #endif
1647     }
1648 #if 0 
1649     CmiPrintf("Done Free lrts for bytes=%d\n", size);
1650 #endif
1651 }
1652
1653 static void LrtsExit()
1654 {
1655     /* free memory ? */
1656 #if     USE_LRTS_MEMPOOL
1657     kill_allmempool();
1658 #endif
1659     PMI_Finalize();
1660     exit(0);
1661 }
1662
1663 static void LrtsDrainResources()
1664 {
1665     while (!SendBufferMsg()) {
1666         PumpNetworkSmsg();
1667         PumpNetworkRdmaMsgs();
1668         PumpLocalSmsgTransactions();
1669         PumpLocalRdmaTransactions();
1670     }
1671     PMI_Barrier();
1672 }
1673
1674 void CmiAbort(const char *message) {
1675
1676     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1677     PMI_Abort(-1, message);
1678 }
1679
1680 /**************************  TIMER FUNCTIONS **************************/
1681 #if CMK_TIMER_USE_SPECIAL
1682 /* MPI calls are not threadsafe, even the timer on some machines */
1683 static CmiNodeLock  timerLock = 0;
1684 static int _absoluteTime = 0;
1685 static int _is_global = 0;
1686 static struct timespec start_ns;
1687
1688 inline int CmiTimerIsSynchronized() {
1689     return 1;
1690 }
1691
1692 inline int CmiTimerAbsolute() {
1693     return _absoluteTime;
1694 }
1695
1696 double CmiStartTimer() {
1697     return 0.0;
1698 }
1699
1700 double CmiInitTime() {
1701     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
1702 }
1703
1704 void CmiTimerInit(char **argv) {
1705     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1706     if (_absoluteTime && CmiMyPe() == 0)
1707         printf("Charm++> absolute  timer is used\n");
1708     
1709     _is_global = CmiTimerIsSynchronized();
1710
1711
1712     if (_is_global) {
1713         if (CmiMyRank() == 0) {
1714             clock_gettime(CLOCK_MONOTONIC, &start_ts)
1715         }
1716     } else { /* we don't have a synchronous timer, set our own start time */
1717         CmiBarrier();
1718         CmiBarrier();
1719         CmiBarrier();
1720         clock_gettime(CLOCK_MONOTONIC, &start_ts)
1721     }
1722     CmiNodeAllBarrier();          /* for smp */
1723 }
1724
1725 /**
1726  * Since the timerLock is never created, and is
1727  * always NULL, then all the if-condition inside
1728  * the timer functions could be disabled right
1729  * now in the case of SMP.
1730  */
1731 double CmiTimer(void) {
1732     struct timespec now_ts;
1733     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1734     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1735         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1736 }
1737
1738 double CmiWallTimer(void) {
1739     struct timespec now_ts;
1740     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1741     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1742         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1743 }
1744
1745 double CmiCpuTimer(void) {
1746     struct timespec now_ts;
1747     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1748     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1749         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1750 }
1751
1752 #endif
1753 /************Barrier Related Functions****************/
1754
1755 int CmiBarrier()
1756 {
1757     int status;
1758     status = PMI_Barrier();
1759     return status;
1760
1761 }
1762
1763
1764 #if CMK_PERSISTENT_COMM
1765 #include "machine-persistent.c"
1766 #endif
1767
1768