code clean up
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     export CHARM_UGNI_MEMPOOL_SIZE=8M
12     export CHARM_UGNI_MEMPOOL_MAX=20M
13     export CHARM_UGNI_SEND_MAX=10M
14
15     CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
16     CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
17  */
18 /*@{*/
19
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <stdint.h>
23 #include <errno.h>
24 #include <malloc.h>
25 #include <unistd.h>
26
27 #include "gni_pub.h"
28 #include "pmi.h"
29
30 #include "converse.h"
31 #include <time.h>
32
33 #define USE_OOB                     0
34
35 #define PRINT_SYH  0
36
37 // Trace communication thread
38 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
39 #define TRACE_THRESHOLD     0.00005
40 #define CMI_MPI_TRACE_MOREDETAILED 0
41 #undef CMI_MPI_TRACE_USEREVENTS
42 #define CMI_MPI_TRACE_USEREVENTS 1
43 #else
44 #undef CMK_SMP_TRACE_COMMTHREAD
45 #define CMK_SMP_TRACE_COMMTHREAD 0
46 #endif
47
48 #define CMK_TRACE_COMMOVERHEAD 0
49 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
50 #undef CMI_MPI_TRACE_USEREVENTS
51 #define CMI_MPI_TRACE_USEREVENTS 1
52 #else
53 #undef CMK_TRACE_COMMOVERHEAD
54 #define CMK_TRACE_COMMOVERHEAD 0
55 #endif
56
57 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
58 CpvStaticDeclare(double, projTraceStart);
59 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
60 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
61 #else
62 #define  START_EVENT()
63 #define  END_EVENT(x)
64 #endif
65
66 #define CMI_EXERT_SEND_CAP      0
67 #define CMI_EXERT_RECV_CAP      0
68
69 #if CMI_EXERT_SEND_CAP
70 #define SEND_CAP 16
71 #endif
72
73 #if CMI_EXERT_RECV_CAP
74 #define RECV_CAP 2
75 #endif
76
77
78
79 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
80
81 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
82
83 static int avg_smsg_connection = 32;
84 static int                 *smsg_connected_flag= 0;
85 static gni_smsg_attr_t     **smsg_attr_vector_local;
86 static gni_smsg_attr_t     **smsg_attr_vector_remote;
87 static gni_ep_handle_t     ep_hndl_unbound;
88 static gni_smsg_attr_t     send_smsg_attr;
89 static gni_smsg_attr_t     recv_smsg_attr;
90
91 typedef struct _dynamic_smsg_mailbox{
92    void     *mailbox_base;
93    int      size;
94    int      offset;
95    gni_mem_handle_t  mem_hndl;
96    struct      _dynamic_smsg_mailbox  *next;
97 }dynamic_smsg_mailbox_t;
98
99 static dynamic_smsg_mailbox_t  *mailbox_list;
100
101 #define REMOTE_EVENT                      0
102 #define USE_LRTS_MEMPOOL                  1
103
104 #if USE_LRTS_MEMPOOL
105 #if CMK_SMP
106 #define STEAL_MEMPOOL                     0
107 #endif
108
109 #define oneMB (1024ll*1024)
110 #if CMK_SMP
111 static CmiInt8 _mempool_size = 8*oneMB;
112 #else
113 static CmiInt8 _mempool_size = 32*oneMB;
114 #endif
115 static CmiInt8 _expand_mem =  4*oneMB;
116 #endif
117
118 //Dynamic flow control about memory registration
119 static CmiInt8  MAX_BUFF_SEND  =  2*oneMB*oneMB;
120 static CmiInt8  MAX_REG_MEM    =  2*oneMB*oneMB;
121 static CmiInt8 buffered_send_msg = 0;
122
123 #define BIG_MSG                  16*oneMB
124 #define ONE_SEG                  4*oneMB
125 #define BIG_MSG_PIPELINE         4
126
127 #if CMK_SMP
128 #define COMM_THREAD_SEND 1
129 #endif
130 int         rdma_id = 0;
131 #if PRINT_SYH
132 int         lrts_smsg_success = 0;
133 int         lrts_send_msg_id = 0;
134 int         lrts_send_rdma_success = 0;
135 int         lrts_local_done_msg = 0;
136
137 #endif
138 #include "machine.h"
139
140 #include "pcqueue.h"
141
142 #include "mempool.h"
143
144 #if CMK_PERSISTENT_COMM
145 #include "machine-persistent.h"
146 #endif
147
148 //#define  USE_ONESIDED 1
149 #ifdef USE_ONESIDED
150 //onesided implementation is wrong, since no place to restore omdh
151 #include "onesided.h"
152 onesided_hnd_t   onesided_hnd;
153 onesided_md_t    omdh;
154 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
155
156 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
157
158 #else
159 uint8_t   onesided_hnd, omdh;
160 #if REMOTE_EVENT
161 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
162 #else
163 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
164 #endif
165 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
166 #endif
167 #define   IncreaseMsgInRecv(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)++
168 #define   DecreaseMsgInRecv(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)--
169 #define   IncreaseMsgInSend(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)++
170                                    //printf("++++[%d]%p   ----- %d\n", myrank, ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
171 #define   DecreaseMsgInSend(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)--
172                                      //printf("---[%d]%p   ----- %d\n", myrank,((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
173 #define   GetMempooladdr(x)  ((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr
174 #define   GetMempoolptr(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mptr
175 #define   GetMempoolsize(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->size
176 #define   GetMemHndl(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mem_hndl
177 #define   GetMemHndlFromHeader(x) ((block_header*)x)->mem_hndl
178 #define   GetSizeFromHeader(x) ((block_header*)x)->size
179 #define   NoMsgInSend(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send == 0
180 #define   NoMsgInRecv(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv == 0
181 #define   NoMsgInFlight(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send + ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv  == 0
182 #define   IsMemHndlZero(x)  (x.qword1 == 0 && x.qword2 == 0)
183 #define   SetMemHndlZero(x)  x.qword1 = 0; x.qword2 = 0
184 #define   NotRegistered(x)  IsMemHndlZero(((block_header*)x)->mem_hndl)
185
186 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
187 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
188 #define CmiGetMsgSeq(m)  ((CmiMsgHeaderExt*)m)->seq
189 #define CmiSetMsgSeq(m, s)  ((((CmiMsgHeaderExt*)m)->seq) = (s))
190
191 #define ALIGNBUF                64
192
193 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
194 /* If SMSG is not used */
195
196 #define FMA_PER_CORE  1024
197 #define FMA_BUFFER_SIZE 1024
198
199 /* If SMSG is used */
200 static int  SMSG_MAX_MSG = 1024;
201 #define SMSG_MAX_CREDIT 72 
202
203 #define MSGQ_MAXSIZE       2048
204 /* large message transfer with FMA or BTE */
205 #define LRTS_GNI_RDMA_THRESHOLD  1024 
206
207 #if CMK_SMP
208 static int  REMOTE_QUEUE_ENTRIES=163840; 
209 static int LOCAL_QUEUE_ENTRIES=163840; 
210 #else
211 static int  REMOTE_QUEUE_ENTRIES=20480;
212 static int LOCAL_QUEUE_ENTRIES=20480; 
213 #endif
214
215 #define BIG_MSG_TAG  0x26
216 #define PUT_DONE_TAG      0x28
217 #define DIRECT_PUT_DONE_TAG      0x29
218 #define ACK_TAG           0x30
219 /* SMSG is data message */
220 #define SMALL_DATA_TAG          0x31
221 /* SMSG is a control message to initialize a BTE */
222 #define MEDIUM_HEAD_TAG         0x32
223 #define MEDIUM_DATA_TAG         0x33
224 #define LMSG_INIT_TAG           0x39 
225 #define VERY_LMSG_INIT_TAG      0x40 
226 #define VERY_LMSG_TAG           0x41 
227
228 #define DEBUG
229 #ifdef GNI_RC_CHECK
230 #undef GNI_RC_CHECK
231 #endif
232 #ifdef DEBUG
233 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
234 #else
235 #define GNI_RC_CHECK(msg,rc)
236 #endif
237
238 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
239 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
240
241 static int useStaticMSGQ = 0;
242 static int useStaticFMA = 0;
243 static int mysize, myrank;
244 gni_nic_handle_t      nic_hndl;
245
246 typedef struct {
247     gni_mem_handle_t mdh;
248     uint64_t addr;
249 } mdh_addr_t ;
250 // this is related to dynamic SMSG
251
252 typedef struct mdh_addr_list{
253     gni_mem_handle_t mdh;
254    void *addr;
255     struct mdh_addr_list *next;
256 }mdh_addr_list_t;
257
258 static unsigned int         smsg_memlen;
259 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
260 mdh_addr_t          setup_mem;
261 mdh_addr_t          *smsg_connection_vec = 0;
262 gni_mem_handle_t    smsg_connection_memhndl;
263 static int          smsg_expand_slots = 10;
264 static int          smsg_available_slot = 0;
265 static void         *smsg_mailbox_mempool = 0;
266 mdh_addr_list_t     *smsg_dynamic_list = 0;
267
268 static void             *smsg_mailbox_base;
269 gni_msgq_attr_t         msgq_attrs;
270 gni_msgq_handle_t       msgq_handle;
271 gni_msgq_ep_attr_t      msgq_ep_attrs;
272 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
273
274 /* =====Beginning of Declarations of Machine Specific Variables===== */
275 static int cookie;
276 static int modes = 0;
277 static gni_cq_handle_t       smsg_rx_cqh = NULL;
278 static gni_cq_handle_t       smsg_tx_cqh = NULL;
279 static gni_cq_handle_t       post_rx_cqh = NULL;
280 static gni_cq_handle_t       post_tx_cqh = NULL;
281 static gni_ep_handle_t       *ep_hndl_array;
282 #if CMK_SMP && !COMM_THREAD_SEND
283 static CmiNodeLock           *ep_lock_array;
284 static CmiNodeLock           tx_cq_lock; 
285 static CmiNodeLock           rx_cq_lock; 
286 #endif
287 typedef struct msg_list
288 {
289     uint32_t destNode;
290     uint32_t size;
291     void *msg;
292     uint8_t tag;
293 #if !CMK_SMP
294     struct msg_list *next;
295 #endif
296 }MSG_LIST;
297
298
299 typedef struct control_msg
300 {
301     uint64_t            source_addr;    /* address from the start of buffer  */
302     uint64_t            dest_addr;      /* address from the start of buffer */
303     //int                 source;         /* source rank */
304     int                 total_length;   /* total length */
305     int                 length;         /* length of this packet */
306     uint8_t             seq_id;         //big message   0 meaning single message
307     gni_mem_handle_t    source_mem_hndl;
308     struct control_msg *next;
309 }CONTROL_MSG;
310
311 #ifdef CMK_DIRECT
312 typedef struct{
313     void *recverBuf;
314     void (*callbackFnPtr)(void *);
315     void *callbackData;
316 }CMK_DIRECT_HEADER;
317 #endif
318 typedef struct  rmda_msg
319 {
320     int                   destNode;
321     gni_post_descriptor_t *pd;
322 #if !CMK_SMP
323     struct  rmda_msg      *next;
324 #endif
325 }RDMA_REQUEST;
326
327 #if CMK_SMP
328 PCQueue sendRdmaBuf;
329 typedef struct  msg_list_index
330 {
331     int         next;
332     PCQueue     sendSmsgBuf;
333 } MSG_LIST_INDEX;
334 #else
335 static RDMA_REQUEST        *sendRdmaBuf = 0;
336 static RDMA_REQUEST        *sendRdmaTail = 0;
337 typedef struct  msg_list_index
338 {
339     int         next;
340     MSG_LIST    *sendSmsgBuf;
341     MSG_LIST    *tail;
342 } MSG_LIST_INDEX;
343 #endif
344
345 /* reuse PendingMsg memory */
346 static CONTROL_MSG          *control_freelist=0;
347 static MSG_LIST             *msglist_freelist=0;
348
349 typedef struct smsg_queue
350 {
351     MSG_LIST_INDEX   *smsg_msglist_index;
352     int               smsg_head_index;
353 } SMSG_QUEUE;
354
355 SMSG_QUEUE                  smsg_queue;
356 SMSG_QUEUE                  smsg_oob_queue;
357
358 #if CMK_SMP
359
360 #define FreeMsgList(d)   free(d);
361 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
362
363 #else
364
365 #define FreeMsgList(d)  \
366   (d)->next = msglist_freelist;\
367   msglist_freelist = d;
368
369 #define MallocMsgList(d) \
370   d = msglist_freelist;\
371   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
372              _MEMCHECK(d);\
373   } else msglist_freelist = d->next; \
374   d->next =0;
375 #endif
376
377 #if CMK_SMP
378
379 #define FreeControlMsg(d)      free(d);
380 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
381
382 #else
383
384 #define FreeControlMsg(d)       \
385   (d)->next = control_freelist;\
386   control_freelist = d;
387
388 #define MallocControlMsg(d) \
389   d = control_freelist;\
390   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
391              _MEMCHECK(d);\
392   } else control_freelist = d->next;
393
394 #endif
395
396 static RDMA_REQUEST         *rdma_freelist = NULL;
397
398 #define FreeMediumControlMsg(d)       \
399   (d)->next = medium_control_freelist;\
400   medium_control_freelist = d;
401
402
403 #define MallocMediumControlMsg(d) \
404     d = medium_control_freelist;\
405     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
406     _MEMCHECK(d);\
407 } else mediumcontrol_freelist = d->next;
408
409 # if CMK_SMP
410 #define FreeRdmaRequest(d)       free(d);
411 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
412 #else
413
414 #define FreeRdmaRequest(d)       \
415   (d)->next = rdma_freelist;\
416   rdma_freelist = d;
417
418 #define MallocRdmaRequest(d) \
419   d = rdma_freelist;\
420   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
421              _MEMCHECK(d);\
422   } else rdma_freelist = d->next; \
423     d->next =0;
424 #endif
425
426 /* reuse gni_post_descriptor_t */
427 static gni_post_descriptor_t *post_freelist=0;
428
429 #if  !CMK_SMP
430 #define FreePostDesc(d)       \
431     (d)->next_descr = post_freelist;\
432     post_freelist = d;
433
434 #define MallocPostDesc(d) \
435   d = post_freelist;\
436   if (d==0) { \
437      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
438      _MEMCHECK(d);\
439   } else post_freelist = d->next_descr;
440 #else
441
442 #define FreePostDesc(d)     free(d);
443 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
444
445 #endif
446
447
448 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
449 static int      buffered_smsg_counter = 0;
450
451 /* SmsgSend return success but message sent is not confirmed by remote side */
452 static MSG_LIST *buffered_fma_head = 0;
453 static MSG_LIST *buffered_fma_tail = 0;
454
455 /* functions  */
456 #define IsFree(a,ind)  !( a& (1<<(ind) ))
457 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
458 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
459
460 CpvDeclare(mempool_type*, mempool);
461
462 /* get the upper bound of log 2 */
463 int mylog2(int size)
464 {
465     int op = size;
466     unsigned int ret=0;
467     unsigned int mask = 0;
468     int i;
469     while(op>0)
470     {
471         op = op >> 1;
472         ret++;
473
474     }
475     for(i=1; i<ret; i++)
476     {
477         mask = mask << 1;
478         mask +=1;
479     }
480
481     ret -= ((size &mask) ? 0:1);
482     return ret;
483 }
484
485 static void
486 allgather(void *in,void *out, int len)
487 {
488     static int *ivec_ptr=NULL,already_called=0,job_size=0;
489     int i,rc;
490     int my_rank;
491     char *tmp_buf,*out_ptr;
492
493     if(!already_called) {
494
495         rc = PMI_Get_size(&job_size);
496         CmiAssert(rc == PMI_SUCCESS);
497         rc = PMI_Get_rank(&my_rank);
498         CmiAssert(rc == PMI_SUCCESS);
499
500         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
501         CmiAssert(ivec_ptr != NULL);
502
503         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
504         CmiAssert(rc == PMI_SUCCESS);
505
506         already_called = 1;
507
508     }
509
510     tmp_buf = (char *)malloc(job_size * len);
511     CmiAssert(tmp_buf);
512
513     rc = PMI_Allgather(in,tmp_buf,len);
514     CmiAssert(rc == PMI_SUCCESS);
515
516     out_ptr = out;
517
518     for(i=0;i<job_size;i++) {
519
520         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
521
522     }
523
524     free(tmp_buf);
525 }
526
527 static void
528 allgather_2(void *in,void *out, int len)
529 {
530     //PMI_Allgather is out of order
531     int i,rc, extend_len;
532     int  rank_index;
533     char *out_ptr, *out_ref;
534     char *in2;
535
536     extend_len = sizeof(int) + len;
537     in2 = (char*)malloc(extend_len);
538
539     memcpy(in2, &myrank, sizeof(int));
540     memcpy(in2+sizeof(int), in, len);
541
542     out_ptr = (char*)malloc(mysize*extend_len);
543
544     rc = PMI_Allgather(in2, out_ptr, extend_len);
545     GNI_RC_CHECK("allgather", rc);
546
547     out_ref = out;
548
549     for(i=0;i<mysize;i++) {
550         //rank index 
551         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
552         //copy to the rank index slot
553         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
554     }
555
556     free(out_ptr);
557     free(in2);
558
559 }
560
561 static unsigned int get_gni_nic_address(int device_id)
562 {
563     unsigned int address, cpu_id;
564     gni_return_t status;
565     int i, alps_dev_id=-1,alps_address=-1;
566     char *token, *p_ptr;
567
568     p_ptr = getenv("PMI_GNI_DEV_ID");
569     if (!p_ptr) {
570         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
571        
572         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
573     } else {
574         while ((token = strtok(p_ptr,":")) != NULL) {
575             alps_dev_id = atoi(token);
576             if (alps_dev_id == device_id) {
577                 break;
578             }
579             p_ptr = NULL;
580         }
581         CmiAssert(alps_dev_id != -1);
582         p_ptr = getenv("PMI_GNI_LOC_ADDR");
583         CmiAssert(p_ptr != NULL);
584         i = 0;
585         while ((token = strtok(p_ptr,":")) != NULL) {
586             if (i == alps_dev_id) {
587                 alps_address = atoi(token);
588                 break;
589             }
590             p_ptr = NULL;
591             ++i;
592         }
593         CmiAssert(alps_address != -1);
594         address = alps_address;
595     }
596     return address;
597 }
598
599 static uint8_t get_ptag(void)
600 {
601     char *p_ptr, *token;
602     uint8_t ptag;
603
604     p_ptr = getenv("PMI_GNI_PTAG");
605     CmiAssert(p_ptr != NULL);
606     token = strtok(p_ptr, ":");
607     ptag = (uint8_t)atoi(token);
608     return ptag;
609         
610 }
611
612 static uint32_t get_cookie(void)
613 {
614     uint32_t cookie;
615     char *p_ptr, *token;
616
617     p_ptr = getenv("PMI_GNI_COOKIE");
618     CmiAssert(p_ptr != NULL);
619     token = strtok(p_ptr, ":");
620     cookie = (uint32_t)atoi(token);
621
622     return cookie;
623 }
624
625 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
626 /* TODO: add any that are related */
627 /* =====End of Definitions of Message-Corruption Related Macros=====*/
628
629
630 #include "machine-lrts.h"
631 #include "machine-common-core.c"
632
633 /* Network progress function is used to poll the network when for
634    messages. This flushes receive buffers on some  implementations*/
635 #if CMK_MACHINE_PROGRESS_DEFINED
636 void CmiMachineProgressImpl() {
637 }
638 #endif
639
640 static void SendRdmaMsg();
641 static void PumpNetworkSmsg();
642 static void PumpLocalRdmaTransactions();
643 static int SendBufferMsg();
644 static     int register_mempool = 0;
645
646 #if MACHINE_DEBUG_LOG
647 FILE *debugLog = NULL;
648 static CmiInt8 buffered_recv_msg = 0;
649 int         lrts_smsg_success = 0;
650 int         lrts_received_msg = 0;
651 #endif
652 static void sweep_mempool(mempool_type *mptr)
653 {
654     block_header *current = &(mptr->block_head);
655
656 printf("[n %d] sweep_mempool slot START .\n", myrank);
657     while( current!= NULL) {
658 printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
659         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
660     }
661 printf("[n %d] sweep_mempool slot END .\n", myrank);
662 }
663
664 inline
665 static  gni_return_t deregisterMempool(mempool_type *mptr, block_header **from)
666 {
667     gni_return_t status;
668     block_header *current = *from;
669
670     while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
671        current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
672
673     *from = current;
674     if(current == NULL) return GNI_RC_ERROR_RESOURCE;
675     status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh);
676     GNI_RC_CHECK("registerMemorypool de-register", status);
677     register_mempool -= GetSizeFromHeader(current);
678 //printf("[n %d] deregisterMempool done. register_mempool: %d\n", myrank, register_mempool);
679     SetMemHndlZero(GetMemHndlFromHeader(current));
680     return GNI_RC_SUCCESS;
681 }
682
683 inline 
684 static gni_return_t registerMempool(void *msg)
685 {
686     gni_return_t status = GNI_RC_SUCCESS;
687     int size = GetMempoolsize(msg);
688     void *addr = GetMempooladdr(msg);
689     gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
690    
691     mempool_type *mptr = (mempool_type*)GetMempoolptr(msg);
692     block_header *current = &(mptr->block_head);
693
694     while(register_mempool > MAX_REG_MEM)
695     {
696         status = deregisterMempool(mptr, &current);
697         if (status != GNI_RC_SUCCESS) break;
698     };
699     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_mempool); 
700     while(1)
701     {
702         status = MEMORY_REGISTER(onesided_hnd, nic_hndl, addr, size, memhndl, &omdh);
703         if(status == GNI_RC_SUCCESS)
704         {
705             register_mempool += size;
706             break;
707         }
708         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
709         {
710             CmiAbort("Memory registor for mempool fails\n");
711         }
712         else
713         {
714             status = deregisterMempool(mptr, &current);
715             if (status != GNI_RC_SUCCESS) break;
716         }
717     }; 
718     return status;
719 }
720
721 inline
722 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
723 {
724     MSG_LIST        *msg_tmp;
725     MallocMsgList(msg_tmp);
726     msg_tmp->destNode = destNode;
727     msg_tmp->size   = size;
728     msg_tmp->msg    = msg;
729     msg_tmp->tag    = tag;
730
731 #if !CMK_SMP
732     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
733         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
734         queue->smsg_head_index = destNode;
735         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
736     }else
737     {
738         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
739         queue->smsg_msglist_index[destNode].tail = msg_tmp;
740     }
741 #else
742     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
743 #endif
744 #if PRINT_SYH
745     buffered_smsg_counter++;
746 #endif
747 }
748
749 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
750 {
751     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
752 }
753
754 inline
755 static void setup_smsg_connection(int destNode)
756 {
757     mdh_addr_list_t  *new_entry = 0;
758     gni_post_descriptor_t *pd;
759     gni_smsg_attr_t      *smsg_attr;
760     gni_return_t status = GNI_RC_NOT_DONE;
761     RDMA_REQUEST        *rdma_request_msg;
762     
763     if(smsg_available_slot == smsg_expand_slots)
764     {
765         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
766         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
767         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
768
769         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
770             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
771             GNI_MEM_READWRITE,   
772             -1,
773             &(new_entry->mdh));
774         smsg_available_slot = 0; 
775         new_entry->next = smsg_dynamic_list;
776         smsg_dynamic_list = new_entry;
777     }
778     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
779     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
780     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
781     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
782     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
783     smsg_attr->buff_size = smsg_memlen;
784     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
785     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
786     smsg_local_attr_vec[destNode] = smsg_attr;
787     smsg_available_slot++;
788     MallocPostDesc(pd);
789     pd->type            = GNI_POST_FMA_PUT;
790     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
791     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
792     pd->length          = sizeof(gni_smsg_attr_t);
793     pd->local_addr      = (uint64_t) smsg_attr;
794     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
795     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
796     pd->src_cq_hndl     = 0;
797     pd->rdma_mode       = 0;
798     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
799     print_smsg_attr(smsg_attr);
800     if(status == GNI_RC_ERROR_RESOURCE )
801     {
802         MallocRdmaRequest(rdma_request_msg);
803         rdma_request_msg->destNode = destNode;
804         rdma_request_msg->pd = pd;
805         /* buffer this request */
806     }
807 #if PRINT_SYH
808     if(status != GNI_RC_SUCCESS)
809        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
810     else
811         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
812 #endif
813 }
814
815 /* useDynamicSMSG */
816 inline 
817 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
818 {
819     gni_return_t status = GNI_RC_NOT_DONE;
820
821     if(mailbox_list->offset == mailbox_list->size)
822     {
823         dynamic_smsg_mailbox_t *new_mailbox_entry;
824         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
825         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
826         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
827         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
828         new_mailbox_entry->offset = 0;
829         
830         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
831             new_mailbox_entry->size, smsg_rx_cqh,
832             GNI_MEM_READWRITE,   
833             -1,
834             &(new_mailbox_entry->mem_hndl));
835
836         GNI_RC_CHECK("register", status);
837         new_mailbox_entry->next = mailbox_list;
838         mailbox_list = new_mailbox_entry;
839     }
840     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
841     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
842     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
843     local_smsg_attr->mbox_offset = mailbox_list->offset;
844     mailbox_list->offset += smsg_memlen;
845     local_smsg_attr->buff_size = smsg_memlen;
846     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
847     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
848 }
849
850 /* useDynamicSMSG */
851 inline 
852 static int connect_to(int destNode)
853 {
854     gni_return_t status = GNI_RC_NOT_DONE;
855     CmiAssert(smsg_connected_flag[destNode] == 0);
856     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
857     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
858     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
859     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
860             
861 #if CMK_SMP && !COMM_THREAD_SEND
862     //CmiLock(ep_lock_array[destNode]);
863     CmiLock(tx_cq_lock);
864 #endif
865     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
866 #if CMK_SMP && !COMM_THREAD_SEND
867     //CmiUnlock(ep_lock_array[destNode]);
868     CmiUnlock(tx_cq_lock);
869 #endif
870     if (status == GNI_RC_ERROR_RESOURCE) {
871       /* possibly destNode is making connection at the same time */
872       free(smsg_attr_vector_local[destNode]);
873       smsg_attr_vector_local[destNode] = NULL;
874       free(smsg_attr_vector_remote[destNode]);
875       smsg_attr_vector_remote[destNode] = NULL;
876       mailbox_list->offset -= smsg_memlen;
877       return 0;
878     }
879     GNI_RC_CHECK("GNI_Post", status);
880     smsg_connected_flag[destNode] = 1;
881     return 1;
882 }
883
884 //inline static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
885 inline static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
886 {
887     unsigned int          remote_address;
888     uint32_t              remote_id;
889     gni_return_t status = GNI_RC_NOT_DONE;
890     gni_smsg_attr_t      *smsg_attr;
891     gni_post_descriptor_t *pd;
892     gni_post_state_t      post_state;
893     char                  *real_data; 
894
895     if (useDynamicSMSG) {
896         switch (smsg_connected_flag[destNode]) {
897         case 0: 
898             connect_to(destNode);         /* continue to case 1 */
899         case 1:                           /* pending connection, do nothing */
900             status = GNI_RC_NOT_DONE;
901             if(inbuff ==0)
902                 buffer_small_msgs(queue, msg, size, destNode, tag);
903             return status;
904         }
905     }
906 #if CMK_SMP
907     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
908     {
909 #else
910     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
911     {
912 #endif
913 #if CMK_SMP && !COMM_THREAD_SEND
914         CmiLock(tx_cq_lock);
915 #endif
916         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
917 #if CMK_SMP && !COMM_THREAD_SEND
918         CmiUnlock(tx_cq_lock);
919 #endif
920         if(status == GNI_RC_SUCCESS)
921         {
922 #if CMK_SMP_TRACE_COMMTHREAD
923             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
924             { 
925                 START_EVENT();
926                 if ( tag == SMALL_DATA_TAG)
927                     real_data = (char*)msg; 
928                 else 
929                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
930                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
931             }
932 #endif
933             return status;
934         }
935     }
936     if(inbuff ==0)
937         buffer_small_msgs(queue, msg, size, destNode, tag);
938     return status;
939 }
940
941
942 inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
943 {
944     /* construct a control message and send */
945     CONTROL_MSG         *control_msg_tmp;
946     MallocControlMsg(control_msg_tmp);
947     control_msg_tmp->source_addr = (uint64_t)msg;
948     control_msg_tmp->seq_id    = seqno;
949     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
950 #if     USE_LRTS_MEMPOOL
951     if(size < BIG_MSG)
952     {
953         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
954     }
955     else
956     {
957         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
958         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
959         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
960     }
961 #else
962     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
963 #endif
964     return control_msg_tmp;
965 }
966
967 // Large message, send control to receiver, receiver register memory and do a GET, 
968 // return 1 - send no success
969 inline
970 static int send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
971 {
972     gni_return_t        status  =   GNI_RC_NOT_DONE;
973     uint32_t            vmdh_index  = -1;
974     int                 size;
975     int                 offset = 0;
976     uint64_t            source_addr;
977     int                 register_size; 
978     size    =   control_msg_tmp->total_length;
979     source_addr = control_msg_tmp->source_addr;
980     register_size = control_msg_tmp->length;
981
982 #if     USE_LRTS_MEMPOOL
983     if( control_msg_tmp->seq_id == 0 ){
984         if(buffered_send_msg >= MAX_BUFF_SEND)
985         {
986             if(!inbuff)
987                 buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
988             return status;
989         }
990         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
991         {
992             //register the corresponding mempool
993             status = registerMempool((void*)source_addr);
994             if(status == GNI_RC_SUCCESS)
995             {
996                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
997             }
998         }else
999         {
1000             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1001             status = GNI_RC_SUCCESS;
1002         }
1003         if(NoMsgInSend( (void*)(control_msg_tmp->source_addr)))
1004             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1005         else
1006             register_size = 0;
1007     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1008     {
1009         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1010         source_addr += offset;
1011         size = control_msg_tmp->length;
1012         status = MEMORY_REGISTER(onesided_hnd, nic_hndl, source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
1013         register_size = 0;  
1014     }
1015
1016     if(status == GNI_RC_SUCCESS)
1017     {
1018         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
1019         if(status == GNI_RC_SUCCESS)
1020         {
1021             buffered_send_msg += register_size;
1022             if(control_msg_tmp->seq_id == 0)
1023             {
1024                 IncreaseMsgInSend(source_addr);
1025             }
1026             FreeControlMsg(control_msg_tmp);
1027             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_mempool, LMSG_INIT_TAG); 
1028         }
1029     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1030     {
1031         CmiAbort("Memory registor for large msg\n");
1032     }else 
1033     {
1034         if(!inbuff)
1035             buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1036     }
1037     return status;
1038 #else
1039     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
1040     if(status == GNI_RC_SUCCESS)
1041     {
1042         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
1043         if(status == GNI_RC_SUCCESS)
1044         {
1045             FreeControlMsg(control_msg_tmp);
1046         }
1047     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1048     {
1049         CmiAbort("Memory registor for large msg\n");
1050     }else 
1051     {
1052         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1053     }
1054     return status;
1055 #endif
1056 }
1057
1058 inline void LrtsPrepareEnvelope(char *msg, int size)
1059 {
1060     CmiSetMsgSize(msg, size);
1061 }
1062
1063 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1064 {
1065     gni_return_t        status  =   GNI_RC_SUCCESS;
1066     uint8_t tag;
1067     CONTROL_MSG         *control_msg_tmp;
1068     int                 oob = ( mode & OUT_OF_BAND);
1069     SMSG_QUEUE          *queue;
1070
1071 #if USE_OOB
1072     queue = oob? &smsg_oob_queue : &smsg_queue;
1073 #else
1074     queue = &smsg_queue;
1075 #endif
1076
1077     LrtsPrepareEnvelope(msg, size);
1078
1079 #if PRINT_SYH
1080     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1081 #endif 
1082 #if CMK_SMP && COMM_THREAD_SEND
1083     if(size <= SMSG_MAX_MSG)
1084         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1085     else if (size < BIG_MSG) {
1086         control_msg_tmp =  construct_control_msg(size, msg, 0);
1087         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1088     }
1089     else {
1090           CmiSetMsgSeq(msg, 0);
1091           control_msg_tmp =  construct_control_msg(size, msg, 1);
1092           buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1093     }
1094 #else //non-smp, smp(worker sending)
1095     if(size <= SMSG_MAX_MSG)
1096     {
1097         status = send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0);  
1098         if(status == GNI_RC_SUCCESS)
1099             CmiFree(msg);
1100     }
1101     else if (size < BIG_MSG) {
1102         control_msg_tmp =  construct_control_msg(size, msg, 0);
1103         send_large_messages(queue, destNode, control_msg_tmp, 0);
1104     }
1105     else {
1106 #if     USE_LRTS_MEMPOOL
1107         CmiSetMsgSeq(msg, 0);
1108         control_msg_tmp =  construct_control_msg(size, msg, 1);
1109         send_large_messages(queue, destNode, control_msg_tmp, 0);
1110 #else
1111         control_msg_tmp =  construct_control_msg(size, msg, 0);
1112         send_large_messages(queue, destNode, control_msg_tmp, 0);
1113 #endif
1114     }
1115 #endif
1116     return 0;
1117 }
1118
1119 static void    PumpDatagramConnection();
1120 static void registerUserTraceEvents() {
1121 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1122     traceRegisterUserEvent("setting up connections", 10);
1123     traceRegisterUserEvent("Receiving small msgs", 20);
1124     traceRegisterUserEvent("Release local transaction", 30);
1125     traceRegisterUserEvent("Sending buffered small msgs", 40);
1126     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1127 #endif
1128 }
1129
1130 void LrtsPostCommonInit(int everReturn)
1131 {
1132 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1133     CpvInitialize(double, projTraceStart);
1134     /* only PE 0 needs to care about registration (to generate sts file). */
1135     if (CmiMyPe() == 0) {
1136         registerMachineUserEventsFunction(&registerUserTraceEvents);
1137     }
1138 #endif
1139
1140 #if CMK_SMP
1141     CmiIdleState *s=CmiNotifyGetState();
1142     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1143     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1144 #else
1145     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1146     if (useDynamicSMSG)
1147     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1148 #endif
1149 }
1150
1151 /* this is called by worker thread */
1152 void LrtsPostNonLocal(){
1153 #if CMK_SMP
1154 #if !COMM_THREAD_SEND
1155     if(mysize == 1) return;
1156     PumpLocalRdmaTransactions();
1157     SendBufferMsg();
1158     SendRdmaMsg();
1159 #endif
1160 #endif
1161 }
1162
1163 /* useDynamicSMSG */
1164 static void    PumpDatagramConnection()
1165 {
1166     uint32_t          remote_address;
1167     uint32_t          remote_id;
1168     gni_return_t status;
1169     gni_post_state_t  post_state;
1170     uint64_t          datagram_id;
1171     int i;
1172
1173    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1174    {
1175        if (datagram_id >= mysize) {           /* bound endpoint */
1176            int pe = datagram_id - mysize;
1177 #if CMK_SMP && !COMM_THREAD_SEND
1178            CmiLock(tx_cq_lock);
1179 #endif
1180            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1181 #if CMK_SMP && !COMM_THREAD_SEND
1182            CmiUnlock(tx_cq_lock);
1183 #endif
1184        if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1185        {
1186           CmiAssert(remote_id == pe);
1187           status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1188           GNI_RC_CHECK("Dynamic SMSG Init", status);
1189 #if PRINT_SYH
1190           printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1191 #endif
1192           CmiAssert(smsg_connected_flag[pe] == 1);
1193           smsg_connected_flag[pe] = 2;
1194        }
1195      }
1196      else {         /* unbound ep */
1197        status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1198        if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1199        {
1200           CmiAssert(remote_id<mysize);
1201           CmiAssert(smsg_connected_flag[remote_id] <= 0);
1202           status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1203           GNI_RC_CHECK("Dynamic SMSG Init", status);
1204 #if PRINT_SYH
1205           printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1206 #endif
1207           smsg_connected_flag[remote_id] = 2;
1208
1209           alloc_smsg_attr(&send_smsg_attr);
1210           status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1211           GNI_RC_CHECK("post unbound datagram", status);
1212         }
1213      }
1214    }
1215 }
1216
1217 /* pooling CQ to receive network message */
1218 static void PumpNetworkRdmaMsgs()
1219 {
1220     gni_cq_entry_t      event_data;
1221     gni_return_t        status;
1222
1223 }
1224
1225 inline 
1226 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1227 {
1228     RDMA_REQUEST        *rdma_request_msg;
1229     MallocRdmaRequest(rdma_request_msg);
1230     rdma_request_msg->destNode = inst_id;
1231     rdma_request_msg->pd = pd;
1232 #if CMK_SMP
1233     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1234 #else
1235     if(sendRdmaBuf == 0)
1236     {
1237         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1238     }else{
1239         sendRdmaTail->next = rdma_request_msg;
1240         sendRdmaTail =  rdma_request_msg;
1241     }
1242 #endif
1243
1244 }
1245 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1246 static void PumpNetworkSmsg()
1247 {
1248     uint64_t            inst_id;
1249     int                 ret;
1250     gni_cq_entry_t      event_data;
1251     gni_return_t        status;
1252     void                *header;
1253     uint8_t             msg_tag;
1254     int                 msg_nbytes;
1255     void                *msg_data;
1256     gni_mem_handle_t    msg_mem_hndl;
1257     gni_smsg_attr_t     *smsg_attr;
1258     gni_smsg_attr_t     *remote_smsg_attr;
1259     int                 init_flag;
1260     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1261     uint64_t            source_addr;
1262     SMSG_QUEUE         *queue = &smsg_queue;
1263
1264 #if CMK_SMP && !COMM_THREAD_SEND
1265     while(1)
1266     {
1267         CmiLock(tx_cq_lock);
1268         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1269         CmiUnlock(tx_cq_lock);
1270         if(status != GNI_RC_SUCCESS)
1271             break;
1272 #else
1273     while ( (status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
1274     {
1275 #endif
1276         inst_id = GNI_CQ_GET_INST_ID(event_data);
1277         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1278 #if PRINT_SYH
1279         printf("[%d] PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
1280 #endif
1281         if (useDynamicSMSG) {
1282             /* subtle: smsg may come before connection is setup */
1283             while (smsg_connected_flag[inst_id] != 2) 
1284                PumpDatagramConnection();
1285         }
1286         msg_tag = GNI_SMSG_ANY_TAG;
1287 #if CMK_SMP && !COMM_THREAD_SEND
1288         while(1) {
1289             CmiLock(tx_cq_lock);
1290             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1291             CmiUnlock(tx_cq_lock);
1292             if (status != GNI_RC_SUCCESS)
1293                 break;
1294 #else
1295         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS){
1296 #endif
1297             /* copy msg out and then put into queue (small message) */
1298             switch (msg_tag) {
1299             case SMALL_DATA_TAG:
1300             {
1301                 START_EVENT();
1302                 msg_nbytes = CmiGetMsgSize(header);
1303                 msg_data    = CmiAlloc(msg_nbytes);
1304                 memcpy(msg_data, (char*)header, msg_nbytes);
1305 #if CMK_SMP_TRACE_COMMTHREAD
1306                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1307 #endif
1308                 handleOneRecvedMsg(msg_nbytes, msg_data);
1309                 break;
1310             }
1311             case LMSG_INIT_TAG:
1312             {
1313 #if PRINT_SYH
1314                 printf("[%d] from %d request for Large msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
1315 #endif
1316                 getLargeMsgRequest(header, inst_id);
1317                 break;
1318             }
1319             case ACK_TAG:   //msg fit into mempool
1320             {
1321                 /* Get is done, release message . Now put is not used yet*/
1322 #if ! USE_LRTS_MEMPOOL
1323                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
1324 #else
1325                 DecreaseMsgInSend( ((void*)((CONTROL_MSG *) header)->source_addr));
1326 #endif
1327                 if(NoMsgInSend(((void*)((CONTROL_MSG *) header)->source_addr)))
1328                     buffered_send_msg -= GetMempoolsize((void*)((CONTROL_MSG *) header)->source_addr);
1329                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool, msg_tag); 
1330                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
1331                 break;
1332             }
1333             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1334             {
1335                 header_tmp = (CONTROL_MSG *) header;
1336                 void *msg = (void*)(header_tmp->source_addr);
1337                 int cur_seq = CmiGetMsgSeq(header_tmp->source_addr);
1338                 int offset = ONE_SEG*(cur_seq+1);
1339                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh);
1340                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1341                 if (remain_size < 0) remain_size = 0;
1342                 CmiSetMsgSize(msg, remain_size);
1343                 if(remain_size <= 0) //transaction done
1344                 {
1345                     CmiFree(msg);
1346                 }else if (header_tmp->total_length > offset)
1347                 {
1348                     CmiSetMsgSeq(header_tmp->source_addr, cur_seq+1);
1349                     MallocControlMsg(control_msg_tmp);
1350                     control_msg_tmp->source_addr = header_tmp->source_addr;
1351                     control_msg_tmp->dest_addr    = header_tmp->dest_addr;
1352                     control_msg_tmp->total_length   = header_tmp->total_length; 
1353                     control_msg_tmp->length         = header_tmp->total_length - offset;
1354                     if (control_msg_tmp->length >= ONE_SEG) control_msg_tmp->length = ONE_SEG;
1355                     control_msg_tmp->seq_id         = cur_seq+1+1;
1356                     //send next seg
1357                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1358                          // pipelining
1359                     if (header_tmp->seq_id == 1) {
1360                       int i;
1361                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1362                         int seq = cur_seq+i+2;
1363                         CmiSetMsgSeq(header_tmp->source_addr, seq-1);
1364                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)header_tmp->source_addr, seq);
1365                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1366                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1367                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1368                       }
1369                     }
1370                 }
1371                 break;
1372             }
1373
1374 #if CMK_PERSISTENT_COMM
1375             case PUT_DONE_TAG: //persistent message
1376             void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1377             int size = ((CONTROL_MSG *) header)->length;
1378             CmiReference(msg);
1379             handleOneRecvedMsg(size, msg); 
1380             break;
1381 #endif
1382 #ifdef CMK_DIRECT
1383             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1384             (*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1385            break;
1386 #endif
1387             default: {
1388                 printf("weird tag problem\n");
1389                 CmiAbort("Unknown tag\n");
1390                      }
1391             }
1392 #if CMK_SMP && !COMM_THREAD_SEND
1393             CmiLock(tx_cq_lock);
1394 #endif
1395             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1396 #if CMK_SMP && !COMM_THREAD_SEND
1397             CmiUnlock(tx_cq_lock);
1398 #endif
1399             msg_tag = GNI_SMSG_ANY_TAG;
1400         } //endwhile getNext
1401     }   //end while GetEvent
1402     if(status == GNI_RC_ERROR_RESOURCE)
1403     {
1404         GNI_RC_CHECK("Smsg_rx_cq full", status);
1405     }
1406 }
1407
1408 static void printDesc(gni_post_descriptor_t *pd)
1409 {
1410     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1411 }
1412
1413 // for BIG_MSG called on receiver side for receiving control message
1414 // LMSG_INIT_TAG
1415 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1416 {
1417 #if     USE_LRTS_MEMPOOL
1418     CONTROL_MSG         *request_msg;
1419     gni_return_t        status = GNI_RC_SUCCESS;
1420     void                *msg_data;
1421     gni_post_descriptor_t *pd;
1422     gni_mem_handle_t    msg_mem_hndl;
1423     int source, size, transaction_size, offset = 0;
1424     int     register_size = 0;
1425     // initial a get to transfer data from the sender side */
1426     request_msg = (CONTROL_MSG *) header;
1427     size = request_msg->total_length;
1428     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool); 
1429     if(request_msg->seq_id < 2)   {
1430         msg_data = CmiAlloc(size);
1431         CmiSetMsgSeq(msg_data, 0);
1432         _MEMCHECK(msg_data);
1433     }
1434     else {
1435         offset = ONE_SEG*(request_msg->seq_id-1);
1436         msg_data = (char*)request_msg->dest_addr + offset;
1437     }
1438    
1439     MallocPostDesc(pd);
1440     pd->cqwrite_value = request_msg->seq_id;
1441     if( request_msg->seq_id == 0)
1442     {
1443         pd->local_mem_hndl= GetMemHndl(msg_data);
1444         transaction_size = ALIGN64(size);
1445         if(IsMemHndlZero(pd->local_mem_hndl))
1446         {   
1447             status = registerMempool((void*)(msg_data));
1448             if(status == GNI_RC_SUCCESS)
1449             {
1450                 pd->local_mem_hndl = GetMemHndl(msg_data);
1451             }
1452             else
1453             {
1454                 SetMemHndlZero(pd->local_mem_hndl);
1455             }
1456         }
1457         if(NoMsgInRecv( (void*)(msg_data)))
1458             register_size = GetMempoolsize((void*)(msg_data));
1459         else
1460             register_size = 0;
1461     }
1462     else{
1463         transaction_size = ALIGN64(request_msg->length);
1464         status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, transaction_size, &(pd->local_mem_hndl), &omdh);
1465         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1466         {
1467             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1468         }
1469     }
1470     pd->first_operand = ALIGN64(size);                   //  total length
1471
1472     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1473         pd->type            = GNI_POST_FMA_GET;
1474     else
1475         pd->type            = GNI_POST_RDMA_GET;
1476 #if REMOTE_EVENT
1477     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1478 #else
1479     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1480 #endif
1481     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1482     pd->length          = transaction_size;
1483     pd->local_addr      = (uint64_t) msg_data;
1484     pd->remote_addr     = request_msg->source_addr + offset;
1485     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1486     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1487     pd->rdma_mode       = 0;
1488     pd->amo_cmd         = 0;
1489
1490     //memory registration success
1491     if(status == GNI_RC_SUCCESS)
1492     {
1493 #if CMK_SMP && !COMM_THREAD_SEND
1494         CmiLock(tx_cq_lock);
1495 #endif
1496         if(pd->type == GNI_POST_RDMA_GET) 
1497             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1498         else
1499             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1500 #if CMK_SMP && !COMM_THREAD_SEND
1501         CmiUnlock(tx_cq_lock);
1502 #endif
1503          
1504         if(status == GNI_RC_SUCCESS )
1505         {
1506             if(pd->cqwrite_value == 0)
1507             {
1508 #if MACHINE_DEBUG_LOG
1509                 buffered_recv_msg += register_size;
1510                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool); 
1511 #endif
1512                 IncreaseMsgInRecv(msg_data);
1513
1514             }
1515         }
1516     }else
1517     {
1518         SetMemHndlZero(pd->local_mem_hndl);
1519     }
1520     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1521     {
1522         bufferRdmaMsg(inst_id, pd); 
1523     }else {
1524         /* printf("source: %d pd:%p\n", source, pd); */
1525         GNI_RC_CHECK("AFter posting", status);
1526     }
1527 #else
1528     CONTROL_MSG         *request_msg;
1529     gni_return_t        status;
1530     void                *msg_data;
1531     gni_post_descriptor_t *pd;
1532     RDMA_REQUEST        *rdma_request_msg;
1533     gni_mem_handle_t    msg_mem_hndl;
1534     //int source;
1535     // initial a get to transfer data from the sender side */
1536     request_msg = (CONTROL_MSG *) header;
1537     msg_data = CmiAlloc(request_msg->length);
1538     _MEMCHECK(msg_data);
1539
1540     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
1541
1542     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1543     {
1544         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1545     }
1546
1547     MallocPostDesc(pd);
1548     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1549         pd->type            = GNI_POST_FMA_GET;
1550     else
1551         pd->type            = GNI_POST_RDMA_GET;
1552 #if REMOTE_EVENT
1553     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1554 #else
1555     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1556 #endif
1557     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1558     pd->length          = ALIGN64(request_msg->length);
1559     pd->local_addr      = (uint64_t) msg_data;
1560     pd->remote_addr     = request_msg->source_addr;
1561     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1562     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1563     pd->rdma_mode       = 0;
1564     pd->amo_cmd         = 0;
1565
1566     //memory registration successful
1567     if(status == GNI_RC_SUCCESS)
1568     {
1569         pd->local_mem_hndl  = msg_mem_hndl;
1570 #if CMK_SMP && !COMM_THREAD_SEND
1571             CmiLock(tx_cq_lock);
1572 #endif
1573         if(pd->type == GNI_POST_RDMA_GET) 
1574             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1575         else
1576             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1577 #if CMK_SMP && !COMM_THREAD_SEND
1578             CmiUnlock(tx_cq_lock);
1579 #endif
1580     }else
1581     {
1582         SetMemHndlZero(pd->local_mem_hndl);
1583     }
1584     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1585     {
1586         MallocRdmaRequest(rdma_request_msg);
1587         rdma_request_msg->next = 0;
1588         rdma_request_msg->destNode = inst_id;
1589         rdma_request_msg->pd = pd;
1590         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1591     }else {
1592         GNI_RC_CHECK("AFter posting", status);
1593     }
1594 #endif
1595 }
1596
1597 static void PumpLocalRdmaTransactions()
1598 {
1599     gni_cq_entry_t          ev;
1600     gni_return_t            status;
1601     uint64_t                type, inst_id;
1602     gni_post_descriptor_t   *tmp_pd;
1603     MSG_LIST                *ptr;
1604     CONTROL_MSG             *ack_msg_tmp;
1605     uint8_t             msg_tag;
1606 #ifdef CMK_DIRECT
1607     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1608 #endif
1609     SMSG_QUEUE         *queue = &smsg_queue;
1610
1611 #if CMK_SMP && !COMM_THREAD_SEND
1612     while(1) {
1613         CmiLock(tx_cq_lock);
1614         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1615         CmiUnlock(tx_cq_lock);
1616         if(status != GNI_RC_SUCCESS)
1617             break;
1618 #else
1619     while( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1620     {
1621 #endif
1622         type = GNI_CQ_GET_TYPE(ev);
1623         if (type == GNI_CQ_EVENT_TYPE_POST)
1624         {
1625             inst_id     = GNI_CQ_GET_INST_ID(ev);
1626 #if PRINT_SYH
1627             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1628 #endif
1629 #if CMK_SMP && !COMM_THREAD_SEND
1630             CmiLock(tx_cq_lock);
1631 #endif
1632             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1633 #if CMK_SMP && !COMM_THREAD_SEND
1634             CmiUnlock(tx_cq_lock);
1635 #endif
1636 #ifdef CMK_DIRECT
1637             if(tmp_pd->amo_cmd == 1)
1638             {
1639                 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1640                 cmk_direct_done_msg->callbackFnPtr = (void*)( tmp_pd->first_operand);
1641                 cmk_direct_done_msg->recverBuf = (void*)(tmp_pd->remote_addr);
1642                 cmk_direct_done_msg->callbackData = (void*)(tmp_pd->second_operand); 
1643             }
1644             else{
1645                 MallocControlMsg(ack_msg_tmp);
1646                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1647                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1648             }
1649 #else
1650             MallocControlMsg(ack_msg_tmp);
1651             ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1652             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1653 #endif
1654             ////Message is sent, free message , put is not used now
1655             switch (tmp_pd->type) {
1656 #if CMK_PERSISTENT_COMM
1657             case GNI_POST_RDMA_PUT:
1658 #if ! USE_LRTS_MEMPOOL
1659                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1660 #endif
1661             case GNI_POST_FMA_PUT:
1662                 CmiFree((void *)tmp_pd->local_addr);
1663                 msg_tag = PUT_DONE_TAG;
1664                 break;
1665 #endif
1666 #ifdef CMK_DIRECT
1667             case GNI_POST_RDMA_PUT:
1668             case GNI_POST_FMA_PUT:
1669                 //sender ACK to receiver to trigger it is done
1670                 msg_tag = DIRECT_PUT_DONE_TAG;
1671                 break;
1672 #endif
1673             case GNI_POST_RDMA_GET:
1674             case GNI_POST_FMA_GET:
1675 #if  ! USE_LRTS_MEMPOOL
1676                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1677                 msg_tag = ACK_TAG;  
1678 #else
1679                 ack_msg_tmp->seq_id = tmp_pd->cqwrite_value;
1680                 if(ack_msg_tmp->seq_id > 0)      // BIG_MSG
1681                 {
1682                     msg_tag = BIG_MSG_TAG; 
1683                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1684                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1685                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1686                 } 
1687                 else
1688                 {
1689                     msg_tag = ACK_TAG;  
1690                     ack_msg_tmp->dest_addr = tmp_pd->local_addr;
1691                 }
1692                 ack_msg_tmp->length = tmp_pd->length;
1693                 ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1694 #endif
1695                 break;
1696             default:
1697                 CmiPrintf("type=%d\n", tmp_pd->type);
1698                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1699             }
1700 #if CMK_DIRECT
1701             if(tmp_pd->amo_cmd == 1)
1702                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1703             else
1704 #endif
1705                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
1706             if(status == GNI_RC_SUCCESS)
1707             {
1708 #if CMK_DIRECT
1709                 if(tmp_pd->amo_cmd == 1)
1710                     free(cmk_direct_done_msg); 
1711                 else
1712 #endif
1713                     FreeControlMsg(ack_msg_tmp);
1714
1715             }
1716 #if CMK_PERSISTENT_COMM
1717             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1718 #endif
1719             {
1720                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1721 #if PRINT_SYH
1722                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1723 #endif
1724 #if CMK_SMP_TRACE_COMMTHREAD
1725                     START_EVENT();
1726 #endif
1727                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1728                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1729 #if MACHINE_DEBUG_LOG
1730                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1731                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1732                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool, msg_tag); 
1733 #endif
1734 #if CMK_SMP_TRACE_COMMTHREAD
1735                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1736 #endif
1737                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1738                 }else if(msg_tag == BIG_MSG_TAG){
1739                   void *msg = (void*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1740                   CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1741                   if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1742 #if CMK_SMP_TRACE_COMMTHREAD
1743                     START_EVENT();
1744 #endif
1745 #if PRINT_SYH
1746                     printf("Pipeline msg done [%d]\n", myrank);
1747 #endif
1748 #if CMK_SMP_TRACE_COMMTHREAD
1749                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
1750 #endif
1751                     handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1752                   }
1753                 }
1754             }
1755             FreePostDesc(tmp_pd);
1756         }
1757     } //end while
1758     if(status == GNI_RC_ERROR_RESOURCE)
1759     {
1760         GNI_RC_CHECK("Smsg_tx_cq full", status);
1761     }
1762 }
1763
1764 static void  SendRdmaMsg()
1765 {
1766     gni_return_t            status = GNI_RC_SUCCESS;
1767     gni_mem_handle_t        msg_mem_hndl;
1768     RDMA_REQUEST *ptr = 0, *tmp_ptr;
1769     RDMA_REQUEST *pre = 0;
1770     int i, register_size = 0;
1771
1772 #if CMK_SMP
1773     int len = PCQueueLength(sendRdmaBuf);
1774     for (i=0; i<len; i++)
1775     {
1776         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
1777         if (ptr == NULL) break;
1778 #else
1779     ptr = sendRdmaBuf;
1780     while (ptr!=0)
1781     {
1782 #endif 
1783         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_mempool); 
1784         gni_post_descriptor_t *pd = ptr->pd;
1785         status = GNI_RC_SUCCESS;
1786         
1787         if(pd->cqwrite_value == 0)
1788         {
1789             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
1790             {
1791                 status = registerMempool((void*)(pd->local_addr));
1792                 if(status == GNI_RC_SUCCESS)
1793                 {
1794                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1795                 }
1796             }else
1797             {
1798                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1799                 status = GNI_RC_SUCCESS;
1800             }
1801             if(NoMsgInRecv( (void*)(pd->local_addr)))
1802                 register_size = GetMempoolsize((void*)(pd->local_addr));
1803             else
1804                 register_size = 0;
1805         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
1806         {
1807             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1808         }
1809         if(status == GNI_RC_SUCCESS)
1810         {
1811 #if CMK_SMP && !COMM_THREAD_SEND
1812             CmiLock(tx_cq_lock);
1813 #endif
1814             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1815                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1816             else
1817                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1818 #if CMK_SMP && !COMM_THREAD_SEND
1819             CmiUnlock(tx_cq_lock);
1820 #endif
1821         }
1822         if(status == GNI_RC_SUCCESS)
1823         {
1824 #if !CMK_SMP
1825             tmp_ptr = ptr;
1826             if(pre != 0) {
1827                 pre->next = ptr->next;
1828             }
1829             else {
1830                 sendRdmaBuf = ptr->next;
1831             }
1832             ptr = ptr->next;
1833             FreeRdmaRequest(tmp_ptr);
1834 #endif
1835             if(pd->cqwrite_value == 0)
1836             {
1837                 IncreaseMsgInRecv(((void*)(pd->local_addr)));
1838             }
1839 #if MACHINE_DEBUG_LOG
1840             buffered_recv_msg += register_size;
1841             MACHSTATE(8, "GO request from buffered\n"); 
1842 #endif
1843         }else
1844         {
1845 #if CMK_SMP
1846             PCQueuePush(sendRdmaBuf, (char*)ptr);
1847 #else
1848             pre = ptr;
1849             ptr = ptr->next;
1850 #endif
1851         }
1852     } //end while
1853 #if ! CMK_SMP
1854     sendRdmaTail = pre;
1855 #endif
1856 }
1857
1858 // return 1 if all messages are sent
1859 static int SendBufferMsg(SMSG_QUEUE *queue)
1860 {
1861     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
1862     CONTROL_MSG         *control_msg_tmp;
1863     gni_return_t        status;
1864     int                 done = 1;
1865     int                 register_size;
1866     void                *register_addr;
1867     int                 index_previous = -1;
1868     int                 index = queue->smsg_head_index;
1869 #if CMI_EXERT_SEND_CAP
1870     int                 sent_cnt = 0;
1871 #endif
1872
1873 #if ! CMK_SMP
1874     index = queue->smsg_head_index;
1875 #else
1876     index = 0;
1877 #endif
1878 #if CMK_SMP
1879     while(index <mysize)
1880     {
1881         int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
1882         for (i=0; i<len; i++) 
1883         {
1884             ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
1885             if (ptr == NULL) break;
1886 #else
1887     while(index != -1)
1888     {
1889         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
1890         pre = 0;
1891         while(ptr != 0)
1892         {
1893 #endif
1894             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_mempool, ptr->tag); 
1895             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
1896                 /* connection not exists yet */
1897               done = 0;
1898               break;
1899             }
1900             status = GNI_RC_ERROR_RESOURCE;
1901             switch(ptr->tag)
1902             {
1903             case SMALL_DATA_TAG:
1904                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
1905                 if(status == GNI_RC_SUCCESS)
1906                 {
1907                     CmiFree(ptr->msg);
1908                 }
1909                 break;
1910             case LMSG_INIT_TAG:
1911                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1912                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
1913                 if(status != GNI_RC_SUCCESS)
1914                     done = 0;
1915                 break;
1916             case   ACK_TAG:
1917             case   BIG_MSG_TAG:
1918                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
1919                 if(status == GNI_RC_SUCCESS)
1920                 {
1921                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1922                 }else
1923                     done = 0;
1924                 break;
1925 #ifdef CMK_DIRECT
1926             case DIRECT_PUT_DONE_TAG:
1927                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
1928                 if(status == GNI_RC_SUCCESS)
1929                 {
1930                     free((CMK_DIRECT_HEADER*)ptr->msg);
1931                 }
1932                 break;
1933
1934 #endif
1935             default:
1936                 printf("Weird tag\n");
1937                 CmiAbort("should not happen\n");
1938             }
1939             if(status == GNI_RC_SUCCESS)
1940             {
1941 #if !CMK_SMP
1942                 tmp_ptr = ptr;
1943                 if(pre)
1944                 {
1945                     ptr = pre ->next = ptr->next;
1946                 }else
1947                 {
1948                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
1949                 }
1950                 FreeMsgList(tmp_ptr);
1951 #else
1952                 FreeMsgList(ptr);
1953 #endif
1954 #if PRINT_SYH
1955                 buffered_smsg_counter--;
1956                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
1957 #endif
1958 #if CMI_EXERT_SEND_CAP
1959                 sent_cnt++;
1960                 if(sent_cnt == SEND_CAP)
1961                     break;
1962 #endif
1963             }else {
1964 #if CMK_SMP
1965                 PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
1966 #else
1967                 pre = ptr;
1968                 ptr=ptr->next;
1969 #endif
1970                 done = 0;
1971             } 
1972         } //end while
1973 #if !CMK_SMP
1974         queue->smsg_msglist_index[index].tail = pre;
1975         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
1976         {
1977             if(index_previous != -1)
1978                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
1979             else
1980                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
1981         }else
1982         {
1983             index_previous = index;
1984         }
1985         index = queue->smsg_msglist_index[index].next;
1986 #else
1987         index++;
1988 #endif
1989
1990 #if CMI_EXERT_SEND_CAP
1991         if(sent_cnt == SEND_CAP)
1992                 break;
1993 #endif
1994     }   // end pooling for all cores
1995     return done;
1996 }
1997
1998 void LrtsAdvanceCommunication(int whileidle)
1999 {
2000     /*  Receive Msg first */
2001     MACHSTATE(8, "calling advance comm \n") ; 
2002 #if CMK_SMP_TRACE_COMMTHREAD
2003     double startT, endT;
2004 #endif
2005     if (useDynamicSMSG && whileidle)
2006     {
2007 #if CMK_SMP_TRACE_COMMTHREAD
2008         startT = CmiWallTimer();
2009 #endif
2010         PumpDatagramConnection();
2011 #if CMK_SMP_TRACE_COMMTHREAD
2012         endT = CmiWallTimer();
2013         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2014 #endif
2015     }
2016
2017 #if CMK_SMP_TRACE_COMMTHREAD
2018     startT = CmiWallTimer();
2019 #endif
2020     PumpNetworkSmsg();
2021     MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2022 #if CMK_SMP_TRACE_COMMTHREAD
2023     endT = CmiWallTimer();
2024     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2025 #endif
2026
2027 #if CMK_SMP_TRACE_COMMTHREAD
2028     startT = CmiWallTimer();
2029 #endif
2030     PumpLocalRdmaTransactions();
2031     MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2032 #if CMK_SMP_TRACE_COMMTHREAD
2033     endT = CmiWallTimer();
2034     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2035 #endif
2036     /* Send buffered Message */
2037 #if CMK_SMP_TRACE_COMMTHREAD
2038     startT = CmiWallTimer();
2039 #endif
2040 #if USE_OOB
2041     SendBufferMsg(&smsg_oob_queue);
2042 #endif
2043     SendBufferMsg(&smsg_queue);
2044     MACHSTATE(8, "after SendBufferMsg\n") ; 
2045 #if CMK_SMP_TRACE_COMMTHREAD
2046     endT = CmiWallTimer();
2047     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2048 #endif
2049
2050 #if CMK_SMP_TRACE_COMMTHREAD
2051     startT = CmiWallTimer();
2052 #endif
2053     SendRdmaMsg();
2054     MACHSTATE(8, "after SendRdmaMsg\n") ; 
2055 #if CMK_SMP_TRACE_COMMTHREAD
2056     endT = CmiWallTimer();
2057     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2058 #endif
2059 }
2060
2061 /* useDynamicSMSG */
2062 static void _init_dynamic_smsg()
2063 {
2064     gni_return_t status;
2065     uint32_t     vmdh_index = -1;
2066     int i;
2067
2068     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2069     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2070     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2071     for(i=0; i<mysize; i++) {
2072         smsg_connected_flag[i] = 0;
2073         smsg_attr_vector_local[i] = NULL;
2074         smsg_attr_vector_remote[i] = NULL;
2075     }
2076     if(mysize <=512)
2077     {
2078         SMSG_MAX_MSG = 4096;
2079     }else if (mysize <= 4096)
2080     {
2081         SMSG_MAX_MSG = 4096/mysize * 1024;
2082     }else if (mysize <= 16384)
2083     {
2084         SMSG_MAX_MSG = 512;
2085     }else {
2086         SMSG_MAX_MSG = 256;
2087     }
2088
2089     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2090     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2091     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2092     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2093     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2094
2095     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2096     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2097     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2098     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2099     mailbox_list->offset = 0;
2100     mailbox_list->next = 0;
2101     
2102     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2103         mailbox_list->size, smsg_rx_cqh,
2104         GNI_MEM_READWRITE,   
2105         vmdh_index,
2106         &(mailbox_list->mem_hndl));
2107     GNI_RC_CHECK("MEMORY registration for smsg", status);
2108
2109     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2110     GNI_RC_CHECK("Unbound EP", status);
2111     
2112     alloc_smsg_attr(&send_smsg_attr);
2113
2114     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2115     GNI_RC_CHECK("post unbound datagram", status);
2116
2117       /* always pre-connect to proc 0 */
2118     //if (myrank != 0) connect_to(0);
2119 }
2120
2121 static void _init_static_smsg()
2122 {
2123     gni_smsg_attr_t      *smsg_attr;
2124     gni_smsg_attr_t      remote_smsg_attr;
2125     gni_smsg_attr_t      *smsg_attr_vec;
2126     gni_mem_handle_t     my_smsg_mdh_mailbox;
2127     int      ret, i;
2128     gni_return_t status;
2129     uint32_t              vmdh_index = -1;
2130     mdh_addr_t            base_infor;
2131     mdh_addr_t            *base_addr_vec;
2132     char *env;
2133
2134     if(mysize <=512)
2135     {
2136         SMSG_MAX_MSG = 1024;
2137     }else if (mysize <= 4096)
2138     {
2139         SMSG_MAX_MSG = 1024;
2140     }else if (mysize <= 16384)
2141     {
2142         SMSG_MAX_MSG = 512;
2143     }else {
2144         SMSG_MAX_MSG = 256;
2145     }
2146     
2147     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2148     if (env) SMSG_MAX_MSG = atoi(env);
2149     CmiAssert(SMSG_MAX_MSG > 0);
2150
2151     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2152     
2153     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2154     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2155     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2156     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2157     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2158     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2159     CmiAssert(ret == 0);
2160     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2161     //if (myrank == 0) printf("Charm++> allocates %.2fMB for SMSG mailbox. \n", smsg_memlen*mysize/1e6);
2162     
2163     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2164             smsg_memlen*(mysize), smsg_rx_cqh,
2165             GNI_MEM_READWRITE,   
2166             vmdh_index,
2167             &my_smsg_mdh_mailbox);
2168
2169     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2170
2171     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2172     base_infor.mdh =  my_smsg_mdh_mailbox;
2173     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2174
2175     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2176  
2177     for(i=0; i<mysize; i++)
2178     {
2179         if(i==myrank)
2180             continue;
2181         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2182         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2183         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2184         smsg_attr[i].mbox_offset = i*smsg_memlen;
2185         smsg_attr[i].buff_size = smsg_memlen;
2186         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2187         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2188     }
2189
2190     for(i=0; i<mysize; i++)
2191     {
2192         if (myrank == i) continue;
2193
2194         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2195         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2196         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2197         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2198         remote_smsg_attr.buff_size = smsg_memlen;
2199         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2200         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2201
2202         /* initialize the smsg channel */
2203         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2204         GNI_RC_CHECK("SMSG Init", status);
2205     } //end initialization
2206
2207     free(base_addr_vec);
2208     free(smsg_attr);
2209
2210     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2211     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2212
2213
2214 inline
2215 static void _init_send_queue(SMSG_QUEUE *queue)
2216 {
2217      int i;
2218      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2219      for(i =0; i<mysize; i++)
2220      {
2221         queue->smsg_msglist_index[i].next = -1;
2222 #if CMK_SMP
2223         queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2224 #else
2225         queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2226 #endif
2227         
2228      }
2229      queue->smsg_head_index = -1;
2230 }
2231
2232 inline
2233 static void _init_smsg()
2234 {
2235     if(mysize > 1) {
2236         if (useDynamicSMSG)
2237             _init_dynamic_smsg();
2238         else
2239             _init_static_smsg();
2240     }
2241
2242     _init_send_queue(&smsg_queue);
2243 #if USE_OOB
2244     _init_send_queue(&smsg_oob_queue);
2245 #endif
2246 }
2247
2248 static void _init_static_msgq()
2249 {
2250     gni_return_t status;
2251     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2252     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2253     msgq_attrs.smsg_q_sz = 1;
2254     msgq_attrs.rcv_pool_sz = 1;
2255     msgq_attrs.num_msgq_eps = 2;
2256     msgq_attrs.nloc_insts = 8;
2257     msgq_attrs.modes = 0;
2258     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2259
2260     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2261     GNI_RC_CHECK("MSGQ Init", status);
2262
2263
2264 }
2265
2266 #if CMK_SMP && STEAL_MEMPOOL
2267 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2268 {
2269     void *pool = NULL;
2270     int i, k;
2271     // check other ranks
2272     for (k=0; k<CmiMyNodeSize()+1; k++) {
2273         i = (CmiMyRank()+k)%CmiMyNodeSize();
2274         if (i==CmiMyRank()) continue;
2275         mempool_type *mptr = CpvAccessOther(mempool, i);
2276         CmiLock(mptr->mempoolLock);
2277         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2278         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2279             CmiUnlock(mptr->mempoolLock);
2280             continue;
2281         }
2282         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2283         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2284             /* search in the free list */
2285           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2286           mempool_header *current = free_header;
2287           while (current) {
2288             if (current->next_free == (char*)header-(char*)mptr) break;
2289             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2290           }
2291           if (current == NULL) {         /* not found in free list */
2292             CmiUnlock(mptr->mempoolLock);
2293             continue;
2294           }
2295 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2296             /* search the previous memblock, and remove the tail */
2297           mempool_block *ptr = (mempool_block *)mptr;
2298           while (ptr) {
2299             if (ptr->memblock_next == mptr->memblock_tail) break;
2300             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2301           }
2302           CmiAssert(ptr!=NULL);
2303           ptr->memblock_next = 0;
2304           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2305
2306             /* remove memblock from the free list */
2307           current->next_free = header->next_free;
2308           if (header == free_header) mptr->freelist_head = header->next_free;
2309
2310           CmiUnlock(mptr->mempoolLock);
2311
2312           pool = (void*)tail;
2313           *mem_hndl = tail->mem_hndl;
2314           *size = tail->size;
2315           return pool;
2316         }
2317         CmiUnlock(mptr->mempoolLock);
2318     }
2319
2320       /* steal failed, deregister and free memblock now */
2321     int freed = 0;
2322     for (k=0; k<CmiMyNodeSize()+1; k++) {
2323         i = (CmiMyRank()+k)%CmiMyNodeSize();
2324         mempool_type *mptr = CpvAccessOther(mempool, i);
2325         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2326
2327         mempool_block *mempools_head = &(mptr->mempools_head);
2328         mempool_block *current = mempools_head;
2329         mempool_block *prev = NULL;
2330
2331         while (current) {
2332           int isfree = 0;
2333           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2334 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2335           mempool_header *cur = free_header;
2336           mempool_header *header;
2337           if (current != mempools_head) {
2338             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2339              /* search in free list */
2340             if (header->size == current->size - sizeof(mempool_block)) {
2341               cur = free_header;
2342               while (cur) {
2343                 if (cur->next_free == (char*)header-(char*)mptr) break;
2344                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2345               }
2346               if (cur != NULL) isfree = 1;
2347             }
2348           }
2349           if (isfree) {
2350               /* remove from free list */
2351             cur->next_free = header->next_free;
2352             if (header == free_header) mptr->freelist_head = header->next_free;
2353              // deregister
2354             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh);
2355             GNI_RC_CHECK("steal Mempool de-register", status);
2356             mempool_block *ptr = current;
2357             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2358             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2359 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2360             freed += ptr->size;
2361             free(ptr);
2362              // try now
2363             if (freed > *size) {
2364               if (pool == NULL) {
2365                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2366                 CmiAssert(ret == 0);
2367               }
2368               status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh);
2369               if (status == GNI_RC_SUCCESS) {
2370                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2371 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2372                 return pool;
2373               }
2374 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2375             }
2376           }
2377           else {
2378              prev = current;
2379              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2380           }
2381         }
2382
2383         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2384     }
2385       /* still no luck registering pool */
2386     if (pool) free(pool);
2387     return NULL;
2388 }
2389 #endif
2390 static long long int total_mempool_size = 0;
2391 static long long int total_mempool_calls = 0;
2392 #if USE_LRTS_MEMPOOL
2393 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2394 {
2395     void *pool;
2396     int ret;
2397
2398     int default_size =  expand_flag? _expand_mem : _mempool_size;
2399     if (*size < default_size) *size = default_size;
2400     total_mempool_size += *size;
2401     total_mempool_calls += 1;
2402     ret = posix_memalign(&pool, ALIGNBUF, *size);
2403     if (ret != 0) {
2404 #if CMK_SMP && STEAL_MEMPOOL
2405       pool = steal_mempool_block(size, mem_hndl);
2406       if (pool != NULL) return pool;
2407 #endif
2408       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2409       if (ret == ENOMEM)
2410         CmiAbort("alloc_mempool_block: out of memory.");
2411       else
2412         CmiAbort("alloc_mempool_block: posix_memalign failed");
2413     }
2414         SetMemHndlZero((*mem_hndl));
2415     
2416     return pool;
2417 }
2418
2419 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2420 {
2421     if(!(IsMemHndlZero(mem_hndl)))
2422     {
2423         gni_return_t status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2424         GNI_RC_CHECK("free_mempool_block Mempool de-register", status);
2425     }
2426     free(ptr);
2427 }
2428 #endif
2429
2430 void LrtsPreCommonInit(int everReturn){
2431 #if USE_LRTS_MEMPOOL
2432     CpvInitialize(mempool_type*, mempool);
2433     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block);
2434     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2435 #endif
2436 }
2437
2438
2439 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2440 {
2441     register int            i;
2442     int                     rc;
2443     int                     device_id = 0;
2444     unsigned int            remote_addr;
2445     gni_cdm_handle_t        cdm_hndl;
2446     gni_return_t            status = GNI_RC_SUCCESS;
2447     uint32_t                vmdh_index = -1;
2448     uint8_t                 ptag;
2449     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2450     int                     first_spawned;
2451     int                     physicalID;
2452     char                   *env;
2453
2454     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2455     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2456     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2457    
2458     status = PMI_Init(&first_spawned);
2459     GNI_RC_CHECK("PMI_Init", status);
2460
2461     status = PMI_Get_size(&mysize);
2462     GNI_RC_CHECK("PMI_Getsize", status);
2463
2464     status = PMI_Get_rank(&myrank);
2465     GNI_RC_CHECK("PMI_getrank", status);
2466
2467     //physicalID = CmiPhysicalNodeID(myrank);
2468     
2469     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2470
2471     *myNodeID = myrank;
2472     *numNodes = mysize;
2473   
2474 #if !COMM_THREAD_SEND
2475     /* Currently, we only consider the case that comm. thread will only recv msgs */
2476     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2477 #endif
2478
2479     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2480     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2481     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2482
2483     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2484     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2485     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2486
2487     useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2488     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2489     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2490     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2491     
2492     if(myrank == 0)
2493     {
2494         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2495         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2496     }
2497 #ifdef USE_ONESIDED
2498     onesided_init(NULL, &onesided_hnd);
2499
2500     // this is a GNI test, so use the libonesided bypass functionality
2501     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2502     local_addr = gniGetNicAddress();
2503 #else
2504     ptag = get_ptag();
2505     cookie = get_cookie();
2506 #if 0
2507     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2508 #endif
2509     //Create and attach to the communication  domain */
2510     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2511     GNI_RC_CHECK("GNI_CdmCreate", status);
2512     //* device id The device id is the minor number for the device
2513     //that is assigned to the device by the system when the device is created.
2514     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2515     //where X is the device number 0 default 
2516     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2517     GNI_RC_CHECK("GNI_CdmAttach", status);
2518     local_addr = get_gni_nic_address(0);
2519 #endif
2520     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2521     _MEMCHECK(MPID_UGNI_AllAddr);
2522     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2523     /* create the local completion queue */
2524     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2525     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2526     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2527     
2528     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2529     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2530     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2531
2532     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2533     GNI_RC_CHECK("Create CQ (rx)", status);
2534     
2535     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2536     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2537     
2538     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2539     //GNI_RC_CHECK("Create BTE CQ", status);
2540
2541     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2542     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2543     _MEMCHECK(ep_hndl_array);
2544 #if CMK_SMP && !COMM_THREAD_SEND
2545     tx_cq_lock  = CmiCreateLock();
2546     rx_cq_lock  = CmiCreateLock();
2547 #endif
2548     for (i=0; i<mysize; i++) {
2549         if(i == myrank) continue;
2550         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2551         GNI_RC_CHECK("GNI_EpCreate ", status);   
2552         remote_addr = MPID_UGNI_AllAddr[i];
2553         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2554         GNI_RC_CHECK("GNI_EpBind ", status);   
2555     }
2556
2557     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2558     _init_smsg();
2559     PMI_Barrier();
2560
2561 #if     USE_LRTS_MEMPOOL
2562     env = getenv("CHARM_UGNI_MEMPOOL_SIZE");
2563     if (env) _mempool_size = CmiReadSize(env);
2564     if (CmiGetArgStringDesc(*argv,"+useMemorypoolSize",&env,"Set the memory pool size")) 
2565         _mempool_size = CmiReadSize(env);
2566
2567     if (myrank==0) printf("Charm++> memory pool size: %1.fMB\n", _mempool_size/1024.0/1024);
2568
2569     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2570     if (env) {
2571         MAX_REG_MEM = CmiReadSize(env);
2572         if (myrank==0) printf("Charm++> memory pool maximum size: %1.fMB\n", MAX_REG_MEM/1024.0/1024);
2573     }
2574
2575     env = getenv("CHARM_UGNI_SEND_MAX");
2576     if (env) {
2577         MAX_BUFF_SEND = CmiReadSize(env);
2578         if (myrank==0) printf("Charm++> maximum pending memory pool for sending: %1.fMB\n", MAX_BUFF_SEND/1024.0/1024);
2579     }
2580
2581     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2582     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2583 #endif
2584
2585     /* init DMA buffer for medium message */
2586
2587     //_init_DMA_buffer();
2588     
2589     free(MPID_UGNI_AllAddr);
2590 #if CMK_SMP
2591     sendRdmaBuf = PCQueueCreate();
2592 #else
2593     sendRdmaBuf = 0;
2594 #endif
2595 #if MACHINE_DEBUG_LOG
2596     char ln[200];
2597     sprintf(ln,"debugLog.%d",myrank);
2598     debugLog=fopen(ln,"w");
2599 #endif
2600
2601 }
2602
2603 void* LrtsAlloc(int n_bytes, int header)
2604 {
2605     void *ptr;
2606 #if 0
2607     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2608 #endif
2609     if(n_bytes <= SMSG_MAX_MSG)
2610     {
2611         int totalsize = n_bytes+header;
2612         ptr = malloc(totalsize);
2613     }
2614     else {
2615         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
2616 #if     USE_LRTS_MEMPOOL
2617         n_bytes = ALIGN64(n_bytes);
2618         if(n_bytes <= BIG_MSG)
2619         {
2620             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
2621             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
2622         }else 
2623         {
2624             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2625             ptr = res + ALIGNBUF - header;
2626
2627         }
2628 #else
2629         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
2630         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2631         ptr = res + ALIGNBUF - header;
2632 #endif
2633     }
2634     return ptr;
2635 }
2636
2637 void  LrtsFree(void *msg)
2638 {
2639     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
2640     if (size <= SMSG_MAX_MSG)
2641       free(msg);
2642     else if(size>BIG_MSG)
2643     {
2644         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2645
2646     }else
2647     {
2648 #if     USE_LRTS_MEMPOOL
2649 #if CMK_SMP
2650         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2651 #else
2652         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2653 #endif
2654 #else
2655         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2656 #endif
2657     }
2658 }
2659
2660 void LrtsExit()
2661 {
2662     /* free memory ? */
2663 #if USE_LRTS_MEMPOOL
2664     mempool_destroy(CpvAccess(mempool));
2665 #endif
2666     PMI_Finalize();
2667     exit(0);
2668 }
2669
2670 void LrtsDrainResources()
2671 {
2672     if(mysize == 1) return;
2673     while (
2674 #if USE_OOB
2675            !SendBufferMsg(&smsg_oob_queue) ||
2676 #endif
2677            !SendBufferMsg(&smsg_queue))
2678     {
2679         if (useDynamicSMSG)
2680             PumpDatagramConnection();
2681         PumpNetworkSmsg();
2682         PumpLocalRdmaTransactions();
2683         SendRdmaMsg();
2684     }
2685     PMI_Barrier();
2686 }
2687
2688 void LrtsAbort(const char *message) {
2689     printf("CmiAbort is calling on PE:%d\n", myrank);
2690     CmiPrintStackTrace(0);
2691     PMI_Abort(-1, message);
2692 }
2693
2694 /**************************  TIMER FUNCTIONS **************************/
2695 #if CMK_TIMER_USE_SPECIAL
2696 /* MPI calls are not threadsafe, even the timer on some machines */
2697 static CmiNodeLock  timerLock = 0;
2698 static int _absoluteTime = 0;
2699 static int _is_global = 0;
2700 static struct timespec start_ts;
2701
2702 inline int CmiTimerIsSynchronized() {
2703     return 0;
2704 }
2705
2706 inline int CmiTimerAbsolute() {
2707     return _absoluteTime;
2708 }
2709
2710 double CmiStartTimer() {
2711     return 0.0;
2712 }
2713
2714 double CmiInitTime() {
2715     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
2716 }
2717
2718 void CmiTimerInit(char **argv) {
2719     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
2720     if (_absoluteTime && CmiMyPe() == 0)
2721         printf("Charm++> absolute  timer is used\n");
2722     
2723     _is_global = CmiTimerIsSynchronized();
2724
2725
2726     if (_is_global) {
2727         if (CmiMyRank() == 0) {
2728             clock_gettime(CLOCK_MONOTONIC, &start_ts);
2729         }
2730     } else { /* we don't have a synchronous timer, set our own start time */
2731         CmiBarrier();
2732         CmiBarrier();
2733         CmiBarrier();
2734         clock_gettime(CLOCK_MONOTONIC, &start_ts);
2735     }
2736     CmiNodeAllBarrier();          /* for smp */
2737 }
2738
2739 /**
2740  * Since the timerLock is never created, and is
2741  * always NULL, then all the if-condition inside
2742  * the timer functions could be disabled right
2743  * now in the case of SMP.
2744  */
2745 double CmiTimer(void) {
2746     struct timespec now_ts;
2747     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2748     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2749         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2750 }
2751
2752 double CmiWallTimer(void) {
2753     struct timespec now_ts;
2754     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2755     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2756         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
2757 }
2758
2759 double CmiCpuTimer(void) {
2760     struct timespec now_ts;
2761     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2762     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2763         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2764 }
2765
2766 #endif
2767 /************Barrier Related Functions****************/
2768
2769 int CmiBarrier()
2770 {
2771     int status;
2772
2773 #if CMK_SMP
2774     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
2775     CmiNodeAllBarrier();
2776     if (CmiMyRank() == CmiMyNodeSize())
2777 #else
2778     if (CmiMyRank() == 0)
2779 #endif
2780     {
2781         /**
2782          *  The call of CmiBarrier is usually before the initialization
2783          *  of trace module of Charm++, therefore, the START_EVENT
2784          *  and END_EVENT are disabled here. -Chao Mei
2785          */
2786         /*START_EVENT();*/
2787         status = PMI_Barrier();
2788         GNI_RC_CHECK("PMI_Barrier", status);
2789         /*END_EVENT(10);*/
2790     }
2791     CmiNodeAllBarrier();
2792     return status;
2793
2794 }
2795 #if CMK_DIRECT
2796 #include "machine-cmidirect.c"
2797 #endif
2798 #if CMK_PERSISTENT_COMM
2799 #include "machine-persistent.c"
2800 #endif
2801
2802