Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  */
10 /*@{*/
11
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <stdint.h>
15 #include <errno.h>
16 #include <malloc.h>
17 #include <unistd.h>
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21
22 #include "converse.h"
23 #include <time.h>
24
25 #define PRINT_SYH  0
26 // Trace communication thread
27 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
28 #define TRACE_THRESHOLD     0.00005
29 #define CMI_MPI_TRACE_MOREDETAILED 0
30 #undef CMI_MPI_TRACE_USEREVENTS
31 #define CMI_MPI_TRACE_USEREVENTS 1
32 #else
33 #undef CMK_SMP_TRACE_COMMTHREAD
34 #define CMK_SMP_TRACE_COMMTHREAD 0
35 #endif
36
37 #define CMK_TRACE_COMMOVERHEAD 0
38 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
39 #undef CMI_MPI_TRACE_USEREVENTS
40 #define CMI_MPI_TRACE_USEREVENTS 1
41 #else
42 #undef CMK_TRACE_COMMOVERHEAD
43 #define CMK_TRACE_COMMOVERHEAD 0
44 #endif
45
46 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
47 CpvStaticDeclare(double, projTraceStart);
48 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
49 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
50 #else
51 #define  START_EVENT()
52 #define  END_EVENT(x)
53 #endif
54
55 #define CMI_EXERT_SEND_CAP      0
56 #define CMI_EXERT_RECV_CAP      0
57
58 #if CMI_EXERT_SEND_CAP
59 #define SEND_CAP 16
60 #endif
61
62 #if CMI_EXERT_RECV_CAP
63 #define RECV_CAP 2
64 #endif
65
66
67
68 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
69
70 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
71
72 static int avg_smsg_connection = 32;
73 static int                 *smsg_connected_flag= 0;
74 static gni_smsg_attr_t     **smsg_attr_vector_local;
75 static gni_smsg_attr_t     **smsg_attr_vector_remote;
76 static gni_ep_handle_t     ep_hndl_unbound;
77 static gni_smsg_attr_t     send_smsg_attr;
78 static gni_smsg_attr_t     recv_smsg_attr;
79
80 typedef struct _dynamic_smsg_mailbox{
81    void     *mailbox_base;
82    int      size;
83    int      offset;
84    gni_mem_handle_t  mem_hndl;
85    struct      _dynamic_smsg_mailbox  *next;
86 }dynamic_smsg_mailbox_t;
87
88 static dynamic_smsg_mailbox_t  *mailbox_list;
89
90 #define REMOTE_EVENT                      0
91 #define USE_LRTS_MEMPOOL                  1
92
93 #if USE_LRTS_MEMPOOL
94 #if CMK_SMP
95 #define STEAL_MEMPOOL                     0
96 #endif
97
98 #define oneMB (1024ll*1024)
99 #if CMK_SMP
100 static CmiInt8 _mempool_size = 8*oneMB;
101 #else
102 static CmiInt8 _mempool_size = 32*oneMB;
103 #endif
104 static CmiInt8 _expand_mem =  4*oneMB;
105 #endif
106
107 //Dynamic flow control about memory registration
108 #define  MAX_BUFF_SEND      512*oneMB
109 static CmiInt8 buffered_send_msg = 0;
110
111 #define BIG_MSG                  16*oneMB
112 #define ONE_SEG                  4*oneMB
113 #define BIG_MSG_PIPELINE         4
114
115 #if CMK_SMP
116 #define COMM_THREAD_SEND 1
117 #endif
118 int         rdma_id = 0;
119 #if PRINT_SYH
120 int         lrts_smsg_success = 0;
121 int         lrts_send_msg_id = 0;
122 int         lrts_send_rdma_success = 0;
123 int         lrts_received_msg = 0;
124 int         lrts_local_done_msg = 0;
125
126 #endif
127 #include "machine.h"
128
129 #include "pcqueue.h"
130
131 #include "mempool.h"
132
133 #if CMK_PERSISTENT_COMM
134 #include "machine-persistent.h"
135 #endif
136
137 //#define  USE_ONESIDED 1
138 #ifdef USE_ONESIDED
139 //onesided implementation is wrong, since no place to restore omdh
140 #include "onesided.h"
141 onesided_hnd_t   onesided_hnd;
142 onesided_md_t    omdh;
143 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
144
145 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
146
147 #else
148 uint8_t   onesided_hnd, omdh;
149 #if REMOTE_EVENT
150 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
151 #else
152 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
153 #endif
154 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
155 #endif
156 #define   IncreaseMsgInRecv(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)++
157 #define   DecreaseMsgInRecv(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)--
158 #define   IncreaseMsgInSend(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)++
159                                    //printf("++++[%d]%p   ----- %d\n", myrank, ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
160 #define   DecreaseMsgInSend(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)--
161                                      //printf("---[%d]%p   ----- %d\n", myrank,((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
162 #define   GetMempooladdr(x)  ((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr
163 #define   GetMempoolsize(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->size
164 #define   GetMemHndl(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mem_hndl
165 #define   GetMemHndlFromHeader(x) ((block_header*)x)->mem_hndl
166 #define   GetSizeFromHeader(x) ((block_header*)x)->size
167 #define   NoMsgInSend(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send == 0
168 #define   NoMsgInFlight(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send + ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv  == 0
169 #define   IsMemHndlZero(x)  (x.qword1 == 0 && x.qword2 == 0)
170 #define   SetMemHndlZero(x)  x.qword1 = 0; x.qword2 = 0
171 #define   NotRegistered(x)  IsMemHndlZero(((block_header*)x)->mem_hndl)
172
173 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
174 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
175 #define CmiGetMsgSeq(m)  ((CmiMsgHeaderExt*)m)->seq
176 #define CmiSetMsgSeq(m, s)  ((((CmiMsgHeaderExt*)m)->seq) = (s))
177
178 #define ALIGNBUF                64
179
180 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
181 /* If SMSG is not used */
182
183 #define FMA_PER_CORE  1024
184 #define FMA_BUFFER_SIZE 1024
185
186 /* If SMSG is used */
187 static int  SMSG_MAX_MSG = 1024;
188 #define SMSG_MAX_CREDIT 72 
189
190 #define MSGQ_MAXSIZE       2048
191 /* large message transfer with FMA or BTE */
192 #define LRTS_GNI_RDMA_THRESHOLD  1024 
193
194 #if CMK_SMP
195 static int  REMOTE_QUEUE_ENTRIES=163840; 
196 static int LOCAL_QUEUE_ENTRIES=163840; 
197 #else
198 static int  REMOTE_QUEUE_ENTRIES=20480;
199 static int LOCAL_QUEUE_ENTRIES=20480; 
200 #endif
201
202 #define BIG_MSG_TAG  0x26
203 #define PUT_DONE_TAG      0x28
204 #define DIRECT_PUT_DONE_TAG      0x29
205 #define ACK_TAG           0x30
206 /* SMSG is data message */
207 #define SMALL_DATA_TAG          0x31
208 /* SMSG is a control message to initialize a BTE */
209 #define MEDIUM_HEAD_TAG         0x32
210 #define MEDIUM_DATA_TAG         0x33
211 #define LMSG_INIT_TAG           0x39 
212 #define VERY_LMSG_INIT_TAG      0x40 
213 #define VERY_LMSG_TAG           0x41 
214
215 #define DEBUG
216 #ifdef GNI_RC_CHECK
217 #undef GNI_RC_CHECK
218 #endif
219 #ifdef DEBUG
220 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
221 #else
222 #define GNI_RC_CHECK(msg,rc)
223 #endif
224
225 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
226 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
227
228 static int useStaticMSGQ = 0;
229 static int useStaticFMA = 0;
230 static int mysize, myrank;
231 gni_nic_handle_t      nic_hndl;
232
233 typedef struct {
234     gni_mem_handle_t mdh;
235     uint64_t addr;
236 } mdh_addr_t ;
237 // this is related to dynamic SMSG
238
239 typedef struct mdh_addr_list{
240     gni_mem_handle_t mdh;
241    void *addr;
242     struct mdh_addr_list *next;
243 }mdh_addr_list_t;
244
245 static unsigned int         smsg_memlen;
246 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
247 mdh_addr_t          setup_mem;
248 mdh_addr_t          *smsg_connection_vec = 0;
249 gni_mem_handle_t    smsg_connection_memhndl;
250 static int          smsg_expand_slots = 10;
251 static int          smsg_available_slot = 0;
252 static void         *smsg_mailbox_mempool = 0;
253 mdh_addr_list_t     *smsg_dynamic_list = 0;
254
255 static void             *smsg_mailbox_base;
256 gni_msgq_attr_t         msgq_attrs;
257 gni_msgq_handle_t       msgq_handle;
258 gni_msgq_ep_attr_t      msgq_ep_attrs;
259 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
260
261 /* =====Beginning of Declarations of Machine Specific Variables===== */
262 static int cookie;
263 static int modes = 0;
264 static gni_cq_handle_t       smsg_rx_cqh = NULL;
265 static gni_cq_handle_t       smsg_tx_cqh = NULL;
266 static gni_cq_handle_t       post_rx_cqh = NULL;
267 static gni_cq_handle_t       post_tx_cqh = NULL;
268 static gni_ep_handle_t       *ep_hndl_array;
269 #if CMK_SMP && !COMM_THREAD_SEND
270 static CmiNodeLock           *ep_lock_array;
271 static CmiNodeLock           tx_cq_lock; 
272 static CmiNodeLock           rx_cq_lock; 
273 #endif
274 typedef struct msg_list
275 {
276     uint32_t destNode;
277     uint32_t size;
278     void *msg;
279     uint8_t tag;
280 #if !CMK_SMP
281     struct msg_list *next;
282 #endif
283 }MSG_LIST;
284
285
286 typedef struct control_msg
287 {
288     uint64_t            source_addr;    /* address from the start of buffer  */
289     uint64_t            dest_addr;      /* address from the start of buffer */
290     //int                 source;         /* source rank */
291     int                 total_length;   /* total length */
292     int                 length;         /* length of this packet */
293     uint8_t             seq_id;         //big message   0 meaning single message
294     gni_mem_handle_t    source_mem_hndl;
295     struct control_msg *next;
296 }CONTROL_MSG;
297
298 #ifdef CMK_DIRECT
299 typedef struct{
300     void *recverBuf;
301     void (*callbackFnPtr)(void *);
302     void *callbackData;
303 }CMK_DIRECT_HEADER;
304 #endif
305 typedef struct  rmda_msg
306 {
307     int                   destNode;
308     gni_post_descriptor_t *pd;
309 #if !CMK_SMP
310     struct  rmda_msg      *next;
311 #endif
312 }RDMA_REQUEST;
313
314 #if CMK_SMP
315 PCQueue sendRdmaBuf;
316 typedef struct  msg_list_index
317 {
318     int         next;
319     PCQueue     sendSmsgBuf;
320 } MSG_LIST_INDEX;
321 #else
322 RDMA_REQUEST        *sendRdmaBuf;
323 RDMA_REQUEST        *sendRdmaTail;
324 typedef struct  msg_list_index
325 {
326     int         next;
327     MSG_LIST    *sendSmsgBuf;
328     MSG_LIST    *tail;
329 } MSG_LIST_INDEX;
330 #endif
331 /* reuse PendingMsg memory */
332 static CONTROL_MSG          *control_freelist=0;
333 static MSG_LIST             *msglist_freelist=0;
334 static int                  smsg_head_index = -1;
335 static MSG_LIST_INDEX       *smsg_msglist_index= 0;
336 static MSG_LIST             *smsg_free_head=0;
337 static MSG_LIST             *smsg_free_tail=0;
338
339 /*
340 #define FreeMsgList(msg_head, msg_tail, free_head, free_tail)       \
341     if(free_head == 0)  free_head = free_tail = msg_head;    \
342     else   free_tail = free_tail->next;    \
343     if( msg_head->next == msg_tail) msg_head =0;   \
344     else msg_head= msg_head->next;    
345
346 #define MallocMsgList(d, msg_head, msg_tail, free_head, free_tail, msgsize) \
347     if(free_head == 0) {d= malloc(msgsize);  \
348         if(msg_head == 0)   msg_head =msg_tail = msg_head->next = msg_tail->next = d; \
349         else { msg_tail->next = d; d->next = msg_head; msg_tail=d;} \
350     }else {d = free_head; free_head = free_head->next; if(free_tail->next == free_head) free_head =0;} \
351 */
352
353 #if CMK_SMP
354
355 #define FreeMsgList(d)   free(d);
356 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
357
358 #else
359
360 #define FreeMsgList(d)  \
361   (d)->next = msglist_freelist;\
362   msglist_freelist = d;
363
364 #define MallocMsgList(d) \
365   d = msglist_freelist;\
366   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
367              _MEMCHECK(d);\
368   } else msglist_freelist = d->next; \
369   d->next =0;
370 #endif
371
372 #if CMK_SMP
373
374 #define FreeControlMsg(d)      free(d);
375 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
376
377 #else
378
379 #define FreeControlMsg(d)       \
380   (d)->next = control_freelist;\
381   control_freelist = d;
382
383 #define MallocControlMsg(d) \
384   d = control_freelist;\
385   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
386              _MEMCHECK(d);\
387   } else control_freelist = d->next;
388
389 #endif
390
391 static RDMA_REQUEST         *rdma_freelist = NULL;
392
393 #define FreeMediumControlMsg(d)       \
394   (d)->next = medium_control_freelist;\
395   medium_control_freelist = d;
396
397
398 #define MallocMediumControlMsg(d) \
399     d = medium_control_freelist;\
400     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
401     _MEMCHECK(d);\
402 } else mediumcontrol_freelist = d->next;
403
404 # if CMK_SMP
405 #define FreeRdmaRequest(d)       free(d);
406 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
407 #else
408
409 #define FreeRdmaRequest(d)       \
410   (d)->next = rdma_freelist;\
411   rdma_freelist = d;
412
413 #define MallocRdmaRequest(d) \
414   d = rdma_freelist;\
415   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
416              _MEMCHECK(d);\
417   } else rdma_freelist = d->next; \
418     d->next =0;
419 #endif
420
421 /* reuse gni_post_descriptor_t */
422 static gni_post_descriptor_t *post_freelist=0;
423
424 #if  !CMK_SMP
425 #define FreePostDesc(d)       \
426     (d)->next_descr = post_freelist;\
427     post_freelist = d;
428
429 #define MallocPostDesc(d) \
430   d = post_freelist;\
431   if (d==0) { \
432      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
433      _MEMCHECK(d);\
434   } else post_freelist = d->next_descr;
435 #else
436
437 #define FreePostDesc(d)     free(d);
438 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
439
440 #endif
441
442
443 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
444 static int      buffered_smsg_counter = 0;
445
446 /* SmsgSend return success but message sent is not confirmed by remote side */
447 static MSG_LIST *buffered_fma_head = 0;
448 static MSG_LIST *buffered_fma_tail = 0;
449
450 /* functions  */
451 #define IsFree(a,ind)  !( a& (1<<(ind) ))
452 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
453 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
454
455 CpvDeclare(mempool_type*, mempool);
456
457 /* get the upper bound of log 2 */
458 int mylog2(int size)
459 {
460     int op = size;
461     unsigned int ret=0;
462     unsigned int mask = 0;
463     int i;
464     while(op>0)
465     {
466         op = op >> 1;
467         ret++;
468
469     }
470     for(i=1; i<ret; i++)
471     {
472         mask = mask << 1;
473         mask +=1;
474     }
475
476     ret -= ((size &mask) ? 0:1);
477     return ret;
478 }
479
480 static void
481 allgather(void *in,void *out, int len)
482 {
483     static int *ivec_ptr=NULL,already_called=0,job_size=0;
484     int i,rc;
485     int my_rank;
486     char *tmp_buf,*out_ptr;
487
488     if(!already_called) {
489
490         rc = PMI_Get_size(&job_size);
491         CmiAssert(rc == PMI_SUCCESS);
492         rc = PMI_Get_rank(&my_rank);
493         CmiAssert(rc == PMI_SUCCESS);
494
495         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
496         CmiAssert(ivec_ptr != NULL);
497
498         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
499         CmiAssert(rc == PMI_SUCCESS);
500
501         already_called = 1;
502
503     }
504
505     tmp_buf = (char *)malloc(job_size * len);
506     CmiAssert(tmp_buf);
507
508     rc = PMI_Allgather(in,tmp_buf,len);
509     CmiAssert(rc == PMI_SUCCESS);
510
511     out_ptr = out;
512
513     for(i=0;i<job_size;i++) {
514
515         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
516
517     }
518
519     free(tmp_buf);
520 }
521
522 static void
523 allgather_2(void *in,void *out, int len)
524 {
525     //PMI_Allgather is out of order
526     int i,rc, extend_len;
527     int  rank_index;
528     char *out_ptr, *out_ref;
529     char *in2;
530
531     extend_len = sizeof(int) + len;
532     in2 = (char*)malloc(extend_len);
533
534     memcpy(in2, &myrank, sizeof(int));
535     memcpy(in2+sizeof(int), in, len);
536
537     out_ptr = (char*)malloc(mysize*extend_len);
538
539     rc = PMI_Allgather(in2, out_ptr, extend_len);
540     GNI_RC_CHECK("allgather", rc);
541
542     out_ref = out;
543
544     for(i=0;i<mysize;i++) {
545         //rank index 
546         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
547         //copy to the rank index slot
548         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
549     }
550
551     free(out_ptr);
552     free(in2);
553
554 }
555
556 static unsigned int get_gni_nic_address(int device_id)
557 {
558     unsigned int address, cpu_id;
559     gni_return_t status;
560     int i, alps_dev_id=-1,alps_address=-1;
561     char *token, *p_ptr;
562
563     p_ptr = getenv("PMI_GNI_DEV_ID");
564     if (!p_ptr) {
565         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
566        
567         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
568     } else {
569         while ((token = strtok(p_ptr,":")) != NULL) {
570             alps_dev_id = atoi(token);
571             if (alps_dev_id == device_id) {
572                 break;
573             }
574             p_ptr = NULL;
575         }
576         CmiAssert(alps_dev_id != -1);
577         p_ptr = getenv("PMI_GNI_LOC_ADDR");
578         CmiAssert(p_ptr != NULL);
579         i = 0;
580         while ((token = strtok(p_ptr,":")) != NULL) {
581             if (i == alps_dev_id) {
582                 alps_address = atoi(token);
583                 break;
584             }
585             p_ptr = NULL;
586             ++i;
587         }
588         CmiAssert(alps_address != -1);
589         address = alps_address;
590     }
591     return address;
592 }
593
594 static uint8_t get_ptag(void)
595 {
596     char *p_ptr, *token;
597     uint8_t ptag;
598
599     p_ptr = getenv("PMI_GNI_PTAG");
600     CmiAssert(p_ptr != NULL);
601     token = strtok(p_ptr, ":");
602     ptag = (uint8_t)atoi(token);
603     return ptag;
604         
605 }
606
607 static uint32_t get_cookie(void)
608 {
609     uint32_t cookie;
610     char *p_ptr, *token;
611
612     p_ptr = getenv("PMI_GNI_COOKIE");
613     CmiAssert(p_ptr != NULL);
614     token = strtok(p_ptr, ":");
615     cookie = (uint32_t)atoi(token);
616
617     return cookie;
618 }
619
620 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
621 /* TODO: add any that are related */
622 /* =====End of Definitions of Message-Corruption Related Macros=====*/
623
624
625 #include "machine-lrts.h"
626 #include "machine-common-core.c"
627
628 /* Network progress function is used to poll the network when for
629    messages. This flushes receive buffers on some  implementations*/
630 #if CMK_MACHINE_PROGRESS_DEFINED
631 void CmiMachineProgressImpl() {
632 }
633 #endif
634
635 static void SendRdmaMsg();
636 static void PumpNetworkSmsg();
637 static void PumpLocalRdmaTransactions();
638 static int SendBufferMsg();
639
640 #define         DEBUG_POOL      0
641 #ifdef          DEBUG_POOL
642 static     int register_mempool = 0;
643 #endif
644 inline 
645 static gni_return_t registerMempool(void *msg)
646 {
647     gni_return_t status= GNI_RC_ERROR_RESOURCE;
648     int size = GetMempoolsize(msg);
649     void *addr = GetMempooladdr(msg);
650     gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
651    
652     mempool_type *mptr = CpvAccess(mempool);
653     block_header *current = &(mptr->block_head);
654 #if         DEBUG_POOL
655     if(register_mempool >= 16*oneMB)
656         return status;
657 #endif
658
659     while(1)
660     {
661        status = MEMORY_REGISTER(onesided_hnd, nic_hndl, addr, size, memhndl, &omdh);
662        //find one slot to de-register
663        if(status == GNI_RC_SUCCESS)
664        {
665 #if          DEBUG_POOL
666            register_mempool += size;
667            printf("[%d]Increase msg pool to %d, size=%d\n", myrank, register_mempool, size);
668 #endif
669            break;
670        }
671        else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
672        {
673                 CmiAbort("Memory registor for mempool fails\n");
674        }
675         
676        while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
677            current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
678        
679        if(current == NULL)
680        { status = GNI_RC_ERROR_RESOURCE; break;}
681        status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh);
682     
683        GNI_RC_CHECK("registerMemorypool de-register", status);
684 #ifdef          DEBUG_POOL
685        register_mempool -= GetSizeFromHeader(current);
686 #endif
687        SetMemHndlZero(GetMemHndlFromHeader(current));
688     }; 
689     return status;
690 }
691
692 inline
693 static void buffer_small_msgs(void *msg, int size, int destNode, uint8_t tag)
694 {
695     MSG_LIST        *msg_tmp;
696     MallocMsgList(msg_tmp);
697     msg_tmp->destNode = destNode;
698     msg_tmp->size   = size;
699     msg_tmp->msg    = msg;
700     msg_tmp->tag    = tag;
701     //msg_tmp->next   = 0;
702
703 #if !CMK_SMP
704     if (smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
705         smsg_msglist_index[destNode].next = smsg_head_index;
706         smsg_head_index = destNode;
707         smsg_msglist_index[destNode].tail = smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
708     }else
709     {
710         smsg_msglist_index[destNode].tail->next = msg_tmp;
711         smsg_msglist_index[destNode].tail = msg_tmp;
712     }
713 #else
714     PCQueuePush(smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
715 #endif
716 #if PRINT_SYH
717     buffered_smsg_counter++;
718 #endif
719 }
720
721 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
722 {
723     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
724 }
725
726 inline
727 static void setup_smsg_connection(int destNode)
728 {
729     mdh_addr_list_t  *new_entry = 0;
730     gni_post_descriptor_t *pd;
731     gni_smsg_attr_t      *smsg_attr;
732     gni_return_t status = GNI_RC_NOT_DONE;
733     RDMA_REQUEST        *rdma_request_msg;
734     
735     if(smsg_available_slot == smsg_expand_slots)
736     {
737         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
738         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
739         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
740
741         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
742             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
743             GNI_MEM_READWRITE,   
744             -1,
745             &(new_entry->mdh));
746         smsg_available_slot = 0; 
747         new_entry->next = smsg_dynamic_list;
748         smsg_dynamic_list = new_entry;
749     }
750     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
751     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
752     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
753     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
754     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
755     smsg_attr->buff_size = smsg_memlen;
756     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
757     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
758     smsg_local_attr_vec[destNode] = smsg_attr;
759     smsg_available_slot++;
760     MallocPostDesc(pd);
761     pd->type            = GNI_POST_FMA_PUT;
762     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
763     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
764     pd->length          = sizeof(gni_smsg_attr_t);
765     pd->local_addr      = (uint64_t) smsg_attr;
766     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
767     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
768     pd->src_cq_hndl     = 0;
769     pd->rdma_mode       = 0;
770     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
771     print_smsg_attr(smsg_attr);
772     if(status == GNI_RC_ERROR_RESOURCE )
773     {
774         MallocRdmaRequest(rdma_request_msg);
775         rdma_request_msg->destNode = destNode;
776         rdma_request_msg->pd = pd;
777         /* buffer this request */
778     }
779 #if PRINT_SYH
780     if(status != GNI_RC_SUCCESS)
781        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
782     else
783         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
784 #endif
785 }
786
787 /* useDynamicSMSG */
788 inline 
789 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
790 {
791     gni_return_t status = GNI_RC_NOT_DONE;
792
793     if(mailbox_list->offset == mailbox_list->size)
794     {
795         dynamic_smsg_mailbox_t *new_mailbox_entry;
796         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
797         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
798         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
799         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
800         new_mailbox_entry->offset = 0;
801         
802         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
803             new_mailbox_entry->size, smsg_rx_cqh,
804             GNI_MEM_READWRITE,   
805             -1,
806             &(new_mailbox_entry->mem_hndl));
807
808         //status = MEMORY_REGISTER(onesided_hnd, nic_hndl, new_mailbox_entry->mailbox_base, new_mailbox_entry->size, &(new_mailbox_entry->mem_hndl), &omdh);
809         GNI_RC_CHECK("register", status);
810         new_mailbox_entry->next = mailbox_list;
811         mailbox_list = new_mailbox_entry;
812     }
813     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
814     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
815     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
816     local_smsg_attr->mbox_offset = mailbox_list->offset;
817     mailbox_list->offset += smsg_memlen;
818     local_smsg_attr->buff_size = smsg_memlen;
819     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
820     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
821 }
822
823 /* useDynamicSMSG */
824 inline 
825 static int connect_to(int destNode)
826 {
827     gni_return_t status = GNI_RC_NOT_DONE;
828     CmiAssert(smsg_connected_flag[destNode] == 0);
829     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
830     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
831     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
832     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
833             
834 #if CMK_SMP && !COMM_THREAD_SEND
835     //CmiLock(ep_lock_array[destNode]);
836     CmiLock(tx_cq_lock);
837 #endif
838     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
839 #if CMK_SMP && !COMM_THREAD_SEND
840     //CmiUnlock(ep_lock_array[destNode]);
841     CmiUnlock(tx_cq_lock);
842 #endif
843     if (status == GNI_RC_ERROR_RESOURCE) {
844       /* possibly destNode is making connection at the same time */
845       free(smsg_attr_vector_local[destNode]);
846       smsg_attr_vector_local[destNode] = NULL;
847       free(smsg_attr_vector_remote[destNode]);
848       smsg_attr_vector_remote[destNode] = NULL;
849       mailbox_list->offset -= smsg_memlen;
850       return 0;
851     }
852     GNI_RC_CHECK("GNI_Post", status);
853     //printf("[%d] setting up %d -> %d\n", myrank, myrank, destNode);
854     smsg_connected_flag[destNode] = 1;
855     return 1;
856 }
857
858 //inline static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
859 inline static gni_return_t send_smsg_message(int destNode, void *msg, int size, uint8_t tag, int inbuff )
860 {
861     unsigned int          remote_address;
862     uint32_t              remote_id;
863     gni_return_t status = GNI_RC_NOT_DONE;
864     gni_smsg_attr_t      *smsg_attr;
865     gni_post_descriptor_t *pd;
866     gni_post_state_t      post_state;
867     char                  *real_data; 
868
869     if (useDynamicSMSG) {
870         switch (smsg_connected_flag[destNode]) {
871         case 0: 
872             connect_to(destNode);         /* continue to case 1 */
873         case 1:                           /* pending connection, do nothing */
874             status = GNI_RC_NOT_DONE;
875             if(inbuff ==0)
876                 buffer_small_msgs(msg, size, destNode, tag);
877             return status;
878         }
879     }
880 #if CMK_SMP
881     if(PCQueueEmpty(smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
882     {
883 #else
884     if(smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
885     {
886 #endif
887 #if CMK_SMP && !COMM_THREAD_SEND
888         CmiLock(tx_cq_lock);
889 #endif
890         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
891 #if CMK_SMP && !COMM_THREAD_SEND
892         CmiUnlock(tx_cq_lock);
893 #endif
894         if(status == GNI_RC_SUCCESS)
895         {
896 #if CMK_SMP_TRACE_COMMTHREAD
897             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
898             { 
899                 START_EVENT();
900                 if ( tag == SMALL_DATA_TAG)
901                     real_data = (char*)msg; 
902                 else 
903                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
904                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
905             }
906 #endif
907 #if PRINT_SYH
908             lrts_smsg_success++;
909             printf("[%d==>%d] send done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
910 #endif     
911             return status;
912         }
913     }
914     if(inbuff ==0)
915         buffer_small_msgs(msg, size, destNode, tag);
916     return status;
917 }
918
919
920 inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
921 {
922     /* construct a control message and send */
923     CONTROL_MSG         *control_msg_tmp;
924     MallocControlMsg(control_msg_tmp);
925     control_msg_tmp->source_addr = (uint64_t)msg;
926     control_msg_tmp->seq_id    = seqno;
927     //control_msg_tmp->source    = myrank;
928     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
929 #if     USE_LRTS_MEMPOOL
930     if(size < BIG_MSG)
931     {
932         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
933     }
934     else
935     {
936         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
937         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
938         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
939     }
940 #else
941     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
942 #endif
943     return control_msg_tmp;
944 }
945
946 // Large message, send control to receiver, receiver register memory and do a GET, 
947 // return 1 - send no success
948 inline
949 static int send_large_messages(int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
950 {
951     gni_return_t        status  =   GNI_RC_NOT_DONE;
952     uint32_t            vmdh_index  = -1;
953     int                 size;
954     int                 offset = 0;
955     uint64_t            source_addr;
956     int                 register_size; 
957     size    =   control_msg_tmp->total_length;
958     source_addr = control_msg_tmp->source_addr;
959     register_size = control_msg_tmp->length;
960
961 #if     USE_LRTS_MEMPOOL
962     if( control_msg_tmp->seq_id == 0 ){
963         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
964         {
965             //register the corresponding mempool
966             if(buffered_send_msg >= MAX_BUFF_SEND)
967             {
968                 if(!inbuff)
969                     buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
970                // if(myrank == 0)
971                //     CmiPrintf(" [%d] send_large hit max %lld(%d)\n", myrank, buffered_send_msg, register_mempool); 
972                 return status;
973             }
974             status = registerMempool((void*)source_addr);
975             if(status == GNI_RC_SUCCESS)
976             {
977                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
978             }
979         }else
980         {
981             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
982             status = GNI_RC_SUCCESS;
983         }
984         if(NoMsgInSend( (void*)(control_msg_tmp->source_addr)))
985             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
986         else
987             register_size = 0;
988     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
989     {
990         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
991         source_addr += offset;
992         size = control_msg_tmp->length;
993 //printf("[%d] send_large_messages seq: %d  addr: %p size: %d %d\n", CmiMyPe(), control_msg_tmp->seq_id, source_addr, size, control_msg_tmp->length);
994         status = MEMORY_REGISTER(onesided_hnd, nic_hndl, source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
995         register_size = 0;  //TODOTODO
996     }
997
998     if(status == GNI_RC_SUCCESS)
999     {
1000         status = send_smsg_message( destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
1001         if(status == GNI_RC_SUCCESS)
1002         {
1003             buffered_send_msg += register_size;
1004             if(control_msg_tmp->seq_id == 0)
1005             {
1006                 IncreaseMsgInSend(source_addr);
1007             }
1008             FreeControlMsg(control_msg_tmp);
1009 #if         DEBUG_POOL
1010             CmiPrintf(" [%d]==>%d send_large GOOD %lld(%d) size=%d\n", myrank, destNode, buffered_send_msg, register_mempool, size); 
1011 #endif
1012         }
1013     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1014     {
1015         CmiAbort("Memory registor for large msg\n");
1016     }else 
1017     {
1018         if(!inbuff)
1019             buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1020     }
1021     return status;
1022 #else
1023     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
1024     if(status == GNI_RC_SUCCESS)
1025     {
1026         status = send_smsg_message( destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
1027         if(status == GNI_RC_SUCCESS)
1028         {
1029             FreeControlMsg(control_msg_tmp);
1030         }
1031     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1032     {
1033         CmiAbort("Memory registor for large msg\n");
1034     }else 
1035     {
1036         buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1037     }
1038     return status;
1039 #endif
1040 }
1041
1042 inline void LrtsPrepareEnvelope(char *msg, int size)
1043 {
1044     CmiSetMsgSize(msg, size);
1045 }
1046
1047 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1048 {
1049     gni_return_t        status  =   GNI_RC_SUCCESS;
1050     uint8_t tag;
1051     CONTROL_MSG         *control_msg_tmp;
1052
1053     LrtsPrepareEnvelope(msg, size);
1054
1055 #if PRINT_SYH
1056     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1057 #endif 
1058 #if CMK_SMP && COMM_THREAD_SEND
1059     if(size <= SMSG_MAX_MSG)
1060         buffer_small_msgs(msg, size, destNode, SMALL_DATA_TAG);
1061     else if (size < BIG_MSG) {
1062         control_msg_tmp =  construct_control_msg(size, msg, 0);
1063         buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1064     }
1065     else {
1066           CmiSetMsgSeq(msg, 0);
1067           control_msg_tmp =  construct_control_msg(size, msg, 1);
1068           buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1069     }
1070 #else //non-smp, smp(worker sending)
1071     if(size <= SMSG_MAX_MSG)
1072     {
1073         status = send_smsg_message( destNode,  msg, size, SMALL_DATA_TAG, 0);  
1074         if(status == GNI_RC_SUCCESS)
1075             CmiFree(msg);
1076     }
1077     else if (size < BIG_MSG) {
1078         control_msg_tmp =  construct_control_msg(size, msg, 0);
1079         send_large_messages(destNode, control_msg_tmp, 0);
1080     }
1081     else {
1082 #if     USE_LRTS_MEMPOOL
1083         CmiSetMsgSeq(msg, 0);
1084         control_msg_tmp =  construct_control_msg(size, msg, 1);
1085         send_large_messages(destNode, control_msg_tmp, 0);
1086 #else
1087         control_msg_tmp =  construct_control_msg(size, msg, 0);
1088         send_large_messages(destNode, control_msg_tmp, 0);
1089 #endif
1090     }
1091 #endif
1092     return 0;
1093 }
1094
1095 static void    PumpDatagramConnection();
1096 static void registerUserTraceEvents() {
1097 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1098     traceRegisterUserEvent("setting up connections", 10);
1099     traceRegisterUserEvent("Receiving small msgs", 20);
1100     traceRegisterUserEvent("Release local transaction", 30);
1101     traceRegisterUserEvent("Sending buffered small msgs", 40);
1102     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1103 #endif
1104 }
1105
1106 void LrtsPostCommonInit(int everReturn)
1107 {
1108 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1109     CpvInitialize(double, projTraceStart);
1110     /* only PE 0 needs to care about registration (to generate sts file). */
1111     if (CmiMyPe() == 0) {
1112         registerMachineUserEventsFunction(&registerUserTraceEvents);
1113     }
1114 #endif
1115
1116 #if CMK_SMP
1117     CmiIdleState *s=CmiNotifyGetState();
1118     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1119     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1120 #else
1121     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1122     if (useDynamicSMSG)
1123     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1124 #endif
1125 }
1126
1127 /* this is called by worker thread */
1128 void LrtsPostNonLocal(){
1129 #if CMK_SMP
1130 #if !COMM_THREAD_SEND
1131     if(mysize == 1) return;
1132     PumpLocalRdmaTransactions();
1133     SendBufferMsg();
1134     SendRdmaMsg();
1135 #endif
1136 #endif
1137 }
1138
1139 /* useDynamicSMSG */
1140 static void    PumpDatagramConnection()
1141 {
1142     uint32_t          remote_address;
1143     uint32_t          remote_id;
1144     gni_return_t status;
1145     gni_post_state_t  post_state;
1146     uint64_t          datagram_id;
1147     int i;
1148
1149    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1150    {
1151        if (datagram_id >= mysize) {           /* bound endpoint */
1152            int pe = datagram_id - mysize;
1153 #if CMK_SMP && !COMM_THREAD_SEND
1154            CmiLock(tx_cq_lock);
1155 #endif
1156            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1157 #if CMK_SMP && !COMM_THREAD_SEND
1158            CmiUnlock(tx_cq_lock);
1159 #endif
1160        if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1161        {
1162           CmiAssert(remote_id == pe);
1163           status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1164           GNI_RC_CHECK("Dynamic SMSG Init", status);
1165 #if PRINT_SYH
1166           printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1167 #endif
1168           CmiAssert(smsg_connected_flag[pe] == 1);
1169           smsg_connected_flag[pe] = 2;
1170        }
1171      }
1172      else {         /* unbound ep */
1173        status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1174        if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1175        {
1176           CmiAssert(remote_id<mysize);
1177           CmiAssert(smsg_connected_flag[remote_id] <= 0);
1178           status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1179           GNI_RC_CHECK("Dynamic SMSG Init", status);
1180 #if PRINT_SYH
1181           printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1182 #endif
1183           smsg_connected_flag[remote_id] = 2;
1184
1185           alloc_smsg_attr(&send_smsg_attr);
1186           status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1187           GNI_RC_CHECK("post unbound datagram", status);
1188         }
1189      }
1190    }
1191 }
1192
1193 /* pooling CQ to receive network message */
1194 static void PumpNetworkRdmaMsgs()
1195 {
1196     gni_cq_entry_t      event_data;
1197     gni_return_t        status;
1198
1199 }
1200
1201 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1202 static void PumpNetworkSmsg()
1203 {
1204     uint64_t            inst_id;
1205     int                 ret;
1206     gni_cq_entry_t      event_data;
1207     gni_return_t        status;
1208     void                *header;
1209     uint8_t             msg_tag;
1210     int                 msg_nbytes;
1211     void                *msg_data;
1212     gni_mem_handle_t    msg_mem_hndl;
1213     gni_smsg_attr_t     *smsg_attr;
1214     gni_smsg_attr_t     *remote_smsg_attr;
1215     int                 init_flag;
1216     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1217     uint64_t            source_addr;
1218 #if CMK_SMP && !COMM_THREAD_SEND
1219     while(1)
1220     {
1221         CmiLock(tx_cq_lock);
1222         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1223         CmiUnlock(tx_cq_lock);
1224         if(status != GNI_RC_SUCCESS)
1225             break;
1226 #else
1227     while ( (status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
1228     {
1229 #endif
1230         inst_id = GNI_CQ_GET_INST_ID(event_data);
1231         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1232 #if PRINT_SYH
1233         printf("[%d] PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
1234 #endif
1235         if (useDynamicSMSG) {
1236             /* subtle: smsg may come before connection is setup */
1237             while (smsg_connected_flag[inst_id] != 2) 
1238                PumpDatagramConnection();
1239         }
1240         msg_tag = GNI_SMSG_ANY_TAG;
1241 #if CMK_SMP && !COMM_THREAD_SEND
1242         while(1) {
1243             CmiLock(tx_cq_lock);
1244             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1245             CmiUnlock(tx_cq_lock);
1246             if (status != GNI_RC_SUCCESS)
1247                 break;
1248 #else
1249         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS){
1250 #endif
1251             /* copy msg out and then put into queue (small message) */
1252             switch (msg_tag) {
1253             case SMALL_DATA_TAG:
1254             {
1255                 START_EVENT();
1256                 msg_nbytes = CmiGetMsgSize(header);
1257                 msg_data    = CmiAlloc(msg_nbytes);
1258                 memcpy(msg_data, (char*)header, msg_nbytes);
1259 #if CMK_SMP_TRACE_COMMTHREAD
1260                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1261 #endif
1262                 handleOneRecvedMsg(msg_nbytes, msg_data);
1263                 break;
1264             }
1265             case LMSG_INIT_TAG:
1266             {
1267 #if PRINT_SYH
1268                 printf("[%d] from %d request for Large msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
1269 #endif
1270                 getLargeMsgRequest(header, inst_id);
1271                 break;
1272             }
1273             case ACK_TAG:   //msg fit into mempool
1274             {
1275                 /* Get is done, release message . Now put is not used yet*/
1276 #if ! USE_LRTS_MEMPOOL
1277                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
1278 #else
1279                 //if(myrank == 0)
1280                 DecreaseMsgInSend( ((void*)((CONTROL_MSG *) header)->source_addr));
1281 #endif
1282                 if(NoMsgInSend(((void*)((CONTROL_MSG *) header)->source_addr)))
1283                     buffered_send_msg -= GetMempoolsize((void*)((CONTROL_MSG *) header)->source_addr);
1284 #if             DEBUG_POOL
1285                 printf("send done [%d===>%d] %lld, %lld\n", myrank, inst_id, buffered_send_msg, register_mempool);
1286 #endif
1287                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
1288                 //SendRdmaMsg();
1289                 break;
1290             }
1291             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1292             {
1293                 header_tmp = (CONTROL_MSG *) header;
1294                 void *msg = (void*)(header_tmp->source_addr);
1295                 int cur_seq = CmiGetMsgSeq(header_tmp->source_addr);
1296                 int offset = ONE_SEG*(cur_seq+1);
1297                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh);
1298                 //buffered_send_msg -= ((CONTROL_MSG *) header)->length;
1299                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1300                 if (remain_size < 0) remain_size = 0;
1301                 CmiSetMsgSize(msg, remain_size);
1302 //printf("[%d] PumpNetworkSmsg: free %p len: %d offset: %d seq: %d msg size: %d %d\n", myrank, header_tmp->source_addr_orig, header_tmp->total_length, offset, cur_seq, CmiGetMsgSize(msg), header_tmp->length);
1303                 if(remain_size <= 0) //transaction done
1304                 {
1305                     CmiFree(msg);
1306                 }else if (header_tmp->total_length > offset)
1307                 {
1308                     CmiSetMsgSeq(header_tmp->source_addr, cur_seq+1);
1309                     MallocControlMsg(control_msg_tmp);
1310                     control_msg_tmp->source_addr = header_tmp->source_addr;
1311                     control_msg_tmp->dest_addr    = header_tmp->dest_addr;
1312                     control_msg_tmp->total_length   = header_tmp->total_length; 
1313                     //control_msg_tmp->source         = myrank;
1314                     control_msg_tmp->length         = header_tmp->total_length - offset;
1315                     if (control_msg_tmp->length >= ONE_SEG) control_msg_tmp->length = ONE_SEG;
1316                     control_msg_tmp->seq_id         = cur_seq+1+1;
1317                     //send next seg
1318                     send_large_messages(inst_id, control_msg_tmp, 0);
1319                          // pipelining
1320                     if (header_tmp->seq_id == 1) {
1321                       int i;
1322                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1323                         int seq = cur_seq+i+2;
1324                         CmiSetMsgSeq(header_tmp->source_addr, seq-1);
1325                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)header_tmp->source_addr, seq);
1326                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1327                         send_large_messages(inst_id, control_msg_tmp, 0);
1328                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1329                       }
1330                     }
1331                 }
1332                 //SendRdmaMsg();
1333                 break;
1334             }
1335
1336 #if CMK_PERSISTENT_COMM
1337             case PUT_DONE_TAG: //persistent message
1338             void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1339             int size = ((CONTROL_MSG *) header)->length;
1340             CmiReference(msg);
1341             handleOneRecvedMsg(size, msg); 
1342             break;
1343 #endif
1344 #ifdef CMK_DIRECT
1345             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1346             (*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1347            break;
1348 #endif
1349             default: {
1350                 printf("weird tag problem\n");
1351                 CmiAbort("Unknown tag\n");
1352                      }
1353             }
1354 #if CMK_SMP && !COMM_THREAD_SEND
1355             CmiLock(tx_cq_lock);
1356 #endif
1357             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1358 #if CMK_SMP && !COMM_THREAD_SEND
1359             CmiUnlock(tx_cq_lock);
1360 #endif
1361             msg_tag = GNI_SMSG_ANY_TAG;
1362         } //endwhile getNext
1363     }   //end while GetEvent
1364     if(status == GNI_RC_ERROR_RESOURCE)
1365     {
1366         GNI_RC_CHECK("Smsg_rx_cq full", status);
1367     }
1368 }
1369
1370 static void printDesc(gni_post_descriptor_t *pd)
1371 {
1372     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1373 }
1374
1375 // for BIG_MSG called on receiver side for receiving control message
1376 // LMSG_INIT_TAG
1377 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1378 {
1379 #if     USE_LRTS_MEMPOOL
1380     CONTROL_MSG         *request_msg;
1381     gni_return_t        status = GNI_RC_SUCCESS;
1382     void                *msg_data;
1383     gni_post_descriptor_t *pd;
1384     RDMA_REQUEST        *rdma_request_msg;
1385     gni_mem_handle_t    msg_mem_hndl;
1386     int source, size, transaction_size, offset = 0;
1387
1388     // initial a get to transfer data from the sender side */
1389     request_msg = (CONTROL_MSG *) header;
1390     //source = request_msg->source;
1391     size = request_msg->total_length;
1392 #if         DEBUG_POOL
1393     printf("[%d]Get Recv requst from %d (pool=%d)\n", myrank, inst_id, register_mempool);
1394 #endif
1395 //printf("[%d] getLargeMsgRequest seq: %d size: %d\n", CmiMyPe(), request_msg->seq_id , size);
1396     if(request_msg->seq_id < 2)   {
1397         msg_data = CmiAlloc(size);
1398         CmiSetMsgSeq(msg_data, 0);
1399         _MEMCHECK(msg_data);
1400     }
1401     else {
1402         offset = ONE_SEG*(request_msg->seq_id-1);
1403         msg_data = (char*)request_msg->dest_addr + offset;
1404     }
1405    
1406     MallocPostDesc(pd);
1407     pd->cqwrite_value = request_msg->seq_id;
1408     if( request_msg->seq_id == 0)
1409     {
1410         pd->local_mem_hndl= GetMemHndl(msg_data);
1411         transaction_size = ALIGN64(size);
1412         if(IsMemHndlZero(pd->local_mem_hndl))
1413         {
1414             status = registerMempool((void*)(msg_data));
1415             if(status == GNI_RC_SUCCESS)
1416             {
1417                 pd->local_mem_hndl = GetMemHndl(msg_data);
1418             }
1419             else
1420             {
1421                 SetMemHndlZero(pd->local_mem_hndl);
1422             }
1423         }
1424     }
1425     else{
1426         transaction_size = ALIGN64(request_msg->length);
1427         status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, transaction_size, &(pd->local_mem_hndl), &omdh);
1428         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1429         {
1430             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1431         }
1432     }
1433     pd->first_operand = ALIGN64(size);                   //  total length
1434
1435     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1436         pd->type            = GNI_POST_FMA_GET;
1437     else
1438         pd->type            = GNI_POST_RDMA_GET;
1439 #if REMOTE_EVENT
1440     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1441 #else
1442     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1443 #endif
1444     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1445     pd->length          = transaction_size;
1446     pd->local_addr      = (uint64_t) msg_data;
1447     pd->remote_addr     = request_msg->source_addr + offset;
1448     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1449     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1450     pd->rdma_mode       = 0;
1451     pd->amo_cmd         = 0;
1452
1453     //memory registration success
1454     if(status == GNI_RC_SUCCESS)
1455     {
1456        // CmiPrintf(" PE:%d reigster(size=%d)(%s) (%lld, %lld), (%lld, %lld)\n", myrank, pd->length, gni_err_str[status], (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
1457 #if CMK_SMP && !COMM_THREAD_SEND
1458         CmiLock(tx_cq_lock);
1459 #endif
1460         if(pd->type == GNI_POST_RDMA_GET) 
1461             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1462         else
1463             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1464 #if CMK_SMP && !COMM_THREAD_SEND
1465         CmiUnlock(tx_cq_lock);
1466 #endif
1467          
1468         if(status == GNI_RC_SUCCESS )
1469         {
1470             if(pd->cqwrite_value == 0)
1471             {
1472 #if         DEBUG_POOL
1473                 printf("[%d]Recv requst from %d (%d){%d}\n", myrank, inst_id, transaction_size, register_mempool );
1474 #endif
1475                 IncreaseMsgInRecv(msg_data);
1476
1477             }
1478         }
1479     }else
1480     {
1481         SetMemHndlZero(pd->local_mem_hndl);
1482     }
1483     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1484     {
1485         MallocRdmaRequest(rdma_request_msg);
1486         rdma_request_msg->destNode = inst_id;
1487         rdma_request_msg->pd = pd;
1488 #if CMK_SMP
1489         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1490 #else
1491         if(sendRdmaBuf == 0)
1492         {
1493             sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1494         }else{
1495             sendRdmaTail->next = rdma_request_msg;
1496             sendRdmaTail =  rdma_request_msg;
1497         }
1498 #endif
1499     }else {
1500         /* printf("source: %d pd:%p\n", source, pd); */
1501         GNI_RC_CHECK("AFter posting", status);
1502     }
1503 #else
1504     CONTROL_MSG         *request_msg;
1505     gni_return_t        status;
1506     void                *msg_data;
1507     gni_post_descriptor_t *pd;
1508     RDMA_REQUEST        *rdma_request_msg;
1509     gni_mem_handle_t    msg_mem_hndl;
1510     //int source;
1511     // initial a get to transfer data from the sender side */
1512     request_msg = (CONTROL_MSG *) header;
1513     //source = request_msg->source;
1514     msg_data = CmiAlloc(request_msg->length);
1515     _MEMCHECK(msg_data);
1516
1517     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
1518
1519     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1520     {
1521         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1522     }
1523
1524     MallocPostDesc(pd);
1525     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1526         pd->type            = GNI_POST_FMA_GET;
1527     else
1528         pd->type            = GNI_POST_RDMA_GET;
1529 #if REMOTE_EVENT
1530     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1531 #else
1532     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1533 #endif
1534     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1535     pd->length          = ALIGN64(request_msg->length);
1536     pd->local_addr      = (uint64_t) msg_data;
1537     pd->remote_addr     = request_msg->source_addr;
1538     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1539     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1540     pd->rdma_mode       = 0;
1541     pd->amo_cmd         = 0;
1542
1543     //memory registration successful
1544     if(status == GNI_RC_SUCCESS)
1545     {
1546         pd->local_mem_hndl  = msg_mem_hndl;
1547 #if CMK_SMP && !COMM_THREAD_SEND
1548             CmiLock(tx_cq_lock);
1549 #endif
1550         if(pd->type == GNI_POST_RDMA_GET) 
1551             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1552         else
1553             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1554 #if CMK_SMP && !COMM_THREAD_SEND
1555             CmiUnlock(tx_cq_lock);
1556 #endif
1557     }else
1558     {
1559         SetMemHndlZero(pd->local_mem_hndl);
1560     }
1561     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1562     {
1563         MallocRdmaRequest(rdma_request_msg);
1564         rdma_request_msg->next = 0;
1565         rdma_request_msg->destNode = inst_id;
1566         rdma_request_msg->pd = pd;
1567         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1568     }else {
1569         /* printf("source: %d pd:%p\n", source, pd); */
1570         GNI_RC_CHECK("AFter posting", status);
1571     }
1572 #endif
1573 }
1574
1575 static void PumpLocalRdmaTransactions()
1576 {
1577     gni_cq_entry_t          ev;
1578     gni_return_t            status;
1579     uint64_t                type, inst_id;
1580     gni_post_descriptor_t   *tmp_pd;
1581     MSG_LIST                *ptr;
1582     CONTROL_MSG             *ack_msg_tmp;
1583     uint8_t             msg_tag;
1584 #ifdef CMK_DIRECT
1585     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1586 #endif
1587 #if CMK_SMP && !COMM_THREAD_SEND
1588     while(1) {
1589         CmiLock(tx_cq_lock);
1590         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1591         CmiUnlock(tx_cq_lock);
1592         if(status != GNI_RC_SUCCESS)
1593             break;
1594 #else
1595     while( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1596     {
1597 #endif
1598         type = GNI_CQ_GET_TYPE(ev);
1599         if (type == GNI_CQ_EVENT_TYPE_POST)
1600         {
1601             inst_id     = GNI_CQ_GET_INST_ID(ev);
1602 #if PRINT_SYH
1603             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1604 #endif
1605             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
1606 #if CMK_SMP && !COMM_THREAD_SEND
1607             CmiLock(tx_cq_lock);
1608 #endif
1609             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1610 #if CMK_SMP && !COMM_THREAD_SEND
1611             CmiUnlock(tx_cq_lock);
1612 #endif
1613 #ifdef CMK_DIRECT
1614             if(tmp_pd->amo_cmd == 1)
1615             {
1616                 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1617                 cmk_direct_done_msg->callbackFnPtr = (void*)( tmp_pd->first_operand);
1618                 cmk_direct_done_msg->recverBuf = (void*)(tmp_pd->remote_addr);
1619                 cmk_direct_done_msg->callbackData = (void*)(tmp_pd->second_operand); 
1620             }
1621             else{
1622                 MallocControlMsg(ack_msg_tmp);
1623                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1624                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1625             }
1626 #else
1627             MallocControlMsg(ack_msg_tmp);
1628             ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1629             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1630 #endif
1631             ////Message is sent, free message , put is not used now
1632             switch (tmp_pd->type) {
1633 #if CMK_PERSISTENT_COMM
1634             case GNI_POST_RDMA_PUT:
1635 #if ! USE_LRTS_MEMPOOL
1636                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1637 #endif
1638             case GNI_POST_FMA_PUT:
1639                 CmiFree((void *)tmp_pd->local_addr);
1640                 msg_tag = PUT_DONE_TAG;
1641                 break;
1642 #endif
1643 #ifdef CMK_DIRECT
1644             case GNI_POST_RDMA_PUT:
1645             case GNI_POST_FMA_PUT:
1646                 //sender ACK to receiver to trigger it is done
1647                 msg_tag = DIRECT_PUT_DONE_TAG;
1648                 break;
1649 #endif
1650             case GNI_POST_RDMA_GET:
1651             case GNI_POST_FMA_GET:
1652 #if  ! USE_LRTS_MEMPOOL
1653                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1654                 msg_tag = ACK_TAG;  
1655 #else
1656                 ack_msg_tmp->seq_id = tmp_pd->cqwrite_value;
1657                 if(ack_msg_tmp->seq_id > 0)      // BIG_MSG
1658                 {
1659                     msg_tag = BIG_MSG_TAG; 
1660                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1661                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1662                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1663                 } 
1664                 else
1665                 {
1666                     msg_tag = ACK_TAG;  
1667                     ack_msg_tmp->dest_addr = tmp_pd->local_addr;
1668                 }
1669                 ack_msg_tmp->length = tmp_pd->length;
1670                 ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1671 #endif
1672                 break;
1673             default:
1674                 CmiPrintf("type=%d\n", tmp_pd->type);
1675                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1676             }
1677 #if CMK_DIRECT
1678             if(tmp_pd->amo_cmd == 1)
1679                 status = send_smsg_message(inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1680             else
1681 #endif
1682                 status = send_smsg_message(inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
1683 #if         DEBUG_POOL
1684             printf("%d Recv done from %d (ack=%s) pool=%lld\n", myrank, inst_id, gni_err_str[status], register_mempool);
1685 #endif
1686             if(status == GNI_RC_SUCCESS)
1687             {
1688 #if CMK_DIRECT
1689                 if(tmp_pd->amo_cmd == 1)
1690                     free(cmk_direct_done_msg); 
1691                 else
1692 #endif
1693                     FreeControlMsg(ack_msg_tmp);
1694
1695             }
1696 #if CMK_PERSISTENT_COMM
1697             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1698 #endif
1699             {
1700                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1701 #if PRINT_SYH
1702                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1703 #endif
1704 #if CMK_SMP_TRACE_COMMTHREAD
1705                     START_EVENT();
1706 #endif
1707                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1708                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1709 #if CMK_SMP_TRACE_COMMTHREAD
1710                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1711 #endif
1712                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1713                 }else if(msg_tag == BIG_MSG_TAG){
1714                   void *msg = (void*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1715                   CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1716                   if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1717 #if CMK_SMP_TRACE_COMMTHREAD
1718                     START_EVENT();
1719 #endif
1720 #if PRINT_SYH
1721                     printf("Pipeline msg done [%d]\n", myrank);
1722 #endif
1723 #if CMK_SMP_TRACE_COMMTHREAD
1724                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
1725 #endif
1726                     handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1727                   }
1728                 }
1729                 //SendRdmaMsg();
1730             }
1731             FreePostDesc(tmp_pd);
1732         }
1733     } //end while
1734     if(status == GNI_RC_ERROR_RESOURCE)
1735     {
1736         GNI_RC_CHECK("Smsg_tx_cq full", status);
1737     }
1738 }
1739
1740 static void  SendRdmaMsg()
1741 {
1742     gni_return_t            status = GNI_RC_SUCCESS;
1743     gni_mem_handle_t        msg_mem_hndl;
1744     RDMA_REQUEST *ptr = NULL;
1745
1746 #if CMK_SMP
1747     while (!(PCQueueEmpty(sendRdmaBuf)))
1748     {
1749         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
1750 #else
1751     while( sendRdmaBuf != 0) {
1752         ptr = sendRdmaBuf;
1753 #endif
1754         gni_post_descriptor_t *pd = ptr->pd;
1755         status = GNI_RC_SUCCESS;
1756         
1757         if(pd->cqwrite_value == 0)
1758         {
1759             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
1760             {
1761                 status = registerMempool((void*)(pd->local_addr));
1762                 if(status == GNI_RC_SUCCESS)
1763                 {
1764                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1765                 }
1766             }else
1767             {
1768                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1769                 status = GNI_RC_SUCCESS;
1770             }
1771         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
1772         {
1773             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1774         }
1775         if(status == GNI_RC_SUCCESS)
1776         {
1777 #if CMK_SMP && !COMM_THREAD_SEND
1778             CmiLock(tx_cq_lock);
1779 #endif
1780             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1781                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1782             else
1783                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1784 #if CMK_SMP && !COMM_THREAD_SEND
1785             CmiUnlock(tx_cq_lock);
1786 #endif
1787             if(status == GNI_RC_SUCCESS)
1788             {
1789 #if !CMK_SMP
1790                 sendRdmaBuf = sendRdmaBuf->next;
1791 #endif
1792                 if(pd->cqwrite_value == 0)
1793                 {
1794                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
1795                 }
1796                 FreeRdmaRequest(ptr);
1797                 continue;
1798             }else
1799             {
1800 #if CMK_SMP
1801                 PCQueuePush(sendRdmaBuf, (char*)ptr);
1802 #endif
1803             break;
1804             }
1805         }else
1806         {
1807 #if CMK_SMP
1808             PCQueuePush(sendRdmaBuf, (char*)ptr);
1809 #endif
1810             break;
1811         }
1812     } //end while
1813 }
1814
1815 // return 1 if all messages are sent
1816 static int SendBufferMsg()
1817 {
1818     MSG_LIST            *ptr, *tmp_ptr, *previous_head, *current_head;
1819     CONTROL_MSG         *control_msg_tmp;
1820     gni_return_t        status;
1821     int done = 1;
1822     register    int     i, register_size;
1823     void                *register_addr;
1824     int                 index_previous = -1;
1825     int                 index = smsg_head_index;
1826 #if CMI_EXERT_SEND_CAP
1827     int                 sent_cnt = 0;
1828 #endif
1829
1830 #if ! CMK_SMP
1831     index = smsg_head_index;
1832 #else
1833     index = 0;
1834 #endif
1835     //if( smsg_msglist_head == 0 && buffered_smsg_counter!= 0 ) {printf("WRONGWRONG on rank%d, buffermsg=%d, (msgid-succ:%d)\n", myrank, buffered_smsg_counter, (lrts_send_msg_id-lrts_smsg_success)); CmiAbort("sendbuf");}
1836     /* can add flow control here to control the number of messages sent before handle message */
1837
1838 #if CMK_SMP
1839     while(index <mysize)
1840     {
1841         ptr = (MSG_LIST*)PCQueuePop(smsg_msglist_index[index].sendSmsgBuf);
1842 #else
1843     while(index != -1)
1844         {
1845         ptr = smsg_msglist_index[index].sendSmsgBuf;
1846 #endif
1847         while(ptr != 0)
1848         {
1849             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
1850                 /* connection not exists yet */
1851               done = 0;
1852               break;
1853             }
1854             status = GNI_RC_ERROR_RESOURCE;
1855             switch(ptr->tag)
1856             {
1857             case SMALL_DATA_TAG:
1858                 status = send_smsg_message( ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
1859                 if(status == GNI_RC_SUCCESS)
1860                 {
1861                     CmiFree(ptr->msg);
1862                 }
1863                 break;
1864             case LMSG_INIT_TAG:
1865                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1866                 status = send_large_messages( ptr->destNode, control_msg_tmp, 1);
1867                 if(status != GNI_RC_SUCCESS)
1868                     done = 0;
1869                 break;
1870             case   ACK_TAG:
1871             case   BIG_MSG_TAG:
1872                 status = send_smsg_message( ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
1873                 if(status == GNI_RC_SUCCESS)
1874                 {
1875                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1876                 }else
1877                     done = 0;
1878                 break;
1879 #ifdef CMK_DIRECT
1880             case DIRECT_PUT_DONE_TAG:
1881                 status = send_smsg_message( ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
1882                 if(status == GNI_RC_SUCCESS)
1883                 {
1884                     free((CMK_DIRECT_HEADER*)ptr->msg);
1885                 }
1886                 break;
1887
1888 #endif
1889             default:
1890                 printf("Weird tag\n");
1891                 CmiAbort("should not happen\n");
1892             }
1893             if(status == GNI_RC_SUCCESS)
1894             {
1895 #if !CMK_SMP
1896                 smsg_msglist_index[index].sendSmsgBuf = smsg_msglist_index[index].sendSmsgBuf->next;
1897                 FreeMsgList(ptr);
1898                 ptr= smsg_msglist_index[index].sendSmsgBuf;
1899 #else
1900                 FreeMsgList(ptr);
1901                 ptr = (MSG_LIST*)PCQueuePop(smsg_msglist_index[index].sendSmsgBuf);
1902 #endif
1903 #if PRINT_SYH
1904                 buffered_smsg_counter--;
1905                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
1906 #endif
1907 #if CMI_EXERT_SEND_CAP
1908                 sent_cnt++;
1909                 if(sent_cnt == SEND_CAP)
1910                     break;
1911 #endif
1912             }else {
1913 #if CMK_SMP
1914                 PCQueuePush(smsg_msglist_index[index].sendSmsgBuf, (char*)ptr); //ERROR
1915 //#else
1916 //                previous_head = ptr;
1917 //                ptr=ptr->next;
1918 #endif
1919                 done = 0;
1920                 break;
1921             } 
1922         
1923         } //end while
1924 #if !CMK_SMP
1925         if(smsg_msglist_index[index].sendSmsgBuf == 0)
1926         {
1927             if(index_previous != -1)
1928                 smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next;
1929             else
1930                 smsg_head_index = smsg_msglist_index[index].next;
1931         }else
1932         {
1933             index_previous = index;
1934         }
1935         index = smsg_msglist_index[index].next;
1936 #else
1937         index++;
1938 #endif
1939
1940 #if CMI_EXERT_SEND_CAP
1941         if(sent_cnt == SEND_CAP)
1942                 break;
1943 #endif
1944     }   // end pooling for all cores
1945     return done;
1946 }
1947
1948 void LrtsAdvanceCommunication(int whileidle)
1949 {
1950     /*  Receive Msg first */
1951 #if 0
1952     if(myrank == 0)
1953     printf("Calling Lrts Pump Msg PE:%d\n", myrank);
1954 #endif
1955 #if CMK_SMP_TRACE_COMMTHREAD
1956     double startT, endT;
1957 #endif
1958     //if(mysize == 1) return;
1959     if (useDynamicSMSG && whileidle)
1960     {
1961 #if CMK_SMP_TRACE_COMMTHREAD
1962         startT = CmiWallTimer();
1963 #endif
1964         PumpDatagramConnection();
1965 #if CMK_SMP_TRACE_COMMTHREAD
1966         endT = CmiWallTimer();
1967         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
1968 #endif
1969     }
1970
1971 #if CMK_SMP_TRACE_COMMTHREAD
1972     startT = CmiWallTimer();
1973 #endif
1974     PumpNetworkSmsg();
1975 #if CMK_SMP_TRACE_COMMTHREAD
1976     endT = CmiWallTimer();
1977     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
1978 #endif
1979
1980 #if CMK_SMP_TRACE_COMMTHREAD
1981     startT = CmiWallTimer();
1982 #endif
1983     PumpLocalRdmaTransactions();
1984 #if CMK_SMP_TRACE_COMMTHREAD
1985     endT = CmiWallTimer();
1986     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
1987 #endif
1988 #if 0
1989     if(myrank == 0)
1990     printf("Calling Lrts Send Buffmsg PE:%d\n", myrank);
1991 #endif
1992     /* Send buffered Message */
1993 #if CMK_SMP_TRACE_COMMTHREAD
1994     startT = CmiWallTimer();
1995 #endif
1996     SendBufferMsg();
1997 #if CMK_SMP_TRACE_COMMTHREAD
1998     endT = CmiWallTimer();
1999     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2000 #endif
2001
2002 #if CMK_SMP_TRACE_COMMTHREAD
2003     startT = CmiWallTimer();
2004 #endif
2005     SendRdmaMsg();
2006 #if CMK_SMP_TRACE_COMMTHREAD
2007     endT = CmiWallTimer();
2008     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2009 #endif
2010     //CmiPrintf("[%d]send buffer=%dM\n", myrank, buffered_send_msg/(1024*1024));
2011 #if 0
2012     if(myrank == 0)
2013     printf("done PE:%d\n", myrank);
2014 #endif
2015 }
2016
2017 /* useDynamicSMSG */
2018 static void _init_dynamic_smsg()
2019 {
2020     gni_return_t status;
2021     uint32_t     vmdh_index = -1;
2022     int i;
2023
2024     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2025     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2026     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2027     for(i=0; i<mysize; i++) {
2028         smsg_connected_flag[i] = 0;
2029         smsg_attr_vector_local[i] = NULL;
2030         smsg_attr_vector_remote[i] = NULL;
2031     }
2032     if(mysize <=512)
2033     {
2034         SMSG_MAX_MSG = 4096;
2035     }else if (mysize <= 4096)
2036     {
2037         SMSG_MAX_MSG = 4096/mysize * 1024;
2038     }else if (mysize <= 16384)
2039     {
2040         SMSG_MAX_MSG = 512;
2041     }else {
2042         SMSG_MAX_MSG = 256;
2043     }
2044
2045     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2046     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2047     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2048     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2049     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2050
2051     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2052     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2053     //mailbox_list->mailbox_base = malloc(mailbox_list->size);
2054     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2055     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2056     mailbox_list->offset = 0;
2057     mailbox_list->next = 0;
2058     
2059     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2060         mailbox_list->size, smsg_rx_cqh,
2061         GNI_MEM_READWRITE,   
2062         vmdh_index,
2063         &(mailbox_list->mem_hndl));
2064     //status = MEMORY_REGISTER(onesided_hnd, nic_hndl, mailbox_list->mailbox_base, mailbox_list->size, &(mailbox_list->mem_hndl), &omdh);
2065     GNI_RC_CHECK("MEMORY registration for smsg", status);
2066
2067     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2068     GNI_RC_CHECK("Unbound EP", status);
2069     
2070     alloc_smsg_attr(&send_smsg_attr);
2071
2072     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2073     GNI_RC_CHECK("post unbound datagram", status);
2074
2075       /* always pre-connect to proc 0 */
2076     //if (myrank != 0) connect_to(0);
2077 }
2078
2079 static void _init_static_smsg()
2080 {
2081     gni_smsg_attr_t      *smsg_attr;
2082     gni_smsg_attr_t      remote_smsg_attr;
2083     gni_smsg_attr_t      *smsg_attr_vec;
2084     gni_mem_handle_t     my_smsg_mdh_mailbox;
2085     int      ret, i;
2086     gni_return_t status;
2087     uint32_t              vmdh_index = -1;
2088     mdh_addr_t            base_infor;
2089     mdh_addr_t            *base_addr_vec;
2090     char *env;
2091
2092     if(mysize <=512)
2093     {
2094         SMSG_MAX_MSG = 1024;
2095     }else if (mysize <= 4096)
2096     {
2097         SMSG_MAX_MSG = 1024;
2098     }else if (mysize <= 16384)
2099     {
2100         SMSG_MAX_MSG = 512;
2101     }else {
2102         SMSG_MAX_MSG = 256;
2103     }
2104     
2105     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2106     if (env) SMSG_MAX_MSG = atoi(env);
2107     CmiAssert(SMSG_MAX_MSG > 0);
2108
2109     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2110     
2111     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2112     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2113     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2114     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2115     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2116     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2117     CmiAssert(ret == 0);
2118     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2119     //if (myrank == 0) printf("Charm++> allocates %.2fMB for SMSG mailbox. \n", smsg_memlen*mysize/1e6);
2120     
2121     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2122             smsg_memlen*(mysize), smsg_rx_cqh,
2123             GNI_MEM_READWRITE,   
2124             vmdh_index,
2125             &my_smsg_mdh_mailbox);
2126
2127     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2128
2129     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2130     base_infor.mdh =  my_smsg_mdh_mailbox;
2131     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2132
2133     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2134  
2135     for(i=0; i<mysize; i++)
2136     {
2137         if(i==myrank)
2138             continue;
2139         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2140         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2141         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2142         smsg_attr[i].mbox_offset = i*smsg_memlen;
2143         smsg_attr[i].buff_size = smsg_memlen;
2144         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2145         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2146     }
2147
2148     for(i=0; i<mysize; i++)
2149     {
2150         if (myrank == i) continue;
2151
2152         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2153         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2154         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2155         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2156         remote_smsg_attr.buff_size = smsg_memlen;
2157         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2158         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2159
2160         /* initialize the smsg channel */
2161         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2162         GNI_RC_CHECK("SMSG Init", status);
2163     } //end initialization
2164
2165     free(base_addr_vec);
2166     free(smsg_attr);
2167
2168     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2169     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2170
2171
2172 inline
2173 static void _init_smsg()
2174 {
2175     int i;
2176
2177     if(mysize > 1) {
2178         if (useDynamicSMSG)
2179             _init_dynamic_smsg();
2180         else
2181             _init_static_smsg();
2182     }
2183
2184      smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2185      for(i =0; i<mysize; i++)
2186      {
2187         smsg_msglist_index[i].next = -1;
2188 #if CMK_SMP
2189         smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2190 #else
2191         smsg_msglist_index[i].sendSmsgBuf = 0; 
2192 #endif
2193         
2194      }
2195      smsg_head_index = -1;
2196 }
2197
2198 static void _init_static_msgq()
2199 {
2200     gni_return_t status;
2201     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2202     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2203     msgq_attrs.smsg_q_sz = 1;
2204     msgq_attrs.rcv_pool_sz = 1;
2205     msgq_attrs.num_msgq_eps = 2;
2206     msgq_attrs.nloc_insts = 8;
2207     msgq_attrs.modes = 0;
2208     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2209
2210     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2211     GNI_RC_CHECK("MSGQ Init", status);
2212
2213
2214 }
2215
2216 #if CMK_SMP && STEAL_MEMPOOL
2217 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2218 {
2219     void *pool = NULL;
2220     int i, k;
2221     // check other ranks
2222     for (k=0; k<CmiMyNodeSize()+1; k++) {
2223         i = (CmiMyRank()+k)%CmiMyNodeSize();
2224         if (i==CmiMyRank()) continue;
2225         mempool_type *mptr = CpvAccessOther(mempool, i);
2226         CmiLock(mptr->mempoolLock);
2227         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2228         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2229             CmiUnlock(mptr->mempoolLock);
2230             continue;
2231         }
2232         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2233         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2234             /* search in the free list */
2235           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2236           mempool_header *current = free_header;
2237           while (current) {
2238             if (current->next_free == (char*)header-(char*)mptr) break;
2239             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2240           }
2241           if (current == NULL) {         /* not found in free list */
2242             CmiUnlock(mptr->mempoolLock);
2243             continue;
2244           }
2245 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2246             /* search the previous memblock, and remove the tail */
2247           mempool_block *ptr = (mempool_block *)mptr;
2248           while (ptr) {
2249             if (ptr->memblock_next == mptr->memblock_tail) break;
2250             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2251           }
2252           CmiAssert(ptr!=NULL);
2253           ptr->memblock_next = 0;
2254           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2255
2256             /* remove memblock from the free list */
2257           current->next_free = header->next_free;
2258           if (header == free_header) mptr->freelist_head = header->next_free;
2259
2260           CmiUnlock(mptr->mempoolLock);
2261
2262           pool = (void*)tail;
2263           *mem_hndl = tail->mem_hndl;
2264           *size = tail->size;
2265           return pool;
2266         }
2267         CmiUnlock(mptr->mempoolLock);
2268     }
2269
2270       /* steal failed, deregister and free memblock now */
2271     int freed = 0;
2272     for (k=0; k<CmiMyNodeSize()+1; k++) {
2273         i = (CmiMyRank()+k)%CmiMyNodeSize();
2274         mempool_type *mptr = CpvAccessOther(mempool, i);
2275         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2276
2277         mempool_block *mempools_head = &(mptr->mempools_head);
2278         mempool_block *current = mempools_head;
2279         mempool_block *prev = NULL;
2280
2281         while (current) {
2282           int isfree = 0;
2283           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2284 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2285           mempool_header *cur = free_header;
2286           mempool_header *header;
2287           if (current != mempools_head) {
2288             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2289              /* search in free list */
2290             if (header->size == current->size - sizeof(mempool_block)) {
2291               cur = free_header;
2292               while (cur) {
2293                 if (cur->next_free == (char*)header-(char*)mptr) break;
2294                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2295               }
2296               if (cur != NULL) isfree = 1;
2297             }
2298           }
2299           if (isfree) {
2300               /* remove from free list */
2301             cur->next_free = header->next_free;
2302             if (header == free_header) mptr->freelist_head = header->next_free;
2303              // deregister
2304             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh);
2305             GNI_RC_CHECK("steal Mempool de-register", status);
2306             mempool_block *ptr = current;
2307             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2308             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2309 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2310             freed += ptr->size;
2311             free(ptr);
2312              // try now
2313             if (freed > *size) {
2314               if (pool == NULL) {
2315                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2316                 CmiAssert(ret == 0);
2317               }
2318               status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh);
2319               if (status == GNI_RC_SUCCESS) {
2320                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2321 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2322                 return pool;
2323               }
2324 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2325             }
2326           }
2327           else {
2328              prev = current;
2329              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2330           }
2331         }
2332
2333         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2334     }
2335       /* still no luck registering pool */
2336     if (pool) free(pool);
2337     return NULL;
2338 }
2339 #endif
2340 static long long int total_mempool_size = 0;
2341 static long long int total_mempool_calls = 0;
2342 #if USE_LRTS_MEMPOOL
2343 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2344 {
2345     void *pool;
2346     int ret;
2347
2348     int default_size =  expand_flag? _expand_mem : _mempool_size;
2349     if (*size < default_size) *size = default_size;
2350     total_mempool_size += *size;
2351     total_mempool_calls += 1;
2352     ret = posix_memalign(&pool, ALIGNBUF, *size);
2353     if (ret != 0) {
2354 #if CMK_SMP && STEAL_MEMPOOL
2355       pool = steal_mempool_block(size, mem_hndl);
2356       if (pool != NULL) return pool;
2357 #endif
2358       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2359       if (ret == ENOMEM)
2360         CmiAbort("alloc_mempool_block: out of memory.");
2361       else
2362         CmiAbort("alloc_mempool_block: posix_memalign failed");
2363     }
2364     /*
2365     gni_return_t status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh);
2366 #if CMK_SMP && STEAL_MEMPOOL
2367     if(expand_flag && status != GNI_RC_SUCCESS) {
2368       free(pool);
2369       pool = steal_mempool_block(size, mem_hndl);
2370       if (pool != NULL) status = GNI_RC_SUCCESS;
2371     }
2372 #endif
2373 */
2374   //  if(status != GNI_RC_SUCCESS)
2375     {
2376         //printf("[%d] Charm++> too much registering memory of %d bytes: Please try to use large page (module load craype-hugepages8m) or contact charm++ developer for help.[%lld, %lld]\n", CmiMyPe(), *size, total_mempool_size, total_mempool_calls);
2377         SetMemHndlZero((*mem_hndl));
2378     
2379    }
2380     //if(expand_flag)
2381     //    printf("[%d] Alloc more Memory pool of %d bytes: [mempool=%lld, calls=%lld]\n", CmiMyPe(), *size, total_mempool_size, total_mempool_calls);
2382     return pool;
2383 }
2384
2385 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2386 {
2387     if(!(IsMemHndlZero(mem_hndl)))
2388     {
2389         gni_return_t status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2390         GNI_RC_CHECK("free_mempool_block Mempool de-register", status);
2391     }
2392     free(ptr);
2393 }
2394 #endif
2395
2396 void LrtsPreCommonInit(int everReturn){
2397 #if USE_LRTS_MEMPOOL
2398     CpvInitialize(mempool_type*, mempool);
2399     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block);
2400 #endif
2401 }
2402
2403
2404 FILE *debugLog = NULL;
2405 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2406 {
2407     register int            i;
2408     int                     rc;
2409     int                     device_id = 0;
2410     unsigned int            remote_addr;
2411     gni_cdm_handle_t        cdm_hndl;
2412     gni_return_t            status = GNI_RC_SUCCESS;
2413     uint32_t                vmdh_index = -1;
2414     uint8_t                 ptag;
2415     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2416     int                     first_spawned;
2417     int                     physicalID;
2418     char                   *env;
2419
2420     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2421     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2422     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2423    
2424     status = PMI_Init(&first_spawned);
2425     GNI_RC_CHECK("PMI_Init", status);
2426
2427     status = PMI_Get_size(&mysize);
2428     GNI_RC_CHECK("PMI_Getsize", status);
2429
2430     status = PMI_Get_rank(&myrank);
2431     GNI_RC_CHECK("PMI_getrank", status);
2432
2433     //physicalID = CmiPhysicalNodeID(myrank);
2434     
2435     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2436
2437     *myNodeID = myrank;
2438     *numNodes = mysize;
2439   
2440 #if !COMM_THREAD_SEND
2441     /* Currently, we only consider the case that comm. thread will only recv msgs */
2442     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2443 #endif
2444
2445     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2446     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2447     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2448
2449     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2450     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2451     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2452
2453     useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2454     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2455     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2456     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2457     
2458     if(myrank == 0)
2459     {
2460         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2461         printf("Charm++> %s SMSG\n", useDynamicSMSG?"Dynamic":"Static");
2462     }
2463 #ifdef USE_ONESIDED
2464     onesided_init(NULL, &onesided_hnd);
2465
2466     // this is a GNI test, so use the libonesided bypass functionality
2467     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2468     local_addr = gniGetNicAddress();
2469 #else
2470     ptag = get_ptag();
2471     cookie = get_cookie();
2472 #if 0
2473     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2474 #endif
2475     //Create and attach to the communication  domain */
2476     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2477     GNI_RC_CHECK("GNI_CdmCreate", status);
2478     //* device id The device id is the minor number for the device
2479     //that is assigned to the device by the system when the device is created.
2480     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2481     //where X is the device number 0 default 
2482     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2483     GNI_RC_CHECK("GNI_CdmAttach", status);
2484     local_addr = get_gni_nic_address(0);
2485 #endif
2486     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2487     _MEMCHECK(MPID_UGNI_AllAddr);
2488     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2489     /* create the local completion queue */
2490     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2491     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2492     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2493     
2494     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2495     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2496     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2497
2498     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2499     GNI_RC_CHECK("Create CQ (rx)", status);
2500     
2501     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2502     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2503     
2504     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2505     //GNI_RC_CHECK("Create BTE CQ", status);
2506
2507     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2508     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2509     _MEMCHECK(ep_hndl_array);
2510 #if CMK_SMP && !COMM_THREAD_SEND
2511     tx_cq_lock  = CmiCreateLock();
2512     rx_cq_lock  = CmiCreateLock();
2513 #endif
2514     for (i=0; i<mysize; i++) {
2515         if(i == myrank) continue;
2516         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2517         GNI_RC_CHECK("GNI_EpCreate ", status);   
2518         remote_addr = MPID_UGNI_AllAddr[i];
2519         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2520         GNI_RC_CHECK("GNI_EpBind ", status);   
2521     }
2522
2523     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2524     _init_smsg();
2525     PMI_Barrier();
2526
2527 #if     USE_LRTS_MEMPOOL
2528     char *str;
2529     str = getenv("CHARM_UGNI_MEMPOOL_SIZE");
2530     if (str) _mempool_size = CmiReadSize(env);
2531     if (CmiGetArgStringDesc(*argv,"+useMemorypoolSize",&str,"Set the memory pool size")) 
2532         _mempool_size = CmiReadSize(str);
2533
2534     if (myrank==0) printf("Charm++> memory pool size: %1.fMB\n", _mempool_size/1024.0/1024);
2535 #endif
2536
2537     /* init DMA buffer for medium message */
2538
2539     //_init_DMA_buffer();
2540     
2541     free(MPID_UGNI_AllAddr);
2542 #if CMK_SMP
2543     sendRdmaBuf = PCQueueCreate();
2544 #else
2545     sendRdmaBuf = 0;
2546 #endif
2547 }
2548
2549 void* LrtsAlloc(int n_bytes, int header)
2550 {
2551     void *ptr;
2552 #if 0
2553     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2554 #endif
2555     if(n_bytes <= SMSG_MAX_MSG)
2556     {
2557         int totalsize = n_bytes+header;
2558         ptr = malloc(totalsize);
2559     }
2560     else {
2561         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
2562 #if     USE_LRTS_MEMPOOL
2563         n_bytes = ALIGN64(n_bytes);
2564         if(n_bytes <= BIG_MSG)
2565         {
2566             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
2567             mempool_header* mh = ((mempool_header*)(res-sizeof(mempool_header)));
2568             block_header *bh = (block_header*)(mh->mempool_ptr);
2569             mem_handle_t hh=  bh->mem_hndl;
2570             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
2571         }else 
2572         {
2573             //printf("$$$$ [%d] Large message  %d\n", myrank, n_bytes); 
2574             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2575             ptr = res + ALIGNBUF - header;
2576
2577         }
2578 #else
2579         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
2580         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2581         ptr = res + ALIGNBUF - header;
2582 #endif
2583     }
2584 #if 0 
2585     printf("Done Alloc Lrts for bytes=%d, head=%d\n", n_bytes, header);
2586 #endif
2587     return ptr;
2588 }
2589
2590 void  LrtsFree(void *msg)
2591 {
2592     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
2593     if (size <= SMSG_MAX_MSG)
2594       free(msg);
2595     else if(size>BIG_MSG)
2596     {
2597         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2598
2599     }else
2600     {
2601 #if 0
2602         printf("[PE:%d] Free lrts for bytes=%d, ptr=%p\n", CmiMyPe(), size, (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2603 #endif
2604 #if     USE_LRTS_MEMPOOL
2605 #if CMK_SMP
2606         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2607 #else
2608         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2609 #endif
2610 #else
2611         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2612 #endif
2613     }
2614 #if 0 
2615     printf("Done Free lrts for bytes=%d\n", size);
2616 #endif
2617 }
2618
2619 void LrtsExit()
2620 {
2621     /* free memory ? */
2622 #if USE_LRTS_MEMPOOL
2623     mempool_destroy(CpvAccess(mempool));
2624 #endif
2625     PMI_Finalize();
2626     exit(0);
2627 }
2628
2629 void LrtsDrainResources()
2630 {
2631     if(mysize == 1) return;
2632     while (!SendBufferMsg()) {
2633         if (useDynamicSMSG)
2634             PumpDatagramConnection();
2635         PumpNetworkSmsg();
2636         //PumpNetworkRdmaMsgs();
2637         PumpLocalRdmaTransactions();
2638         SendRdmaMsg();
2639     }
2640     PMI_Barrier();
2641 }
2642
2643 void LrtsAbort(const char *message) {
2644     printf("CmiAbort is calling on PE:%d\n", myrank);
2645     CmiPrintStackTrace(0);
2646     PMI_Abort(-1, message);
2647 }
2648
2649 /**************************  TIMER FUNCTIONS **************************/
2650 #if CMK_TIMER_USE_SPECIAL
2651 /* MPI calls are not threadsafe, even the timer on some machines */
2652 static CmiNodeLock  timerLock = 0;
2653 static int _absoluteTime = 0;
2654 static int _is_global = 0;
2655 static struct timespec start_ts;
2656
2657 inline int CmiTimerIsSynchronized() {
2658     return 0;
2659 }
2660
2661 inline int CmiTimerAbsolute() {
2662     return _absoluteTime;
2663 }
2664
2665 double CmiStartTimer() {
2666     return 0.0;
2667 }
2668
2669 double CmiInitTime() {
2670     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
2671 }
2672
2673 void CmiTimerInit(char **argv) {
2674     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
2675     if (_absoluteTime && CmiMyPe() == 0)
2676         printf("Charm++> absolute  timer is used\n");
2677     
2678     _is_global = CmiTimerIsSynchronized();
2679
2680
2681     if (_is_global) {
2682         if (CmiMyRank() == 0) {
2683             clock_gettime(CLOCK_MONOTONIC, &start_ts);
2684         }
2685     } else { /* we don't have a synchronous timer, set our own start time */
2686         CmiBarrier();
2687         CmiBarrier();
2688         CmiBarrier();
2689         clock_gettime(CLOCK_MONOTONIC, &start_ts);
2690     }
2691     CmiNodeAllBarrier();          /* for smp */
2692 }
2693
2694 /**
2695  * Since the timerLock is never created, and is
2696  * always NULL, then all the if-condition inside
2697  * the timer functions could be disabled right
2698  * now in the case of SMP.
2699  */
2700 double CmiTimer(void) {
2701     struct timespec now_ts;
2702     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2703     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2704         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2705 }
2706
2707 double CmiWallTimer(void) {
2708     struct timespec now_ts;
2709     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2710     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2711         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
2712 }
2713
2714 double CmiCpuTimer(void) {
2715     struct timespec now_ts;
2716     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2717     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2718         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2719 }
2720
2721 #endif
2722 /************Barrier Related Functions****************/
2723
2724 int CmiBarrier()
2725 {
2726     int status;
2727
2728 #if CMK_SMP
2729     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
2730     CmiNodeAllBarrier();
2731     if (CmiMyRank() == CmiMyNodeSize())
2732 #else
2733     if (CmiMyRank() == 0)
2734 #endif
2735     {
2736         /**
2737          *  The call of CmiBarrier is usually before the initialization
2738          *  of trace module of Charm++, therefore, the START_EVENT
2739          *  and END_EVENT are disabled here. -Chao Mei
2740          */
2741         /*START_EVENT();*/
2742         status = PMI_Barrier();
2743         GNI_RC_CHECK("PMI_Barrier", status);
2744         /*END_EVENT(10);*/
2745     }
2746     CmiNodeAllBarrier();
2747     return status;
2748
2749 }
2750 #if CMK_DIRECT
2751 #include "machine-cmidirect.c"
2752 #endif
2753 #if CMK_PERSISTENT_COMM
2754 #include "machine-persistent.c"
2755 #endif
2756
2757