added dynamic smsg connection
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <stdint.h>
16 #include <errno.h>
17 #include <malloc.h>
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #if defined(_WIN32) && ! defined(__CYGWIN__)
26 #include <windows.h>
27 #include <wincon.h>
28 #include <sys/types.h>
29 #include <sys/timeb.h>
30
31 static void sleep(int secs) {
32     Sleep(1000*secs);
33 }
34 #else
35 #include <unistd.h> /*For getpid()*/
36 #endif
37
38
39 #define REMOTE_EVENT         0
40 #define USE_LRTS_MEMPOOL   1
41
42 #if USE_LRTS_MEMPOOL
43 static size_t _mempool_size = 1024ll*1024*32;
44 #endif
45
46 #define PRINT_SYH  0
47
48 #if PRINT_SYH
49 int         lrts_smsg_success = 0;
50 int         lrts_send_msg_id = 0;
51 int         lrts_send_rdma_id = 0;
52 int         lrts_send_rdma_success = 0;
53 int         lrts_received_msg = 0;
54 int         lrts_local_done_msg = 0;
55 #endif
56
57 #include "machine.h"
58
59 #include "pcqueue.h"
60
61 #if CMK_PERSISTENT_COMM
62 #include "machine-persistent.h"
63 #endif
64
65 //#define  USE_ONESIDED 1
66 #ifdef USE_ONESIDED
67 //onesided implementation is wrong, since no place to restore omdh
68 #include "onesided.h"
69 onesided_hnd_t   onesided_hnd;
70 onesided_md_t    omdh;
71 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
72
73 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
74
75 #else
76 uint8_t   onesided_hnd, omdh;
77 #if REMOTE_EVENT
78 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
79 #else
80 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
81 #endif
82 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
83 #endif
84
85 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
86 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
87
88 #define ALIGNBUF                64
89
90 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
91 /* If SMSG is not used */
92 #define FMA_PER_CORE  1024
93 #define FMA_BUFFER_SIZE 1024
94 /* If SMSG is used */
95 static int  SMSG_MAX_MSG;
96 //static int  log2_SMSG_MAX_MSG;
97 #define SMSG_MAX_CREDIT  36
98
99 #define MSGQ_MAXSIZE       2048
100 /* large message transfer with FMA or BTE */
101 #define LRTS_GNI_RDMA_THRESHOLD  2048
102
103 #define REMOTE_QUEUE_ENTRIES  2048 
104 #define LOCAL_QUEUE_ENTRIES   64 
105
106 #define PUT_DONE_TAG      0x29
107 #define ACK_TAG           0x30
108 /* SMSG is data message */
109 #define SMALL_DATA_TAG          0x31
110 /* SMSG is a control message to initialize a BTE */
111 #define MEDIUM_HEAD_TAG         0x32
112 #define MEDIUM_DATA_TAG         0x33
113 #define LMSG_INIT_TAG     0x39 
114
115 #define DEBUG
116 #ifdef GNI_RC_CHECK
117 #undef GNI_RC_CHECK
118 #endif
119 #ifdef DEBUG
120 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
121 #else
122 #define GNI_RC_CHECK(msg,rc)
123 #endif
124
125 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
126 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
127
128 static int Mempool_MaxSize = 1024*1024*128;
129 static int useDynamicSMSG   = 0;
130 static int useStaticMSGQ = 0;
131 static int useStaticFMA = 0;
132 static int mysize, myrank;
133 static gni_nic_handle_t      nic_hndl;
134
135 typedef struct {
136     gni_mem_handle_t mdh;
137     uint64_t addr;
138 } mdh_addr_t;
139 // this is related to dynamic SMSG
140
141 typedef struct mdh_addr_list{
142     gni_mem_handle_t mdh;
143     uint64_t addr;
144     struct mdh_addr_list *next;
145 }mdh_addr_list_t;
146
147 #define     SMSG_CONN_SIZE     24
148 int     *smsg_connected_flag= 0;
149 char    *smsg_connection_addr = 0;
150 mdh_addr_t    *smsg_connection_vec = 0;
151 gni_mem_handle_t    smsg_connection_memhndl;
152 static int            smsg_expand_slots = 10;
153 static int            smsg_available_slot = 0;
154 static void           *smsg_mailbox_mempool = 0;
155 mdh_addr_list_t       *smsg_dynamic_list = 0;
156
157 static void             *smsg_mailbox_base;
158 gni_msgq_attr_t         msgq_attrs;
159 gni_msgq_handle_t       msgq_handle;
160 gni_msgq_ep_attr_t      msgq_ep_attrs;
161 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
162
163
164
165 /* preallocated DMA buffer */
166 int                     DMA_slots;
167 uint64_t                DMA_avail_tag = 0;
168 uint32_t                DMA_incoming_avail_tag = 0;
169 uint32_t                DMA_outgoing_avail_tag = 0;
170 void                    *DMA_incoming_base_addr;
171 void                    *DMA_outgoing_base_addr;
172 mdh_addr_t              DMA_buffer_base_mdh_addr;
173 mdh_addr_t              *DMA_buffer_base_mdh_addr_vec;
174 int                     DMA_buffer_size;
175 int                     DMA_max_single_msg = 131072;//524288 ;
176
177 #define                 DMA_SIZE_PER_SLOT       8192
178
179
180 typedef struct dma_msgid_map
181 {
182     uint64_t     msg_id;
183     int     msg_subid
184 } dma_msgid_map_t;
185
186 dma_msgid_map_t         *dma_map_list;
187
188 typedef struct msg_trace
189 {
190     uint64_t    msg_id;
191     int         done_num;
192 }msg_trace_t;
193
194 msg_trace_t             *pending_msg_list;
195 /* =====Beginning of Declarations of Machine Specific Variables===== */
196 static int cookie;
197 static int modes = 0;
198 static gni_cq_handle_t       smsg_rx_cqh = NULL;
199 static gni_cq_handle_t       smsg_tx_cqh = NULL;
200 static gni_cq_handle_t       post_rx_cqh = NULL;
201 static gni_cq_handle_t       post_tx_cqh = NULL;
202 static gni_ep_handle_t       *ep_hndl_array;
203
204 typedef    struct  pending_smg
205 {
206     int     inst_id;
207     struct  pending_smg *next;
208 } PENDING_GETNEXT;
209
210
211 typedef struct msg_list
212 {
213     uint32_t destNode;
214     uint32_t size;
215     void *msg;
216     struct msg_list *next;
217     struct msg_list *pehead_next;
218     uint8_t tag;
219 }MSG_LIST;
220
221 typedef struct medium_msg_list
222 {
223     uint32_t destNode;
224     uint32_t msg_id;
225     uint32_t msg_subid;
226     uint32_t remain_size;
227     void *msg;
228     struct medium_msg_list *next;
229 }MEDIUM_MSG_LIST;
230
231
232 typedef struct control_msg
233 {
234     uint64_t            source_addr;
235     int                 source;               /* source rank */
236     int                 length;
237     gni_mem_handle_t    source_mem_hndl;
238     struct control_msg *next;
239 }CONTROL_MSG;
240
241 typedef struct medium_msg_control
242 {
243     uint64_t            dma_offset;     //the dma_buffer for this block of msg
244     int                 msg_id;         //Id for the total index
245     int                 msg_subid;      //offset inside the message id 
246 }MEDIUM_MSG_CONTROL;
247
248 typedef struct  rmda_msg
249 {
250     int                   destNode;
251     gni_post_descriptor_t *pd;
252     struct  rmda_msg      *next;
253 }RDMA_REQUEST;
254
255 typedef struct  msg_list_index
256 {
257     int         next;
258     MSG_LIST    *head;
259     MSG_LIST    *tail;
260 } MSG_LIST_INDEX;
261
262 /* reuse PendingMsg memory */
263 static CONTROL_MSG          *control_freelist=0;
264 static MSG_LIST             *msglist_freelist=0;
265 static int                  smsg_head_index;
266 static MSG_LIST_INDEX       *smsg_msglist_index= 0;
267 static MSG_LIST             *smsg_free_head=0;
268 static MSG_LIST             *smsg_free_tail=0;
269
270 /*
271 #define FreeMsgList(msg_head, msg_tail, free_head, free_tail)       \
272     if(free_head == 0)  free_head = free_tail = msg_head;    \
273     else   free_tail = free_tail->next;    \
274     if( msg_head->next == msg_tail) msg_head =0;   \
275     else msg_head= msg_head->next;    
276
277 #define MallocMsgList(d, msg_head, msg_tail, free_head, free_tail, msgsize) \
278     if(free_head == 0) {d= malloc(msgsize);  \
279         if(msg_head == 0)   msg_head =msg_tail = msg_head->next = msg_tail->next = d; \
280         else { msg_tail->next = d; d->next = msg_head; msg_tail=d;} \
281     }else {d = free_head; free_head = free_head->next; if(free_tail->next == free_head) free_head =0;} \
282 */
283
284 #define FreeMsgList(d)  \
285   (d)->next = msglist_freelist;\
286   msglist_freelist = d;
287
288
289
290 #define MallocMsgList(d) \
291   d = msglist_freelist;\
292   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
293              _MEMCHECK(d);\
294   } else msglist_freelist = d->next;
295
296
297 #define FreeControlMsg(d)       \
298   (d)->next = control_freelist;\
299   control_freelist = d;
300
301 #define MallocControlMsg(d) \
302   d = control_freelist;\
303   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
304              _MEMCHECK(d);\
305   } else control_freelist = d->next;
306
307
308 static RDMA_REQUEST         *rdma_freelist = NULL;
309
310 #define FreeControlMsg(d)       \
311   (d)->next = control_freelist;\
312   control_freelist = d;
313
314 #define MallocControlMsg(d) \
315   d = control_freelist;\
316   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
317              _MEMCHECK(d);\
318   } else control_freelist = d->next;
319
320 #define FreeMediumControlMsg(d)       \
321   (d)->next = medium_control_freelist;\
322   medium_control_freelist = d;
323
324
325 #define MallocMediumControlMsg(d) \
326     d = medium_control_freelist;\
327     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
328     _MEMCHECK(d);\
329 } else mediumcontrol_freelist = d->next;
330
331 #define FreeRdmaRequest(d)       \
332   (d)->next = rdma_freelist;\
333   rdma_freelist = d;
334
335 #define MallocRdmaRequest(d) \
336   d = rdma_freelist;\
337   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
338              _MEMCHECK(d);\
339   } else rdma_freelist = d->next;
340
341 /* reuse gni_post_descriptor_t */
342 static gni_post_descriptor_t *post_freelist=0;
343
344 #if 1
345 #define FreePostDesc(d)       \
346     (d)->next_descr = post_freelist;\
347     post_freelist = d;
348
349 #define MallocPostDesc(d) \
350   d = post_freelist;\
351   if (d==0) { \
352      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
353      _MEMCHECK(d);\
354   } else post_freelist = d->next_descr;
355 #else
356
357 #define FreePostDesc(d)     free(d);
358 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
359
360 #endif
361
362 static PENDING_GETNEXT     *pending_smsg_head = 0;
363 static PENDING_GETNEXT     *pending_smsg_tail = 0;
364
365 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
366 static int      buffered_smsg_counter = 0;
367
368 /* SmsgSend return success but message sent is not confirmed by remote side */
369
370 static RDMA_REQUEST  *pending_rdma_head = 0;
371 static RDMA_REQUEST  *pending_rdma_tail = 0;
372 static MSG_LIST *buffered_fma_head = 0;
373 static MSG_LIST *buffered_fma_tail = 0;
374
375 /* functions  */
376 #define IsFree(a,ind)  !( a& (1<<(ind) ))
377 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
378 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
379
380 #include "mempool.c"
381
382 /* get the upper bound of log 2 */
383 int mylog2(int size)
384 {
385     int op = size;
386     unsigned int ret=0;
387     unsigned int mask = 0;
388     int i;
389     while(op>0)
390     {
391         op = op >> 1;
392         ret++;
393
394     }
395     for(i=1; i<ret; i++)
396     {
397         mask = mask << 1;
398         mask +=1;
399     }
400
401     ret -= ((size &mask) ? 0:1);
402     return ret;
403 }
404
405 static void
406 allgather(void *in,void *out, int len)
407 {
408     static int *ivec_ptr=NULL,already_called=0,job_size=0;
409     int i,rc;
410     int my_rank;
411     char *tmp_buf,*out_ptr;
412
413     if(!already_called) {
414
415         rc = PMI_Get_size(&job_size);
416         CmiAssert(rc == PMI_SUCCESS);
417         rc = PMI_Get_rank(&my_rank);
418         CmiAssert(rc == PMI_SUCCESS);
419
420         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
421         CmiAssert(ivec_ptr != NULL);
422
423         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
424         CmiAssert(rc == PMI_SUCCESS);
425
426         already_called = 1;
427
428     }
429
430     tmp_buf = (char *)malloc(job_size * len);
431     CmiAssert(tmp_buf);
432
433     rc = PMI_Allgather(in,tmp_buf,len);
434     CmiAssert(rc == PMI_SUCCESS);
435
436     out_ptr = out;
437
438     for(i=0;i<job_size;i++) {
439
440         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
441
442     }
443
444     free(tmp_buf);
445 }
446 static void
447 allgather_2(void *in,void *out, int len)
448 {
449     //PMI_Allgather is out of order
450     int i,rc, extend_len;
451     int  rank_index;
452     char *out_ptr, *out_ref;
453     char *in2;
454
455     extend_len = sizeof(int) + len;
456     in2 = (char*)malloc(extend_len);
457
458     memcpy(in2, &myrank, sizeof(int));
459     memcpy(in2+sizeof(int), in, len);
460
461     out_ptr = (char*)malloc(mysize*extend_len);
462
463     rc = PMI_Allgather(in2, out_ptr, extend_len);
464     GNI_RC_CHECK("allgather", rc);
465
466     out_ref = out;
467
468     for(i=0;i<mysize;i++) {
469         //rank index 
470         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
471         //copy to the rank index slot
472         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
473     }
474
475     free(out_ptr);
476     free(in2);
477
478 }
479
480 static unsigned int get_gni_nic_address(int device_id)
481 {
482     unsigned int address, cpu_id;
483     gni_return_t status;
484     int i, alps_dev_id=-1,alps_address=-1;
485     char *token, *p_ptr;
486
487     p_ptr = getenv("PMI_GNI_DEV_ID");
488     if (!p_ptr) {
489         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
490        
491         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
492     } else {
493         while ((token = strtok(p_ptr,":")) != NULL) {
494             alps_dev_id = atoi(token);
495             if (alps_dev_id == device_id) {
496                 break;
497             }
498             p_ptr = NULL;
499         }
500         CmiAssert(alps_dev_id != -1);
501         p_ptr = getenv("PMI_GNI_LOC_ADDR");
502         CmiAssert(p_ptr != NULL);
503         i = 0;
504         while ((token = strtok(p_ptr,":")) != NULL) {
505             if (i == alps_dev_id) {
506                 alps_address = atoi(token);
507                 break;
508             }
509             p_ptr = NULL;
510             ++i;
511         }
512         CmiAssert(alps_address != -1);
513         address = alps_address;
514     }
515     return address;
516 }
517
518 static uint8_t get_ptag(void)
519 {
520     char *p_ptr, *token;
521     uint8_t ptag;
522
523     p_ptr = getenv("PMI_GNI_PTAG");
524     CmiAssert(p_ptr != NULL);
525     token = strtok(p_ptr, ":");
526     ptag = (uint8_t)atoi(token);
527     return ptag;
528         
529 }
530
531 static uint32_t get_cookie(void)
532 {
533     uint32_t cookie;
534     char *p_ptr, *token;
535
536     p_ptr = getenv("PMI_GNI_COOKIE");
537     CmiAssert(p_ptr != NULL);
538     token = strtok(p_ptr, ":");
539     cookie = (uint32_t)atoi(token);
540
541     return cookie;
542 }
543
544 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
545 /* TODO: add any that are related */
546 /* =====End of Definitions of Message-Corruption Related Macros=====*/
547
548
549 #include "machine-lrts.h"
550 #include "machine-common-core.c"
551
552 /* Network progress function is used to poll the network when for
553    messages. This flushes receive buffers on some  implementations*/
554 #if CMK_MACHINE_PROGRESS_DEFINED
555 void CmiMachineProgressImpl() {
556 }
557 #endif
558
559 inline
560 static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
561 {
562     MSG_LIST        *msg_tmp;
563     MallocMsgList(msg_tmp);
564     msg_tmp->destNode = destNode;
565     msg_tmp->size   = size;
566     msg_tmp->msg    = msg;
567     msg_tmp->tag    = tag;
568     msg_tmp->next   = 0;
569     if (smsg_msglist_index[destNode].head == 0 ) {
570         smsg_msglist_index[destNode].head = msg_tmp;
571         smsg_msglist_index[destNode].next = smsg_head_index;
572         smsg_head_index = destNode;
573     }
574     else {
575       (smsg_msglist_index[destNode].tail)->next    = msg_tmp;
576     }
577     smsg_msglist_index[destNode].tail          = msg_tmp;
578 #if PRINT_SYH
579     buffered_smsg_counter++;
580 #endif
581 }
582
583 inline 
584 static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
585 {
586     gni_return_t status = GNI_RC_NOT_DONE;
587     
588     if(useDynamicSMSG == 1)
589     {
590         if(smsg_connected_flag[destNode] == 0)
591         {
592
593             //allocate mailbox 
594             //FMA_PUT to set up
595            //register mailbox for destNode 
596         } else if (smsg_connected_flag[destNode] == 1)
597         {
598             //connection request sent
599         }
600     }
601     if(smsg_msglist_index[destNode].head == 0 || inbuff==1)
602     {
603         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], header, size_header, msg, size, 0, tag);
604         if(status == GNI_RC_SUCCESS)
605         {
606 #if PRINT_SYH
607             lrts_smsg_success++;
608             if(lrts_smsg_success == lrts_send_msg_id)
609                 CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
610             else
611                 CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
612 #endif
613             return status;
614         }
615     }else {
616         if(inbuff ==0)
617             delay_send_small_msg(msg, size, destNode, tag);
618     }
619     return status;
620 }
621
622 // Get first 0 in DMA_tags starting from index
623 static int get_first_avail_bit(uint64_t DMA_tags, int start_index)
624 {
625
626     uint64_t         mask = 0x1;
627     register    int     i=0;
628     while((DMA_tags & mask) && i<DMA_slots) {mask << 1; i++;}
629
630 }
631
632 static int send_medium_messages(int destNode, int size, char *msg)
633 {
634 #if 0
635     gni_return_t status = GNI_RC_SUCCESS;
636     int first_avail_bit=0;
637     uint64_t mask = 0x1;
638     MEDIUM_MSG_CONTROL  *medium_msg_control_tmp;
639     MEDIUM_MSG_LIST        *msg_tmp;
640     int blocksize, remain_size, pos;
641     int sub_id = 0;
642     remain_size = size;
643     pos = 0;  //offset before which data are sent
644     /* copy blocks of the message to DMA preallocated buffer and send SMSG */
645     //Check whether there is any available DMA buffer
646     
647     do{
648         while((DMA_avail_tag & mask) && first_avail_bit<DMA_slots) {mask << 1; first_avail_bit++;}
649         if(first_avail_bit == DMA_slots) //No available DMA, buffer this message
650         {
651             MallocMediumMsgList(msg_tmp);
652             msg_tmp->destNode = destNode;
653             msg_tmp->msg_id   = lrts_send_msg_id;
654             msg_tmp->msg_subid   = sub_id;
655             msg_tmp->size   = remain_size;
656             msg_tmp->msg    = msg+pos;
657             msg_tmp->next   = NULL;
658             break;
659         }else
660         {
661             //copy this part of the message into this DMA buffer
662             //TODO optimize here, some data can go with this SMSG
663             blocksize = (remain_size>DMA_SIZE_PER_SLOT)?DMA_SIZE_PER_SLOT: remain_size;
664             memcpy(DMA_buffer_base_mdh_addr.addr[first_avail_bit], msg+pos, blocksize);
665             pos += blocksize;
666             remain_size -= blocksize;
667             SET_BITS(DMA_avail_tag, first_avail_bit);
668            
669             MallocMediumControlMsg(medium_msg_control_tmp);
670             medium_msg_control_tmp->msg_id = lrts_send_msg_id;
671             medium_msg_control_tmp->msg_subid = sub_id;
672             if(status == GNI_RC_SUCCESS)
673             {
674                 if(sub_id==0)
675                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_HEAD_TAG);
676                 else
677                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_DATA_TAG);
678             }
679             //buffer this smsg
680             if(status != GNI_RC_SUCCESS)
681             {
682                 delay_send_small_msg(medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), destNode, MEDIUM_HEAD_TAG);
683             }
684             sub_id++;
685         }while(remain_size > 0 );
686
687         }
688     }
689 #endif
690 }
691
692 // Large message, send control to receiver, receiver register memory and do a GET 
693 inline
694 static void send_large_messages(int destNode, int size, char *msg)
695 {
696 #if     USE_LRTS_MEMPOOL
697     gni_return_t        status  =   GNI_RC_SUCCESS;
698     CONTROL_MSG         *control_msg_tmp;
699     uint32_t            vmdh_index  = -1;
700     /* construct a control message and send */
701     MallocControlMsg(control_msg_tmp);
702     control_msg_tmp->source_addr    = (uint64_t)msg;
703     control_msg_tmp->source         = myrank;
704     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
705     //memcpy( &(control_msg_tmp->source_mem_hndl), GetMemHndl(msg), sizeof(gni_mem_handle_t)) ;
706     control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
707 #if PRINT_SYH
708     lrts_send_msg_id++;
709     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
710 #endif
711     status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
712     if(status == GNI_RC_SUCCESS)
713     {
714         FreeControlMsg(control_msg_tmp);
715     }
716 // NOT use mempool, should slow 
717 #else
718     gni_return_t        status  =   GNI_RC_SUCCESS;
719     CONTROL_MSG         *control_msg_tmp;
720     uint32_t            vmdh_index  = -1;
721     /* construct a control message and send */
722     MallocControlMsg(control_msg_tmp);
723     control_msg_tmp->source_addr    = (uint64_t)msg;
724     control_msg_tmp->source         = myrank;
725     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
726     control_msg_tmp->source_mem_hndl.qword1 = 0;
727     control_msg_tmp->source_mem_hndl.qword2 = 0;
728 #if PRINT_SYH
729     lrts_send_msg_id++;
730     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
731 #endif
732     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
733     if(status == GNI_RC_SUCCESS)
734     {
735         status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
736         if(status == GNI_RC_SUCCESS)
737         {
738             FreeControlMsg(control_msg_tmp);
739         }
740     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
741     {
742         CmiAbort("Memory registor for large msg\n");
743     }else 
744     {
745         delay_send_small_msg(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
746     }
747 #endif
748 }
749
750 inline void LrtsPrepareEnvelope(char *msg, int size)
751 {
752     CmiSetMsgSize(msg, size);
753 }
754
755 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
756 {
757     gni_return_t        status  =   GNI_RC_SUCCESS;
758     LrtsPrepareEnvelope(msg, size);
759
760     if(size <= SMSG_MAX_MSG)
761     {
762 #if PRINT_SYH
763         lrts_send_msg_id++;
764         CmiPrintf("SMSG LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_msg_id);
765 #endif
766         status = send_smsg_message( destNode, 0, 0, msg, size, SMALL_DATA_TAG, 0);  
767         if(status == GNI_RC_SUCCESS)
768         {
769             CmiFree(msg);
770         }
771     }
772     else
773     {
774         send_large_messages(destNode, size, msg);
775     }
776     return 0;
777 }
778
779 static void LrtsPreCommonInit(int everReturn){}
780
781 /* Idle-state related functions: called in non-smp mode */
782 void CmiNotifyIdleForGemini(void) {
783     LrtsAdvanceCommunication();
784 }
785
786 static void LrtsPostCommonInit(int everReturn)
787 {
788 #if CMK_SMP
789     CmiIdleState *s=CmiNotifyGetState();
790     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
791     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
792 #else
793     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForGemini,NULL);
794 #endif
795
796 }
797
798
799 void LrtsPostNonLocal(){}
800 /* pooling CQ to receive network message */
801 static void PumpNetworkRdmaMsgs()
802 {
803     gni_cq_entry_t      event_data;
804     gni_return_t        status;
805     while( (status = GNI_CqGetEvent(post_rx_cqh, &event_data)) == GNI_RC_SUCCESS);
806 }
807
808 static int SendRdmaMsg();
809 static void getLargeMsgRequest(void* header, uint64_t inst_id);
810 static void PumpNetworkSmsg()
811 {
812     uint64_t            inst_id;
813     PENDING_GETNEXT     *pending_next;
814     int                 ret;
815     gni_cq_entry_t      event_data;
816     gni_return_t        status;
817     void                *header;
818     uint8_t             msg_tag;
819     int                 msg_nbytes;
820     void                *msg_data;
821     gni_mem_handle_t    msg_mem_hndl;
822  
823
824     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
825     {
826         inst_id = GNI_CQ_GET_INST_ID(event_data);
827         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
828 #if PRINT_SYH
829         CmiPrintf("[%d] PumpNetworkMsgs small msgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
830 #endif
831         msg_tag = GNI_SMSG_ANY_TAG;
832         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
833         {
834 #if PRINT_SYH
835             lrts_received_msg++;
836             CmiPrintf("+++[%d] PumpNetwork msg is received, messageid:%d tag=%d\n", myrank, lrts_received_msg, msg_tag);
837 #endif
838             /* copy msg out and then put into queue (small message) */
839             switch (msg_tag) {
840             case SMALL_DATA_TAG:
841             {
842                 msg_nbytes = CmiGetMsgSize(header);
843                 msg_data    = CmiAlloc(msg_nbytes);
844                 memcpy(msg_data, (char*)header, msg_nbytes);
845                 handleOneRecvedMsg(msg_nbytes, msg_data);
846                 break;
847             }
848             case LMSG_INIT_TAG:
849             {
850 #if PRINT_SYH
851                 CmiPrintf("+++[%d] from %d PumpNetwork Rdma Request msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
852 #endif
853                 getLargeMsgRequest(header, inst_id);
854                 break;
855             }
856             case ACK_TAG:
857             {
858                 /* Get is done, release message . Now put is not used yet*/
859 #if         !USE_LRTS_MEMPOOL
860                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
861 #endif
862                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
863                 SendRdmaMsg();
864                 break;
865             }
866             case PUT_DONE_TAG: //persistent message
867             {
868                 handleOneRecvedMsg(((CONTROL_MSG *) header)->length,((void*) (CONTROL_MSG *) header)); 
869                 break;
870             }
871             default: {
872                 CmiPrintf("weird tag problem\n");
873                 CmiAbort("Unknown tag\n");
874             }
875             }
876             GNI_SmsgRelease(ep_hndl_array[inst_id]);
877             msg_tag = GNI_SMSG_ANY_TAG;
878         } //endwhile getNext
879     }   //end while GetEvent
880     if(status == GNI_RC_ERROR_RESOURCE)
881     {
882         GNI_RC_CHECK("Smsg_rx_cq full", status);
883     }
884 }
885
886 static void getLargeMsgRequest(void* header, uint64_t inst_id)
887 {
888 #if     USE_LRTS_MEMPOOL
889     CONTROL_MSG         *request_msg;
890     gni_return_t        status;
891     void                *msg_data;
892     gni_post_descriptor_t *pd;
893     RDMA_REQUEST        *rdma_request_msg;
894     gni_mem_handle_t    msg_mem_hndl;
895     int source;
896     // initial a get to transfer data from the sender side */
897     request_msg = (CONTROL_MSG *) header;
898     source = request_msg->source;
899     msg_data = CmiAlloc(request_msg->length);
900     _MEMCHECK(msg_data);
901     //memcpy(&msg_mem_hndl, GetMemHndl(msg_data), sizeof(gni_mem_handle_t));
902     msg_mem_hndl = GetMemHndl(msg_data);
903
904     MallocPostDesc(pd);
905     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
906         pd->type            = GNI_POST_FMA_GET;
907     else
908         pd->type            = GNI_POST_RDMA_GET;
909 #if REMOTE_EVENT
910     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
911 #else
912     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
913 #endif
914     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
915     pd->length          = ALIGN4(request_msg->length);
916     pd->local_addr      = (uint64_t) msg_data;
917     pd->local_mem_hndl  = msg_mem_hndl;
918     pd->remote_addr     = request_msg->source_addr;
919     pd->remote_mem_hndl = request_msg->source_mem_hndl;
920     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
921     pd->rdma_mode       = 0;
922
923     if(pd->type == GNI_POST_RDMA_GET) 
924         status = GNI_PostRdma(ep_hndl_array[source], pd);
925     else
926         status = GNI_PostFma(ep_hndl_array[source],  pd);
927     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
928     {
929         MallocRdmaRequest(rdma_request_msg);
930         rdma_request_msg->next = 0;
931         rdma_request_msg->destNode = inst_id;
932         rdma_request_msg->pd = pd;
933         if(pending_rdma_head == 0)
934         {
935             pending_rdma_head = rdma_request_msg;
936         }else
937         {
938             pending_rdma_tail->next = rdma_request_msg;
939         }
940         pending_rdma_tail = rdma_request_msg;
941     }else
942         GNI_RC_CHECK("AFter posting", status);
943
944 #else
945     CONTROL_MSG         *request_msg;
946     gni_return_t        status;
947     void                *msg_data;
948     gni_post_descriptor_t *pd;
949     RDMA_REQUEST        *rdma_request_msg;
950     gni_mem_handle_t    msg_mem_hndl;
951     int source;
952     // initial a get to transfer data from the sender side */
953     request_msg = (CONTROL_MSG *) header;
954     source = request_msg->source;
955     msg_data = CmiAlloc(request_msg->length);
956     _MEMCHECK(msg_data);
957
958     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
959
960     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
961     {
962         GNI_RC_CHECK("Mem Register before post", status);
963     }
964
965     MallocPostDesc(pd);
966     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
967         pd->type            = GNI_POST_FMA_GET;
968     else
969         pd->type            = GNI_POST_RDMA_GET;
970 #if REMOTE_EVENT
971     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
972 #else
973     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
974 #endif
975     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
976     pd->length          = ALIGN4(request_msg->length);
977     pd->local_addr      = (uint64_t) msg_data;
978     pd->remote_addr     = request_msg->source_addr;
979     pd->remote_mem_hndl = request_msg->source_mem_hndl;
980     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
981     pd->rdma_mode       = 0;
982
983     //memory registration successful
984     if(status == GNI_RC_SUCCESS)
985     {
986         pd->local_mem_hndl  = msg_mem_hndl;
987         if(pd->type == GNI_POST_RDMA_GET) 
988             status = GNI_PostRdma(ep_hndl_array[source], pd);
989         else
990             status = GNI_PostFma(ep_hndl_array[source],  pd);
991     }else
992     {
993         pd->local_mem_hndl.qword1  = 0; 
994         pd->local_mem_hndl.qword1  = 0; 
995     }
996     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
997     {
998         MallocRdmaRequest(rdma_request_msg);
999         rdma_request_msg->next = 0;
1000         rdma_request_msg->destNode = inst_id;
1001         rdma_request_msg->pd = pd;
1002         if(pending_rdma_head == 0)
1003         {
1004             pending_rdma_head = rdma_request_msg;
1005         }else
1006         {
1007             pending_rdma_tail->next = rdma_request_msg;
1008         }
1009         pending_rdma_tail = rdma_request_msg;
1010     }else
1011         GNI_RC_CHECK("AFter posting", status);
1012 #endif
1013 }
1014
1015 /* Check whether message send or get is confirmed by remote */
1016 static void PumpLocalSmsgTransactions()
1017 {
1018     gni_return_t            status;
1019     gni_cq_entry_t          ev;
1020     while ((status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1021     {
1022 #if PRINT_SYH
1023         lrts_local_done_msg++;
1024         //CmiPrintf("*[%d]  PumpLocalSmsgTransactions GNI_CQ_GET_TYPE %d. Localdone=%d\n", myrank, GNI_CQ_GET_TYPE(ev), lrts_local_done_msg);
1025 #endif
1026         if(GNI_CQ_OVERRUN(ev))
1027         {
1028             CmiPrintf("Overrun detected in local CQ");
1029             CmiAbort("Overrun in TX");
1030         }
1031     }
1032     if(status == GNI_RC_ERROR_RESOURCE)
1033     {
1034         GNI_RC_CHECK("Smsg_tx_cq full", status);
1035     }
1036 }
1037
1038 static void PumpLocalRdmaTransactions()
1039 {
1040     gni_cq_entry_t          ev;
1041     gni_return_t            status;
1042     uint64_t                type, inst_id;
1043     gni_post_descriptor_t   *tmp_pd;
1044     MSG_LIST                *ptr;
1045     CONTROL_MSG             *ack_msg_tmp;
1046     uint8_t             msg_tag;
1047
1048    // while ( (status = GNI_CqGetEvent(post_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1049     while ( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1050     {
1051         type        = GNI_CQ_GET_TYPE(ev);
1052 #if PRINT_SYH
1053         //lrts_local_done_msg++;
1054         CmiPrintf("**[%d] SMSGPumpLocalTransactions (type=%d)\n", myrank, type);
1055 #endif
1056         if (type == GNI_CQ_EVENT_TYPE_POST)
1057         {
1058             inst_id     = GNI_CQ_GET_INST_ID(ev);
1059 #if PRINT_SYH
1060             CmiPrintf("**[%d] SMSGPumpLocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1061 #endif
1062             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
1063             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1064             ////Message is sent, free message , put is not used now
1065             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
1066             {   //persistent message 
1067                 CmiFree((void *)tmp_pd->local_addr);
1068                 msg_tag = PUT_DONE_TAG;  
1069             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1070             {
1071                 msg_tag = ACK_TAG;  
1072             }
1073             MallocControlMsg(ack_msg_tmp);
1074             ack_msg_tmp->source             = myrank;
1075             ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
1076             ack_msg_tmp->length             = tmp_pd->length; 
1077             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1078 #if PRINT_SYH
1079             lrts_send_msg_id++;
1080             CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
1081 #endif
1082             status = send_smsg_message(inst_id, 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0);  
1083             if(status == GNI_RC_SUCCESS)
1084             {
1085                 FreeControlMsg(ack_msg_tmp);
1086             }
1087 #if     !USE_LRTS_MEMPOOL
1088             MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1089 #endif
1090             CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1091             handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1092             SendRdmaMsg(); 
1093             FreePostDesc(tmp_pd);
1094         }
1095     } //end while
1096 }
1097
1098 static int SendRdmaMsg()
1099 {
1100     gni_return_t            status = GNI_RC_SUCCESS;
1101     gni_mem_handle_t        msg_mem_hndl;
1102
1103     RDMA_REQUEST *ptr = pending_rdma_head;
1104     RDMA_REQUEST *prev = NULL;
1105
1106     while (ptr != NULL)
1107     {
1108         gni_post_descriptor_t *pd = ptr->pd;
1109         status = GNI_RC_SUCCESS;
1110         // register memory first
1111         if( pd->local_mem_hndl.qword1 == 0 && pd->local_mem_hndl.qword2 == 0)
1112         {
1113             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1114         }
1115         if(status == GNI_RC_SUCCESS)
1116         {
1117             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1118                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1119             else
1120                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1121             if(status == GNI_RC_SUCCESS)
1122             {
1123                 RDMA_REQUEST *tmp = ptr;
1124                 if (prev)
1125                   prev->next = ptr->next;
1126                 else
1127                   pending_rdma_head = ptr->next;
1128                 ptr = ptr->next;
1129                 FreeRdmaRequest(tmp);
1130                 continue;
1131             }
1132         }
1133         prev = ptr;
1134         ptr = ptr->next;
1135     } //end while
1136     return 0;
1137 }
1138
1139 // return 1 if all messages are sent
1140 static int SendBufferMsg()
1141 {
1142     MSG_LIST            *ptr, *previous_head, *current_head;
1143     CONTROL_MSG         *control_msg_tmp;
1144     gni_return_t        status;
1145     int done = 1;
1146     register    int     i;
1147     int                 index_previous = -1;
1148     int                 index = smsg_head_index;
1149     //if( smsg_msglist_head == 0 && buffered_smsg_counter!= 0 ) {CmiPrintf("WRONGWRONG on rank%d, buffermsg=%d, (msgid-succ:%d)\n", myrank, buffered_smsg_counter, (lrts_send_msg_id-lrts_smsg_success)); CmiAbort("sendbuf");}
1150     /* can add flow control here to control the number of messages sent before handle message */
1151     while(index != -1)
1152     {
1153         ptr = smsg_msglist_index[index].head;
1154        
1155         while(ptr!=0)
1156         {
1157             CmiAssert(ptr!=NULL);
1158             if(ptr->tag == SMALL_DATA_TAG)
1159             {
1160                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, ptr->size, SMALL_DATA_TAG, 1);  
1161                 if(status == GNI_RC_SUCCESS)
1162                 {
1163                     CmiFree(ptr->msg);
1164                 }
1165             }
1166             else if(ptr->tag == LMSG_INIT_TAG)
1167             {
1168                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1169 #if PRINT_SYH
1170                 CmiPrintf("[%d==>%d] LMSG buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1171 #endif
1172                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
1173                 {
1174                     MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, control_msg_tmp->length, &(control_msg_tmp->source_mem_hndl), &omdh);
1175                     if(status != GNI_RC_SUCCESS) {
1176                         done = 0;
1177                         break;
1178                     }
1179                 }
1180                 
1181                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 1);  
1182                 if(status == GNI_RC_SUCCESS)
1183                 {
1184                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
1185                 }
1186             }else if (ptr->tag == ACK_TAG)
1187             {
1188                 status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), ACK_TAG, 1);  
1189                 if(status == GNI_RC_SUCCESS)
1190                 {
1191                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1192                 }
1193             }else
1194             {
1195                 CmiPrintf("Weird tag\n");
1196                 CmiAbort("should not happen\n");
1197             }
1198         } //end while 
1199         if(status == GNI_RC_SUCCESS)
1200         {
1201             smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
1202             FreeMsgList(ptr);
1203             ptr= smsg_msglist_index[index].head;
1204 #if PRINT_SYH
1205             buffered_smsg_counter--;
1206             if(lrts_smsg_success == lrts_send_msg_id)
1207                 CmiPrintf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1208             else
1209                 CmiPrintf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1210 #endif
1211         }else {
1212             done = 0;
1213             break;
1214         }
1215         if(ptr == 0)
1216         {
1217             if(index_previous != -1)
1218                 smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next;
1219             else
1220                 smsg_head_index = smsg_msglist_index[index].next;
1221         }else
1222         {
1223             index_previous = index;
1224         }
1225         index = smsg_msglist_index[index].next;
1226     }   // end pooling for all cores
1227 #if PRINT_SYH
1228     if(lrts_send_msg_id-lrts_smsg_success !=0)
1229         CmiPrintf("WRONG [%d buffered msg is empty(%p) (actually=%d)(buffercounter=%d)\n", myrank, smsg_msglist_head[i], (lrts_send_msg_id-lrts_smsg_success, buffered_smsg_counter));
1230 #endif
1231     return done;
1232 }
1233
1234 static void LrtsAdvanceCommunication()
1235 {
1236     /*  Receive Msg first */
1237     //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", CmiMyPe());
1238     PumpNetworkSmsg();
1239     //CmiPrintf("Calling Lrts Pump RdmaMsg PE:%d\n", CmiMyPe());
1240     //PumpNetworkRdmaMsgs();
1241     /* Release Sent Msg */
1242     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
1243     //PumpLocalSmsgTransactions();
1244     //CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", CmiMyPe());
1245     PumpLocalRdmaTransactions();
1246     //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", CmiMyPe());
1247     /* Send buffered Message */
1248     SendBufferMsg();
1249     SendRdmaMsg();
1250 }
1251
1252 static void _init_dynamic_smsg()
1253 {
1254     gni_smsg_attr_t smsg_attr;
1255     mdh_addr_t current_addr;
1256     gni_return_t status;
1257     unsigned int         smsg_memlen;
1258     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
1259     memset(smsg_connected_flag, 0, mysize*sizeof(int));
1260
1261     current_addr.addr = (uint64_t)malloc(mysize * SMSG_CONN_SIZE);
1262     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, smsg_connection_addr, mysize * SMSG_CONN_SIZE,  &(current_addr.mdh), &omdh);
1263     
1264     smsg_connection_vec = (mdh_addr_t*) malloc(mysize*sizeof(mdh_addr_t)); 
1265     allgather(&current_addr, smsg_connection_vec, sizeof(mdh_addr_t));
1266     
1267     //pre-allocate some memory as mailbox for dynamic connection
1268     if(mysize <=4096)
1269     {
1270         SMSG_MAX_MSG = 1024;
1271     }else if (mysize > 4096 && mysize <= 16384)
1272     {
1273         SMSG_MAX_MSG = 512;
1274     }else {
1275         SMSG_MAX_MSG = 256;
1276     }
1277     
1278     smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1279     smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1280     smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1281     status = GNI_SmsgBufferSizeNeeded(&smsg_attr, &smsg_memlen);
1282     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1283     
1284     smsg_dynamic_list = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1285
1286     smsg_dynamic_list->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1287     bzero(smsg_dynamic_list->addr, smsg_memlen*smsg_expand_slots);
1288     
1289     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_dynamic_list->addr,
1290             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1291             GNI_MEM_READWRITE,   
1292             vmdh_index,
1293             &(smsg_dynamic_list->mdh));
1294    smsg_available_slot = 0;  
1295 }
1296
1297 static void _init_static_smsg()
1298 {
1299     gni_smsg_attr_t      *smsg_attr;
1300     gni_smsg_attr_t      remote_smsg_attr;
1301     gni_smsg_attr_t      *smsg_attr_vec;
1302     unsigned int         smsg_memlen;
1303     gni_mem_handle_t     my_smsg_mdh_mailbox;
1304     int      i;
1305     gni_return_t status;
1306     uint32_t              vmdh_index = -1;
1307     mdh_addr_t            base_infor;
1308     mdh_addr_t            *base_addr_vec;
1309     if(mysize <=4096)
1310     {
1311         SMSG_MAX_MSG = 1024;
1312         //log2_SMSG_MAX_MSG = 10;
1313     }else if (mysize > 4096 && mysize <= 16384)
1314     {
1315         SMSG_MAX_MSG = 512;
1316         //log2_SMSG_MAX_MSG = 9;
1317
1318     }else {
1319         SMSG_MAX_MSG = 256;
1320         //log2_SMSG_MAX_MSG = 8;
1321     }
1322     
1323     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
1324     
1325     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1326     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
1327     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
1328     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
1329     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1330     smsg_mailbox_base = memalign(64, smsg_memlen*(mysize));
1331     _MEMCHECK(smsg_mailbox_base);
1332     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
1333     if (myrank == 0) CmiPrintf("Charm++> allocates %.2fMB for SMSG. \n", smsg_memlen*mysize/1e6);
1334     
1335     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
1336             smsg_memlen*(mysize), smsg_rx_cqh,
1337             GNI_MEM_READWRITE,   
1338             vmdh_index,
1339             &my_smsg_mdh_mailbox);
1340
1341     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1342
1343 #if 1
1344     base_infor.addr =  (uint64_t)smsg_mailbox_base;
1345     base_infor.mdh =  my_smsg_mdh_mailbox;
1346     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
1347
1348     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
1349  
1350     for(i=0; i<mysize; i++)
1351     {
1352         if(i==myrank)
1353             continue;
1354         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1355         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1356         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1357         smsg_attr[i].mbox_offset = i*smsg_memlen;
1358         smsg_attr[i].buff_size = smsg_memlen;
1359         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
1360         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1361     }
1362
1363     for(i=0; i<mysize; i++)
1364     {
1365         if (myrank == i) continue;
1366
1367         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1368         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1369         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1370         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
1371         remote_smsg_attr.buff_size = smsg_memlen;
1372         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
1373         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
1374
1375         /* initialize the smsg channel */
1376         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
1377         GNI_RC_CHECK("SMSG Init", status);
1378     } //end initialization
1379
1380     free(base_addr_vec);
1381 #else
1382     for(i=0; i<mysize; i++)
1383     {
1384         if(i==myrank)
1385             continue;
1386         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1387         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1388         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1389         if(i<myrank)
1390             smsg_attr[i].mbox_offset = i*smsg_memlen;
1391         else
1392             smsg_attr[i].mbox_offset = (i-1)*smsg_memlen;
1393
1394         smsg_attr[i].msg_buffer = smsg_mailbox_base;
1395         smsg_attr[i].buff_size = smsg_memlen;
1396         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1397 #if 0 
1398         if(i<2)
1399         {
1400             CmiPrintf("[assign %d==%d]  offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, (smsg_attr[i]).mbox_offset, (smsg_attr[i]).msg_buffer, (smsg_attr[i]).mbox_maxcredit, (smsg_attr[i]).msg_maxsize, (smsg_attr[i]).buff_size ); 
1401 #endif 
1402         }
1403     }
1404     smsg_attr_vec = (gni_smsg_attr_t*)malloc(mysize * mysize * sizeof(gni_smsg_attr_t));
1405     _MEMCHECK(smsg_attr_vec);
1406  
1407     if(myrank==0)
1408         CmiPrintf("total size=%d\n", mysize*mysize*sizeof(gni_smsg_attr_t));
1409     allgather(smsg_attr, smsg_attr_vec,  mysize*sizeof(gni_smsg_attr_t));
1410     
1411     CmiBarrier();
1412     for(i=0; i<mysize; i++)
1413     {
1414         if (myrank == i) continue;
1415         /* initialize the smsg channel */
1416         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &smsg_attr_vec[i*mysize+myrank]);
1417 #if 0 
1418         if(i<2)
1419         {
1420             CmiPrintf("[%d==%d] SmsgInit rc=%s, offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, gni_err_str[status], (smsg_attr_vec[i*mysize+myrank]).mbox_offset, (smsg_attr_vec[i*mysize+myrank]).msg_buffer, (smsg_attr_vec[i*mysize+myrank]).mbox_maxcredit, (smsg_attr_vec[i*mysize+myrank]).msg_maxsize, (smsg_attr_vec[i*mysize+myrank]).buff_size ); 
1421         }
1422 #endif
1423         GNI_RC_CHECK("SMSG Init", status);
1424     } //end initialization
1425     CmiBarrier();
1426     free(smsg_attr_vec);
1427 #endif
1428     free(smsg_attr);
1429     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
1430     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
1431     smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
1432     for(i =0; i<mysize; i++)
1433     {
1434         smsg_msglist_index[i].next = -1;
1435         smsg_msglist_index[i].head = 0;
1436         smsg_msglist_index[i].tail = 0;
1437     }
1438     smsg_head_index = -1;
1439
1440
1441 static void _init_static_msgq()
1442 {
1443     gni_return_t status;
1444     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
1445     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
1446     msgq_attrs.smsg_q_sz = 1;
1447     msgq_attrs.rcv_pool_sz = 1;
1448     msgq_attrs.num_msgq_eps = 2;
1449     msgq_attrs.nloc_insts = 8;
1450     msgq_attrs.modes = 0;
1451     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
1452
1453     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
1454     GNI_RC_CHECK("MSGQ Init", status);
1455
1456
1457 }
1458
1459 static void _init_DMA_buffer()
1460 {
1461     gni_return_t            status = GNI_RC_SUCCESS;
1462     /*AUTO tuning */
1463     /* suppose max_smsg is 1024, DMA buffer is split into 2048, 4096, 8192, ... */
1464     /*  This method might be better for SMP, but it is bad for Nonsmp since msgs are sharing same slots */
1465     /*
1466      * DMA_slots = 19-log2_SMSG_MAX_MSG;
1467     DMA_incoming_avail_tag = malloc(DMA_slots);
1468     DMA_buffer_size = 2*(DMA_max_single_msg - SMSG_MAX_MSG); 
1469     DMA_incoming_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1470     DMA_outgoing_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1471     
1472     status = GNI_MemRegister(nic_hndl, (uint64_t)DMA_incoming_base_addr,
1473             DMA_buffer_size, smsg_rx_cqh,
1474             GNI_MEM_READWRITE ,   
1475             vmdh_index,
1476             &);
1477             */
1478     // one is reserved to avoid deadlock
1479     DMA_slots           = 17; // each one is 8K  16*8K + 1 slot reserved to avoid deadlock
1480     DMA_buffer_size     = DMA_max_single_msg + 8192;
1481     DMA_buffer_base_mdh_addr.addr = (uint64_t)memalign(ALIGNBUF, DMA_buffer_size);
1482     status = GNI_MemRegister(nic_hndl, DMA_buffer_base_mdh_addr.addr,
1483         DMA_buffer_size, smsg_rx_cqh,
1484         GNI_MEM_READWRITE ,   
1485         -1,
1486         &(DMA_buffer_base_mdh_addr.mdh));
1487     GNI_RC_CHECK("GNI_MemRegister", status);
1488     DMA_buffer_base_mdh_addr_vec = (mdh_addr_t*) malloc(sizeof(mdh_addr_t) * mysize);
1489
1490     allgather(&DMA_buffer_base_mdh_addr, DMA_buffer_base_mdh_addr_vec, sizeof(mdh_addr_t) );
1491 }
1492
1493 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
1494 {
1495     register int            i;
1496     int                     rc;
1497     int                     device_id = 0;
1498     unsigned int            remote_addr;
1499     gni_cdm_handle_t        cdm_hndl;
1500     gni_return_t            status = GNI_RC_SUCCESS;
1501     uint32_t                vmdh_index = -1;
1502     uint8_t                 ptag;
1503     unsigned int            local_addr, *MPID_UGNI_AllAddr;
1504     int                     first_spawned;
1505     int                     physicalID;
1506     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
1507     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
1508     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
1509    
1510     //Mempool_MaxSize = CmiGetArgFlag(*argv, "+useMemorypoolSize");
1511     useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
1512     if (myrank==0) 
1513     {
1514         if(useDynamicSMSG) 
1515             CmiPrintf("Charm++> use Dynamic SMSG\n"); 
1516         else 
1517             CmiPrintf("Charm++> use Static SMSG\n");
1518     };
1519     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
1520     
1521     status = PMI_Init(&first_spawned);
1522     GNI_RC_CHECK("PMI_Init", status);
1523
1524     status = PMI_Get_size(&mysize);
1525     GNI_RC_CHECK("PMI_Getsize", status);
1526
1527     status = PMI_Get_rank(&myrank);
1528     GNI_RC_CHECK("PMI_getrank", status);
1529
1530     //physicalID = CmiPhysicalNodeID(myrank);
1531     
1532     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
1533
1534     *myNodeID = myrank;
1535     *numNodes = mysize;
1536   
1537     if(myrank == 0)
1538     {
1539         printf("Charm++> Running on Gemini (GNI) using %d  cores\n", mysize);
1540     }
1541 #ifdef USE_ONESIDED
1542     onesided_init(NULL, &onesided_hnd);
1543
1544     // this is a GNI test, so use the libonesided bypass functionality
1545     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
1546     local_addr = gniGetNicAddress();
1547 #else
1548     ptag = get_ptag();
1549     cookie = get_cookie();
1550 #if 0
1551     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
1552 #endif
1553     //Create and attach to the communication  domain */
1554     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
1555     GNI_RC_CHECK("GNI_CdmCreate", status);
1556     //* device id The device id is the minor number for the device
1557     //that is assigned to the device by the system when the device is created.
1558     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
1559     //where X is the device number 0 default 
1560     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
1561     GNI_RC_CHECK("GNI_CdmAttach", status);
1562     local_addr = get_gni_nic_address(0);
1563 #endif
1564     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
1565     _MEMCHECK(MPID_UGNI_AllAddr);
1566     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
1567     /* create the local completion queue */
1568     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
1569     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
1570     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
1571     
1572     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
1573     //GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
1574     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
1575
1576     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
1577     GNI_RC_CHECK("Create CQ (rx)", status);
1578     
1579     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
1580     //GNI_RC_CHECK("Create Post CQ (rx)", status);
1581     
1582     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
1583     //GNI_RC_CHECK("Create BTE CQ", status);
1584
1585     /* create the endpoints. they need to be bound to allow later CQWrites to them */
1586     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
1587     _MEMCHECK(ep_hndl_array);
1588
1589     for (i=0; i<mysize; i++) {
1590         if(i == myrank) continue;
1591         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
1592         GNI_RC_CHECK("GNI_EpCreate ", status);   
1593         remote_addr = MPID_UGNI_AllAddr[i];
1594         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
1595         GNI_RC_CHECK("GNI_EpBind ", status);   
1596     }
1597     /* Depending on the number of cores in the job, decide different method */
1598     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
1599     if(mysize > 1)
1600     {
1601         if(useDynamicSMSG == 0)
1602         {
1603             _init_static_smsg();
1604         }else
1605         {
1606             _init_dynamic_smsg();
1607         }
1608     }
1609 #if     USE_LRTS_MEMPOOL
1610     CmiGetArgLong(*argv, "+useMemorypoolSize", &_mempool_size);
1611     if (myrank==0) CmiPrintf("Charm++> use memorypool size: %1.fMB\n", _mempool_size/1024.0/1024);
1612     init_mempool(_mempool_size);
1613     //init_mempool(Mempool_MaxSize);
1614 #endif
1615
1616     /* init DMA buffer for medium message */
1617
1618     //_init_DMA_buffer();
1619     
1620     free(MPID_UGNI_AllAddr);
1621 }
1622
1623
1624 void* LrtsAlloc(int n_bytes, int header)
1625 {
1626     void *ptr;
1627 #if 0
1628     CmiPrintf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d\n", CmiMyPe(), n_bytes, header);
1629 #endif
1630     if(n_bytes <= SMSG_MAX_MSG)
1631     {
1632         int totalsize = n_bytes+header;
1633         ptr = malloc(totalsize);
1634     }else 
1635     {
1636
1637         CmiAssert(header <= ALIGNBUF);
1638 #if     USE_LRTS_MEMPOOL
1639         n_bytes = ALIGN64(n_bytes);
1640         char *res = syh_mempool_malloc(ALIGNBUF+n_bytes);
1641 #else
1642         n_bytes = ALIGN4(n_bytes);           /* make sure size if 4 aligned */
1643         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1644 #endif
1645         ptr = res + ALIGNBUF - header;
1646     }
1647 #if 0 
1648     CmiPrintf("Done Alloc Lrts for bytes=%d, head=%d\n", n_bytes, header);
1649 #endif
1650     return ptr;
1651 }
1652
1653 void  LrtsFree(void *msg)
1654 {
1655     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1656     if (size <= SMSG_MAX_MSG)
1657       free(msg);
1658     else
1659     {
1660 #if 0
1661         CmiPrintf("[PE:%d] Free lrts for bytes=%d, ptr=%p\n", CmiMyPe(), size, (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1662 #endif
1663 #if     USE_LRTS_MEMPOOL
1664         syh_mempool_free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1665 #else
1666         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1667 #endif
1668     }
1669 #if 0 
1670     CmiPrintf("Done Free lrts for bytes=%d\n", size);
1671 #endif
1672 }
1673
1674 static void LrtsExit()
1675 {
1676     /* free memory ? */
1677 #if     USE_LRTS_MEMPOOL
1678     kill_allmempool();
1679 #endif
1680     PMI_Finalize();
1681     exit(0);
1682 }
1683
1684 static void LrtsDrainResources()
1685 {
1686     while (!SendBufferMsg()) {
1687         PumpNetworkSmsg();
1688         PumpNetworkRdmaMsgs();
1689         PumpLocalSmsgTransactions();
1690         PumpLocalRdmaTransactions();
1691     }
1692     PMI_Barrier();
1693 }
1694
1695 void CmiAbort(const char *message) {
1696
1697     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1698     PMI_Abort(-1, message);
1699 }
1700
1701 /**************************  TIMER FUNCTIONS **************************/
1702 #if CMK_TIMER_USE_SPECIAL
1703 /* MPI calls are not threadsafe, even the timer on some machines */
1704 static CmiNodeLock  timerLock = 0;
1705 static int _absoluteTime = 0;
1706 static int _is_global = 0;
1707 static struct timespec start_ns;
1708
1709 inline int CmiTimerIsSynchronized() {
1710     return 1;
1711 }
1712
1713 inline int CmiTimerAbsolute() {
1714     return _absoluteTime;
1715 }
1716
1717 double CmiStartTimer() {
1718     return 0.0;
1719 }
1720
1721 double CmiInitTime() {
1722     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
1723 }
1724
1725 void CmiTimerInit(char **argv) {
1726     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1727     if (_absoluteTime && CmiMyPe() == 0)
1728         printf("Charm++> absolute  timer is used\n");
1729     
1730     _is_global = CmiTimerIsSynchronized();
1731
1732
1733     if (_is_global) {
1734         if (CmiMyRank() == 0) {
1735             clock_gettime(CLOCK_MONOTONIC, &start_ts)
1736         }
1737     } else { /* we don't have a synchronous timer, set our own start time */
1738         CmiBarrier();
1739         CmiBarrier();
1740         CmiBarrier();
1741         clock_gettime(CLOCK_MONOTONIC, &start_ts)
1742     }
1743     CmiNodeAllBarrier();          /* for smp */
1744 }
1745
1746 /**
1747  * Since the timerLock is never created, and is
1748  * always NULL, then all the if-condition inside
1749  * the timer functions could be disabled right
1750  * now in the case of SMP.
1751  */
1752 double CmiTimer(void) {
1753     struct timespec now_ts;
1754     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1755     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1756         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1757 }
1758
1759 double CmiWallTimer(void) {
1760     struct timespec now_ts;
1761     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1762     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1763         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1764 }
1765
1766 double CmiCpuTimer(void) {
1767     struct timespec now_ts;
1768     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1769     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1770         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1771 }
1772
1773 #endif
1774 /************Barrier Related Functions****************/
1775
1776 int CmiBarrier()
1777 {
1778     int status;
1779     status = PMI_Barrier();
1780     return status;
1781
1782 }
1783
1784
1785 #if CMK_PERSISTENT_COMM
1786 #include "machine-persistent.c"
1787 #endif
1788
1789