remove remote event for gemini
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <stdint.h>
16 #include <errno.h>
17 #include <malloc.h>
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #if defined(_WIN32) && ! defined(__CYGWIN__)
26 #include <windows.h>
27 #include <wincon.h>
28 #include <sys/types.h>
29 #include <sys/timeb.h>
30
31 static void sleep(int secs) {
32     Sleep(1000*secs);
33 }
34 #else
35 #include <unistd.h> /*For getpid()*/
36 #endif
37
38 #define USE_LRTS_MEMPOOL   1
39
40 #if USE_LRTS_MEMPOOL
41 static size_t _mempool_size = 1024ll*1024*8;
42 #endif
43
44 #define PRINT_SYH  0
45
46 #if PRINT_SYH
47 int         lrts_smsg_success = 0;
48 int         lrts_send_msg_id = 0;
49 int         lrts_send_rdma_id = 0;
50 int         lrts_send_rdma_success = 0;
51 int         lrts_received_msg = 0;
52 int         lrts_local_done_msg = 0;
53 #endif
54
55 #include "machine.h"
56
57 #include "pcqueue.h"
58
59 #if CMK_PERSISTENT_COMM
60 #include "machine-persistent.h"
61 #endif
62
63 //#define  USE_ONESIDED 1
64 #ifdef USE_ONESIDED
65 //onesided implementation is wrong, since no place to restore omdh
66 #include "onesided.h"
67 onesided_hnd_t   onesided_hnd;
68 onesided_md_t    omdh;
69 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
70
71 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
72
73 #else
74 uint8_t   onesided_hnd, omdh;
75 //#define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
76 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
77
78 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
79 #endif
80
81 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
82 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
83
84 #define ALIGNBUF                64
85
86 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
87 /* If SMSG is not used */
88 #define FMA_PER_CORE  1024
89 #define FMA_BUFFER_SIZE 1024
90 /* If SMSG is used */
91 static int  SMSG_MAX_MSG;
92 //static int  log2_SMSG_MAX_MSG;
93 #define SMSG_MAX_CREDIT  36
94
95 #define MSGQ_MAXSIZE       4096
96 /* large message transfer with FMA or BTE */
97 #define LRTS_GNI_RDMA_THRESHOLD  4096
98
99 #define REMOTE_QUEUE_ENTRIES  1048576
100 #define LOCAL_QUEUE_ENTRIES   1024
101
102 #define PUT_DONE_TAG      0x29
103 #define ACK_TAG           0x30
104 /* SMSG is data message */
105 #define SMALL_DATA_TAG          0x31
106 /* SMSG is a control message to initialize a BTE */
107 #define MEDIUM_HEAD_TAG         0x32
108 #define MEDIUM_DATA_TAG         0x33
109 #define LMSG_INIT_TAG     0x39 
110
111 #define DEBUG
112 #ifdef GNI_RC_CHECK
113 #undef GNI_RC_CHECK
114 #endif
115 #ifdef DEBUG
116 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
117 #else
118 #define GNI_RC_CHECK(msg,rc)
119 #endif
120
121 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
122 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
123
124 static int Mempool_MaxSize = 1024*1024*128;
125 static int useStaticSMSG   = 1;
126 static int useStaticMSGQ = 0;
127 static int useStaticFMA = 0;
128 static int mysize, myrank;
129 static gni_nic_handle_t      nic_hndl;
130
131
132 static void             *smsg_mailbox_base;
133 gni_msgq_attr_t         msgq_attrs;
134 gni_msgq_handle_t       msgq_handle;
135 gni_msgq_ep_attr_t      msgq_ep_attrs;
136 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
137
138 /* preallocated memory buffer for FMA for short message and control message */
139 typedef struct {
140     gni_mem_handle_t mdh;
141     uint64_t addr;
142 } mdh_addr_t;
143
144
145 /* preallocated DMA buffer */
146 int                     DMA_slots;
147 uint64_t                DMA_avail_tag = 0;
148 uint32_t                DMA_incoming_avail_tag = 0;
149 uint32_t                DMA_outgoing_avail_tag = 0;
150 void                    *DMA_incoming_base_addr;
151 void                    *DMA_outgoing_base_addr;
152 mdh_addr_t              DMA_buffer_base_mdh_addr;
153 mdh_addr_t              *DMA_buffer_base_mdh_addr_vec;
154 int                     DMA_buffer_size;
155 int                     DMA_max_single_msg = 131072;//524288 ;
156
157 #define                 DMA_SIZE_PER_SLOT       8192
158
159
160 typedef struct dma_msgid_map
161 {
162     uint64_t     msg_id;
163     int     msg_subid
164 } dma_msgid_map_t;
165
166 dma_msgid_map_t         *dma_map_list;
167
168 typedef struct msg_trace
169 {
170     uint64_t    msg_id;
171     int         done_num;
172 }msg_trace_t;
173
174 msg_trace_t             *pending_msg_list;
175 /* =====Beginning of Declarations of Machine Specific Variables===== */
176 static int cookie;
177 static int modes = 0;
178 static gni_cq_handle_t       smsg_rx_cqh = NULL;
179 static gni_cq_handle_t       smsg_tx_cqh = NULL;
180 static gni_cq_handle_t       post_rx_cqh = NULL;
181 static gni_cq_handle_t       post_tx_cqh = NULL;
182 static gni_ep_handle_t       *ep_hndl_array;
183
184 typedef    struct  pending_smg
185 {
186     int     inst_id;
187     struct  pending_smg *next;
188 } PENDING_GETNEXT;
189
190
191 typedef struct msg_list
192 {
193     uint32_t destNode;
194     uint32_t size;
195     void *msg;
196     struct msg_list *next;
197     uint8_t tag;
198 }MSG_LIST;
199
200 typedef struct medium_msg_list
201 {
202     uint32_t destNode;
203     uint32_t msg_id;
204     uint32_t msg_subid;
205     uint32_t remain_size;
206     void *msg;
207     struct medium_msg_list *next;
208 }MEDIUM_MSG_LIST;
209
210
211 typedef struct control_msg
212 {
213     uint64_t            source_addr;
214     int                 source;               /* source rank */
215     int                 length;
216     gni_mem_handle_t    source_mem_hndl;
217     struct control_msg *next;
218 }CONTROL_MSG;
219
220 typedef struct medium_msg_control
221 {
222     uint64_t            dma_offset;     //the dma_buffer for this block of msg
223     int                 msg_id;         //Id for the total index
224     int                 msg_subid;      //offset inside the message id 
225 }MEDIUM_MSG_CONTROL;
226
227 typedef struct  rmda_msg
228 {
229     int                   destNode;
230     gni_post_descriptor_t *pd;
231     struct  rmda_msg      *next;
232 }RDMA_REQUEST;
233
234 /* reuse PendingMsg memory */
235 static CONTROL_MSG          *control_freelist=0;
236 static MSG_LIST             *msglist_freelist=0;
237 static MSG_LIST             **smsg_msglist_head= 0;
238 static MSG_LIST             **smsg_msglist_tail= 0;
239 static MSG_LIST             *smsg_free_head=0;
240 static MSG_LIST             *smsg_free_tail=0;
241
242 /*
243 #define FreeMsgList(msg_head, msg_tail, free_head, free_tail)       \
244     if(free_head == 0)  free_head = free_tail = msg_head;    \
245     else   free_tail = free_tail->next;    \
246     if( msg_head->next == msg_tail) msg_head =0;   \
247     else msg_head= msg_head->next;    
248
249 #define MallocMsgList(d, msg_head, msg_tail, free_head, free_tail, msgsize) \
250     if(free_head == 0) {d= malloc(msgsize);  \
251         if(msg_head == 0)   msg_head =msg_tail = msg_head->next = msg_tail->next = d; \
252         else { msg_tail->next = d; d->next = msg_head; msg_tail=d;} \
253     }else {d = free_head; free_head = free_head->next; if(free_tail->next == free_head) free_head =0;} \
254 */
255
256 #define FreeMsgList(d)  \
257   (d)->next = msglist_freelist;\
258   msglist_freelist = d;
259
260
261
262 #define MallocMsgList(d) \
263   d = msglist_freelist;\
264   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
265              _MEMCHECK(d);\
266   } else msglist_freelist = d->next;
267
268
269 #define FreeControlMsg(d)       \
270   (d)->next = control_freelist;\
271   control_freelist = d;
272
273 #define MallocControlMsg(d) \
274   d = control_freelist;\
275   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
276              _MEMCHECK(d);\
277   } else control_freelist = d->next;
278
279
280 static RDMA_REQUEST         *rdma_freelist = NULL;
281
282 #define FreeControlMsg(d)       \
283   (d)->next = control_freelist;\
284   control_freelist = d;
285
286 #define MallocControlMsg(d) \
287   d = control_freelist;\
288   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
289              _MEMCHECK(d);\
290   } else control_freelist = d->next;
291
292 #define FreeMediumControlMsg(d)       \
293   (d)->next = medium_control_freelist;\
294   medium_control_freelist = d;
295
296
297 #define MallocMediumControlMsg(d) \
298     d = medium_control_freelist;\
299     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
300     _MEMCHECK(d);\
301 } else mediumcontrol_freelist = d->next;
302
303 #define FreeRdmaRequest(d)       \
304   (d)->next = rdma_freelist;\
305   rdma_freelist = d;
306
307 #define MallocRdmaRequest(d) \
308   d = rdma_freelist;\
309   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
310              _MEMCHECK(d);\
311   } else rdma_freelist = d->next;
312
313 /* reuse gni_post_descriptor_t */
314 static gni_post_descriptor_t *post_freelist=0;
315
316 #if 1
317 #define FreePostDesc(d)       \
318     (d)->next_descr = post_freelist;\
319     post_freelist = d;
320
321 #define MallocPostDesc(d) \
322   d = post_freelist;\
323   if (d==0) { \
324      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
325      _MEMCHECK(d);\
326   } else post_freelist = d->next_descr;
327 #else
328
329 #define FreePostDesc(d)     free(d);
330 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
331
332 #endif
333
334 static PENDING_GETNEXT     *pending_smsg_head = 0;
335 static PENDING_GETNEXT     *pending_smsg_tail = 0;
336
337 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
338 static int      buffered_smsg_counter = 0;
339
340 /* SmsgSend return success but message sent is not confirmed by remote side */
341
342 static RDMA_REQUEST  *pending_rdma_head = 0;
343 static RDMA_REQUEST  *pending_rdma_tail = 0;
344 static MSG_LIST *buffered_fma_head = 0;
345 static MSG_LIST *buffered_fma_tail = 0;
346
347 /* functions  */
348 #define IsFree(a,ind)  !( a& (1<<(ind) ))
349 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
350 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
351
352 #include "mempool.c"
353
354 /* get the upper bound of log 2 */
355 int mylog2(int size)
356 {
357     int op = size;
358     unsigned int ret=0;
359     unsigned int mask = 0;
360     int i;
361     while(op>0)
362     {
363         op = op >> 1;
364         ret++;
365
366     }
367     for(i=1; i<ret; i++)
368     {
369         mask = mask << 1;
370         mask +=1;
371     }
372
373     ret -= ((size &mask) ? 0:1);
374     return ret;
375 }
376
377 static void
378 allgather(void *in,void *out, int len)
379 {
380     static int *ivec_ptr=NULL,already_called=0,job_size=0;
381     int i,rc;
382     int my_rank;
383     char *tmp_buf,*out_ptr;
384
385     if(!already_called) {
386
387         rc = PMI_Get_size(&job_size);
388         CmiAssert(rc == PMI_SUCCESS);
389         rc = PMI_Get_rank(&my_rank);
390         CmiAssert(rc == PMI_SUCCESS);
391
392         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
393         CmiAssert(ivec_ptr != NULL);
394
395         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
396         CmiAssert(rc == PMI_SUCCESS);
397
398         already_called = 1;
399
400     }
401
402     tmp_buf = (char *)malloc(job_size * len);
403     CmiAssert(tmp_buf);
404
405     rc = PMI_Allgather(in,tmp_buf,len);
406     CmiAssert(rc == PMI_SUCCESS);
407
408     out_ptr = out;
409
410     for(i=0;i<job_size;i++) {
411
412         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
413
414     }
415
416     free(tmp_buf);
417 }
418 static void
419 allgather_2(void *in,void *out, int len)
420 {
421     //PMI_Allgather is out of order
422     int i,rc, extend_len;
423     int  rank_index;
424     char *out_ptr, *out_ref;
425     char *in2;
426
427     extend_len = sizeof(int) + len;
428     in2 = (char*)malloc(extend_len);
429
430     memcpy(in2, &myrank, sizeof(int));
431     memcpy(in2+sizeof(int), in, len);
432
433     out_ptr = (char*)malloc(mysize*extend_len);
434
435     rc = PMI_Allgather(in2, out_ptr, extend_len);
436     GNI_RC_CHECK("allgather", rc);
437
438     out_ref = out;
439
440     for(i=0;i<mysize;i++) {
441         //rank index 
442         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
443         //copy to the rank index slot
444         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
445     }
446
447     free(out_ptr);
448     free(in2);
449
450 }
451
452 static unsigned int get_gni_nic_address(int device_id)
453 {
454     unsigned int address, cpu_id;
455     gni_return_t status;
456     int i, alps_dev_id=-1,alps_address=-1;
457     char *token, *p_ptr;
458
459     p_ptr = getenv("PMI_GNI_DEV_ID");
460     if (!p_ptr) {
461         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
462        
463         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
464     } else {
465         while ((token = strtok(p_ptr,":")) != NULL) {
466             alps_dev_id = atoi(token);
467             if (alps_dev_id == device_id) {
468                 break;
469             }
470             p_ptr = NULL;
471         }
472         CmiAssert(alps_dev_id != -1);
473         p_ptr = getenv("PMI_GNI_LOC_ADDR");
474         CmiAssert(p_ptr != NULL);
475         i = 0;
476         while ((token = strtok(p_ptr,":")) != NULL) {
477             if (i == alps_dev_id) {
478                 alps_address = atoi(token);
479                 break;
480             }
481             p_ptr = NULL;
482             ++i;
483         }
484         CmiAssert(alps_address != -1);
485         address = alps_address;
486     }
487     return address;
488 }
489
490 static uint8_t get_ptag(void)
491 {
492     char *p_ptr, *token;
493     uint8_t ptag;
494
495     p_ptr = getenv("PMI_GNI_PTAG");
496     CmiAssert(p_ptr != NULL);
497     token = strtok(p_ptr, ":");
498     ptag = (uint8_t)atoi(token);
499     return ptag;
500         
501 }
502
503 static uint32_t get_cookie(void)
504 {
505     uint32_t cookie;
506     char *p_ptr, *token;
507
508     p_ptr = getenv("PMI_GNI_COOKIE");
509     CmiAssert(p_ptr != NULL);
510     token = strtok(p_ptr, ":");
511     cookie = (uint32_t)atoi(token);
512
513     return cookie;
514 }
515
516 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
517 /* TODO: add any that are related */
518 /* =====End of Definitions of Message-Corruption Related Macros=====*/
519
520
521 #include "machine-lrts.h"
522 #include "machine-common-core.c"
523
524 /* Network progress function is used to poll the network when for
525    messages. This flushes receive buffers on some  implementations*/
526 #if CMK_MACHINE_PROGRESS_DEFINED
527 void CmiMachineProgressImpl() {
528 }
529 #endif
530
531 inline
532 static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
533 {
534     MSG_LIST        *msg_tmp;
535     MallocMsgList(msg_tmp);
536     msg_tmp->destNode = destNode;
537     msg_tmp->size   = size;
538     msg_tmp->msg    = msg;
539     msg_tmp->tag    = tag;
540     msg_tmp->next   = 0;
541     if (smsg_msglist_head[destNode] == 0) {
542         smsg_msglist_head[destNode]  = msg_tmp;
543     }
544     else {
545       smsg_msglist_tail[destNode]->next    = msg_tmp;
546     }
547     smsg_msglist_tail[destNode]          = msg_tmp;
548 #if PRINT_SYH
549     buffered_smsg_counter++;
550 #endif
551 }
552
553 // messages smaller than max single SMSG
554 inline
555 static void send_small_messages(int destNode, int size, char *msg)
556 {
557     gni_return_t        status  =   GNI_RC_SUCCESS;
558     const uint8_t       tag_data    = SMALL_DATA_TAG;
559     
560     if(smsg_msglist_head[destNode] == 0)
561     {
562         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, SMALL_DATA_TAG);
563         if (status == GNI_RC_SUCCESS)
564         {
565 #if PRINT_SYH
566             lrts_smsg_success++;
567             if(lrts_smsg_success == lrts_send_msg_id)
568                 CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
569             else
570                 CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
571 #endif
572             CmiFree(msg);
573             return;
574         }else if (status == GNI_RC_INVALID_PARAM)
575         {
576             CmiAbort("SmsgSen fails\n");
577         }
578     }
579     //buffer this message when buffer_msg_head!=0 or send fails
580     delay_send_small_msg(msg, size, destNode, SMALL_DATA_TAG);
581 }
582
583 // Get first 0 in DMA_tags starting from index
584 static int get_first_avail_bit(uint64_t DMA_tags, int start_index)
585 {
586
587     uint64_t         mask = 0x1;
588     register    int     i=0;
589     while((DMA_tags & mask) && i<DMA_slots) {mask << 1; i++;}
590
591 }
592
593 static int send_medium_messages(int destNode, int size, char *msg)
594 {
595 #if 0
596     gni_return_t status = GNI_RC_SUCCESS;
597     int first_avail_bit=0;
598     uint64_t mask = 0x1;
599     MEDIUM_MSG_CONTROL  *medium_msg_control_tmp;
600     MEDIUM_MSG_LIST        *msg_tmp;
601     int blocksize, remain_size, pos;
602     int sub_id = 0;
603     remain_size = size;
604     pos = 0;  //offset before which data are sent
605     /* copy blocks of the message to DMA preallocated buffer and send SMSG */
606     //Check whether there is any available DMA buffer
607     
608     do{
609         while((DMA_avail_tag & mask) && first_avail_bit<DMA_slots) {mask << 1; first_avail_bit++;}
610         if(first_avail_bit == DMA_slots) //No available DMA, buffer this message
611         {
612             MallocMediumMsgList(msg_tmp);
613             msg_tmp->destNode = destNode;
614             msg_tmp->msg_id   = lrts_send_msg_id;
615             msg_tmp->msg_subid   = sub_id;
616             msg_tmp->size   = remain_size;
617             msg_tmp->msg    = msg+pos;
618             msg_tmp->next   = NULL;
619             break;
620         }else
621         {
622             //copy this part of the message into this DMA buffer
623             //TODO optimize here, some data can go with this SMSG
624             blocksize = (remain_size>DMA_SIZE_PER_SLOT)?DMA_SIZE_PER_SLOT: remain_size;
625             memcpy(DMA_buffer_base_mdh_addr.addr[first_avail_bit], msg+pos, blocksize);
626             pos += blocksize;
627             remain_size -= blocksize;
628             SET_BITS(DMA_avail_tag, first_avail_bit);
629            
630             MallocMediumControlMsg(medium_msg_control_tmp);
631             medium_msg_control_tmp->msg_id = lrts_send_msg_id;
632             medium_msg_control_tmp->msg_subid = sub_id;
633             if(status == GNI_RC_SUCCESS)
634             {
635                 if(sub_id==0)
636                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_HEAD_TAG);
637                 else
638                     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), 0, MEDIUM_DATA_TAG);
639             }
640             //buffer this smsg
641             if(status != GNI_RC_SUCCESS)
642             {
643                 delay_send_small_msg(medium_msg_tmp, sizeof(MEDIUM_MSG_CONTROL), destNode, MEDIUM_HEAD_TAG);
644             }
645             sub_id++;
646         }while(remain_size > 0 );
647
648         }
649     }
650 #endif
651 }
652
653 // Large message, send control to receiver, receiver register memory and do a GET 
654 inline
655 static int send_large_messages(int destNode, int size, char *msg)
656 {
657 #if     USE_LRTS_MEMPOOL
658     gni_return_t        status  =   GNI_RC_SUCCESS;
659     CONTROL_MSG         *control_msg_tmp;
660     uint32_t            vmdh_index  = -1;
661     /* construct a control message and send */
662     MallocControlMsg(control_msg_tmp);
663     control_msg_tmp->source_addr    = (uint64_t)msg;
664     control_msg_tmp->source         = myrank;
665     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
666     //memcpy( &(control_msg_tmp->source_mem_hndl), GetMemHndl(msg), sizeof(gni_mem_handle_t)) ;
667     control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
668 #if PRINT_SYH
669     lrts_send_msg_id++;
670     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
671 #endif
672     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
673     if(status == GNI_RC_SUCCESS)
674     {
675 #if PRINT_SYH
676         lrts_smsg_success++;
677         if(lrts_smsg_success == lrts_send_msg_id)
678             CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
679         else
680             CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
681 #endif
682         FreeControlMsg(control_msg_tmp);
683         return 1;
684     }
685
686     delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
687     return 0;
688 // NOT use mempool, should slow 
689 #else
690     gni_return_t        status  =   GNI_RC_SUCCESS;
691     CONTROL_MSG         *control_msg_tmp;
692     uint32_t            vmdh_index  = -1;
693     /* construct a control message and send */
694     MallocControlMsg(control_msg_tmp);
695     control_msg_tmp->source_addr    = (uint64_t)msg;
696     control_msg_tmp->source         = myrank;
697     control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
698     control_msg_tmp->source_mem_hndl.qword1 = 0;
699     control_msg_tmp->source_mem_hndl.qword2 = 0;
700 #if PRINT_SYH
701     lrts_send_msg_id++;
702     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
703 #endif
704     if(smsg_msglist_head[destNode] == 0)
705     {
706         status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
707         if(status == GNI_RC_SUCCESS)
708         {
709             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
710             if(status == GNI_RC_SUCCESS)
711             {
712 #if PRINT_SYH
713                 lrts_smsg_success++;
714                 if(lrts_smsg_success == lrts_send_msg_id)
715                     CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
716                 else
717                     CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
718 #endif
719                 FreeControlMsg(control_msg_tmp);
720                 return 1;
721             }
722
723         } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
724         {
725             CmiAbort("Memory registor for large msg\n");
726         }
727     }
728     delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
729     return 0;
730 #endif
731 }
732
733 inline void LrtsPrepareEnvelope(char *msg, int size)
734 {
735     CmiSetMsgSize(msg, size);
736 }
737
738 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
739 {
740     LrtsPrepareEnvelope(msg, size);
741
742     if(size <= SMSG_MAX_MSG)
743     {
744 #if PRINT_SYH
745         lrts_send_msg_id++;
746         CmiPrintf("SMSG LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_msg_id);
747 #endif
748         send_small_messages(destNode, size, msg);
749     }
750     else
751     {
752         send_large_messages(destNode, size, msg);
753     }
754     return 0;
755 }
756
757 static void LrtsPreCommonInit(int everReturn){}
758
759 /* Idle-state related functions: called in non-smp mode */
760 void CmiNotifyIdleForGemini(void) {
761     LrtsAdvanceCommunication();
762 }
763
764 static void LrtsPostCommonInit(int everReturn)
765 {
766 #if CMK_SMP
767     CmiIdleState *s=CmiNotifyGetState();
768     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
769     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
770 #else
771     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForGemini,NULL);
772 #endif
773
774 }
775
776
777 void LrtsPostNonLocal(){}
778 /* pooling CQ to receive network message */
779 static void PumpNetworkRdmaMsgs()
780 {
781     gni_cq_entry_t      event_data;
782     gni_return_t        status;
783     while( (status = GNI_CqGetEvent(post_rx_cqh, &event_data)) == GNI_RC_SUCCESS);
784 }
785
786 static int SendRdmaMsg();
787 static void getLargeMsgRequest(void* header, uint64_t inst_id);
788 static void PumpNetworkSmsg()
789 {
790     uint64_t            inst_id;
791     PENDING_GETNEXT     *pending_next;
792     int                 ret;
793     gni_cq_entry_t      event_data;
794     gni_return_t        status;
795     void                *header;
796     uint8_t             msg_tag;
797     int                 msg_nbytes;
798     void                *msg_data;
799     gni_mem_handle_t    msg_mem_hndl;
800  
801
802     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
803     {
804         inst_id = GNI_CQ_GET_INST_ID(event_data);
805         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
806 #if PRINT_SYH
807         CmiPrintf("[%d] PumpNetworkMsgs small msgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
808 #endif
809         msg_tag = GNI_SMSG_ANY_TAG;
810         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
811         {
812 #if PRINT_SYH
813             lrts_received_msg++;
814             CmiPrintf("+++[%d] PumpNetwork msg is received, messageid:%d tag=%d\n", myrank, lrts_received_msg, msg_tag);
815 #endif
816             /* copy msg out and then put into queue (small message) */
817             switch (msg_tag) {
818             case SMALL_DATA_TAG:
819             {
820                 msg_nbytes = CmiGetMsgSize(header);
821                 msg_data    = CmiAlloc(msg_nbytes);
822                 memcpy(msg_data, (char*)header, msg_nbytes);
823                 handleOneRecvedMsg(msg_nbytes, msg_data);
824                 break;
825             }
826             case LMSG_INIT_TAG:
827             {
828 #if PRINT_SYH
829                 CmiPrintf("+++[%d] from %d PumpNetwork Rdma Request msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
830 #endif
831                 getLargeMsgRequest(header, inst_id);
832                 break;
833             }
834             case ACK_TAG:
835             {
836                 /* Get is done, release message . Now put is not used yet*/
837 #if         !USE_LRTS_MEMPOOL
838                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
839 #endif
840                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
841                 SendRdmaMsg();
842                 break;
843             }
844             case PUT_DONE_TAG: //persistent message
845             {
846                 handleOneRecvedMsg(((CONTROL_MSG *) header)->length,((void*) (CONTROL_MSG *) header)); 
847                 break;
848             }
849             default: {
850                 CmiPrintf("weird tag problem\n");
851                 CmiAbort("Unknown tag\n");
852             }
853             }
854             GNI_SmsgRelease(ep_hndl_array[inst_id]);
855             msg_tag = GNI_SMSG_ANY_TAG;
856         } //endwhile getNext
857     }   //end while GetEvent
858     if(status == GNI_RC_ERROR_RESOURCE)
859     {
860         GNI_RC_CHECK("Smsg_rx_cq full", status);
861     }
862 }
863
864 static void getLargeMsgRequest(void* header, uint64_t inst_id)
865 {
866 #if     USE_LRTS_MEMPOOL
867     CONTROL_MSG         *request_msg;
868     gni_return_t        status;
869     void                *msg_data;
870     gni_post_descriptor_t *pd;
871     RDMA_REQUEST        *rdma_request_msg;
872     gni_mem_handle_t    msg_mem_hndl;
873     int source;
874     // initial a get to transfer data from the sender side */
875     request_msg = (CONTROL_MSG *) header;
876     source = request_msg->source;
877     msg_data = CmiAlloc(request_msg->length);
878     _MEMCHECK(msg_data);
879     //memcpy(&msg_mem_hndl, GetMemHndl(msg_data), sizeof(gni_mem_handle_t));
880     msg_mem_hndl = GetMemHndl(msg_data);
881
882     MallocPostDesc(pd);
883     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
884         pd->type            = GNI_POST_FMA_GET;
885     else
886         pd->type            = GNI_POST_RDMA_GET;
887
888     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
889     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
890     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
891     pd->length          = ALIGN4(request_msg->length);
892     pd->local_addr      = (uint64_t) msg_data;
893     pd->local_mem_hndl  = msg_mem_hndl;
894     pd->remote_addr     = request_msg->source_addr;
895     pd->remote_mem_hndl = request_msg->source_mem_hndl;
896     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
897     pd->rdma_mode       = 0;
898
899     if(pd->type == GNI_POST_RDMA_GET) 
900         status = GNI_PostRdma(ep_hndl_array[source], pd);
901     else
902         status = GNI_PostFma(ep_hndl_array[source],  pd);
903     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
904     {
905         MallocRdmaRequest(rdma_request_msg);
906         rdma_request_msg->next = 0;
907         rdma_request_msg->destNode = inst_id;
908         rdma_request_msg->pd = pd;
909         if(pending_rdma_head == 0)
910         {
911             pending_rdma_head = rdma_request_msg;
912         }else
913         {
914             pending_rdma_tail->next = rdma_request_msg;
915         }
916         pending_rdma_tail = rdma_request_msg;
917     }else
918         GNI_RC_CHECK("AFter posting", status);
919
920 #else
921     CONTROL_MSG         *request_msg;
922     gni_return_t        status;
923     void                *msg_data;
924     gni_post_descriptor_t *pd;
925     RDMA_REQUEST        *rdma_request_msg;
926     gni_mem_handle_t    msg_mem_hndl;
927     int source;
928     // initial a get to transfer data from the sender side */
929     request_msg = (CONTROL_MSG *) header;
930     source = request_msg->source;
931     msg_data = CmiAlloc(request_msg->length);
932     _MEMCHECK(msg_data);
933
934     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
935
936     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
937     {
938         GNI_RC_CHECK("Mem Register before post", status);
939     }
940
941     MallocPostDesc(pd);
942     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
943         pd->type            = GNI_POST_FMA_GET;
944     else
945         pd->type            = GNI_POST_RDMA_GET;
946
947     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
948     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
949     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
950     pd->length          = ALIGN4(request_msg->length);
951     pd->local_addr      = (uint64_t) msg_data;
952     pd->remote_addr     = request_msg->source_addr;
953     pd->remote_mem_hndl = request_msg->source_mem_hndl;
954     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
955     pd->rdma_mode       = 0;
956
957     //memory registration successful
958     if(status == GNI_RC_SUCCESS)
959     {
960         pd->local_mem_hndl  = msg_mem_hndl;
961         if(pd->type == GNI_POST_RDMA_GET) 
962             status = GNI_PostRdma(ep_hndl_array[source], pd);
963         else
964             status = GNI_PostFma(ep_hndl_array[source],  pd);
965     }else
966     {
967         pd->local_mem_hndl.qword1  = 0; 
968         pd->local_mem_hndl.qword1  = 0; 
969     }
970     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
971     {
972         MallocRdmaRequest(rdma_request_msg);
973         rdma_request_msg->next = 0;
974         rdma_request_msg->destNode = inst_id;
975         rdma_request_msg->pd = pd;
976         if(pending_rdma_head == 0)
977         {
978             pending_rdma_head = rdma_request_msg;
979         }else
980         {
981             pending_rdma_tail->next = rdma_request_msg;
982         }
983         pending_rdma_tail = rdma_request_msg;
984     }else
985         GNI_RC_CHECK("AFter posting", status);
986 #endif
987 }
988
989 /* Check whether message send or get is confirmed by remote */
990 static void PumpLocalSmsgTransactions()
991 {
992     gni_return_t            status;
993     gni_cq_entry_t          ev;
994     while ((status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
995     {
996 #if PRINT_SYH
997         lrts_local_done_msg++;
998         //CmiPrintf("*[%d]  PumpLocalSmsgTransactions GNI_CQ_GET_TYPE %d. Localdone=%d\n", myrank, GNI_CQ_GET_TYPE(ev), lrts_local_done_msg);
999 #endif
1000         if(GNI_CQ_OVERRUN(ev))
1001         {
1002             CmiPrintf("Overrun detected in local CQ");
1003             CmiAbort("Overrun in TX");
1004         }
1005     }
1006     if(status == GNI_RC_ERROR_RESOURCE)
1007     {
1008         GNI_RC_CHECK("Smsg_tx_cq full", status);
1009     }
1010 }
1011
1012 static void PumpLocalRdmaTransactions()
1013 {
1014     gni_cq_entry_t          ev;
1015     gni_return_t            status;
1016     uint64_t                type, inst_id;
1017     gni_post_descriptor_t   *tmp_pd;
1018     MSG_LIST                *ptr;
1019     CONTROL_MSG             *ack_msg_tmp;
1020     uint8_t             msg_tag;
1021
1022    // while ( (status = GNI_CqGetEvent(post_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1023     while ( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
1024     {
1025         type        = GNI_CQ_GET_TYPE(ev);
1026 #if PRINT_SYH
1027         //lrts_local_done_msg++;
1028         CmiPrintf("**[%d] SMSGPumpLocalTransactions (type=%d)\n", myrank, type);
1029 #endif
1030         /*if(type == GNI_CQ_EVENT_TYPE_SMSG)
1031         {
1032         }else */
1033         if (type == GNI_CQ_EVENT_TYPE_POST)
1034         {
1035             inst_id     = GNI_CQ_GET_INST_ID(ev);
1036 #if PRINT_SYH
1037             CmiPrintf("**[%d] SMSGPumpLocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1038 #endif
1039             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
1040             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1041             //CmiPrintf("**[%d] SMSGPumpLocalTransactions local done(type=%d) length=%d, size=%d\n", myrank, type, tmp_pd->length, SIZEFIELD((void*)(tmp_pd->local_addr)) );
1042             ////Message is sent, free message , put is not used now
1043             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
1044             {   //persistent message 
1045                 CmiFree((void *)tmp_pd->local_addr);
1046                 msg_tag = PUT_DONE_TAG;  
1047             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1048             {
1049                 msg_tag = ACK_TAG;  
1050             }
1051             MallocControlMsg(ack_msg_tmp);
1052             ack_msg_tmp->source             = myrank;
1053             ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
1054             ack_msg_tmp->length             = tmp_pd->length; 
1055             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1056 #if PRINT_SYH
1057             lrts_send_msg_id++;
1058             CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
1059 #endif
1060             if(smsg_msglist_head[inst_id]!=0)
1061             {
1062                 delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, msg_tag);
1063             }else
1064             {
1065                 status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, msg_tag);
1066                 if(status == GNI_RC_SUCCESS)
1067                 {
1068 #if PRINT_SYH
1069                     lrts_smsg_success++;
1070                     CmiPrintf("[%d==>%d] sent ACK done%d\n", myrank, inst_id, lrts_smsg_success);
1071 #endif
1072                     FreeControlMsg(ack_msg_tmp);
1073                 }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
1074                 {
1075                     delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, msg_tag);
1076                 }
1077                 else
1078                     GNI_RC_CHECK("GNI_SmsgSendWTag", status);
1079             }
1080 #if     !USE_LRTS_MEMPOOL
1081             MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1082 #endif
1083             CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1084             handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1085             SendRdmaMsg(); 
1086             FreePostDesc(tmp_pd);
1087         }
1088     } //end while
1089 }
1090
1091 static int SendRdmaMsg()
1092 {
1093     gni_return_t            status = GNI_RC_SUCCESS;
1094     gni_mem_handle_t        msg_mem_hndl;
1095
1096     RDMA_REQUEST *ptr = pending_rdma_head;
1097     RDMA_REQUEST *prev = NULL;
1098
1099     while (ptr != NULL)
1100     {
1101         gni_post_descriptor_t *pd = ptr->pd;
1102         // register memory first
1103         if( pd->local_mem_hndl.qword1 == 0 && pd->local_mem_hndl.qword2 == 0)
1104         {
1105             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1106         }
1107         if(status == GNI_RC_SUCCESS)
1108         {
1109             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1110                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1111             else
1112                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1113             if(status == GNI_RC_SUCCESS)
1114             {
1115                 RDMA_REQUEST *tmp = ptr;
1116                 if (prev)
1117                   prev->next = ptr->next;
1118                 else
1119                   pending_rdma_head = ptr->next;
1120                 ptr = ptr->next;
1121                 FreeRdmaRequest(tmp);
1122                 continue;
1123             }
1124         }
1125         prev = ptr;
1126         ptr = ptr->next;
1127     } //end while
1128     return 0;
1129 }
1130
1131 // return 1 if all messages are sent
1132 static int SendBufferMsg()
1133 {
1134     MSG_LIST            *ptr;
1135     CONTROL_MSG         *control_msg_tmp;
1136     gni_return_t        status;
1137     int done = 1;
1138     register    int     i;
1139     //if( smsg_msglist_head == 0 && buffered_smsg_counter!= 0 ) {CmiPrintf("WRONGWRONG on rank%d, buffermsg=%d, (msgid-succ:%d)\n", myrank, buffered_smsg_counter, (lrts_send_msg_id-lrts_smsg_success)); CmiAbort("sendbuf");}
1140     /* can add flow control here to control the number of messages sent before handle message */
1141     for(i=0; i<mysize; i++)
1142     {
1143         if(i==myrank || smsg_msglist_head[i]==0) continue;
1144        
1145         while(smsg_msglist_head[i]!=0)
1146         {
1147         if(useStaticSMSG)
1148         {
1149             ptr = smsg_msglist_head[i];
1150             CmiAssert(ptr!=NULL);
1151             if(ptr->tag == SMALL_DATA_TAG)
1152             {
1153                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], NULL, 0, ptr->msg, ptr->size, 0, SMALL_DATA_TAG);
1154                // CmiPrintf("[%d==>%d] buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1155                 if(status == GNI_RC_SUCCESS) {
1156 #if PRINT_SYH
1157                     lrts_smsg_success++;
1158                     if(lrts_smsg_success == lrts_send_msg_id)
1159                         CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1160                     else
1161                         CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1162 #endif
1163                     CmiFree(ptr->msg);
1164                 }
1165             }
1166             else if(ptr->tag == LMSG_INIT_TAG)
1167             {
1168                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1169 #if PRINT_SYH
1170                 CmiPrintf("[%d==>%d] LMSG buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1171 #endif
1172                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
1173                 {
1174                     MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, control_msg_tmp->length, &(control_msg_tmp->source_mem_hndl), &omdh);
1175                     if(status != GNI_RC_SUCCESS) {
1176                         done = 0;
1177                         break;
1178                     }
1179                 }
1180                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
1181                 if(status == GNI_RC_SUCCESS) {
1182 #if PRINT_SYH
1183                     lrts_smsg_success++;
1184                     CmiPrintf("[%d==>%d] sent LMSG done%d\n", myrank, ptr->destNode, lrts_smsg_success);
1185 #endif
1186                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
1187                 }
1188             }else if (ptr->tag == ACK_TAG)
1189             {
1190 #if PRINT_SYH
1191                 CmiPrintf("[%d==>%d] ACK buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
1192 #endif
1193                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, ACK_TAG);
1194                 if(status == GNI_RC_SUCCESS) {
1195 #if PRINT_SYH
1196                     lrts_smsg_success++;
1197                     if(lrts_smsg_success == lrts_send_msg_id)
1198                         CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1199                     else
1200                         CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1201 #endif
1202                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1203                 }
1204             }else
1205             {
1206                 CmiPrintf("Weird tag\n");
1207                 CmiAbort("should not happen\n");
1208             }
1209         } 
1210         if(status == GNI_RC_SUCCESS)
1211         {
1212             smsg_msglist_head[i] = smsg_msglist_head[i]->next;
1213             FreeMsgList(ptr);
1214 #if PRINT_SYH
1215             buffered_smsg_counter--;
1216             if(lrts_smsg_success == lrts_send_msg_id)
1217                 CmiPrintf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1218             else
1219                 CmiPrintf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
1220 #endif
1221         }else {
1222             done = 0;
1223             break;
1224         }
1225         } //end pooling this i-th core
1226     }   // end pooling for all cores
1227 #if PRINT_SYH
1228     if(lrts_send_msg_id-lrts_smsg_success !=0)
1229         CmiPrintf("WRONG [%d buffered msg is empty(%p) (actually=%d)(buffercounter=%d)\n", myrank, smsg_msglist_head[i], (lrts_send_msg_id-lrts_smsg_success, buffered_smsg_counter));
1230 #endif
1231     return done;
1232 }
1233
1234 static void LrtsAdvanceCommunication()
1235 {
1236     /*  Receive Msg first */
1237     //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", CmiMyPe());
1238     PumpNetworkSmsg();
1239     //CmiPrintf("Calling Lrts Pump RdmaMsg PE:%d\n", CmiMyPe());
1240     //PumpNetworkRdmaMsgs();
1241     /* Release Sent Msg */
1242     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
1243     //PumpLocalSmsgTransactions();
1244     //CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", CmiMyPe());
1245     PumpLocalRdmaTransactions();
1246     //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", CmiMyPe());
1247     /* Send buffered Message */
1248     SendBufferMsg();
1249     SendRdmaMsg();
1250 }
1251
1252 static void _init_static_smsg()
1253 {
1254     gni_smsg_attr_t      *smsg_attr;
1255     gni_smsg_attr_t      remote_smsg_attr;
1256     gni_smsg_attr_t      *smsg_attr_vec;
1257     unsigned int         smsg_memlen;
1258     gni_mem_handle_t     my_smsg_mdh_mailbox;
1259     int      i;
1260     gni_return_t status;
1261     uint32_t              vmdh_index = -1;
1262     mdh_addr_t            base_infor;
1263     mdh_addr_t            *base_addr_vec;
1264     if(mysize <=1024)
1265     {
1266         SMSG_MAX_MSG = 1024;
1267         //log2_SMSG_MAX_MSG = 10;
1268     }else if (mysize > 1024 && mysize <= 16384)
1269     {
1270         SMSG_MAX_MSG = 512;
1271         //log2_SMSG_MAX_MSG = 9;
1272
1273     }else {
1274         SMSG_MAX_MSG = 256;
1275         //log2_SMSG_MAX_MSG = 8;
1276     }
1277     
1278     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
1279     
1280     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1281     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
1282     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
1283     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
1284     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1285     smsg_mailbox_base = memalign(64, smsg_memlen*(mysize));
1286     _MEMCHECK(smsg_mailbox_base);
1287     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
1288     
1289     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
1290             smsg_memlen*(mysize), smsg_rx_cqh,
1291             GNI_MEM_READWRITE,   
1292             vmdh_index,
1293             &my_smsg_mdh_mailbox);
1294
1295     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
1296
1297 #if 1
1298     base_infor.addr =  (uint64_t)smsg_mailbox_base;
1299     base_infor.mdh =  my_smsg_mdh_mailbox;
1300     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
1301
1302     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
1303  
1304     for(i=0; i<mysize; i++)
1305     {
1306         if(i==myrank)
1307             continue;
1308         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1309         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1310         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1311         smsg_attr[i].mbox_offset = i*smsg_memlen;
1312         smsg_attr[i].buff_size = smsg_memlen;
1313         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
1314         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1315     }
1316
1317     for(i=0; i<mysize; i++)
1318     {
1319         if (myrank == i) continue;
1320
1321         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1322         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
1323         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
1324         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
1325         remote_smsg_attr.buff_size = smsg_memlen;
1326         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
1327         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
1328
1329         /* initialize the smsg channel */
1330         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
1331         GNI_RC_CHECK("SMSG Init", status);
1332     } //end initialization
1333
1334     free(base_addr_vec);
1335 #else
1336     for(i=0; i<mysize; i++)
1337     {
1338         if(i==myrank)
1339             continue;
1340         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1341         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
1342         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
1343         if(i<myrank)
1344             smsg_attr[i].mbox_offset = i*smsg_memlen;
1345         else
1346             smsg_attr[i].mbox_offset = (i-1)*smsg_memlen;
1347
1348         smsg_attr[i].msg_buffer = smsg_mailbox_base;
1349         smsg_attr[i].buff_size = smsg_memlen;
1350         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
1351 #if 0 
1352         if(i<2)
1353         {
1354             CmiPrintf("[assign %d==%d]  offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, (smsg_attr[i]).mbox_offset, (smsg_attr[i]).msg_buffer, (smsg_attr[i]).mbox_maxcredit, (smsg_attr[i]).msg_maxsize, (smsg_attr[i]).buff_size ); 
1355 #endif 
1356         }
1357     }
1358     smsg_attr_vec = (gni_smsg_attr_t*)malloc(mysize * mysize * sizeof(gni_smsg_attr_t));
1359     _MEMCHECK(smsg_attr_vec);
1360  
1361     if(myrank==0)
1362         CmiPrintf("total size=%d\n", mysize*mysize*sizeof(gni_smsg_attr_t));
1363     allgather(smsg_attr, smsg_attr_vec,  mysize*sizeof(gni_smsg_attr_t));
1364     
1365     CmiBarrier();
1366     for(i=0; i<mysize; i++)
1367     {
1368         if (myrank == i) continue;
1369         /* initialize the smsg channel */
1370         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &smsg_attr_vec[i*mysize+myrank]);
1371 #if 0 
1372         if(i<2)
1373         {
1374             CmiPrintf("[%d==%d] SmsgInit rc=%s, offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, gni_err_str[status], (smsg_attr_vec[i*mysize+myrank]).mbox_offset, (smsg_attr_vec[i*mysize+myrank]).msg_buffer, (smsg_attr_vec[i*mysize+myrank]).mbox_maxcredit, (smsg_attr_vec[i*mysize+myrank]).msg_maxsize, (smsg_attr_vec[i*mysize+myrank]).buff_size ); 
1375         }
1376 #endif
1377         GNI_RC_CHECK("SMSG Init", status);
1378     } //end initialization
1379     CmiBarrier();
1380     free(smsg_attr_vec);
1381 #endif
1382     free(smsg_attr);
1383     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
1384     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
1385     smsg_msglist_head = (MSG_LIST**) malloc(mysize*sizeof(MSG_LIST*));
1386     memset(smsg_msglist_head, 0, mysize*sizeof(MSG_LIST*));
1387     smsg_msglist_tail = (MSG_LIST**) malloc(mysize*sizeof(MSG_LIST*));
1388     memset(smsg_msglist_tail, 0, mysize*sizeof(MSG_LIST*));
1389
1390
1391 static void _init_static_msgq()
1392 {
1393     gni_return_t status;
1394     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
1395     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
1396     msgq_attrs.smsg_q_sz = 1;
1397     msgq_attrs.rcv_pool_sz = 1;
1398     msgq_attrs.num_msgq_eps = 2;
1399     msgq_attrs.nloc_insts = 8;
1400     msgq_attrs.modes = 0;
1401     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
1402
1403     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
1404     GNI_RC_CHECK("MSGQ Init", status);
1405
1406
1407 }
1408
1409 static void _init_DMA_buffer()
1410 {
1411     gni_return_t            status = GNI_RC_SUCCESS;
1412     /*AUTO tuning */
1413     /* suppose max_smsg is 1024, DMA buffer is split into 2048, 4096, 8192, ... */
1414     /*  This method might be better for SMP, but it is bad for Nonsmp since msgs are sharing same slots */
1415     /*
1416      * DMA_slots = 19-log2_SMSG_MAX_MSG;
1417     DMA_incoming_avail_tag = malloc(DMA_slots);
1418     DMA_buffer_size = 2*(DMA_max_single_msg - SMSG_MAX_MSG); 
1419     DMA_incoming_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1420     DMA_outgoing_base_addr =  memalign(ALIGNBUF, DMA_buffer_size);
1421     
1422     status = GNI_MemRegister(nic_hndl, (uint64_t)DMA_incoming_base_addr,
1423             DMA_buffer_size, smsg_rx_cqh,
1424             GNI_MEM_READWRITE ,   
1425             vmdh_index,
1426             &);
1427             */
1428     // one is reserved to avoid deadlock
1429     DMA_slots           = 17; // each one is 8K  16*8K + 1 slot reserved to avoid deadlock
1430     DMA_buffer_size     = DMA_max_single_msg + 8192;
1431     DMA_buffer_base_mdh_addr.addr = (uint64_t)memalign(ALIGNBUF, DMA_buffer_size);
1432     status = GNI_MemRegister(nic_hndl, DMA_buffer_base_mdh_addr.addr,
1433         DMA_buffer_size, smsg_rx_cqh,
1434         GNI_MEM_READWRITE ,   
1435         -1,
1436         &(DMA_buffer_base_mdh_addr.mdh));
1437     GNI_RC_CHECK("GNI_MemRegister", status);
1438     DMA_buffer_base_mdh_addr_vec = (mdh_addr_t*) malloc(sizeof(mdh_addr_t) * mysize);
1439
1440     allgather(&DMA_buffer_base_mdh_addr, DMA_buffer_base_mdh_addr_vec, sizeof(mdh_addr_t) );
1441 }
1442
1443 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
1444 {
1445     register int            i;
1446     int                     rc;
1447     int                     device_id = 0;
1448     unsigned int            remote_addr;
1449     gni_cdm_handle_t        cdm_hndl;
1450     gni_return_t            status = GNI_RC_SUCCESS;
1451     uint32_t                vmdh_index = -1;
1452     uint8_t                 ptag;
1453     unsigned int            local_addr, *MPID_UGNI_AllAddr;
1454     int                     first_spawned;
1455     int                     physicalID;
1456     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
1457     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
1458     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
1459    
1460     //Mempool_MaxSize = CmiGetArgFlag(*argv, "+useMemorypoolSize");
1461     //useStaticSMSG = CmiGetArgFlag(*argv, "+useStaticSmsg");
1462     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
1463     
1464     status = PMI_Init(&first_spawned);
1465     GNI_RC_CHECK("PMI_Init", status);
1466
1467     status = PMI_Get_size(&mysize);
1468     GNI_RC_CHECK("PMI_Getsize", status);
1469
1470     status = PMI_Get_rank(&myrank);
1471     GNI_RC_CHECK("PMI_getrank", status);
1472
1473     //physicalID = CmiPhysicalNodeID(myrank);
1474     
1475     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
1476
1477     *myNodeID = myrank;
1478     *numNodes = mysize;
1479   
1480     if(myrank == 0)
1481     {
1482         printf("Charm++> Running on Gemini (GNI) using %d  cores\n", mysize);
1483     }
1484 #ifdef USE_ONESIDED
1485     onesided_init(NULL, &onesided_hnd);
1486
1487     // this is a GNI test, so use the libonesided bypass functionality
1488     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
1489     local_addr = gniGetNicAddress();
1490 #else
1491     ptag = get_ptag();
1492     cookie = get_cookie();
1493 #if 0
1494     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
1495 #endif
1496     //Create and attach to the communication  domain */
1497     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
1498     GNI_RC_CHECK("GNI_CdmCreate", status);
1499     //* device id The device id is the minor number for the device
1500     //that is assigned to the device by the system when the device is created.
1501     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
1502     //where X is the device number 0 default 
1503     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
1504     GNI_RC_CHECK("GNI_CdmAttach", status);
1505     local_addr = get_gni_nic_address(0);
1506 #endif
1507     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
1508     _MEMCHECK(MPID_UGNI_AllAddr);
1509     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
1510     /* create the local completion queue */
1511     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
1512     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
1513     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
1514     
1515     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
1516     //GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
1517     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
1518
1519     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
1520     GNI_RC_CHECK("Create CQ (rx)", status);
1521     
1522     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
1523     //GNI_RC_CHECK("Create Post CQ (rx)", status);
1524     
1525     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
1526     //GNI_RC_CHECK("Create BTE CQ", status);
1527
1528     /* create the endpoints. they need to be bound to allow later CQWrites to them */
1529     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
1530     _MEMCHECK(ep_hndl_array);
1531
1532     for (i=0; i<mysize; i++) {
1533         if(i == myrank) continue;
1534         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
1535         GNI_RC_CHECK("GNI_EpCreate ", status);   
1536         remote_addr = MPID_UGNI_AllAddr[i];
1537         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
1538         GNI_RC_CHECK("GNI_EpBind ", status);   
1539     }
1540     /* Depending on the number of cores in the job, decide different method */
1541     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
1542     if(mysize > 1)
1543     {
1544         if(useStaticSMSG == 1)
1545         {
1546             _init_static_smsg(mysize);
1547         }else if(useStaticMSGQ == 1)
1548         {
1549             _init_static_msgq();
1550         }
1551     }
1552 #if     USE_LRTS_MEMPOOL
1553     init_mempool(_mempool_size);
1554     //init_mempool(Mempool_MaxSize);
1555 #endif
1556
1557     /* init DMA buffer for medium message */
1558
1559     //_init_DMA_buffer();
1560     
1561     free(MPID_UGNI_AllAddr);
1562 }
1563
1564
1565 void* LrtsAlloc(int n_bytes, int header)
1566 {
1567     void *ptr;
1568 #if 0
1569     CmiPrintf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d\n", CmiMyPe(), n_bytes, header);
1570 #endif
1571     if(n_bytes <= SMSG_MAX_MSG)
1572     {
1573         int totalsize = n_bytes+header;
1574         ptr = malloc(totalsize);
1575     }else 
1576     {
1577
1578         CmiAssert(header <= ALIGNBUF);
1579 #if     USE_LRTS_MEMPOOL
1580         n_bytes = ALIGN64(n_bytes);
1581         char *res = syh_mempool_malloc(ALIGNBUF+n_bytes);
1582 #else
1583         n_bytes = ALIGN4(n_bytes);           /* make sure size if 4 aligned */
1584         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1585 #endif
1586         ptr = res + ALIGNBUF - header;
1587     }
1588 #if 0 
1589     CmiPrintf("Done Alloc Lrts for bytes=%d, head=%d\n", n_bytes, header);
1590 #endif
1591     return ptr;
1592 }
1593
1594 void  LrtsFree(void *msg)
1595 {
1596     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1597     if (size <= SMSG_MAX_MSG)
1598       free(msg);
1599     else
1600     {
1601 #if 0
1602         CmiPrintf("[PE:%d] Free lrts for bytes=%d, ptr=%p\n", CmiMyPe(), size, (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1603 #endif
1604 #if     USE_LRTS_MEMPOOL
1605         syh_mempool_free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1606 #else
1607         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1608 #endif
1609     }
1610 #if 0 
1611     CmiPrintf("Done Free lrts for bytes=%d\n", size);
1612 #endif
1613 }
1614
1615 static void LrtsExit()
1616 {
1617     /* free memory ? */
1618 #if     USE_LRTS_MEMPOOL
1619     kill_allmempool();
1620 #endif
1621     PMI_Finalize();
1622     exit(0);
1623 }
1624
1625 static void LrtsDrainResources()
1626 {
1627     while (!SendBufferMsg()) {
1628         PumpNetworkSmsg();
1629         PumpNetworkRdmaMsgs();
1630         PumpLocalSmsgTransactions();
1631         PumpLocalRdmaTransactions();
1632     }
1633     PMI_Barrier();
1634 }
1635
1636 void CmiAbort(const char *message) {
1637
1638     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1639     PMI_Abort(-1, message);
1640 }
1641
1642 /**************************  TIMER FUNCTIONS **************************/
1643 #if CMK_TIMER_USE_SPECIAL
1644 /* MPI calls are not threadsafe, even the timer on some machines */
1645 static CmiNodeLock  timerLock = 0;
1646 static int _absoluteTime = 0;
1647 static int _is_global = 0;
1648 static struct timespec start_ns;
1649
1650 inline int CmiTimerIsSynchronized() {
1651     return 1;
1652 }
1653
1654 inline int CmiTimerAbsolute() {
1655     return _absoluteTime;
1656 }
1657
1658 double CmiStartTimer() {
1659     return 0.0;
1660 }
1661
1662 double CmiInitTime() {
1663     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
1664 }
1665
1666 void CmiTimerInit(char **argv) {
1667     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1668     if (_absoluteTime && CmiMyPe() == 0)
1669         printf("Charm++> absolute  timer is used\n");
1670     
1671     _is_global = CmiTimerIsSynchronized();
1672
1673
1674     if (_is_global) {
1675         if (CmiMyRank() == 0) {
1676             clock_gettime(CLOCK_MONOTONIC, &start_ts)
1677         }
1678     } else { /* we don't have a synchronous timer, set our own start time */
1679         CmiBarrier();
1680         CmiBarrier();
1681         CmiBarrier();
1682         clock_gettime(CLOCK_MONOTONIC, &start_ts)
1683     }
1684     CmiNodeAllBarrier();          /* for smp */
1685 }
1686
1687 /**
1688  * Since the timerLock is never created, and is
1689  * always NULL, then all the if-condition inside
1690  * the timer functions could be disabled right
1691  * now in the case of SMP.
1692  */
1693 double CmiTimer(void) {
1694     struct timespec now_ts;
1695     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1696     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1697         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1698 }
1699
1700 double CmiWallTimer(void) {
1701     struct timespec now_ts;
1702     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1703     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1704         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1705 }
1706
1707 double CmiCpuTimer(void) {
1708     struct timespec now_ts;
1709     clock_gettime(CLOCK_MONOTONIC, &now_ts)
1710     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
1711         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
1712 }
1713
1714 #endif
1715 /************Barrier Related Functions****************/
1716
1717 int CmiBarrier()
1718 {
1719     int status;
1720     status = PMI_Barrier();
1721     return status;
1722
1723 }
1724
1725
1726 #if CMK_PERSISTENT_COMM
1727 #include "machine-persistent.c"
1728 #endif
1729
1730