8a623c4fe59334ce7309d22328cc68e6bf36d85b
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <stdint.h>
16 #include <errno.h>
17 #include <malloc.h>
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #if defined(_WIN32) && ! defined(__CYGWIN__)
26 #include <windows.h>
27 #include <wincon.h>
28 #include <sys/types.h>
29 #include <sys/timeb.h>
30
31 static void sleep(int secs) {
32     Sleep(1000*secs);
33 }
34 #else
35 #include <unistd.h> /*For getpid()*/
36 #endif
37
38 #define USE_LRTS_MEMPOOL   1
39
40 #if USE_LRTS_MEMPOOL
41 static size_t _mempool_size = 1024ll*1024*8;
42 #endif
43
44 #define PRINT_SYH  0
45
46 #if PRINT_SYH
47 int         lrts_smsg_success = 0;
48 int         lrts_send_msg_id = 0;
49 int         lrts_send_rdma_id = 0;
50 int         lrts_send_rdma_success = 0;
51 int         lrts_received_msg = 0;
52 int         lrts_local_done_msg = 0;
53 #endif
54
55 #include "machine.h"
56
57 #include "pcqueue.h"
58
59 #if CMK_PERSISTENT_COMM
60 #include "machine-persistent.h"
61 #endif
62
63 //#define  USE_ONESIDED 1
64 #ifdef USE_ONESIDED
65 //onesided implementation is wrong, since no place to restore omdh
66 #include "onesided.h"
67 onesided_hnd_t   onesided_hnd;
68 onesided_md_t    omdh;
69 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
70
71 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
72
73 #else
74 uint8_t   onesided_hnd, omdh;
75 //#define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
76 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
77
78 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
79 #endif
80
81 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
82 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
83
84 #define ALIGNBUF                64
85
86 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
87 /* If SMSG is not used */
88 #define FMA_PER_CORE  1024
89 #define FMA_BUFFER_SIZE 1024
90 /* If SMSG is used */
91 static int  SMSG_MAX_MSG;
92 //static int  log2_SMSG_MAX_MSG;
93 #define SMSG_MAX_CREDIT  36
94
95 #define MSGQ_MAXSIZE       2048
96 /* large message transfer with FMA or BTE */
97 #define LRTS_GNI_RDMA_THRESHOLD  2048
98
99 #define REMOTE_QUEUE_ENTRIES  1048576
100 #define LOCAL_QUEUE_ENTRIES   512
101
102 #define PUT_DONE_TAG      0x29
103 #define ACK_TAG           0x30
104 /* SMSG is data message */
105 #define SMALL_DATA_TAG          0x31
106 /* SMSG is a control message to initialize a BTE */
107 #define MEDIUM_HEAD_TAG         0x32
108 #define MEDIUM_DATA_TAG         0x33
109 #define LMSG_INIT_TAG     0x39 
110
111 #define DEBUG
112 #ifdef GNI_RC_CHECK
113 #undef GNI_RC_CHECK
114 #endif
115 #ifdef DEBUG
116 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
117 #else
118 #define GNI_RC_CHECK(msg,rc)
119 #endif
120
121 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
122 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
123
124 static int Mempool_MaxSize = 1024*1024*128;
125 static int useStaticSMSG   = 1;
126 static int useStaticMSGQ = 0;
127 static int useStaticFMA = 0;
128 static int mysize, myrank;
129 static gni_nic_handle_t      nic_hndl;
130
131
132 static void             *smsg_mailbox_base;
133 gni_msgq_attr_t         msgq_attrs;
134 gni_msgq_handle_t       msgq_handle;
135 gni_msgq_ep_attr_t      msgq_ep_attrs;
136 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
137
138 /* preallocated memory buffer for FMA for short message and control message */
139 typedef struct {
140     gni_mem_handle_t mdh;
141     uint64_t addr;
142 } mdh_addr_t;
143
144
145 /* preallocated DMA buffer */
146 int                     DMA_slots;
147 uint64_t                DMA_avail_tag = 0;
148 uint32_t                DMA_incoming_avail_tag = 0;
149 uint32_t                DMA_outgoing_avail_tag = 0;
150 void                    *DMA_incoming_base_addr;
151 void                    *DMA_outgoing_base_addr;
152 mdh_addr_t              DMA_buffer_base_mdh_addr;
153 mdh_addr_t              *DMA_buffer_base_mdh_addr_vec;
154 int                     DMA_buffer_size;
155 int                     DMA_max_single_msg = 131072;//524288 ;
156
157 #define                 DMA_SIZE_PER_SLOT       8192
158
159
160 typedef struct dma_msgid_map
161 {
162     uint64_t     msg_id;
163     int     msg_subid
164 } dma_msgid_map_t;
165
166 dma_msgid_map_t         *dma_map_list;
167
168 typedef struct msg_trace
169 {
170     uint64_t    msg_id;
171     int         done_num;
172 }msg_trace_t;
173
174 msg_trace_t             *pending_msg_list;
175 /* =====Beginning of Declarations of Machine Specific Variables===== */
176 static int cookie;
177 static int modes = 0;
178 static gni_cq_handle_t       smsg_rx_cqh = NULL;
179 static gni_cq_handle_t       smsg_tx_cqh = NULL;
180 static gni_cq_handle_t       post_rx_cqh = NULL;
181 static gni_cq_handle_t       post_tx_cqh = NULL;
182 static gni_ep_handle_t       *ep_hndl_array;
183
184 typedef    struct  pending_smg
185 {
186     int     inst_id;
187     struct  pending_smg *next;
188 } PENDING_GETNEXT;
189
190
191 typedef struct msg_list
192 {
193     uint32_t destNode;
194     uint32_t size;
195     void *msg;
196     struct msg_list *next;
197     struct msg_list *pehead_next;
198     uint8_t tag;
199 }MSG_LIST;
200
201 typedef struct medium_msg_list
202 {
203     uint32_t destNode;
204     uint32_t msg_id;
205     uint32_t msg_subid;
206     uint32_t remain_size;
207     void *msg;
208     struct medium_msg_list *next;
209 }MEDIUM_MSG_LIST;
210
211
212 typedef struct control_msg
213 {
214     uint64_t            source_addr;
215     int                 source;               /* source rank */
216     int                 length;
217     gni_mem_handle_t    source_mem_hndl;
218     struct control_msg *next;
219 }CONTROL_MSG;
220
221 typedef struct medium_msg_control
222 {
223     uint64_t            dma_offset;     //the dma_buffer for this block of msg
224     int                 msg_id;         //Id for the total index
225     int                 msg_subid;      //offset inside the message id 
226 }MEDIUM_MSG_CONTROL;
227
228 typedef struct  rmda_msg
229 {
230     int                   destNode;
231     gni_post_descriptor_t *pd;
232     struct  rmda_msg      *next;
233 }RDMA_REQUEST;
234
235 typedef struct  msg_list_index
236 {
237     int         next;
238     MSG_LIST    *head;
239     MSG_LIST    *tail;
240 } MSG_LIST_INDEX;
241
242 /* reuse PendingMsg memory */
243 static CONTROL_MSG          *control_freelist=0;
244 static MSG_LIST             *msglist_freelist=0;
245 static int                  smsg_head_index;
246 static MSG_LIST_INDEX       *smsg_msglist_index= 0;
247 static MSG_LIST             *smsg_free_head=0;
248 static MSG_LIST             *smsg_free_tail=0;
249
250 /*
251 #define FreeMsgList(msg_head, msg_tail, free_head, free_tail)       \
252     if(free_head == 0)  free_head = free_tail = msg_head;    \
253     else   free_tail = free_tail->next;    \
254     if( msg_head->next == msg_tail) msg_head =0;   \
255     else msg_head= msg_head->next;    
256
257 #define MallocMsgList(d, msg_head, msg_tail, free_head, free_tail, msgsize) \
258     if(free_head == 0) {d= malloc(msgsize);  \
259         if(msg_head == 0)   msg_head =msg_tail = msg_head->next = msg_tail->next = d; \
260         else { msg_tail->next = d; d->next = msg_head; msg_tail=d;} \
261     }else {d = free_head; free_head = free_head->next; if(free_tail->next == free_head) free_head =0;} \
262 */
263
264 #define FreeMsgList(d)  \
265   (d)->next = msglist_freelist;\
266   msglist_freelist = d;
267
268
269
270 #define MallocMsgList(d) \
271   d = msglist_freelist;\
272   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
273              _MEMCHECK(d);\
274   } else msglist_freelist = d->next;
275
276
277 #define FreeControlMsg(d)       \
278   (d)->next = control_freelist;\
279   control_freelist = d;
280
281 #define MallocControlMsg(d) \
282   d = control_freelist;\
283   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
284              _MEMCHECK(d);\
285   } else control_freelist = d->next;
286
287
288 static RDMA_REQUEST         *rdma_freelist = NULL;
289
290 #define FreeControlMsg(d)       \
291   (d)->next = control_freelist;\
292   control_freelist = d;
293
294 #define MallocControlMsg(d) \
295   d = control_freelist;\
296   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
297              _MEMCHECK(d);\
298   } else control_freelist = d->next;
299
300 #define FreeMediumControlMsg(d)       \
301   (d)->next = medium_control_freelist;\
302   medium_control_freelist = d;
303
304
305 #define MallocMediumControlMsg(d) \
306     d = medium_control_freelist;\
307     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
308     _MEMCHECK(d);\
309 } else mediumcontrol_freelist = d->next;
310
311 #define FreeRdmaRequest(d)       \
312   (d)->next = rdma_freelist;\
313   rdma_freelist = d;
314
315 #define MallocRdmaRequest(d) \
316   d = rdma_freelist;\
317   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
318              _MEMCHECK(d);\
319   } else rdma_freelist = d->next;
320
321 /* reuse gni_post_descriptor_t */
322 static gni_post_descriptor_t *post_freelist=0;
323
324 #if 1
325 #define FreePostDesc(d)       \
326     (d)->next_descr = post_freelist;\
327     post_freelist = d;
328
329 #define MallocPostDesc(d) \
330   d = post_freelist;\
331   if (d==0) { \
332      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
333      _MEMCHECK(d);\
334   } else post_freelist = d->next_descr;
335 #else
336
337 #define FreePostDesc(d)     free(d);
338 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
339
340 #endif
341
342 static PENDING_GETNEXT     *pending_smsg_head = 0;
343 static PENDING_GETNEXT     *pending_smsg_tail = 0;
344
345 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
346 static int      buffered_smsg_counter = 0;
347
348 /* SmsgSend return success but message sent is not confirmed by remote side */
349
350 static RDMA_REQUEST  *pending_rdma_head = 0;
351 static RDMA_REQUEST  *pending_rdma_tail = 0;
352 static MSG_LIST *buffered_fma_head = 0;
353 static MSG_LIST *buffered_fma_tail = 0;
354
355 /* functions  */
356 #define IsFree(a,ind)  !( a& (1<<(ind) ))
357 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
358 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
359
360 #include "mempool.c"
361
362 /* get the upper bound of log 2 */
363 int mylog2(int size)
364 {
365     int op = size;
366     unsigned int ret=0;
367     unsigned int mask = 0;
368     int i;
369     while(op>0)
370     {
371         op = op >> 1;
372         ret++;
373
374     }
375     for(i=1; i<ret; i++)
376     {
377         mask = mask << 1;
378         mask +=1;
379     }
380
381     ret -= ((size &mask) ? 0:1);
382     return ret;
383 }
384
385 static void
386 allgather(void *in,void *out, int len)
387 {
388     static int *ivec_ptr=NULL,already_called=0,job_size=0;
389     int i,rc;
390     int my_rank;
391     char *tmp_buf,*out_ptr;
392
393     if(!already_called) {
394
395         rc = PMI_Get_size(&job_size);
396         CmiAssert(rc == PMI_SUCCESS);
397         rc = PMI_Get_rank(&my_rank);
398         CmiAssert(rc == PMI_SUCCESS);
399
400         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
401         CmiAssert(ivec_ptr != NULL);
402
403         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
404         CmiAssert(rc == PMI_SUCCESS);
405
406         already_called = 1;
407
408     }
409
410     tmp_buf = (char *)malloc(job_size * len);
411     CmiAssert(tmp_buf);
412
413     rc = PMI_Allgather(in,tmp_buf,len);
414     CmiAssert(rc == PMI_SUCCESS);
415
416     out_ptr = out;
417
418     for(i=0;i<job_size;i++) {
419
420         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
421
422     }
423
424     free(tmp_buf);
425 }
426 static void
427 allgather_2(void *in,void *out, int len)
428 {
429     //PMI_Allgather is out of order
430     int i,rc, extend_len;
431     int  rank_index;
432     char *out_ptr, *out_ref;
433     char *in2;
434
435     extend_len = sizeof(int) + len;
436     in2 = (char*)malloc(extend_len);
437
438     memcpy(in2, &myrank, sizeof(int));
439     memcpy(in2+sizeof(int), in, len);
440
441     out_ptr = (char*)malloc(mysize*extend_len);
442
443     rc = PMI_Allgather(in2, out_ptr, extend_len);
444     GNI_RC_CHECK("allgather", rc);
445
446     out_ref = out;
447
448     for(i=0;i<mysize;i++) {
449         //rank index 
450         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
451         //copy to the rank index slot
452         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
453     }
454
455     free(out_ptr);
456     free(in2);
457
458 }
459
460 static unsigned int get_gni_nic_address(int device_id)
461 {
462     unsigned int address, cpu_id;
463     gni_return_t status;
464     int i, alps_dev_id=-1,alps_address=-1;
465     char *token, *p_ptr;
466
467     p_ptr = getenv("PMI_GNI_DEV_ID");
468     if (!p_ptr) {
469         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
470        
471         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
472     } else {
473         while ((token = strtok(p_ptr,":")) != NULL) {
474             alps_dev_id = atoi(token);
475             if (alps_dev_id == device_id) {
476                 break;
477             }
478             p_ptr = NULL;
479         }
480         CmiAssert(alps_dev_id != -1);
481         p_ptr = getenv("PMI_GNI_LOC_ADDR");
482         CmiAssert(p_ptr != NULL);
483         i = 0;
484         while ((token = strtok(p_ptr,":")) != NULL) {
485             if (i == alps_dev_id) {
486                 alps_address = atoi(token);
487                 break;
488             }
489             p_ptr = NULL;
490             ++i;
491         }
492         CmiAssert(alps_address != -1);
493         address = alps_address;
494     }
495     return address;
496 }
497
498 static uint8_t get_ptag(void)
499 {
500     char *p_ptr, *token;
501     uint8_t ptag;
502
503     p_ptr = getenv("PMI_GNI_PTAG");
504     CmiAssert(p_ptr != NULL);
505     token = strtok(p_ptr, ":");
506     ptag = (uint8_t)atoi(token);
507     return ptag;
508         
509 }
510
511 static uint32_t get_cookie(void)
512 {
513     uint32_t cookie;
514     char *p_ptr, *token;
515
516     p_ptr = getenv("PMI_GNI_COOKIE");
517     CmiAssert(p_ptr != NULL);
518     token = strtok(p_ptr, ":");
519     cookie = (uint32_t)atoi(token);
520
521     return cookie;
522 }
523
524 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
525 /* TODO: add any that are related */
526 /* =====End of Definitions of Message-Corruption Related Macros=====*/
527
528
529 #include "machine-lrts.h"
530 #include "machine-common-core.c"
531
532 /* Network progress function is used to poll the network when for
533    messages. This flushes receive buffers on some  implementations*/
534 #if CMK_MACHINE_PROGRESS_DEFINED
535 void CmiMachineProgressImpl() {
536 }
537 #endif
538
539 inline
540 static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
541 {
542     MSG_LIST        *msg_tmp;
543     MallocMsgList(msg_tmp);
544     msg_tmp->destNode = destNode;
545     msg_tmp->size   = size;
546     msg_tmp->msg    = msg;
547     msg_tmp->tag    = tag;
548     msg_tmp->next   = 0;
549     if (smsg_msglist_index[destNode].head == 0 ) {
550         smsg_msglist_index[destNode].head = msg_tmp;
551         smsg_msglist_index[destNode].next = smsg_head_index;
552         smsg_head_index = destNode;
553     }
554     else {
555       (smsg_msglist_index[destNode].tail)->next    = msg_tmp;
556     }
557     smsg_msglist_index[destNode].tail          = msg_tmp;
558 #if PRINT_SYH
559     buffered_smsg_counter++;
560 #endif
561 }
562
563 // messages smaller than max single SMSG
564 inline
565 static void send_small_messages(int destNode, int size, char *msg)
566 {
567     gni_return_t        status  =   GNI_RC_SUCCESS;
568     const uint8_t       tag_data    = SMALL_DATA_TAG;
569     
570     if(smsg_msglist_index[destNode].head == 0)
571     {
572         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, SMALL_DATA_TAG);
573         if (status == GNI_RC_SUCCESS)
574         {
575 #if PRINT_SYH
576             lrts_smsg_success++;
577             if(lrts_smsg_success == lrts_send_msg_id)
578                 CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
579             else
580                 CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
581 #endif
582             CmiFree(msg);
583             return;
584         }else if (status == GNI_RC_INVALID_PARAM)
585         {
586             CmiAbort("SmsgSen fails\n");
587         }
588     }
589     //buffer this message when buffer_msg_head!=0 or send fails
590     delay_send_small_msg(msg, size, destNode, SMALL_DATA_TAG);
591 }
592
593 // Get first 0 in DMA_tags starting from index
594 static int get_first_avail_bit(uint64_t DMA_tags, int start_index)
595 {
596
597     uint64_t         mask = 0x1;
598     register    int     i=0;
599     while((DMA_tags & mask) && i<DMA_slots) {mask << 1; i++;}
600
601 }
602
603 static int send_medium_messages(int destNode, int size, char *msg)
604 {
605 #if 0
606     gni_return_t status = GNI_RC_SUCCESS;
607     int first_avail_bit=0;
608     uint64_t mask = 0x1;
609     MEDIUM_MSG_CONTROL  *medium_msg_control_tmp;
610     MEDIUM_MSG_LIST        *msg_tmp;
611     int blocksize, remain_size, pos;
612     int sub_id = 0;
613     remain_size = size;
614     pos = 0;  //offset before which data are sent
615     /* copy blocks of the message to DMA preallocated buffer and send SMSG */
616     //Check whether there is any available DMA buffer
617     
618     do{
619         while((DMA_avail_tag & mask) && first_avail_bit<DMA_slots) {mask << 1; first_avail_bit++;}
620         if(first_avail_bit == DMA_slots) //No available DMA, buffer this message
621         {
622             MallocMediumMsgList(msg_tmp);
623             msg_tmp->destNode = destNode;
624             msg_tmp->msg_id   = lrts_send_msg_id;
625             msg_tmp->msg_subid   = sub_id;
626             msg_tmp->size   = remain_size;
627             msg_tmp->msg    = msg+pos;
628             msg_tmp->next   = NULL;
629             break;
630         }else
631         {
632             //copy this part of the message into this DMA buffer
633             //TODO optimize here, some data can go with this SMSG
634             blocksize = (remain_size>DMA_SIZE_PER_SLOT)?DMA_SIZE_PER_SLOT: remain_size;
635             memcpy(DMA_buffer_base_mdh_addr.addr[first_avail_bit], msg+pos, blocksize);
636             pos += blocksize;
637             remain_size -= blocksize;
638             SET_BITS(DMA_avail_tag, first_avail_bit);
639            
640             MallocMediumControlMsg(medium_msg_control_tmp);
641             medium_msg_control_tmp->msg_id = lrts_send_msg_id;
642             medium_msg_control_tmp->msg_subid = sub_id;
643             if(status == GNI_RC_SUCCESS)
644             {
645                 if(sub_id==0)
646                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_HEAD_TAG);
647                 else
648                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_DATA_TAG);
649             }
650             //buffer this smsg
651             if(status != GNI_RC_SUCCESS)
652             {
653                 delay_send_small_msg(medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), destNode, MEDIUM_HEAD_TAG);
654             }
655             sub_id++;
656         }while(remain_size > 0 );
657
658         }
659     }
660 #endif
661 }
662
663 // Large message, send control to receiver, receiver register memory and do a GET 
664 inline
665 static int send_large_messages(int destNode, int size, char *msg)
666 {
667 #if     USE_LRTS_MEMPOOL
668     gni_return_t        status  =   GNI_RC_SUCCESS;
669     CONTROL_MSG         *control_msg_tmp;
670     uint32_t            vmdh_index  = -1;
671     /* construct a control message and send */
672     MallocControlMsg(control_msg_tmp);
673     control_msg_tmp->source_addr    = (uint64_t)msg;
674     control_msg_tmp->source         = myrank;
675     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
676     //memcpy( &(control_msg_tmp->source_mem_hndl), GetMemHndl(msg), sizeof(gni_mem_handle_t)) ;
677     control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
678 #if PRINT_SYH
679     lrts_send_msg_id++;
680     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
681 #endif
682     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
683     if(status == GNI_RC_SUCCESS)
684     {
685 #if PRINT_SYH
686         lrts_smsg_success++;
687         if(lrts_smsg_success == lrts_send_msg_id)
688             CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
689         else
690             CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
691 #endif
692         FreeControlMsg(control_msg_tmp);
693         return 1;
694     }
695
696     delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
697     return 0;
698 // NOT use mempool, should slow 
699 #else
700     gni_return_t        status  =   GNI_RC_SUCCESS;
701     CONTROL_MSG         *control_msg_tmp;
702     uint32_t            vmdh_index  = -1;
703     /* construct a control message and send */
704     MallocControlMsg(control_msg_tmp);
705     control_msg_tmp->source_addr    = (uint64_t)msg;
706     control_msg_tmp->source         = myrank;
707     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
708     control_msg_tmp->source_mem_hndl.qword1 = 0;
709     control_msg_tmp->source_mem_hndl.qword2 = 0;
710 #if PRINT_SYH
711     lrts_send_msg_id++;
712     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
713 #endif
714     if(smsg_msglist_index[destNode].head == 0 )
715     {
716         status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
717         if(status == GNI_RC_SUCCESS)
718         {
719             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
720             if(status == GNI_RC_SUCCESS)
721             {
722 #if PRINT_SYH
723                 lrts_smsg_success++;
724                 if(lrts_smsg_success == lrts_send_msg_id)
725                     CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
726                 else
727                     CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
728 #endif
729                 FreeControlMsg(control_msg_tmp);
730                 return 1;
731             }
732
733         } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
734         {
735             CmiAbort("Memory registor for large msg\n");
736         }
737     }
738     delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
739     return 0;
740 #endif
741 }
742
743 inline void LrtsPrepareEnvelope(char *msg, int size)
744 {
745     CmiSetMsgSize(msg, size);
746 }
747
748 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
749 {
750     LrtsPrepareEnvelope(msg, size);
751
752     if(size <= SMSG_MAX_MSG)
753     {
754 #if PRINT_SYH
755         lrts_send_msg_id++;
756         CmiPrintf("SMSG LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_msg_id);
757 #endif
758         send_small_messages(destNode, size, msg);
759     }
760     else
761     {
762         send_large_messages(destNode, size, msg);
763     }
764     return 0;
765 }
766
767 static void LrtsPreCommonInit(int everReturn){}
768
769 /* Idle-state related functions: called in non-smp mode */
770 void CmiNotifyIdleForGemini(void) {
771     LrtsAdvanceCommunication();
772 }
773
774 static void LrtsPostCommonInit(int everReturn)
775 {
776 #if CMK_SMP
777     CmiIdleState *s=CmiNotifyGetState();
778     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
779     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
780 #else
781     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForGemini,NULL);
782 #endif
783
784 }
785
786
787 void LrtsPostNonLocal(){}
788 /* pooling CQ to receive network message */
789 static void PumpNetworkRdmaMsgs()
790 {
791     gni_cq_entry_t      event_data;
792     gni_return_t        status;
793     while( (status = GNI_CqGetEvent(post_rx_cqh, &event_data)) == GNI_RC_SUCCESS);
794 }
795
796 static int SendRdmaMsg();
797 static void getLargeMsgRequest(void* header, uint64_t inst_id);
798 static void PumpNetworkSmsg()
799 {
800     uint64_t            inst_id;
801     PENDING_GETNEXT     *pending_next;
802     int                 ret;
803     gni_cq_entry_t      event_data;
804     gni_return_t        status;
805     void                *header;
806     uint8_t             msg_tag;
807     int                 msg_nbytes;
808     void                *msg_data;
809     gni_mem_handle_t    msg_mem_hndl;
810  
811
812     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
813     {
814         inst_id = GNI_CQ_GET_INST_ID(event_data);
815         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
816 #if PRINT_SYH
817         CmiPrintf("[%d] PumpNetworkMsgs small msgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
818 #endif
819         msg_tag = GNI_SMSG_ANY_TAG;
820         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
821         {
822 #if PRINT_SYH
823             lrts_received_msg++;
824             CmiPrintf("+++[%d] PumpNetwork msg is received, messageid:%d tag=%d\n", myrank, lrts_received_msg, msg_tag);
825 #endif
826             /* copy msg out and then put into queue (small message) */
827             switch (msg_tag) {
828             case SMALL_DATA_TAG:
829             {
830                 msg_nbytes = CmiGetMsgSize(header);
831                 msg_data    = CmiAlloc(msg_nbytes);
832                 memcpy(msg_data, (char*)header, msg_nbytes);
833                 handleOneRecvedMsg(msg_nbytes, msg_data);
834                 break;
835             }
836             case LMSG_INIT_TAG:
837             {
838 #if PRINT_SYH
839                 CmiPrintf("+++[%d] from %d PumpNetwork Rdma Request msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
840 #endif
841                 getLargeMsgRequest(header, inst_id);
842                 break;
843             }
844             case ACK_TAG:
845             {
846                 /* Get is done, release message . Now put is not used yet*/
847 #if         !USE_LRTS_MEMPOOL
848                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
849 #endif
850                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
851                 SendRdmaMsg();
852                 break;
853             }
854             case PUT_DONE_TAG: //persistent message
855             {
856                 handleOneRecvedMsg(((CONTROL_MSG *) header)->length,((void*) (CONTROL_MSG *) header)); 
857                 break;
858             }
859             default: {
860                 CmiPrintf("weird tag problem\n");
861                 CmiAbort("Unknown tag\n");
862             }
863             }
864             GNI_SmsgRelease(ep_hndl_array[inst_id]);
865             msg_tag = GNI_SMSG_ANY_TAG;
866         } //endwhile getNext
867     }   //end while GetEvent
868     if(status == GNI_RC_ERROR_RESOURCE)
869     {
870         GNI_RC_CHECK("Smsg_rx_cq full", status);
871     }
872 }
873
874 static void getLargeMsgRequest(void* header, uint64_t inst_id)
875 {
876 #if     USE_LRTS_MEMPOOL
877     CONTROL_MSG         *request_msg;
878     gni_return_t        status;
879     void                *msg_data;
880     gni_post_descriptor_t *pd;
881     RDMA_REQUEST        *rdma_request_msg;
882     gni_mem_handle_t    msg_mem_hndl;
883     int source;
884     // initial a get to transfer data from the sender side */
885     request_msg = (CONTROL_MSG *) header;
886     source = request_msg->source;
887     msg_data = CmiAlloc(request_msg->length);
888     _MEMCHECK(msg_data);
889     //memcpy(&msg_mem_hndl, GetMemHndl(msg_data), sizeof(gni_mem_handle_t));
890     msg_mem_hndl = GetMemHndl(msg_data);
891
892     MallocPostDesc(pd);
893     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
894         pd->type            = GNI_POST_FMA_GET;
895     else
896         pd->type            = GNI_POST_RDMA_GET;
897
898     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
899     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
900     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
901     pd->length          = ALIGN4(request_msg->length);
902     pd->local_addr      = (uint64_t) msg_data;
903     pd->local_mem_hndl  = msg_mem_hndl;
904     pd->remote_addr     = request_msg->source_addr;
905     pd->remote_mem_hndl = request_msg->source_mem_hndl;
906     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
907     pd->rdma_mode       = 0;
908
909     if(pd->type == GNI_POST_RDMA_GET) 
910         status = GNI_PostRdma(ep_hndl_array[source], pd);
911     else
912         status = GNI_PostFma(ep_hndl_array[source],  pd);
913     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
914     {
915         MallocRdmaRequest(rdma_request_msg);
916         rdma_request_msg->next = 0;
917         rdma_request_msg->destNode = inst_id;
918         rdma_request_msg->pd = pd;
919         if(pending_rdma_head == 0)
920         {
921             pending_rdma_head = rdma_request_msg;
922         }else
923         {
924             pending_rdma_tail->next = rdma_request_msg;
925         }
926         pending_rdma_tail = rdma_request_msg;
927     }else
928         GNI_RC_CHECK("AFter posting", status);
929
930 #else
931     CONTROL_MSG         *request_msg;
932     gni_return_t        status;
933     void                *msg_data;
934     gni_post_descriptor_t *pd;
935     RDMA_REQUEST        *rdma_request_msg;
936     gni_mem_handle_t    msg_mem_hndl;
937     int source;
938     // initial a get to transfer data from the sender side */
939     request_msg = (CONTROL_MSG *) header;
940     source = request_msg->source;
941     msg_data = CmiAlloc(request_msg->length);
942     _MEMCHECK(msg_data);
943
944     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
945
946     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
947     {
948         GNI_RC_CHECK("Mem Register before post", status);
949     }
950
951     MallocPostDesc(pd);
952     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
953         pd->type            = GNI_POST_FMA_GET;
954     else
955         pd->type            = GNI_POST_RDMA_GET;
956
957     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
958     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
959     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
960     pd->length          = ALIGN4(request_msg->length);
961     pd->local_addr      = (uint64_t) msg_data;
962     pd->remote_addr     = request_msg->source_addr;
963     pd->remote_mem_hndl = request_msg->source_mem_hndl;
964     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
965     pd->rdma_mode       = 0;
966
967     //memory registration successful
968     if(status == GNI_RC_SUCCESS)
969     {
970         pd->local_mem_hndl  = msg_mem_hndl;
971         if(pd->type == GNI_POST_RDMA_GET) 
972             status = GNI_PostRdma(ep_hndl_array[source], pd);
973         else
974             status = GNI_PostFma(ep_hndl_array[source],  pd);
975     }else
976     {
977         pd->local_mem_hndl.qword1  = 0; 
978         pd->local_mem_hndl.qword1  = 0; 
979     }
980     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
981     {
982         MallocRdmaRequest(rdma_request_msg);
983         rdma_request_msg->next = 0;
984         rdma_request_msg->destNode = inst_id;
985         rdma_request_msg->pd = pd;
986         if(pending_rdma_head == 0)
987         {
988             pending_rdma_head = rdma_request_msg;
989         }else
990         {
991             pending_rdma_tail->next = rdma_request_msg;
992         }
993         pending_rdma_tail = rdma_request_msg;
994     }else
995         GNI_RC_CHECK("AFter posting", status);
996 #endif
997 }
998
999 /* Check whether message send or get is confirmed by remote */
1000 static void PumpLocalSmsgTransactions()
1001 {
1002     gni_return_t            status;
1003     gni_cq_entry_t          ev;
1004     while ((status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1005     {
1006 #if PRINT_SYH
1007         lrts_local_done_msg++;
1008         //CmiPrintf("*[%d]  PumpLocalSmsgTransactions GNI_CQ_GET_TYPE %d. Localdone=%d\n", myrank, GNI_CQ_GET_TYPE(ev), lrts_local_done_msg);
1009 #endif
1010         if(GNI_CQ_OVERRUN(ev))
1011         {
1012             CmiPrintf("Overrun detected in local CQ");
1013             CmiAbort("Overrun in TX");
1014         }
1015     }
1016     if(status == GNI_RC_ERROR_RESOURCE)
1017     {
1018         GNI_RC_CHECK("Smsg_tx_cq full", status);
1019     }
1020 }
1021
1022 static void PumpLocalRdmaTransactions()
1023 {
1024     gni_cq_entry_t          ev;
1025     gni_return_t            status;
1026     uint64_t                type, inst_id;
1027     gni_post_descriptor_t   *tmp_pd;
1028     MSG_LIST                *ptr;
1029     CONTROL_MSG             *ack_msg_tmp;
1030     uint8_t             msg_tag;
1031
1032    // while ( (status = GNI_CqGetEvent(post_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1033     while ( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1034     {
1035         type        = GNI_CQ_GET_TYPE(ev);
1036 #if PRINT_SYH
1037         //lrts_local_done_msg++;
1038         CmiPrintf("**[%d] SMSGPumpLocalTransactions (type=%d)\n", myrank, type);
1039 #endif
1040         /*if(type == GNI_CQ_EVENT_TYPE_SMSG)
1041         {
1042         }else */
1043         if (type == GNI_CQ_EVENT_TYPE_POST)
1044         {
1045             inst_id     = GNI_CQ_GET_INST_ID(ev);
1046 #if PRINT_SYH
1047             CmiPrintf("**[%d] SMSGPumpLocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1048 #endif
1049             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
1050             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1051             //CmiPrintf("**[%d] SMSGPumpLocalTransactions local done(type=%d) length=%d, size=%d\n", myrank, type, tmp_pd->length, SIZEFIELD((void*)(tmp_pd->local_addr)) );
1052             ////Message is sent, free message , put is not used now
1053             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
1054             {   //persistent message 
1055                 CmiFree((void *)tmp_pd->local_addr);
1056                 msg_tag = PUT_DONE_TAG;  
1057             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1058             {
1059                 msg_tag = ACK_TAG;  
1060             }
1061             MallocControlMsg(ack_msg_tmp);
1062             ack_msg_tmp->source             = myrank;
1063             ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
1064             ack_msg_tmp->length             = tmp_pd->length; 
1065             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1066 #if PRINT_SYH
1067             lrts_send_msg_id++;
1068             CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
1069 #endif
1070             if(smsg_msglist_index[inst_id].head != 0 )
1071             {
1072                 delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, msg_tag);
1073             }else
1074             {
1075                 status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, msg_tag);
1076                 if(status == GNI_RC_SUCCESS)
1077                 {
1078 #if PRINT_SYH
1079                     lrts_smsg_success++;
1080                     CmiPrintf("[%d==>%d] sent ACK done%d\n", myrank, inst_id, lrts_smsg_success);
1081 #endif
1082                     FreeControlMsg(ack_msg_tmp);
1083                 }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
1084                 {
1085                     delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, msg_tag);
1086                 }
1087                 else
1088                     GNI_RC_CHECK("GNI_SmsgSendWTag", status);
1089             }
1090 #if     !USE_LRTS_MEMPOOL
1091             MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1092 #endif
1093             CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1094             handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1095             SendRdmaMsg(); 
1096             FreePostDesc(tmp_pd);
1097         }
1098     } //end while
1099 }
1100
1101 static int SendRdmaMsg()
1102 {
1103     gni_return_t            status = GNI_RC_SUCCESS;
1104     gni_mem_handle_t        msg_mem_hndl;
1105
1106     RDMA_REQUEST *ptr = pending_rdma_head;
1107     RDMA_REQUEST *prev = NULL;
1108
1109     while (ptr != NULL)
1110     {
1111         gni_post_descriptor_t *pd = ptr->pd;
1112         status = GNI_RC_SUCCESS;
1113         // register memory first
1114         if( pd->local_mem_hndl.qword1 == 0 && pd->local_mem_hndl.qword2 == 0)
1115         {
1116             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1117         }
1118         if(status == GNI_RC_SUCCESS)
1119         {
1120             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1121                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1122             else
1123                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1124             if(status == GNI_RC_SUCCESS)
1125             {
1126                 RDMA_REQUEST *tmp = ptr;
1127                 if (prev)
1128                   prev->next = ptr->next;
1129                 else
1130                   pending_rdma_head = ptr->next;
1131                 ptr = ptr->next;
1132                 FreeRdmaRequest(tmp);
1133                 continue;
1134             }
1135         }
1136         prev = ptr;
1137         ptr = ptr->next;
1138     } //end while
1139     return 0;
1140 }
1141
1142 // return 1 if all messages are sent
1143 static int SendBufferMsg()
1144 {
1145     MSG_LIST            *ptr, *previous_head, *current_head;
1146     CONTROL_MSG         *control_msg_tmp;
1147     gni_return_t        status;
1148     int done = 1;
1149     register    int     i;
1150     int                 index_previous = -1;
1151     int                 index = smsg_head_index;
1152     //if( smsg_msglist_head == 0 && buffered_smsg_counter!= 0 ) {CmiPrintf("WRONGWRONG on rank%d, buffermsg=%d, (msgid-succ:%d)\n", myrank, buffered_smsg_counter, (lrts_send_msg_id-lrts_smsg_success)); CmiAbort("sendbuf");}
1153     /* can add flow control here to control the number of messages sent before handle message */
1154     while(index != -1)
1155     {
1156         ptr = smsg_msglist_index[index].head;
1157        
1158         while(ptr!=0)
1159         {
1160         if(useStaticSMSG)
1161         {
1162             CmiAssert(ptr!=NULL);
1163             if(ptr->tag == SMALL_DATA_TAG)
1164             {
1165                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], NULL, 0, ptr->msg, ptr->size, 0, SMALL_DATA_TAG);
1166                // CmiPrintf("[%d==>%d] buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1167                 if(status == GNI_RC_SUCCESS) {
1168 #if PRINT_SYH
1169                     lrts_smsg_success++;
1170                     if(lrts_smsg_success == lrts_send_msg_id)
1171                         CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1172                     else
1173                         CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1174 #endif
1175                     CmiFree(ptr->msg);
1176                 }
1177             }
1178             else if(ptr->tag == LMSG_INIT_TAG)
1179             {
1180                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1181 #if PRINT_SYH
1182                 CmiPrintf("[%d==>%d] LMSG buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1183 #endif
1184                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
1185                 {
1186                     MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, control_msg_tmp->length, &(control_msg_tmp->source_mem_hndl), &omdh);
1187                     if(status != GNI_RC_SUCCESS) {
1188                         done = 0;
1189                         break;
1190                     }
1191                 }
1192                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
1193                 if(status == GNI_RC_SUCCESS) {
1194 #if PRINT_SYH
1195                     lrts_smsg_success++;
1196                     CmiPrintf("[%d==>%d] sent LMSG done%d\n", myrank, ptr->destNode, lrts_smsg_success);
1197 #endif
1198                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
1199                 }
1200             }else if (ptr->tag == ACK_TAG)
1201             {
1202 #if PRINT_SYH
1203                 CmiPrintf("[%d==>%d] ACK buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1204 #endif
1205                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, ACK_TAG);
1206                 if(status == GNI_RC_SUCCESS) {
1207 #if PRINT_SYH
1208                     lrts_smsg_success++;
1209                     if(lrts_smsg_success == lrts_send_msg_id)
1210                         CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1211                     else
1212                         CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1213 #endif
1214                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1215                 }
1216             }else
1217             {
1218                 CmiPrintf("Weird tag\n");
1219                 CmiAbort("should not happen\n");
1220             }
1221         } 
1222         if(status == GNI_RC_SUCCESS)
1223         {
1224             smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
1225             FreeMsgList(ptr);
1226             ptr= smsg_msglist_index[index].head;
1227 #if PRINT_SYH
1228             buffered_smsg_counter--;
1229             if(lrts_smsg_success == lrts_send_msg_id)
1230                 CmiPrintf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1231             else
1232                 CmiPrintf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1233 #endif
1234         }else {
1235             done = 0;
1236             break;
1237         }
1238         } //end pooling this i-th core
1239         if(ptr == 0)
1240         {
1241             if(index_previous != -1)
1242                 smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next;
1243             else
1244                 smsg_head_index = smsg_msglist_index[index].next;
1245         }else
1246         {
1247             index_previous = index;
1248         }
1249         index = smsg_msglist_index[index].next;
1250     }   // end pooling for all cores
1251 #if PRINT_SYH
1252     if(lrts_send_msg_id-lrts_smsg_success !=0)
1253         CmiPrintf("WRONG [%d buffered msg is empty(%p) (actually=%d)(buffercounter=%d)\n", myrank, smsg_msglist_head[i], (lrts_send_msg_id-lrts_smsg_success, buffered_smsg_counter));
1254 #endif
1255     return done;
1256 }
1257
1258 static void LrtsAdvanceCommunication()
1259 {
1260     /*  Receive Msg first */
1261     //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", CmiMyPe());
1262     PumpNetworkSmsg();
1263     //CmiPrintf("Calling Lrts Pump RdmaMsg PE:%d\n", CmiMyPe());
1264     //PumpNetworkRdmaMsgs();
1265     /* Release Sent Msg */
1266     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
1267     //PumpLocalSmsgTransactions();
1268     //CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", CmiMyPe());
1269     PumpLocalRdmaTransactions();
1270     //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", CmiMyPe());
1271     /* Send buffered Message */
1272     SendBufferMsg();
1273     SendRdmaMsg();
1274 }
1275
1276 static void _init_static_smsg()
1277 {
1278     gni_smsg_attr_t      *smsg_attr;
1279     gni_smsg_attr_t      remote_smsg_attr;
1280     gni_smsg_attr_t      *smsg_attr_vec;
1281     unsigned int         smsg_memlen;
1282     gni_mem_handle_t     my_smsg_mdh_mailbox;
1283     int      i;
1284     gni_return_t status;
1285     uint32_t              vmdh_index = -1;
1286     mdh_addr_t            base_infor;
1287     mdh_addr_t            *base_addr_vec;
1288     if(mysize <=4096)
1289     {
1290         SMSG_MAX_MSG = 1024;
1291         //log2_SMSG_MAX_MSG = 10;
1292     }else if (mysize > 4096 && mysize <= 16384)
1293     {
1294         SMSG_MAX_MSG = 512;
1295         //log2_SMSG_MAX_MSG = 9;
1296
1297     }else {
1298         SMSG_MAX_MSG = 256;
1299         //log2_SMSG_MAX_MSG = 8;
1300     }
1301     
1302     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
1303     
1304     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1305     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
1306     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
1307     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
1308     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1309     smsg_mailbox_base = memalign(64, smsg_memlen*(mysize));
1310     _MEMCHECK(smsg_mailbox_base);
1311     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
1312     if (myrank == 0) CmiPrintf("Charm++> allocates %.2fMB for SMSG. \n", smsg_memlen*mysize/1e6);
1313     
1314     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
1315             smsg_memlen*(mysize), smsg_rx_cqh,
1316             GNI_MEM_READWRITE,   
1317             vmdh_index,
1318             &my_smsg_mdh_mailbox);
1319
1320     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1321
1322 #if 1
1323     base_infor.addr =  (uint64_t)smsg_mailbox_base;
1324     base_infor.mdh =  my_smsg_mdh_mailbox;
1325     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
1326
1327     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
1328  
1329     for(i=0; i<mysize; i++)
1330     {
1331         if(i==myrank)
1332             continue;
1333         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1334         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1335         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1336         smsg_attr[i].mbox_offset = i*smsg_memlen;
1337         smsg_attr[i].buff_size = smsg_memlen;
1338         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
1339         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1340     }
1341
1342     for(i=0; i<mysize; i++)
1343     {
1344         if (myrank == i) continue;
1345
1346         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1347         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1348         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1349         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
1350         remote_smsg_attr.buff_size = smsg_memlen;
1351         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
1352         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
1353
1354         /* initialize the smsg channel */
1355         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
1356         GNI_RC_CHECK("SMSG Init", status);
1357     } //end initialization
1358
1359     free(base_addr_vec);
1360 #else
1361     for(i=0; i<mysize; i++)
1362     {
1363         if(i==myrank)
1364             continue;
1365         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1366         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1367         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1368         if(i<myrank)
1369             smsg_attr[i].mbox_offset = i*smsg_memlen;
1370         else
1371             smsg_attr[i].mbox_offset = (i-1)*smsg_memlen;
1372
1373         smsg_attr[i].msg_buffer = smsg_mailbox_base;
1374         smsg_attr[i].buff_size = smsg_memlen;
1375         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1376 #if 0 
1377         if(i<2)
1378         {
1379             CmiPrintf("[assign %d==%d]  offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, (smsg_attr[i]).mbox_offset, (smsg_attr[i]).msg_buffer, (smsg_attr[i]).mbox_maxcredit, (smsg_attr[i]).msg_maxsize, (smsg_attr[i]).buff_size ); 
1380 #endif 
1381         }
1382     }
1383     smsg_attr_vec = (gni_smsg_attr_t*)malloc(mysize * mysize * sizeof(gni_smsg_attr_t));
1384     _MEMCHECK(smsg_attr_vec);
1385  
1386     if(myrank==0)
1387         CmiPrintf("total size=%d\n", mysize*mysize*sizeof(gni_smsg_attr_t));
1388     allgather(smsg_attr, smsg_attr_vec,  mysize*sizeof(gni_smsg_attr_t));
1389     
1390     CmiBarrier();
1391     for(i=0; i<mysize; i++)
1392     {
1393         if (myrank == i) continue;
1394         /* initialize the smsg channel */
1395         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &smsg_attr_vec[i*mysize+myrank]);
1396 #if 0 
1397         if(i<2)
1398         {
1399             CmiPrintf("[%d==%d] SmsgInit rc=%s, offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, gni_err_str[status], (smsg_attr_vec[i*mysize+myrank]).mbox_offset, (smsg_attr_vec[i*mysize+myrank]).msg_buffer, (smsg_attr_vec[i*mysize+myrank]).mbox_maxcredit, (smsg_attr_vec[i*mysize+myrank]).msg_maxsize, (smsg_attr_vec[i*mysize+myrank]).buff_size ); 
1400         }
1401 #endif
1402         GNI_RC_CHECK("SMSG Init", status);
1403     } //end initialization
1404     CmiBarrier();
1405     free(smsg_attr_vec);
1406 #endif
1407     free(smsg_attr);
1408     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
1409     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
1410     smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
1411     for(i =0; i<mysize; i++)
1412     {
1413         smsg_msglist_index[i].next = -1;
1414         smsg_msglist_index[i].head = 0;
1415         smsg_msglist_index[i].tail = 0;
1416     }
1417     smsg_head_index = -1;
1418
1419
1420 static void _init_static_msgq()
1421 {
1422     gni_return_t status;
1423     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
1424     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
1425     msgq_attrs.smsg_q_sz = 1;
1426     msgq_attrs.rcv_pool_sz = 1;
1427     msgq_attrs.num_msgq_eps = 2;
1428     msgq_attrs.nloc_insts = 8;
1429     msgq_attrs.modes = 0;
1430     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
1431
1432     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
1433     GNI_RC_CHECK("MSGQ Init", status);
1434
1435
1436 }
1437
1438 static void _init_DMA_buffer()
1439 {
1440     gni_return_t            status = GNI_RC_SUCCESS;
1441     /*AUTO tuning */
1442     /* suppose max_smsg is 1024, DMA buffer is split into 2048, 4096, 8192, ... */
1443     /*  This method might be better for SMP, but it is bad for Nonsmp since msgs are sharing same slots */
1444     /*
1445      * DMA_slots = 19-log2_SMSG_MAX_MSG;
1446     DMA_incoming_avail_tag = malloc(DMA_slots);
1447     DMA_buffer_size = 2*(DMA_max_single_msg - SMSG_MAX_MSG); 
1448     DMA_incoming_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1449     DMA_outgoing_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1450     
1451     status = GNI_MemRegister(nic_hndl, (uint64_t)DMA_incoming_base_addr,
1452             DMA_buffer_size, smsg_rx_cqh,
1453             GNI_MEM_READWRITE ,   
1454             vmdh_index,
1455             &);
1456             */
1457     // one is reserved to avoid deadlock
1458     DMA_slots           = 17; // each one is 8K  16*8K + 1 slot reserved to avoid deadlock
1459     DMA_buffer_size     = DMA_max_single_msg + 8192;
1460     DMA_buffer_base_mdh_addr.addr = (uint64_t)memalign(ALIGNBUF, DMA_buffer_size);
1461     status = GNI_MemRegister(nic_hndl, DMA_buffer_base_mdh_addr.addr,
1462         DMA_buffer_size, smsg_rx_cqh,
1463         GNI_MEM_READWRITE ,   
1464         -1,
1465         &(DMA_buffer_base_mdh_addr.mdh));
1466     GNI_RC_CHECK("GNI_MemRegister", status);
1467     DMA_buffer_base_mdh_addr_vec = (mdh_addr_t*) malloc(sizeof(mdh_addr_t) * mysize);
1468
1469     allgather(&DMA_buffer_base_mdh_addr, DMA_buffer_base_mdh_addr_vec, sizeof(mdh_addr_t) );
1470 }
1471
1472 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
1473 {
1474     register int            i;
1475     int                     rc;
1476     int                     device_id = 0;
1477     unsigned int            remote_addr;
1478     gni_cdm_handle_t        cdm_hndl;
1479     gni_return_t            status = GNI_RC_SUCCESS;
1480     uint32_t                vmdh_index = -1;
1481     uint8_t                 ptag;
1482     unsigned int            local_addr, *MPID_UGNI_AllAddr;
1483     int                     first_spawned;
1484     int                     physicalID;
1485     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
1486     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
1487     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
1488    
1489     //Mempool_MaxSize = CmiGetArgFlag(*argv, "+useMemorypoolSize");
1490     //useStaticSMSG = CmiGetArgFlag(*argv, "+useStaticSmsg");
1491     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
1492     
1493     status = PMI_Init(&first_spawned);
1494     GNI_RC_CHECK("PMI_Init", status);
1495
1496     status = PMI_Get_size(&mysize);
1497     GNI_RC_CHECK("PMI_Getsize", status);
1498
1499     status = PMI_Get_rank(&myrank);
1500     GNI_RC_CHECK("PMI_getrank", status);
1501
1502     //physicalID = CmiPhysicalNodeID(myrank);
1503     
1504     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
1505
1506     *myNodeID = myrank;
1507     *numNodes = mysize;
1508   
1509     if(myrank == 0)
1510     {
1511         printf("Charm++> Running on Gemini (GNI) using %d  cores\n", mysize);
1512     }
1513 #ifdef USE_ONESIDED
1514     onesided_init(NULL, &onesided_hnd);
1515
1516     // this is a GNI test, so use the libonesided bypass functionality
1517     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
1518     local_addr = gniGetNicAddress();
1519 #else
1520     ptag = get_ptag();
1521     cookie = get_cookie();
1522 #if 0
1523     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
1524 #endif
1525     //Create and attach to the communication  domain */
1526     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
1527     GNI_RC_CHECK("GNI_CdmCreate", status);
1528     //* device id The device id is the minor number for the device
1529     //that is assigned to the device by the system when the device is created.
1530     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
1531     //where X is the device number 0 default 
1532     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
1533     GNI_RC_CHECK("GNI_CdmAttach", status);
1534     local_addr = get_gni_nic_address(0);
1535 #endif
1536     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
1537     _MEMCHECK(MPID_UGNI_AllAddr);
1538     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
1539     /* create the local completion queue */
1540     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
1541     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
1542     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
1543     
1544     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
1545     //GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
1546     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
1547
1548     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
1549     GNI_RC_CHECK("Create CQ (rx)", status);
1550     
1551     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
1552     //GNI_RC_CHECK("Create Post CQ (rx)", status);
1553     
1554     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
1555     //GNI_RC_CHECK("Create BTE CQ", status);
1556
1557     /* create the endpoints. they need to be bound to allow later CQWrites to them */
1558     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
1559     _MEMCHECK(ep_hndl_array);
1560
1561     for (i=0; i<mysize; i++) {
1562         if(i == myrank) continue;
1563         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
1564         GNI_RC_CHECK("GNI_EpCreate ", status);   
1565         remote_addr = MPID_UGNI_AllAddr[i];
1566         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
1567         GNI_RC_CHECK("GNI_EpBind ", status);   
1568     }
1569     /* Depending on the number of cores in the job, decide different method */
1570     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
1571     if(mysize > 1)
1572     {
1573         if(useStaticSMSG == 1)
1574         {
1575             _init_static_smsg(mysize);
1576         }else if(useStaticMSGQ == 1)
1577         {
1578             _init_static_msgq();
1579         }
1580     }
1581 #if     USE_LRTS_MEMPOOL
1582     CmiGetArgLong(*argv, "+useMemorypoolSize", &_mempool_size);
1583     if (myrank==0) CmiPrintf("Charm++> use memorypool size: %1.fMB\n", _mempool_size/1024.0/1024);
1584     init_mempool(_mempool_size);
1585     //init_mempool(Mempool_MaxSize);
1586 #endif
1587
1588     /* init DMA buffer for medium message */
1589
1590     //_init_DMA_buffer();
1591     
1592     free(MPID_UGNI_AllAddr);
1593 }
1594
1595
1596 void* LrtsAlloc(int n_bytes, int header)
1597 {
1598     void *ptr;
1599 #if 0
1600     CmiPrintf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d\n", CmiMyPe(), n_bytes, header);
1601 #endif
1602     if(n_bytes <= SMSG_MAX_MSG)
1603     {
1604         int totalsize = n_bytes+header;
1605         ptr = malloc(totalsize);
1606     }else 
1607     {
1608
1609         CmiAssert(header <= ALIGNBUF);
1610 #if     USE_LRTS_MEMPOOL
1611         n_bytes = ALIGN64(n_bytes);
1612         char *res = syh_mempool_malloc(ALIGNBUF+n_bytes);
1613 #else
1614         n_bytes = ALIGN4(n_bytes);           /* make sure size if 4 aligned */
1615         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1616 #endif
1617         ptr = res + ALIGNBUF - header;
1618     }
1619 #if 0 
1620     CmiPrintf("Done Alloc Lrts for bytes=%d, head=%d\n", n_bytes, header);
1621 #endif
1622     return ptr;
1623 }
1624
1625 void  LrtsFree(void *msg)
1626 {
1627     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1628     if (size <= SMSG_MAX_MSG)
1629       free(msg);
1630     else
1631     {
1632 #if 0
1633         CmiPrintf("[PE:%d] Free lrts for bytes=%d, ptr=%p\n", CmiMyPe(), size, (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1634 #endif
1635 #if     USE_LRTS_MEMPOOL
1636         syh_mempool_free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1637 #else
1638         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1639 #endif
1640     }
1641 #if 0 
1642     CmiPrintf("Done Free lrts for bytes=%d\n", size);
1643 #endif
1644 }
1645
1646 static void LrtsExit()
1647 {
1648     /* free memory ? */
1649 #if     USE_LRTS_MEMPOOL
1650     kill_allmempool();
1651 #endif
1652     PMI_Finalize();
1653     exit(0);
1654 }
1655
1656 static void LrtsDrainResources()
1657 {
1658     while (!SendBufferMsg()) {
1659         PumpNetworkSmsg();
1660         PumpNetworkRdmaMsgs();
1661         PumpLocalSmsgTransactions();
1662         PumpLocalRdmaTransactions();
1663     }
1664     PMI_Barrier();
1665 }
1666
1667 void CmiAbort(const char *message) {
1668
1669     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1670     PMI_Abort(-1, message);
1671 }
1672
1673 /**************************  TIMER FUNCTIONS **************************/
1674 #if CMK_TIMER_USE_SPECIAL
1675 /* MPI calls are not threadsafe, even the timer on some machines */
1676 static CmiNodeLock  timerLock = 0;
1677 static int _absoluteTime = 0;
1678 static int _is_global = 0;
1679 static struct timespec start_ns;
1680
1681 inline int CmiTimerIsSynchronized() {
1682     return 1;
1683 }
1684
1685 inline int CmiTimerAbsolute() {
1686     return _absoluteTime;
1687 }
1688
1689 double CmiStartTimer() {
1690     return 0.0;
1691 }
1692
1693 double CmiInitTime() {
1694     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
1695 }
1696
1697 void CmiTimerInit(char **argv) {
1698     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1699     if (_absoluteTime && CmiMyPe() == 0)
1700         printf("Charm++> absolute  timer is used\n");
1701     
1702     _is_global = CmiTimerIsSynchronized();
1703
1704
1705     if (_is_global) {
1706         if (CmiMyRank() == 0) {
1707             clock_gettime(CLOCK_MONOTONIC, &start_ts)
1708         }
1709     } else { /* we don't have a synchronous timer, set our own start time */
1710         CmiBarrier();
1711         CmiBarrier();
1712         CmiBarrier();
1713         clock_gettime(CLOCK_MONOTONIC, &start_ts)
1714     }
1715     CmiNodeAllBarrier();          /* for smp */
1716 }
1717
1718 /**
1719  * Since the timerLock is never created, and is
1720  * always NULL, then all the if-condition inside
1721  * the timer functions could be disabled right
1722  * now in the case of SMP.
1723  */
1724 double CmiTimer(void) {
1725     struct timespec now_ts;
1726     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1727     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1728         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1729 }
1730
1731 double CmiWallTimer(void) {
1732     struct timespec now_ts;
1733     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1734     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1735         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1736 }
1737
1738 double CmiCpuTimer(void) {
1739     struct timespec now_ts;
1740     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1741     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1742         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1743 }
1744
1745 #endif
1746 /************Barrier Related Functions****************/
1747
1748 int CmiBarrier()
1749 {
1750     int status;
1751     status = PMI_Barrier();
1752     return status;
1753
1754 }
1755
1756
1757 #if CMK_PERSISTENT_COMM
1758 #include "machine-persistent.c"
1759 #endif
1760
1761