comment the change in PostNonLocal about adding tracing
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37
38 #include <gni_pub.h>
39 #include <pmi.h>
40 //#include <numatoolkit.h>
41
42 #include "converse.h"
43
44 #define     LARGEPAGE              0
45
46 #if LARGEPAGE
47 #include <hugetlbfs.h>
48 #endif
49
50 #if CMK_DIRECT
51 #include "cmidirect.h"
52 #endif
53
54 #if CMK_SMP
55 #define MULTI_THREAD_SEND          0
56 #define COMM_THREAD_SEND           1
57 #endif
58
59 #if CMK_SMP && COMM_THREAD_SEND
60 #define PIGGYBACK_ACK              0
61 #endif
62
63 #define CMI_EXERT_SEND_CAP      0
64 #define CMI_EXERT_RECV_CAP      0
65
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP 16
68 #endif
69
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP 2
72 #endif
73
74 #define USE_LRTS_MEMPOOL                  1
75
76 #define REMOTE_EVENT                      0
77
78 #define PRINT_SYH  0
79
80 // Trace communication thread
81 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
82 #define TRACE_THRESHOLD     0.00005
83 #define CMI_MPI_TRACE_MOREDETAILED 0
84 #undef CMI_MPI_TRACE_USEREVENTS
85 #define CMI_MPI_TRACE_USEREVENTS 1
86 #else
87 #undef CMK_SMP_TRACE_COMMTHREAD
88 #define CMK_SMP_TRACE_COMMTHREAD 0
89 #endif
90
91 #define CMK_TRACE_COMMOVERHEAD 0
92 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
93 #undef CMI_MPI_TRACE_USEREVENTS
94 #define CMI_MPI_TRACE_USEREVENTS 1
95 #else
96 #undef CMK_TRACE_COMMOVERHEAD
97 #define CMK_TRACE_COMMOVERHEAD 0
98 #endif
99
100 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
101 CpvStaticDeclare(double, projTraceStart);
102 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
103 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
104 #else
105 #define  START_EVENT()
106 #define  END_EVENT(x)
107 #endif
108
109 #if USE_LRTS_MEMPOOL
110
111 #define oneMB (1024ll*1024)
112 #define oneGB (1024ll*1024*1024)
113
114 static CmiInt8 _mempool_size = 8*oneMB;
115 static CmiInt8 _expand_mem =  4*oneMB;
116 static CmiInt8 _mempool_size_limit = 0;
117
118 static CmiInt8 _totalmem = 0.8*oneGB;
119
120 #if LARGEPAGE
121 static int BIG_MSG  =  16*oneMB;
122 static int ONE_SEG  =  4*oneMB;
123 #else
124 static int BIG_MSG  =  4*oneMB;
125 static int ONE_SEG  =  2*oneMB;
126 #endif
127 #if MULTI_THREAD_SEND
128 static int BIG_MSG_PIPELINE = 1;
129 #else
130 static int BIG_MSG_PIPELINE = 4;
131 #endif
132
133 // dynamic flow control
134 static CmiInt8 buffered_send_msg = 0;
135 static CmiInt8 register_memory_size = 0;
136
137 #if LARGEPAGE
138 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
139 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
140 static CmiInt8  register_count = 0;
141 #else
142 #if CMK_SMP && COMM_THREAD_SEND 
143 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
144 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
145 #else
146 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
147 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
148 #endif
149
150
151 #endif
152
153 #endif     /* end USE_LRTS_MEMPOOL */
154
155 #if MULTI_THREAD_SEND
156 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
157 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
158 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
159 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
160 #else
161 #define     CMI_GNI_LOCK
162 #define     CMI_GNI_UNLOCK
163 #define     CMI_PCQUEUEPOP_LOCK(Q)   
164 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
165 #endif
166
167 static int _tlbpagesize = 4096;
168
169 //static int _smpd_count  = 0;
170
171 static int   user_set_flag  = 0;
172
173 static int _checkProgress = 1;             /* check deadlock */
174 static int _detected_hang = 0;
175
176 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
177
178 // dynamic SMSG
179 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
180
181 static int avg_smsg_connection = 32;
182 static int                 *smsg_connected_flag= 0;
183 static gni_smsg_attr_t     **smsg_attr_vector_local;
184 static gni_smsg_attr_t     **smsg_attr_vector_remote;
185 static gni_ep_handle_t     ep_hndl_unbound;
186 static gni_smsg_attr_t     send_smsg_attr;
187 static gni_smsg_attr_t     recv_smsg_attr;
188
189 typedef struct _dynamic_smsg_mailbox{
190    void     *mailbox_base;
191    int      size;
192    int      offset;
193    gni_mem_handle_t  mem_hndl;
194    struct      _dynamic_smsg_mailbox  *next;
195 }dynamic_smsg_mailbox_t;
196
197 static dynamic_smsg_mailbox_t  *mailbox_list;
198
199 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
200 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
201
202 #if PRINT_SYH
203 int         lrts_send_msg_id = 0;
204 int         lrts_local_done_msg = 0;
205 int         lrts_send_rdma_success = 0;
206 #endif
207
208 #include "machine.h"
209
210 #include "pcqueue.h"
211
212 #include "mempool.h"
213
214 #if CMK_PERSISTENT_COMM
215 #include "machine-persistent.h"
216 #endif
217
218 //#define  USE_ONESIDED 1
219 #ifdef USE_ONESIDED
220 //onesided implementation is wrong, since no place to restore omdh
221 #include "onesided.h"
222 onesided_hnd_t   onesided_hnd;
223 onesided_md_t    omdh;
224 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
225
226 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
227
228 #else
229 uint8_t   onesided_hnd, omdh;
230 #if REMOTE_EVENT
231 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
232          status = GNI_RC_ERROR_NOMEM;} \
233         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
234                 if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
235 #else
236 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
237     do {   \
238         if (register_memory_size + size >= MAX_REG_MEM) { \
239             status = GNI_RC_ERROR_NOMEM; \
240         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
241             if(status == GNI_RC_SUCCESS) register_memory_size += size; } \
242     } while(0)
243 #endif
244 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
245     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
246              register_memory_size -= size; \
247          else CmiAbort("MEM_DEregister");  \
248     } while (0)
249 #endif
250
251 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
252 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
253 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
254 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
255 #define   IncreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)++
256 #define   DecreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)--
257 #define   IncreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)++
258 #define   DecreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)--
259 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
260 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
261 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
262 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
263 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
264 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
265
266 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
267 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
268
269 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
270 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
271 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
272 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
273
274 #define ALIGNBUF                64
275
276 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
277 /* If SMSG is not used */
278
279 #define FMA_PER_CORE  1024
280 #define FMA_BUFFER_SIZE 1024
281
282 /* If SMSG is used */
283 static int  SMSG_MAX_MSG = 1024;
284 #define SMSG_MAX_CREDIT 72 
285
286 #define MSGQ_MAXSIZE       2048
287 /* large message transfer with FMA or BTE */
288 #define LRTS_GNI_RDMA_THRESHOLD  1024 
289
290 #if CMK_SMP
291 static int  REMOTE_QUEUE_ENTRIES=163840; 
292 static int LOCAL_QUEUE_ENTRIES=163840; 
293 #else
294 static int  REMOTE_QUEUE_ENTRIES=20480;
295 static int LOCAL_QUEUE_ENTRIES=20480; 
296 #endif
297
298 #define BIG_MSG_TAG             0x26
299 #define PUT_DONE_TAG            0x28
300 #define DIRECT_PUT_DONE_TAG     0x29
301 #define ACK_TAG                 0x30
302 /* SMSG is data message */
303 #define SMALL_DATA_TAG          0x31
304 #define SMALL_DATA_ACK_TAG      0x32
305 /* SMSG is a control message to initialize a BTE */
306 #define LMSG_INIT_TAG           0x39 
307 #define LMSG_INIT_ACK_TAG       0x3a 
308
309 #define DEBUG
310 #ifdef GNI_RC_CHECK
311 #undef GNI_RC_CHECK
312 #endif
313 #ifdef DEBUG
314 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
315 #else
316 #define GNI_RC_CHECK(msg,rc)
317 #endif
318
319 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
320 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
321 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
322
323 static int useStaticMSGQ = 0;
324 static int useStaticFMA = 0;
325 static int mysize, myrank;
326 static gni_nic_handle_t   nic_hndl;
327
328 typedef struct {
329     gni_mem_handle_t mdh;
330     uint64_t addr;
331 } mdh_addr_t ;
332 // this is related to dynamic SMSG
333
334 typedef struct mdh_addr_list{
335     gni_mem_handle_t mdh;
336    void *addr;
337     struct mdh_addr_list *next;
338 }mdh_addr_list_t;
339
340 static unsigned int         smsg_memlen;
341 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
342 mdh_addr_t          setup_mem;
343 mdh_addr_t          *smsg_connection_vec = 0;
344 gni_mem_handle_t    smsg_connection_memhndl;
345 static int          smsg_expand_slots = 10;
346 static int          smsg_available_slot = 0;
347 static void         *smsg_mailbox_mempool = 0;
348 mdh_addr_list_t     *smsg_dynamic_list = 0;
349
350 static void             *smsg_mailbox_base;
351 gni_msgq_attr_t         msgq_attrs;
352 gni_msgq_handle_t       msgq_handle;
353 gni_msgq_ep_attr_t      msgq_ep_attrs;
354 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
355
356 /* =====Beginning of Declarations of Machine Specific Variables===== */
357 static int cookie;
358 static int modes = 0;
359 static gni_cq_handle_t       smsg_rx_cqh = NULL;
360 static gni_cq_handle_t       smsg_tx_cqh = NULL;
361 static gni_cq_handle_t       post_rx_cqh = NULL;
362 static gni_cq_handle_t       post_tx_cqh = NULL;
363 static gni_ep_handle_t       *ep_hndl_array;
364 #if MULTI_THREAD_SEND
365 static CmiNodeLock           *ep_lock_array;
366 static CmiNodeLock           tx_cq_lock; 
367 static CmiNodeLock           rx_cq_lock;
368 static CmiNodeLock           *mempool_lock;
369 #endif
370
371 typedef struct msg_list
372 {
373     uint32_t destNode;
374     uint32_t size;
375     void *msg;
376     uint8_t tag;
377 #if !CMK_SMP
378     struct msg_list *next;
379 #endif
380 }MSG_LIST;
381
382
383 typedef struct control_msg
384 {
385     uint64_t            source_addr;    /* address from the start of buffer  */
386     uint64_t            dest_addr;      /* address from the start of buffer */
387     int                 total_length;   /* total length */
388     int                 length;         /* length of this packet */
389     uint8_t             seq_id;         //big message   0 meaning single message
390     gni_mem_handle_t    source_mem_hndl;
391     struct control_msg *next;
392 } CONTROL_MSG;
393
394 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
395
396 typedef struct ack_msg
397 {
398     uint64_t            source_addr;    /* address from the start of buffer  */
399 #if ! USE_LRTS_MEMPOOL
400     gni_mem_handle_t    source_mem_hndl;
401     int                 length;          /* total length */
402 #endif
403     struct ack_msg     *next;
404 } ACK_MSG;
405
406 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
407
408 #if CMK_DIRECT
409 typedef struct{
410     uint64_t    handler_addr;
411 }CMK_DIRECT_HEADER;
412
413 typedef struct {
414     char core[CmiMsgHeaderSizeBytes];
415     uint64_t handler;
416 }cmidirectMsg;
417
418 //SYH
419 CpvDeclare(int, CmiHandleDirectIdx);
420 void CmiHandleDirectMsg(cmidirectMsg* msg)
421 {
422
423     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
424    (*(_handle->callbackFnPtr))(_handle->callbackData);
425    CmiFree(msg);
426 }
427
428 void CmiDirectInit()
429 {
430     CpvInitialize(int,  CmiHandleDirectIdx);
431     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
432 }
433
434 #endif
435 typedef struct  rmda_msg
436 {
437     int                   destNode;
438     gni_post_descriptor_t *pd;
439 #if !CMK_SMP
440     struct  rmda_msg      *next;
441 #endif
442 }RDMA_REQUEST;
443
444
445 #if CMK_SMP
446 #define SMP_LOCKS               0
447 #define ONE_SEND_QUEUE                  0
448 PCQueue sendRdmaBuf;
449 typedef struct  msg_list_index
450 {
451     PCQueue     sendSmsgBuf;
452     int         pushed;
453     CmiNodeLock   lock;
454 } MSG_LIST_INDEX;
455 char                *destpe_avail;
456 #if  !ONE_SEND_QUEUE && SMP_LOCKS
457     PCQueue     nonEmptyQueues;
458 #endif
459 #else         /* non-smp */
460
461 static RDMA_REQUEST        *sendRdmaBuf = 0;
462 static RDMA_REQUEST        *sendRdmaTail = 0;
463 typedef struct  msg_list_index
464 {
465     int         next;
466     MSG_LIST    *sendSmsgBuf;
467     MSG_LIST    *tail;
468 } MSG_LIST_INDEX;
469
470 #endif
471
472 // buffered send queue
473 #if ! ONE_SEND_QUEUE
474 typedef struct smsg_queue
475 {
476     MSG_LIST_INDEX   *smsg_msglist_index;
477     int               smsg_head_index;
478 } SMSG_QUEUE;
479 #else
480 typedef struct smsg_queue
481 {
482     PCQueue       sendMsgBuf;
483 }  SMSG_QUEUE;
484 #endif
485
486 SMSG_QUEUE                  smsg_queue;
487 #if PIGGYBACK_ACK
488 SMSG_QUEUE                  smsg_ack_queue;
489 #endif
490 #if CMK_USE_OOB
491 SMSG_QUEUE                  smsg_oob_queue;
492 #endif
493
494 #if CMK_SMP
495
496 #define FreeMsgList(d)   free(d);
497 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
498
499 #else
500
501 static MSG_LIST       *msglist_freelist=0;
502
503 #define FreeMsgList(d)  \
504   do { \
505   (d)->next = msglist_freelist;\
506   msglist_freelist = d; \
507   } while (0)
508
509 #define MallocMsgList(d) \
510   do {  \
511   d = msglist_freelist;\
512   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
513              _MEMCHECK(d);\
514   } else msglist_freelist = d->next; \
515   d->next =0;  \
516   } while (0)
517
518 #endif
519
520 #if CMK_SMP
521
522 #define FreeControlMsg(d)      free(d);
523 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
524
525 #else
526
527 static CONTROL_MSG    *control_freelist=0;
528
529 #define FreeControlMsg(d)       \
530   do { \
531   (d)->next = control_freelist;\
532   control_freelist = d; \
533   } while (0);
534
535 #define MallocControlMsg(d) \
536   do {  \
537   d = control_freelist;\
538   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
539              _MEMCHECK(d);\
540   } else control_freelist = d->next;  \
541   } while (0);
542
543 #endif
544
545 #if CMK_SMP
546
547 #define FreeAckMsg(d)      free(d);
548 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
549
550 #else
551
552 static ACK_MSG        *ack_freelist=0;
553
554 #define FreeAckMsg(d)       \
555   do { \
556   (d)->next = ack_freelist;\
557   ack_freelist = d; \
558   } while (0)
559
560 #define MallocAckMsg(d) \
561   do { \
562   d = ack_freelist;\
563   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
564              _MEMCHECK(d);\
565   } else ack_freelist = d->next; \
566   } while (0)
567
568 #endif
569
570
571 # if CMK_SMP
572 #define FreeRdmaRequest(d)       free(d);
573 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
574 #else
575
576 static RDMA_REQUEST         *rdma_freelist = NULL;
577
578 #define FreeRdmaRequest(d)       \
579   do {  \
580   (d)->next = rdma_freelist;\
581   rdma_freelist = d;    \
582   } while (0)
583
584 #define MallocRdmaRequest(d) \
585   do {   \
586   d = rdma_freelist;\
587   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
588              _MEMCHECK(d);\
589   } else rdma_freelist = d->next; \
590   d->next =0;   \
591   } while (0)
592 #endif
593
594 /* reuse gni_post_descriptor_t */
595 static gni_post_descriptor_t *post_freelist=0;
596
597 #if  !CMK_SMP
598 #define FreePostDesc(d)       \
599     (d)->next_descr = post_freelist;\
600     post_freelist = d;
601
602 #define MallocPostDesc(d) \
603   d = post_freelist;\
604   if (d==0) { \
605      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
606      d->next_descr = 0;\
607       _MEMCHECK(d);\
608   } else post_freelist = d->next_descr;
609 #else
610
611 #define FreePostDesc(d)     free(d);
612 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
613
614 #endif
615
616
617 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
618 static int      buffered_smsg_counter = 0;
619
620 /* SmsgSend return success but message sent is not confirmed by remote side */
621 static MSG_LIST *buffered_fma_head = 0;
622 static MSG_LIST *buffered_fma_tail = 0;
623
624 /* functions  */
625 #define IsFree(a,ind)  !( a& (1<<(ind) ))
626 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
627 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
628
629 CpvDeclare(mempool_type*, mempool);
630
631 #if CMK_WITH_STATS
632 typedef struct comm_thread_stats
633 {
634 int      count_in_send_buffered_ack;
635 double   time_in_send_buffered_ack;
636 double   max_time_in_send_buffered_ack;
637 int      count_in_send_buffered_smsg;
638 double   time_in_send_buffered_smsg;
639 double   max_time_in_send_buffered_smsg;
640 } Comm_Thread_Stats;
641
642 static Comm_Thread_Stats   comm_stats;
643
644 static void init_comm_stats()
645 {
646   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
647 }
648
649 #define STATS_ACK_TIME(x)   \
650         { double t = CmiWallTimer(); \
651           x;        \
652           t = CmiWallTimer() - t;          \
653           comm_stats.count_in_send_buffered_ack ++;        \
654           comm_stats.time_in_send_buffered_ack += t;   \
655           if (t>comm_stats.max_time_in_send_buffered_ack)      \
656               comm_stats.max_time_in_send_buffered_ack = t;    \
657         }
658
659 static void print_comm_stats()
660 {
661     printf("PE[%d]  count/time in send buffered ack:   %d %f\n",  myrank, comm_stats.count_in_send_buffered_ack, comm_stats.time_in_send_buffered_ack);
662     printf("PE[%d]  max time in send buffered ack:     %f\n",  myrank, comm_stats.max_time_in_send_buffered_ack);
663 }
664 #else
665 #define STATS_ACK_TIME(x)            x
666 #endif
667
668 static int print_stats = 0;
669
670 static void
671 allgather(void *in,void *out, int len)
672 {
673     static int *ivec_ptr=NULL,already_called=0,job_size=0;
674     int i,rc;
675     int my_rank;
676     char *tmp_buf,*out_ptr;
677
678     if(!already_called) {
679
680         rc = PMI_Get_size(&job_size);
681         CmiAssert(rc == PMI_SUCCESS);
682         rc = PMI_Get_rank(&my_rank);
683         CmiAssert(rc == PMI_SUCCESS);
684
685         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
686         CmiAssert(ivec_ptr != NULL);
687
688         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
689         CmiAssert(rc == PMI_SUCCESS);
690
691         already_called = 1;
692
693     }
694
695     tmp_buf = (char *)malloc(job_size * len);
696     CmiAssert(tmp_buf);
697
698     rc = PMI_Allgather(in,tmp_buf,len);
699     CmiAssert(rc == PMI_SUCCESS);
700
701     out_ptr = out;
702
703     for(i=0;i<job_size;i++) {
704
705         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
706
707     }
708
709     free(tmp_buf);
710 }
711
712 static void
713 allgather_2(void *in,void *out, int len)
714 {
715     //PMI_Allgather is out of order
716     int i,rc, extend_len;
717     int  rank_index;
718     char *out_ptr, *out_ref;
719     char *in2;
720
721     extend_len = sizeof(int) + len;
722     in2 = (char*)malloc(extend_len);
723
724     memcpy(in2, &myrank, sizeof(int));
725     memcpy(in2+sizeof(int), in, len);
726
727     out_ptr = (char*)malloc(mysize*extend_len);
728
729     rc = PMI_Allgather(in2, out_ptr, extend_len);
730     GNI_RC_CHECK("allgather", rc);
731
732     out_ref = out;
733
734     for(i=0;i<mysize;i++) {
735         //rank index 
736         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
737         //copy to the rank index slot
738         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
739     }
740
741     free(out_ptr);
742     free(in2);
743
744 }
745
746 static unsigned int get_gni_nic_address(int device_id)
747 {
748     unsigned int address, cpu_id;
749     gni_return_t status;
750     int i, alps_dev_id=-1,alps_address=-1;
751     char *token, *p_ptr;
752
753     p_ptr = getenv("PMI_GNI_DEV_ID");
754     if (!p_ptr) {
755         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
756        
757         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
758     } else {
759         while ((token = strtok(p_ptr,":")) != NULL) {
760             alps_dev_id = atoi(token);
761             if (alps_dev_id == device_id) {
762                 break;
763             }
764             p_ptr = NULL;
765         }
766         CmiAssert(alps_dev_id != -1);
767         p_ptr = getenv("PMI_GNI_LOC_ADDR");
768         CmiAssert(p_ptr != NULL);
769         i = 0;
770         while ((token = strtok(p_ptr,":")) != NULL) {
771             if (i == alps_dev_id) {
772                 alps_address = atoi(token);
773                 break;
774             }
775             p_ptr = NULL;
776             ++i;
777         }
778         CmiAssert(alps_address != -1);
779         address = alps_address;
780     }
781     return address;
782 }
783
784 static uint8_t get_ptag(void)
785 {
786     char *p_ptr, *token;
787     uint8_t ptag;
788
789     p_ptr = getenv("PMI_GNI_PTAG");
790     CmiAssert(p_ptr != NULL);
791     token = strtok(p_ptr, ":");
792     ptag = (uint8_t)atoi(token);
793     return ptag;
794         
795 }
796
797 static uint32_t get_cookie(void)
798 {
799     uint32_t cookie;
800     char *p_ptr, *token;
801
802     p_ptr = getenv("PMI_GNI_COOKIE");
803     CmiAssert(p_ptr != NULL);
804     token = strtok(p_ptr, ":");
805     cookie = (uint32_t)atoi(token);
806
807     return cookie;
808 }
809
810 #if LARGEPAGE
811
812 /* directly mmap memory from hugetlbfs for large pages */
813
814 #include <sys/stat.h>
815 #include <fcntl.h>
816 #include <sys/mman.h>
817
818 // size must be _tlbpagesize aligned
819 void *my_get_huge_pages(size_t size)
820 {
821     char filename[512];
822     int fd;
823     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
824     void *ptr = NULL;
825
826     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
827     fd = open(filename, O_RDWR | O_CREAT, mode);
828     if (fd == -1) {
829         CmiAbort("my_get_huge_pages: open filed");
830     }
831     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
832     if (ptr == MAP_FAILED) ptr = NULL;
833 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
834     close(fd);
835     unlink(filename);
836     return ptr;
837 }
838
839 void my_free_huge_pages(void *ptr, int size)
840 {
841 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
842     int ret = munmap(ptr, size);
843     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
844 }
845
846 #endif
847
848 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
849 /* TODO: add any that are related */
850 /* =====End of Definitions of Message-Corruption Related Macros=====*/
851
852
853 #include "machine-lrts.h"
854 #include "machine-common-core.c"
855
856 /* Network progress function is used to poll the network when for
857    messages. This flushes receive buffers on some  implementations*/
858 #if CMK_MACHINE_PROGRESS_DEFINED
859 void CmiMachineProgressImpl() {
860 }
861 #endif
862
863 static void SendRdmaMsg();
864 static void PumpNetworkSmsg();
865 static void PumpLocalRdmaTransactions();
866 static int SendBufferMsg(SMSG_QUEUE *queue);
867
868 #if MACHINE_DEBUG_LOG
869 FILE *debugLog = NULL;
870 static CmiInt8 buffered_recv_msg = 0;
871 int         lrts_smsg_success = 0;
872 int         lrts_received_msg = 0;
873 #endif
874
875 static void sweep_mempool(mempool_type *mptr)
876 {
877     int n = 0;
878     block_header *current = &(mptr->block_head);
879
880     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
881     while( current!= NULL) {
882         printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
883         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
884     }
885     printf("[n %d] sweep_mempool slot END.\n", myrank);
886 }
887
888 inline
889 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
890 {
891     block_header *current = *from;
892
893     //while(register_memory_size>= MAX_REG_MEM)
894     //{
895         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
896             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
897
898         *from = current;
899         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
900         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
901         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
902     //}
903     return GNI_RC_SUCCESS;
904 }
905
906 inline 
907 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl)
908 {
909     gni_return_t status = GNI_RC_SUCCESS;
910     //int size = GetMempoolsize(msg);
911     //void *blockaddr = GetMempoolBlockPtr(msg);
912     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
913    
914     block_header *current = &(mptr->block_head);
915     while(register_memory_size>= MAX_REG_MEM)
916     {
917         status = deregisterMemory(mptr, &current);
918         if (status != GNI_RC_SUCCESS) break;
919     }
920     if(register_memory_size>= MAX_REG_MEM) return status;
921
922     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
923     while(1)
924     {
925         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
926         if(status == GNI_RC_SUCCESS)
927         {
928             break;
929         }
930         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
931         {
932             CmiAbort("Memory registor for mempool fails\n");
933         }
934         else
935         {
936             status = deregisterMemory(mptr, &current);
937             if (status != GNI_RC_SUCCESS) break;
938         }
939     }; 
940     return status;
941 }
942
943 inline 
944 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t)
945 {
946     static int rank = -1;
947     int i;
948     gni_return_t status;
949     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
950     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
951     mempool_type *mptr;
952
953     status = registerFromMempool(mptr1, msg, size, t);
954     if (status == GNI_RC_SUCCESS) return status;
955 #if CMK_SMP 
956     for (i=0; i<CmiMyNodeSize()+1; i++) {
957       rank = (rank+1)%(CmiMyNodeSize()+1);
958       mptr = CpvAccessOther(mempool, rank);
959       if (mptr == mptr1) continue;
960       status = registerFromMempool(mptr, msg, size, t);
961       if (status == GNI_RC_SUCCESS) return status;
962     }
963 #endif
964     return  GNI_RC_ERROR_RESOURCE;
965 }
966
967 inline
968 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
969 {
970     MSG_LIST        *msg_tmp;
971     MallocMsgList(msg_tmp);
972     msg_tmp->destNode = destNode;
973     msg_tmp->size   = size;
974     msg_tmp->msg    = msg;
975     msg_tmp->tag    = tag;
976
977 #if !CMK_SMP
978     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
979         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
980         queue->smsg_head_index = destNode;
981         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
982     }else
983     {
984         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
985         queue->smsg_msglist_index[destNode].tail = msg_tmp;
986     }
987 #else
988 #if ONE_SEND_QUEUE
989     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
990 #else
991 #if SMP_LOCKS
992     CmiLock(queue->smsg_msglist_index[destNode].lock);
993     if(queue->smsg_msglist_index[destNode].pushed == 0)
994     {
995         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
996     }
997     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
998     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
999 #else
1000     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1001 #endif
1002 #endif
1003 #endif
1004 #if PRINT_SYH
1005     buffered_smsg_counter++;
1006 #endif
1007 }
1008
1009 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1010 {
1011     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1012 }
1013
1014 inline
1015 static void setup_smsg_connection(int destNode)
1016 {
1017     mdh_addr_list_t  *new_entry = 0;
1018     gni_post_descriptor_t *pd;
1019     gni_smsg_attr_t      *smsg_attr;
1020     gni_return_t status = GNI_RC_NOT_DONE;
1021     RDMA_REQUEST        *rdma_request_msg;
1022     
1023     if(smsg_available_slot == smsg_expand_slots)
1024     {
1025         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1026         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1027         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1028
1029         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1030             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1031             GNI_MEM_READWRITE,   
1032             -1,
1033             &(new_entry->mdh));
1034         smsg_available_slot = 0; 
1035         new_entry->next = smsg_dynamic_list;
1036         smsg_dynamic_list = new_entry;
1037     }
1038     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1039     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1040     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1041     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1042     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1043     smsg_attr->buff_size = smsg_memlen;
1044     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1045     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1046     smsg_local_attr_vec[destNode] = smsg_attr;
1047     smsg_available_slot++;
1048     MallocPostDesc(pd);
1049     pd->type            = GNI_POST_FMA_PUT;
1050     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1051     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1052     pd->length          = sizeof(gni_smsg_attr_t);
1053     pd->local_addr      = (uint64_t) smsg_attr;
1054     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1055     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1056     pd->src_cq_hndl     = 0;
1057     pd->rdma_mode       = 0;
1058     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1059     print_smsg_attr(smsg_attr);
1060     if(status == GNI_RC_ERROR_RESOURCE )
1061     {
1062         MallocRdmaRequest(rdma_request_msg);
1063         rdma_request_msg->destNode = destNode;
1064         rdma_request_msg->pd = pd;
1065         /* buffer this request */
1066     }
1067 #if PRINT_SYH
1068     if(status != GNI_RC_SUCCESS)
1069        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1070     else
1071         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1072 #endif
1073 }
1074
1075 /* useDynamicSMSG */
1076 inline 
1077 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1078 {
1079     gni_return_t status = GNI_RC_NOT_DONE;
1080
1081     if(mailbox_list->offset == mailbox_list->size)
1082     {
1083         dynamic_smsg_mailbox_t *new_mailbox_entry;
1084         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1085         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1086         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1087         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1088         new_mailbox_entry->offset = 0;
1089         
1090         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1091             new_mailbox_entry->size, smsg_rx_cqh,
1092             GNI_MEM_READWRITE,   
1093             -1,
1094             &(new_mailbox_entry->mem_hndl));
1095
1096         GNI_RC_CHECK("register", status);
1097         new_mailbox_entry->next = mailbox_list;
1098         mailbox_list = new_mailbox_entry;
1099     }
1100     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1101     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1102     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1103     local_smsg_attr->mbox_offset = mailbox_list->offset;
1104     mailbox_list->offset += smsg_memlen;
1105     local_smsg_attr->buff_size = smsg_memlen;
1106     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1107     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1108 }
1109
1110 /* useDynamicSMSG */
1111 inline 
1112 static int connect_to(int destNode)
1113 {
1114     gni_return_t status = GNI_RC_NOT_DONE;
1115     CmiAssert(smsg_connected_flag[destNode] == 0);
1116     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1117     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1118     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1119     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1120     
1121     CMI_GNI_LOCK
1122     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1123     CMI_GNI_UNLOCK
1124     if (status == GNI_RC_ERROR_RESOURCE) {
1125       /* possibly destNode is making connection at the same time */
1126       free(smsg_attr_vector_local[destNode]);
1127       smsg_attr_vector_local[destNode] = NULL;
1128       free(smsg_attr_vector_remote[destNode]);
1129       smsg_attr_vector_remote[destNode] = NULL;
1130       mailbox_list->offset -= smsg_memlen;
1131       return 0;
1132     }
1133     GNI_RC_CHECK("GNI_Post", status);
1134     smsg_connected_flag[destNode] = 1;
1135     return 1;
1136 }
1137
1138 #if PIGGYBACK_ACK
1139 static void * piggyback_ack(int destNode, int msgsize, int *count)
1140 {
1141     int i;
1142     if (PCQueueEmpty(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf)) return NULL;
1143     int len = PCQueueLength(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
1144     int piggycount = (SMSG_MAX_MSG - msgsize)/sizeof(uint64_t);
1145     if (piggycount > len+1) piggycount = len + 1;
1146     if (piggycount <= 5) return NULL;
1147     uint64_t * buf = (uint64_t*)CmiTmpAlloc(piggycount * sizeof(uint64_t));
1148     CmiAssert(buf != NULL);
1149 //printf("[%d] piggyback_ack: %d\n", myrank, piggycount);
1150     for (i=0; i<piggycount-1; i++) {
1151         CMI_PCQUEUEPOP_LOCK(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf)
1152         MSG_LIST *ptr = (MSG_LIST*)PCQueuePop(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
1153         CMI_PCQUEUEPOP_UNLOCK(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf)
1154         ACK_MSG *msg = ptr->msg;
1155         buf[i+1] = msg->source_addr;
1156         FreeAckMsg(msg);
1157         FreeMsgList(ptr);
1158     }
1159     buf[0] = i;
1160     *count = i + 1;
1161     return buf;
1162 }
1163
1164
1165 static void piggyback_ack_done(int destNode, uint64_t *buf, int done)
1166 {
1167     if (!done)
1168     {
1169         int i;
1170         for (i=0; i<buf[0]; i++) {
1171             MSG_LIST *msg_tmp;
1172             MallocMsgList(msg_tmp);
1173             ACK_MSG  *ack_msg;
1174             MallocAckMsg(ack_msg);
1175             ack_msg->source_addr = buf[i+1];
1176             msg_tmp->size = ACK_MSG_SIZE;
1177             msg_tmp->msg = ack_msg;
1178             msg_tmp->tag = ACK_TAG;
1179             msg_tmp->destNode = destNode;
1180             PCQueuePush(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1181         }
1182     }
1183     CmiTmpFree(buf);
1184 }
1185 #endif
1186
1187 inline 
1188 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
1189 {
1190     unsigned int          remote_address;
1191     uint32_t              remote_id;
1192     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1193     gni_smsg_attr_t       *smsg_attr;
1194     gni_post_descriptor_t *pd;
1195     gni_post_state_t      post_state;
1196     char                  *real_data; 
1197
1198     if (useDynamicSMSG) {
1199         switch (smsg_connected_flag[destNode]) {
1200         case 0: 
1201             connect_to(destNode);         /* continue to case 1 */
1202         case 1:                           /* pending connection, do nothing */
1203             status = GNI_RC_NOT_DONE;
1204             if(inbuff ==0)
1205                 buffer_small_msgs(queue, msg, size, destNode, tag);
1206             return status;
1207         }
1208     }
1209 #if CMK_SMP
1210 #if ! ONE_SEND_QUEUE
1211     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1212 #endif
1213     {
1214 #else
1215     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1216     {
1217 #endif
1218         uint64_t *buf = NULL;
1219         int bufsize = 0;
1220 #if PIGGYBACK_ACK
1221         if (tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG) {
1222             int nack = 0;
1223             buf = piggyback_ack(destNode, size, &nack);
1224             if (buf) {
1225                 tag = (tag == SMALL_DATA_TAG) ? SMALL_DATA_ACK_TAG : LMSG_INIT_ACK_TAG;
1226                 bufsize = nack * sizeof(uint64_t);
1227             }
1228         }
1229 #endif
1230         CMI_GNI_LOCK
1231 #if CMK_SMP_TRACE_COMMTHREAD
1232         int oldpe = -1;
1233         int oldeventid = -1;
1234         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1235         { 
1236             START_EVENT();
1237             if ( tag == SMALL_DATA_TAG)
1238                 real_data = (char*)msg; 
1239             else 
1240                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1241             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1242             TRACE_COMM_SET_COMM_MSGID(real_data);
1243         }
1244 #endif
1245         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], buf, bufsize, msg, size, 0, tag);
1246 #if CMK_SMP_TRACE_COMMTHREAD
1247         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1248 #endif
1249         CMI_GNI_UNLOCK
1250         if(status == GNI_RC_SUCCESS)
1251         {
1252 #if CMK_SMP_TRACE_COMMTHREAD
1253             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == SMALL_DATA_ACK_TAG || tag == LMSG_INIT_ACK_TAG)
1254             { 
1255                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1256             }
1257 #endif
1258             smsg_send_count ++;
1259         }else
1260             status = GNI_RC_ERROR_RESOURCE;
1261 #if PIGGYBACK_ACK
1262         if (buf) {
1263             piggyback_ack_done(destNode, buf, status==GNI_RC_SUCCESS);
1264             tag = (tag == SMALL_DATA_ACK_TAG) ? SMALL_DATA_TAG : LMSG_INIT_TAG;
1265         }
1266 #endif
1267     }
1268     if(status != GNI_RC_SUCCESS && inbuff ==0)
1269         buffer_small_msgs(queue, msg, size, destNode, tag);
1270     return status;
1271 }
1272
1273 inline 
1274 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1275 {
1276     /* construct a control message and send */
1277     CONTROL_MSG         *control_msg_tmp;
1278     MallocControlMsg(control_msg_tmp);
1279     control_msg_tmp->source_addr = (uint64_t)msg;
1280     control_msg_tmp->seq_id    = seqno;
1281     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1282 #if     USE_LRTS_MEMPOOL
1283     if(size < BIG_MSG)
1284     {
1285         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1286     }
1287     else
1288     {
1289         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1290         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1291         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1292     }
1293 #else
1294     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1295 #endif
1296     return control_msg_tmp;
1297 }
1298
1299 #define BLOCKING_SEND_CONTROL    0
1300
1301 // Large message, send control to receiver, receiver register memory and do a GET, 
1302 // return 1 - send no success
1303 inline
1304 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1305 {
1306     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1307     uint32_t            vmdh_index  = -1;
1308     int                 size;
1309     int                 offset = 0;
1310     uint64_t            source_addr;
1311     int                 register_size; 
1312     void                *msg;
1313
1314     size    =   control_msg_tmp->total_length;
1315     source_addr = control_msg_tmp->source_addr;
1316     register_size = control_msg_tmp->length;
1317
1318 #if  USE_LRTS_MEMPOOL
1319     if( control_msg_tmp->seq_id == 0 ){
1320 #if BLOCKING_SEND_CONTROL
1321         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1322             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1323                 LrtsAdvanceCommunication(0);
1324         }
1325 #endif
1326         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1327         {
1328             msg = (void*)source_addr;
1329             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1330             {
1331                 if(!inbuff)
1332                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1333                 return GNI_RC_ERROR_NOMEM;
1334             }
1335             //register the corresponding mempool
1336             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1337             if(status == GNI_RC_SUCCESS)
1338             {
1339                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1340             }
1341         }else
1342         {
1343             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1344             status = GNI_RC_SUCCESS;
1345         }
1346         if(NoMsgInSend( control_msg_tmp->source_addr))
1347             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1348         else
1349             register_size = 0;
1350     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1351     {
1352         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1353         source_addr += offset;
1354         size = control_msg_tmp->length;
1355 #if BLOCKING_SEND_CONTROL
1356         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1357             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1358                 LrtsAdvanceCommunication(0);
1359         }
1360 #endif
1361         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1362             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1363             {
1364                 if(!inbuff)
1365                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1366                 return GNI_RC_ERROR_NOMEM;
1367             }
1368             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1369             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1370         }
1371         else
1372         {
1373             status = GNI_RC_SUCCESS;
1374         }
1375         register_size = 0;  
1376     }
1377
1378     if(status == GNI_RC_SUCCESS)
1379     {
1380         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff);  
1381         if(status == GNI_RC_SUCCESS)
1382         {
1383             buffered_send_msg += register_size;
1384             if(control_msg_tmp->seq_id == 0)
1385             {
1386                 IncreaseMsgInSend(source_addr);
1387             }
1388             FreeControlMsg(control_msg_tmp);
1389             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1390         }else
1391             status = GNI_RC_ERROR_RESOURCE;
1392
1393     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1394     {
1395         CmiAbort("Memory registor for large msg\n");
1396     }else 
1397     {
1398         status = GNI_RC_ERROR_NOMEM; 
1399         if(!inbuff)
1400             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1401     }
1402     return status;
1403 #else
1404     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1405     if(status == GNI_RC_SUCCESS)
1406     {
1407         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0);  
1408         if(status == GNI_RC_SUCCESS)
1409         {
1410             FreeControlMsg(control_msg_tmp);
1411         }
1412     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1413     {
1414         CmiAbort("Memory registor for large msg\n");
1415     }else 
1416     {
1417         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1418     }
1419     return status;
1420 #endif
1421 }
1422
1423 inline void LrtsPrepareEnvelope(char *msg, int size)
1424 {
1425     CmiSetMsgSize(msg, size);
1426 }
1427
1428 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1429 {
1430     gni_return_t        status  =   GNI_RC_SUCCESS;
1431     uint8_t tag;
1432     CONTROL_MSG         *control_msg_tmp;
1433     int                 oob = ( mode & OUT_OF_BAND);
1434     SMSG_QUEUE          *queue;
1435
1436     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1437 #if CMK_USE_OOB
1438     queue = oob? &smsg_oob_queue : &smsg_queue;
1439 #else
1440     queue = &smsg_queue;
1441 #endif
1442
1443     LrtsPrepareEnvelope(msg, size);
1444
1445 #if PRINT_SYH
1446     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1447 #endif 
1448 #if CMK_SMP && COMM_THREAD_SEND
1449     if(size <= SMSG_MAX_MSG)
1450         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1451     else if (size < BIG_MSG) {
1452         control_msg_tmp =  construct_control_msg(size, msg, 0);
1453         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1454     }
1455     else {
1456           CmiSetMsgSeq(msg, 0);
1457           control_msg_tmp =  construct_control_msg(size, msg, 1);
1458           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1459     }
1460 #else   //non-smp, smp(worker sending)
1461     if(size <= SMSG_MAX_MSG)
1462     {
1463         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1464             CmiFree(msg);
1465     }
1466     else if (size < BIG_MSG) {
1467         control_msg_tmp =  construct_control_msg(size, msg, 0);
1468         send_large_messages(queue, destNode, control_msg_tmp, 0);
1469     }
1470     else {
1471 #if     USE_LRTS_MEMPOOL
1472         CmiSetMsgSeq(msg, 0);
1473         control_msg_tmp =  construct_control_msg(size, msg, 1);
1474         send_large_messages(queue, destNode, control_msg_tmp, 0);
1475 #else
1476         control_msg_tmp =  construct_control_msg(size, msg, 0);
1477         send_large_messages(queue, destNode, control_msg_tmp, 0);
1478 #endif
1479     }
1480 #endif
1481     return 0;
1482 }
1483
1484 static void    PumpDatagramConnection();
1485 static void registerUserTraceEvents() {
1486 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1487     traceRegisterUserEvent("setting up connections", 10);
1488     traceRegisterUserEvent("Receiving small msgs", 20);
1489     traceRegisterUserEvent("Release local transaction", 30);
1490     traceRegisterUserEvent("Sending buffered small msgs", 40);
1491     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1492 #endif
1493 }
1494
1495 static void ProcessDeadlock()
1496 {
1497     static CmiUInt8 *ptr = NULL;
1498     static CmiUInt8  last = 0, mysum, sum;
1499     static int count = 0;
1500     gni_return_t status;
1501     int i;
1502
1503 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1504 //sweep_mempool(CpvAccess(mempool));
1505     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1506     mysum = smsg_send_count + smsg_recv_count;
1507     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1508     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1509     GNI_RC_CHECK("PMI_Allgather", status);
1510     sum = 0;
1511     for (i=0; i<mysize; i++)  sum+= ptr[i];
1512     if (last == 0 || sum == last) 
1513         count++;
1514     else
1515         count = 0;
1516     last = sum;
1517     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1518     if (count == 2) { 
1519         /* detected twice, it is a real deadlock */
1520         if (myrank == 0)  {
1521             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1522             CmiAbort("Fatal> Deadlock detected.");
1523         }
1524
1525     }
1526     _detected_hang = 0;
1527 }
1528
1529 static void CheckProgress()
1530 {
1531     if (smsg_send_count == last_smsg_send_count &&
1532         smsg_recv_count == last_smsg_recv_count ) 
1533     {
1534         _detected_hang = 1;
1535 #if !CMK_SMP
1536         if (_detected_hang) ProcessDeadlock();
1537 #endif
1538
1539     }
1540     else {
1541         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1542         last_smsg_send_count = smsg_send_count;
1543         last_smsg_recv_count = smsg_recv_count;
1544         _detected_hang = 0;
1545     }
1546 }
1547
1548 static void set_limit()
1549 {
1550     //if (!user_set_flag && CmiMyRank() == 0) {
1551     if (CmiMyRank() == 0) {
1552         int mynode = CmiPhysicalNodeID(CmiMyPe());
1553         int numpes = CmiNumPesOnPhysicalNode(mynode);
1554         int numprocesses = numpes / CmiMyNodeSize();
1555         MAX_REG_MEM  = _totalmem / numprocesses;
1556         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1557         if (CmiMyPe() == 0)
1558            printf("mem_max = %lld, send_max =%lld\n", MAX_REG_MEM, MAX_BUFF_SEND);
1559     }
1560 }
1561
1562 void LrtsPostCommonInit(int everReturn)
1563 {
1564 #if CMK_DIRECT
1565     CmiDirectInit();
1566 #endif
1567 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1568     CpvInitialize(double, projTraceStart);
1569     /* only PE 0 needs to care about registration (to generate sts file). */
1570     if (CmiMyPe() == 0) {
1571         registerMachineUserEventsFunction(&registerUserTraceEvents);
1572     }
1573 #endif
1574
1575 #if CMK_SMP
1576     CmiIdleState *s=CmiNotifyGetState();
1577     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1578     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1579 #else
1580     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1581     if (useDynamicSMSG)
1582     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1583 #endif
1584
1585     if (_checkProgress)
1586 #if CMK_SMP
1587     if (CmiMyRank() == 0)
1588 #endif
1589     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1590 #if !LARGEPAGE
1591     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1592 #endif
1593 }
1594
1595 /* this is called by worker thread */
1596 void LrtsPostNonLocal(){
1597 #if MULTI_THREAD_SEND
1598     if(mysize == 1) return;
1599 //#if CMK_SMP_TRACE_COMMTHREAD
1600 //    traceEndIdle();
1601 //#endif
1602     //printf("[%d,%d] worker call communication\n", CmiMyNode(), CmiMyRank());
1603     PumpNetworkSmsg();
1604     PumpLocalRdmaTransactions();
1605     
1606 #if CMK_USE_OOB
1607     if (SendBufferMsg(&smsg_oob_queue) == 1)
1608 #endif
1609     {
1610 #if PIGGYBACK_ACK
1611     //if (count%10 == 0) SendBufferMsg(&smsg_ack_queue);
1612     if (SendBufferMsg(&smsg_queue) == 1) {
1613         //if (count++ % 10 == 0) 
1614         SendBufferMsg(&smsg_ack_queue);
1615     }
1616 #else
1617     SendBufferMsg(&smsg_queue);
1618 #endif
1619     }
1620
1621     SendRdmaMsg();
1622     //LrtsAdvanceCommunication(1);
1623 //#if CMK_SMP_TRACE_COMMTHREAD
1624 //    traceBeginIdle();
1625 //#endif
1626 #endif
1627 }
1628
1629 /* useDynamicSMSG */
1630 static void    PumpDatagramConnection()
1631 {
1632     uint32_t          remote_address;
1633     uint32_t          remote_id;
1634     gni_return_t status;
1635     gni_post_state_t  post_state;
1636     uint64_t          datagram_id;
1637     int i;
1638
1639    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1640    {
1641        if (datagram_id >= mysize) {           /* bound endpoint */
1642            int pe = datagram_id - mysize;
1643            CMI_GNI_LOCK
1644            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1645            CMI_GNI_UNLOCK
1646            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1647            {
1648                CmiAssert(remote_id == pe);
1649                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1650                GNI_RC_CHECK("Dynamic SMSG Init", status);
1651 #if PRINT_SYH
1652                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1653 #endif
1654                CmiAssert(smsg_connected_flag[pe] == 1);
1655                smsg_connected_flag[pe] = 2;
1656            }
1657        }
1658        else {         /* unbound ep */
1659            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1660            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1661            {
1662                CmiAssert(remote_id<mysize);
1663                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1664                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1665                GNI_RC_CHECK("Dynamic SMSG Init", status);
1666 #if PRINT_SYH
1667                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1668 #endif
1669                smsg_connected_flag[remote_id] = 2;
1670
1671                alloc_smsg_attr(&send_smsg_attr);
1672                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1673                GNI_RC_CHECK("post unbound datagram", status);
1674            }
1675        }
1676    }
1677 }
1678
1679 /* pooling CQ to receive network message */
1680 static void PumpNetworkRdmaMsgs()
1681 {
1682     gni_cq_entry_t      event_data;
1683     gni_return_t        status;
1684
1685 }
1686
1687 inline 
1688 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1689 {
1690     RDMA_REQUEST        *rdma_request_msg;
1691     MallocRdmaRequest(rdma_request_msg);
1692     rdma_request_msg->destNode = inst_id;
1693     rdma_request_msg->pd = pd;
1694 #if CMK_SMP
1695     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1696 #else
1697     if(sendRdmaBuf == 0)
1698     {
1699         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1700     }else{
1701         sendRdmaTail->next = rdma_request_msg;
1702         sendRdmaTail =  rdma_request_msg;
1703     }
1704 #endif
1705
1706 }
1707
1708 #if PIGGYBACK_ACK
1709 int processPiggybackAckHeader(void *header)
1710 {
1711     int i;
1712     uint64_t *buf = (uint64_t*)header;
1713     int piggycount = buf[0];
1714 //printf("[%d] got piggyback msg: %d\n", myrank, piggycount);
1715     for (i=0; i<piggycount; i++) {
1716         void *msg = (void*)(buf[i+1]);
1717         CmiAssert(msg != NULL);
1718         DecreaseMsgInSend(msg);
1719         if(NoMsgInSend(msg))
1720             buffered_send_msg -= GetMempoolsize(msg);
1721         CmiFree(msg);
1722     }
1723     return piggycount;
1724 }
1725 #endif
1726
1727 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1728
1729 static void PumpNetworkSmsg()
1730 {
1731     uint64_t            inst_id;
1732     int                 ret;
1733     gni_cq_entry_t      event_data;
1734     gni_return_t        status, status2;
1735     void                *header;
1736     uint8_t             msg_tag;
1737     int                 msg_nbytes;
1738     void                *msg_data;
1739     gni_mem_handle_t    msg_mem_hndl;
1740     gni_smsg_attr_t     *smsg_attr;
1741     gni_smsg_attr_t     *remote_smsg_attr;
1742     int                 init_flag;
1743     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1744     uint64_t            source_addr;
1745     SMSG_QUEUE         *queue = &smsg_queue;
1746 #if     CMK_DIRECT
1747     cmidirectMsg        *direct_msg;
1748 #endif
1749     while(1)
1750     {
1751         CMI_GNI_LOCK
1752         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1753         CMI_GNI_UNLOCK
1754         if(status != GNI_RC_SUCCESS)
1755             break;
1756         inst_id = GNI_CQ_GET_INST_ID(event_data);
1757         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1758 #if PRINT_SYH
1759         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1760 #endif
1761         if (useDynamicSMSG) {
1762             /* subtle: smsg may come before connection is setup */
1763             while (smsg_connected_flag[inst_id] != 2) 
1764                PumpDatagramConnection();
1765         }
1766         msg_tag = GNI_SMSG_ANY_TAG;
1767         while(1) {
1768             CMI_GNI_LOCK
1769             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1770             if (status != GNI_RC_SUCCESS)
1771             {
1772                 CMI_GNI_UNLOCK
1773                 break;
1774             }
1775 #if PRINT_SYH
1776             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1777 #endif
1778             /* copy msg out and then put into queue (small message) */
1779             switch (msg_tag) {
1780 #if PIGGYBACK_ACK
1781             case SMALL_DATA_ACK_TAG:
1782             {
1783                 int piggycount = processPiggybackAckHeader(header);
1784                 header = (uint64_t*)header + piggycount + 1;
1785             }
1786 #endif
1787             case SMALL_DATA_TAG:
1788             {
1789                 START_EVENT();
1790                 msg_nbytes = CmiGetMsgSize(header);
1791                 msg_data    = CmiAlloc(msg_nbytes);
1792                 memcpy(msg_data, (char*)header, msg_nbytes);
1793                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1794                 CMI_GNI_UNLOCK
1795 #if CMK_SMP_TRACE_COMMTHREAD
1796                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1797 #endif
1798                 handleOneRecvedMsg(msg_nbytes, msg_data);
1799                 break;
1800             }
1801 #if PIGGYBACK_ACK
1802             case LMSG_INIT_ACK_TAG:
1803             {
1804                 int piggycount = processPiggybackAckHeader(header);
1805                 header = (uint64_t*)header + piggycount + 1;
1806             }
1807 #endif
1808             case LMSG_INIT_TAG:
1809             {
1810 #if MULTI_THREAD_SEND
1811                 MallocControlMsg(control_msg_tmp);
1812                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
1813                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1814                 CMI_GNI_UNLOCK
1815                 getLargeMsgRequest(control_msg_tmp, inst_id);
1816                 FreeControlMsg(control_msg_tmp);
1817 #else
1818                 getLargeMsgRequest(header, inst_id);
1819                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1820 #endif
1821                 break;
1822             }
1823             case ACK_TAG:   //msg fit into mempool
1824             {
1825                 /* Get is done, release message . Now put is not used yet*/
1826                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1827                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1828                 CMI_GNI_UNLOCK
1829 #if ! USE_LRTS_MEMPOOL
1830                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1831 #else
1832                 DecreaseMsgInSend(msg);
1833 #endif
1834                 if(NoMsgInSend(msg))
1835                     buffered_send_msg -= GetMempoolsize(msg);
1836                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1837                 CmiFree(msg);
1838                 break;
1839             }
1840             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1841             {
1842 #if MULTI_THREAD_SEND
1843                 MallocControlMsg(header_tmp);
1844                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
1845                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1846                 CMI_GNI_UNLOCK
1847                    /* FIXME: leak */
1848 #else
1849                 header_tmp = (CONTROL_MSG *) header;
1850 #endif
1851                 void *msg = (void*)(header_tmp->source_addr);
1852                 int cur_seq = CmiGetMsgSeq(msg);
1853                 int offset = ONE_SEG*(cur_seq+1);
1854                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1855                 buffered_send_msg -= header_tmp->length;
1856                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1857                 if (remain_size < 0) remain_size = 0;
1858                 CmiSetMsgSize(msg, remain_size);
1859                 if(remain_size <= 0) //transaction done
1860                 {
1861                     CmiFree(msg);
1862                 }else if (header_tmp->total_length > offset)
1863                 {
1864                     CmiSetMsgSeq(msg, cur_seq+1);
1865                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1866                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1867                     //send next seg
1868                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1869                          // pipelining
1870                     if (header_tmp->seq_id == 1) {
1871                       int i;
1872                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1873                         int seq = cur_seq+i+2;
1874                         CmiSetMsgSeq(msg, seq-1);
1875                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1876                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1877                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1878                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1879                       }
1880                     }
1881                 }
1882 #if MULTI_THREAD_SEND
1883                 FreeControlMsg(header_tmp);
1884 #else
1885                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1886 #endif
1887                 break;
1888             }
1889 #if CMK_PERSISTENT_COMM
1890             case PUT_DONE_TAG: //persistent message
1891                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1892                 int size = ((CONTROL_MSG *) header)->length;
1893                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1894                 CMI_GNI_UNLOCK
1895                 CmiReference(msg);
1896                 handleOneRecvedMsg(size, msg); 
1897                 break;
1898 #endif
1899 #if CMK_DIRECT
1900             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1901                 //create a trigger message
1902                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
1903                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
1904                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1905                 CMI_GNI_UNLOCK
1906                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
1907                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
1908                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1909                 break;
1910 #endif
1911             default: {
1912                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1913                 CMI_GNI_UNLOCK
1914                 printf("weird tag problem\n");
1915                 CmiAbort("Unknown tag\n");
1916                      }
1917             }               // end switch
1918 #if PRINT_SYH
1919             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1920 #endif
1921             smsg_recv_count ++;
1922             msg_tag = GNI_SMSG_ANY_TAG;
1923         } //endwhile getNext
1924     }   //end while GetEvent
1925     if(status == GNI_RC_ERROR_RESOURCE)
1926     {
1927         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
1928         GNI_RC_CHECK("Smsg_rx_cq full", status);
1929     }
1930 }
1931
1932 static void printDesc(gni_post_descriptor_t *pd)
1933 {
1934     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1935 }
1936
1937 // for BIG_MSG called on receiver side for receiving control message
1938 // LMSG_INIT_TAG
1939 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1940 {
1941 #if     USE_LRTS_MEMPOOL
1942     CONTROL_MSG         *request_msg;
1943     gni_return_t        status = GNI_RC_SUCCESS;
1944     void                *msg_data;
1945     gni_post_descriptor_t *pd;
1946     gni_mem_handle_t    msg_mem_hndl;
1947     int source, size, transaction_size, offset = 0;
1948     size_t     register_size = 0;
1949
1950     // initial a get to transfer data from the sender side */
1951     request_msg = (CONTROL_MSG *) header;
1952     size = request_msg->total_length;
1953     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1954     if(request_msg->seq_id < 2)   {
1955         msg_data = CmiAlloc(size);
1956         CmiSetMsgSeq(msg_data, 0);
1957         _MEMCHECK(msg_data);
1958     }
1959     else {
1960         offset = ONE_SEG*(request_msg->seq_id-1);
1961         msg_data = (char*)request_msg->dest_addr + offset;
1962     }
1963    
1964     MallocPostDesc(pd);
1965     pd->cqwrite_value = request_msg->seq_id;
1966     if( request_msg->seq_id == 0)
1967     {
1968         pd->local_mem_hndl= GetMemHndl(msg_data);
1969         transaction_size = ALIGN64(size);
1970         if(IsMemHndlZero(pd->local_mem_hndl))
1971         {   
1972             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1973             if(status == GNI_RC_SUCCESS)
1974             {
1975                 pd->local_mem_hndl = GetMemHndl(msg_data);
1976             }
1977             else
1978             {
1979                 SetMemHndlZero(pd->local_mem_hndl);
1980             }
1981         }
1982         if(NoMsgInRecv( (void*)(msg_data)))
1983             register_size = GetMempoolsize((void*)(msg_data));
1984         else
1985             register_size = 0;
1986     }
1987     else{
1988         transaction_size = ALIGN64(request_msg->length);
1989         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1990         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1991         {
1992             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1993         }
1994     }
1995     pd->first_operand = ALIGN64(size);                   //  total length
1996
1997     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1998         pd->type            = GNI_POST_FMA_GET;
1999     else
2000         pd->type            = GNI_POST_RDMA_GET;
2001 #if REMOTE_EVENT
2002     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
2003 #else
2004     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2005 #endif
2006     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2007     pd->length          = transaction_size;
2008     pd->local_addr      = (uint64_t) msg_data;
2009     pd->remote_addr     = request_msg->source_addr + offset;
2010     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2011     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
2012     pd->rdma_mode       = 0;
2013     pd->amo_cmd         = 0;
2014
2015     //memory registration success
2016     if(status == GNI_RC_SUCCESS)
2017     {
2018         CMI_GNI_LOCK
2019         if(pd->type == GNI_POST_RDMA_GET) 
2020             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2021         else
2022             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2023         CMI_GNI_UNLOCK
2024
2025         if(status == GNI_RC_SUCCESS )
2026         {
2027             if(pd->cqwrite_value == 0)
2028             {
2029 #if MACHINE_DEBUG_LOG
2030                 buffered_recv_msg += register_size;
2031                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2032 #endif
2033                 IncreaseMsgInRecv(msg_data);
2034             }
2035         }
2036     }else
2037     {
2038         SetMemHndlZero((pd->local_mem_hndl));
2039     }
2040     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2041     {
2042         bufferRdmaMsg(inst_id, pd); 
2043     }else {
2044          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
2045         GNI_RC_CHECK("GetLargeAFter posting", status);
2046     }
2047 #else
2048     CONTROL_MSG         *request_msg;
2049     gni_return_t        status;
2050     void                *msg_data;
2051     gni_post_descriptor_t *pd;
2052     RDMA_REQUEST        *rdma_request_msg;
2053     gni_mem_handle_t    msg_mem_hndl;
2054     //int source;
2055     // initial a get to transfer data from the sender side */
2056     request_msg = (CONTROL_MSG *) header;
2057     msg_data = CmiAlloc(request_msg->length);
2058     _MEMCHECK(msg_data);
2059
2060     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
2061
2062     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2063     {
2064         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2065     }
2066
2067     MallocPostDesc(pd);
2068     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2069         pd->type            = GNI_POST_FMA_GET;
2070     else
2071         pd->type            = GNI_POST_RDMA_GET;
2072 #if REMOTE_EVENT
2073     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
2074 #else
2075     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2076 #endif
2077     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2078     pd->length          = ALIGN64(request_msg->length);
2079     pd->local_addr      = (uint64_t) msg_data;
2080     pd->remote_addr     = request_msg->source_addr;
2081     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2082     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
2083     pd->rdma_mode       = 0;
2084     pd->amo_cmd         = 0;
2085
2086     //memory registration successful
2087     if(status == GNI_RC_SUCCESS)
2088     {
2089         pd->local_mem_hndl  = msg_mem_hndl;
2090         CMI_GNI_LOCK
2091         if(pd->type == GNI_POST_RDMA_GET) 
2092             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2093         else
2094             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2095         CMI_GNI_UNLOCK
2096     }else
2097     {
2098         SetMemHndlZero(pd->local_mem_hndl);
2099     }
2100     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2101     {
2102         MallocRdmaRequest(rdma_request_msg);
2103         rdma_request_msg->next = 0;
2104         rdma_request_msg->destNode = inst_id;
2105         rdma_request_msg->pd = pd;
2106         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2107     }else {
2108         GNI_RC_CHECK("AFter posting", status);
2109     }
2110 #endif
2111 }
2112
2113 static void PumpLocalRdmaTransactions()
2114 {
2115     gni_cq_entry_t          ev;
2116     gni_return_t            status;
2117     uint64_t                type, inst_id;
2118     gni_post_descriptor_t   *tmp_pd;
2119     MSG_LIST                *ptr;
2120     CONTROL_MSG             *ack_msg_tmp;
2121     ACK_MSG                 *ack_msg;
2122     uint8_t                 msg_tag;
2123 #if CMK_DIRECT
2124     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2125 #endif
2126     SMSG_QUEUE         *queue = &smsg_queue;
2127
2128     while(1) {
2129         CMI_GNI_LOCK 
2130         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
2131         CMI_GNI_UNLOCK
2132         if(status != GNI_RC_SUCCESS) break;
2133         
2134         type = GNI_CQ_GET_TYPE(ev);
2135         if (type == GNI_CQ_EVENT_TYPE_POST)
2136         {
2137             inst_id     = GNI_CQ_GET_INST_ID(ev);
2138 #if PRINT_SYH
2139             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2140 #endif
2141             CMI_GNI_LOCK
2142             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
2143             CMI_GNI_UNLOCK
2144
2145             switch (tmp_pd->type) {
2146 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2147             case GNI_POST_RDMA_PUT:
2148 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2149                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2150 #endif
2151             case GNI_POST_FMA_PUT:
2152                 if(tmp_pd->amo_cmd == 1) {
2153                     //sender ACK to receiver to trigger it is done
2154                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2155                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2156                     msg_tag = DIRECT_PUT_DONE_TAG;
2157                 }
2158                 else {
2159                     CmiFree((void *)tmp_pd->local_addr);
2160                     MallocControlMsg(ack_msg_tmp);
2161                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2162                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2163                     msg_tag = PUT_DONE_TAG;
2164                 }
2165                 break;
2166 #endif
2167             case GNI_POST_RDMA_GET:
2168             case GNI_POST_FMA_GET:  {
2169 #if  ! USE_LRTS_MEMPOOL
2170                 MallocControlMsg(ack_msg_tmp);
2171                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2172                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2173                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2174                 msg_tag = ACK_TAG;  
2175 #else
2176                 int seq_id = tmp_pd->cqwrite_value;
2177                 if(seq_id > 0)      // BIG_MSG
2178                 {
2179                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2180                     MallocControlMsg(ack_msg_tmp);
2181                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2182                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2183                     ack_msg_tmp->seq_id = seq_id;
2184                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2185                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2186                     ack_msg_tmp->length = tmp_pd->length;
2187                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2188                     msg_tag = BIG_MSG_TAG; 
2189                 } 
2190                 else
2191                 {
2192                     MallocAckMsg(ack_msg);
2193                     ack_msg->source_addr = tmp_pd->remote_addr;
2194                     msg_tag = ACK_TAG;  
2195                     // ack_msg_tmp->dest_addr = tmp_pd->local_addr; ???
2196                 }
2197 #endif
2198                 break;
2199             }
2200             default:
2201                 CmiPrintf("type=%d\n", tmp_pd->type);
2202                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
2203             }      /* end of switch */
2204
2205 #if CMK_DIRECT
2206             if (tmp_pd->amo_cmd == 1) {
2207                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
2208                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2209             }
2210             else
2211 #endif
2212             if (msg_tag == ACK_TAG) {
2213 #if ! PIGGYBACK_ACK
2214                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
2215                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2216 #else
2217                 buffer_small_msgs(&smsg_ack_queue, ack_msg, ACK_MSG_SIZE, inst_id, msg_tag);
2218 #endif
2219             }
2220             else {
2221                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0); 
2222                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2223             }
2224 #if CMK_PERSISTENT_COMM
2225             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2226 #endif
2227             {
2228                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2229 #if PRINT_SYH
2230                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2231 #endif
2232                     START_EVENT();
2233                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2234                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2235 #if MACHINE_DEBUG_LOG
2236                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2237                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2238                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2239 #endif
2240 #if CMK_SMP_TRACE_COMMTHREAD
2241                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2242 #endif
2243                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2244                 }else if(msg_tag == BIG_MSG_TAG){
2245                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2246                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2247                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2248                         START_EVENT();
2249 #if PRINT_SYH
2250                         printf("Pipeline msg done [%d]\n", myrank);
2251 #endif
2252 #if CMK_SMP_TRACE_COMMTHREAD
2253                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2254 #endif
2255                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2256                     }
2257                 }
2258             }
2259             FreePostDesc(tmp_pd);
2260         }
2261     } //end while
2262     if(status == GNI_RC_ERROR_RESOURCE)
2263     {
2264         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2265         GNI_RC_CHECK("Smsg_tx_cq full", status);
2266     }
2267 }
2268
2269 static void  SendRdmaMsg()
2270 {
2271     gni_return_t            status = GNI_RC_SUCCESS;
2272     gni_mem_handle_t        msg_mem_hndl;
2273     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2274     RDMA_REQUEST            *pre = 0;
2275     uint64_t                register_size = 0;
2276     void                    *msg;
2277     int                     i;
2278 #if CMK_SMP
2279     int len = PCQueueLength(sendRdmaBuf);
2280     for (i=0; i<len; i++)
2281     {
2282         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2283         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2284         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2285         if (ptr == NULL) break;
2286 #else
2287     ptr = sendRdmaBuf;
2288     while (ptr!=0)
2289     {
2290 #endif 
2291         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2292         gni_post_descriptor_t *pd = ptr->pd;
2293         status = GNI_RC_SUCCESS;
2294         
2295         if(pd->cqwrite_value == 0)
2296         {
2297             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2298             {
2299                 msg = (void*)(pd->local_addr);
2300                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
2301                 if(status == GNI_RC_SUCCESS)
2302                 {
2303                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2304                 }
2305             }else
2306             {
2307                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2308             }
2309             if(NoMsgInRecv( (void*)(pd->local_addr)))
2310                 register_size = GetMempoolsize((void*)(pd->local_addr));
2311             else
2312                 register_size = 0;
2313         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2314         {
2315             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
2316         }
2317         if(status == GNI_RC_SUCCESS)        //mem register good
2318         {
2319             CMI_GNI_LOCK
2320             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2321                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2322             else
2323                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2324             CMI_GNI_UNLOCK
2325             if(status == GNI_RC_SUCCESS)    //post good
2326             {
2327 #if !CMK_SMP
2328                 tmp_ptr = ptr;
2329                 if(pre != 0) {
2330                     pre->next = ptr->next;
2331                 }
2332                 else {
2333                     sendRdmaBuf = ptr->next;
2334                 }
2335                 ptr = ptr->next;
2336                 FreeRdmaRequest(tmp_ptr);
2337 #endif
2338                 if(pd->cqwrite_value == 0)
2339                 {
2340                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2341                 }
2342 #if MACHINE_DEBUG_LOG
2343                 buffered_recv_msg += register_size;
2344                 MACHSTATE(8, "GO request from buffered\n"); 
2345 #endif
2346             }else           // cannot post
2347             {
2348 #if CMK_SMP
2349                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2350 #else
2351                 pre = ptr;
2352                 ptr = ptr->next;
2353 #endif
2354                 break;
2355             }
2356         } else          //memory registration fails
2357         {
2358 #if CMK_SMP
2359             PCQueuePush(sendRdmaBuf, (char*)ptr);
2360 #else
2361             pre = ptr;
2362             ptr = ptr->next;
2363 #endif
2364         }
2365     } //end while
2366 #if ! CMK_SMP
2367     if(ptr == 0)
2368         sendRdmaTail = pre;
2369 #endif
2370 }
2371
2372 // return 1 if all messages are sent
2373 static int SendBufferMsg(SMSG_QUEUE *queue)
2374 {
2375     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2376     CONTROL_MSG         *control_msg_tmp;
2377     gni_return_t        status;
2378     int                 done = 1;
2379     uint64_t            register_size;
2380     void                *register_addr;
2381     int                 index_previous = -1;
2382 #if CMI_EXERT_SEND_CAP
2383     int                 sent_cnt = 0;
2384 #endif
2385
2386 #if CMK_SMP
2387     int          index = 0;
2388 #if ONE_SEND_QUEUE
2389     memset(destpe_avail, 0, mysize * sizeof(char));
2390     for (index=0; index<1; index++)
2391     {
2392         int i, len = PCQueueLength(queue->sendMsgBuf);
2393         for (i=0; i<len; i++) 
2394         {
2395             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2396             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2397             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2398             if(ptr == NULL) break;
2399             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2400                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2401                 continue;
2402             }
2403 #else
2404 #if SMP_LOCKS
2405     int nonempty = PCQueueLength(nonEmptyQueues);
2406     for(index =0; index<nonempty; index++)
2407     {
2408         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2409         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2410         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
2411         if(current_list == NULL) break; 
2412         PCQueue current_queue= current_list-> sendSmsgBuf;
2413         CmiLock(current_list->lock);
2414         int i, len = PCQueueLength(current_queue);
2415         current_list->pushed = 0;
2416         CmiUnlock(current_list->lock);
2417 #else
2418     for(index =0; index<mysize; index++)
2419     {
2420         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2421         int i, len = PCQueueLength(current_queue);
2422 #endif
2423         for (i=0; i<len; i++) 
2424         {
2425             CMI_PCQUEUEPOP_LOCK(current_queue)
2426             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2427             CMI_PCQUEUEPOP_UNLOCK(current_queue)
2428             if (ptr == 0) break;
2429 #endif
2430 #else
2431     int index = queue->smsg_head_index;
2432     while(index != -1)
2433     {
2434         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2435         pre = 0;
2436         while(ptr != 0)
2437         {
2438 #endif
2439             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2440             status = GNI_RC_ERROR_RESOURCE;
2441             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2442                 /* connection not exists yet */
2443             }
2444             else
2445             switch(ptr->tag)
2446             {
2447             case SMALL_DATA_TAG:
2448                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2449                 if(status == GNI_RC_SUCCESS)
2450                 {
2451                     CmiFree(ptr->msg);
2452                 }
2453                 break;
2454             case LMSG_INIT_TAG:
2455                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2456                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2457                 break;
2458             case ACK_TAG:
2459                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2460                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2461                 break;
2462             case BIG_MSG_TAG:
2463                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2464                 if(status == GNI_RC_SUCCESS)
2465                 {
2466                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2467                 }
2468                 break;
2469 #if CMK_DIRECT
2470             case DIRECT_PUT_DONE_TAG:
2471                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2472                 if(status == GNI_RC_SUCCESS)
2473                 {
2474                     free((CMK_DIRECT_HEADER*)ptr->msg);
2475                 }
2476                 break;
2477
2478 #endif
2479             default:
2480                 printf("Weird tag\n");
2481                 CmiAbort("should not happen\n");
2482             }       // end switch
2483             if(status == GNI_RC_SUCCESS)
2484             {
2485 #if !CMK_SMP
2486                 tmp_ptr = ptr;
2487                 if(pre)
2488                 {
2489                     ptr = pre ->next = ptr->next;
2490                 }else
2491                 {
2492                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2493                 }
2494                 FreeMsgList(tmp_ptr);
2495 #else
2496                 FreeMsgList(ptr);
2497 #endif
2498 #if PRINT_SYH
2499                 buffered_smsg_counter--;
2500                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2501 #endif
2502 #if CMI_EXERT_SEND_CAP
2503                 sent_cnt++;
2504                 if(sent_cnt == SEND_CAP)
2505                     break;
2506 #endif
2507             }else {
2508 #if CMK_SMP
2509 #if ONE_SEND_QUEUE
2510                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2511 #else
2512                 PCQueuePush(current_queue, (char*)ptr);
2513 #endif
2514 #else
2515                 pre = ptr;
2516                 ptr=ptr->next;
2517 #endif
2518                 done = 0;
2519                 if(status == GNI_RC_ERROR_RESOURCE)
2520                 {
2521 #if CMK_SMP && ONE_SEND_QUEUE 
2522                     destpe_avail[ptr->destNode] = 1;
2523 #else
2524                     break;
2525 #endif
2526                 }
2527             } 
2528         } //end while
2529 #if !CMK_SMP
2530         if(ptr == 0)
2531             queue->smsg_msglist_index[index].tail = pre;
2532         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2533         {
2534             if(index_previous != -1)
2535                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2536             else
2537                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2538         }
2539         else {
2540             index_previous = index;
2541         }
2542         index = queue->smsg_msglist_index[index].next;
2543 #else
2544 #if !ONE_SEND_QUEUE && SMP_LOCKS
2545         CmiLock(current_list->lock);
2546         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2547         {
2548             current_list->pushed = 1;
2549             PCQueuePush(nonEmptyQueues, current_list);
2550         }
2551         CmiUnlock(current_list->lock); 
2552 #endif
2553 #endif
2554
2555 #if CMI_EXERT_SEND_CAP
2556         if(sent_cnt == SEND_CAP)
2557                 break;
2558 #endif
2559     }   // end pooling for all cores
2560     return done;
2561 }
2562
2563 static void ProcessDeadlock();
2564 void LrtsAdvanceCommunication(int whileidle)
2565 {
2566     static int count = 0;
2567     /*  Receive Msg first */
2568 #if CMK_SMP_TRACE_COMMTHREAD
2569     double startT, endT;
2570 #endif
2571     if (useDynamicSMSG && whileidle)
2572     {
2573 #if CMK_SMP_TRACE_COMMTHREAD
2574         startT = CmiWallTimer();
2575 #endif
2576         PumpDatagramConnection();
2577 #if CMK_SMP_TRACE_COMMTHREAD
2578         endT = CmiWallTimer();
2579         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2580 #endif
2581     }
2582
2583 #if CMK_SMP_TRACE_COMMTHREAD
2584     startT = CmiWallTimer();
2585 #endif
2586     PumpNetworkSmsg();
2587     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2588 #if CMK_SMP_TRACE_COMMTHREAD
2589     endT = CmiWallTimer();
2590     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2591 #endif
2592
2593 #if CMK_SMP_TRACE_COMMTHREAD
2594     startT = CmiWallTimer();
2595 #endif
2596     PumpLocalRdmaTransactions();
2597     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2598 #if CMK_SMP_TRACE_COMMTHREAD
2599     endT = CmiWallTimer();
2600     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2601 #endif
2602     /* Send buffered Message */
2603 #if CMK_SMP_TRACE_COMMTHREAD
2604     startT = CmiWallTimer();
2605 #endif
2606 #if CMK_USE_OOB
2607     if (SendBufferMsg(&smsg_oob_queue) == 1)
2608 #endif
2609     {
2610 #if PIGGYBACK_ACK
2611     //if (count%10 == 0) SendBufferMsg(&smsg_ack_queue);
2612     if (SendBufferMsg(&smsg_queue) == 1 || count++ % 10 == 0) {
2613         STATS_ACK_TIME(SendBufferMsg(&smsg_ack_queue));
2614     }
2615 #else
2616     SendBufferMsg(&smsg_queue);
2617 #endif
2618     }
2619     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2620 #if CMK_SMP_TRACE_COMMTHREAD
2621     endT = CmiWallTimer();
2622     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2623 #endif
2624
2625 #if CMK_SMP_TRACE_COMMTHREAD
2626     startT = CmiWallTimer();
2627 #endif
2628     SendRdmaMsg();
2629     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2630 #if CMK_SMP_TRACE_COMMTHREAD
2631     endT = CmiWallTimer();
2632     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2633 #endif
2634
2635 #if CMK_SMP
2636     if (_detected_hang)  ProcessDeadlock();
2637 #endif
2638 }
2639
2640 /* useDynamicSMSG */
2641 static void _init_dynamic_smsg()
2642 {
2643     gni_return_t status;
2644     uint32_t     vmdh_index = -1;
2645     int i;
2646
2647     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2648     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2649     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2650     for(i=0; i<mysize; i++) {
2651         smsg_connected_flag[i] = 0;
2652         smsg_attr_vector_local[i] = NULL;
2653         smsg_attr_vector_remote[i] = NULL;
2654     }
2655     if(mysize <=512)
2656     {
2657         SMSG_MAX_MSG = 4096;
2658     }else if (mysize <= 4096)
2659     {
2660         SMSG_MAX_MSG = 4096/mysize * 1024;
2661     }else if (mysize <= 16384)
2662     {
2663         SMSG_MAX_MSG = 512;
2664     }else {
2665         SMSG_MAX_MSG = 256;
2666     }
2667
2668     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2669     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2670     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2671     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2672     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2673
2674     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2675     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2676     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2677     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2678     mailbox_list->offset = 0;
2679     mailbox_list->next = 0;
2680     
2681     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2682         mailbox_list->size, smsg_rx_cqh,
2683         GNI_MEM_READWRITE,   
2684         vmdh_index,
2685         &(mailbox_list->mem_hndl));
2686     GNI_RC_CHECK("MEMORY registration for smsg", status);
2687
2688     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2689     GNI_RC_CHECK("Unbound EP", status);
2690     
2691     alloc_smsg_attr(&send_smsg_attr);
2692
2693     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2694     GNI_RC_CHECK("post unbound datagram", status);
2695
2696       /* always pre-connect to proc 0 */
2697     //if (myrank != 0) connect_to(0);
2698 }
2699
2700 static void _init_static_smsg()
2701 {
2702     gni_smsg_attr_t      *smsg_attr;
2703     gni_smsg_attr_t      remote_smsg_attr;
2704     gni_smsg_attr_t      *smsg_attr_vec;
2705     gni_mem_handle_t     my_smsg_mdh_mailbox;
2706     int      ret, i;
2707     gni_return_t status;
2708     uint32_t              vmdh_index = -1;
2709     mdh_addr_t            base_infor;
2710     mdh_addr_t            *base_addr_vec;
2711     char *env;
2712
2713     if(mysize <=512)
2714     {
2715         SMSG_MAX_MSG = 1024;
2716     }else if (mysize <= 4096)
2717     {
2718         SMSG_MAX_MSG = 1024;
2719     }else if (mysize <= 16384)
2720     {
2721         SMSG_MAX_MSG = 512;
2722     }else {
2723         SMSG_MAX_MSG = 256;
2724     }
2725     
2726     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2727     if (env) SMSG_MAX_MSG = atoi(env);
2728     CmiAssert(SMSG_MAX_MSG > 0);
2729
2730     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2731     
2732     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2733     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2734     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2735     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2736     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2737     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2738     CmiAssert(ret == 0);
2739     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2740     
2741     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2742             smsg_memlen*(mysize), smsg_rx_cqh,
2743             GNI_MEM_READWRITE,   
2744             vmdh_index,
2745             &my_smsg_mdh_mailbox);
2746     register_memory_size += smsg_memlen*(mysize);
2747     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2748
2749     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2750     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2751
2752     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2753     base_infor.mdh =  my_smsg_mdh_mailbox;
2754     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2755
2756     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2757  
2758     for(i=0; i<mysize; i++)
2759     {
2760         if(i==myrank)
2761             continue;
2762         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2763         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2764         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2765         smsg_attr[i].mbox_offset = i*smsg_memlen;
2766         smsg_attr[i].buff_size = smsg_memlen;
2767         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2768         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2769     }
2770
2771     for(i=0; i<mysize; i++)
2772     {
2773         if (myrank == i) continue;
2774
2775         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2776         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2777         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2778         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2779         remote_smsg_attr.buff_size = smsg_memlen;
2780         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2781         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2782
2783         /* initialize the smsg channel */
2784         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2785         GNI_RC_CHECK("SMSG Init", status);
2786     } //end initialization
2787
2788     free(base_addr_vec);
2789     free(smsg_attr);
2790
2791     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2792     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2793
2794
2795 inline
2796 static void _init_send_queue(SMSG_QUEUE *queue)
2797 {
2798      int i;
2799 #if ONE_SEND_QUEUE
2800      queue->sendMsgBuf = PCQueueCreate();
2801      destpe_avail = (char*)malloc(mysize * sizeof(char));
2802 #else
2803      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2804 #if CMK_SMP && SMP_LOCKS
2805      nonEmptyQueues = PCQueueCreate();
2806 #endif
2807      for(i =0; i<mysize; i++)
2808      {
2809 #if CMK_SMP
2810          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2811 #if SMP_LOCKS
2812          queue->smsg_msglist_index[i].pushed = 0;
2813          queue->smsg_msglist_index[i].lock = CmiCreateLock();
2814 #endif
2815 #else
2816          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2817          queue->smsg_msglist_index[i].next = -1;
2818          queue->smsg_head_index = -1;
2819 #endif
2820         
2821      }
2822 #endif
2823 }
2824
2825 inline
2826 static void _init_smsg()
2827 {
2828     if(mysize > 1) {
2829         if (useDynamicSMSG)
2830             _init_dynamic_smsg();
2831         else
2832             _init_static_smsg();
2833     }
2834
2835     _init_send_queue(&smsg_queue);
2836 #if PIGGYBACK_ACK
2837     _init_send_queue(&smsg_ack_queue);
2838 #endif
2839 #if CMK_USE_OOB
2840     _init_send_queue(&smsg_oob_queue);
2841 #endif
2842 }
2843
2844 static void _init_static_msgq()
2845 {
2846     gni_return_t status;
2847     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2848     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2849     msgq_attrs.smsg_q_sz = 1;
2850     msgq_attrs.rcv_pool_sz = 1;
2851     msgq_attrs.num_msgq_eps = 2;
2852     msgq_attrs.nloc_insts = 8;
2853     msgq_attrs.modes = 0;
2854     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2855
2856     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2857     GNI_RC_CHECK("MSGQ Init", status);
2858
2859
2860 }
2861
2862 #if CMK_SMP && STEAL_MEMPOOL
2863 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2864 {
2865     void *pool = NULL;
2866     int i, k;
2867     // check other ranks
2868     for (k=0; k<CmiMyNodeSize()+1; k++) {
2869         i = (CmiMyRank()+k)%CmiMyNodeSize();
2870         if (i==CmiMyRank()) continue;
2871         mempool_type *mptr = CpvAccessOther(mempool, i);
2872         CmiLock(mptr->mempoolLock);
2873         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2874         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2875             CmiUnlock(mptr->mempoolLock);
2876             continue;
2877         }
2878         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2879         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2880             /* search in the free list */
2881           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2882           mempool_header *current = free_header;
2883           while (current) {
2884             if (current->next_free == (char*)header-(char*)mptr) break;
2885             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2886           }
2887           if (current == NULL) {         /* not found in free list */
2888             CmiUnlock(mptr->mempoolLock);
2889             continue;
2890           }
2891 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2892             /* search the previous memblock, and remove the tail */
2893           mempool_block *ptr = (mempool_block *)mptr;
2894           while (ptr) {
2895             if (ptr->memblock_next == mptr->memblock_tail) break;
2896             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2897           }
2898           CmiAssert(ptr!=NULL);
2899           ptr->memblock_next = 0;
2900           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2901
2902             /* remove memblock from the free list */
2903           current->next_free = header->next_free;
2904           if (header == free_header) mptr->freelist_head = header->next_free;
2905
2906           CmiUnlock(mptr->mempoolLock);
2907
2908           pool = (void*)tail;
2909           *mem_hndl = tail->mem_hndl;
2910           *size = tail->size;
2911           return pool;
2912         }
2913         CmiUnlock(mptr->mempoolLock);
2914     }
2915
2916       /* steal failed, deregister and free memblock now */
2917     int freed = 0;
2918     for (k=0; k<CmiMyNodeSize()+1; k++) {
2919         i = (CmiMyRank()+k)%CmiMyNodeSize();
2920         mempool_type *mptr = CpvAccessOther(mempool, i);
2921         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2922
2923         mempool_block *mempools_head = &(mptr->mempools_head);
2924         mempool_block *current = mempools_head;
2925         mempool_block *prev = NULL;
2926
2927         while (current) {
2928           int isfree = 0;
2929           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2930 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2931           mempool_header *cur = free_header;
2932           mempool_header *header;
2933           if (current != mempools_head) {
2934             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2935              /* search in free list */
2936             if (header->size == current->size - sizeof(mempool_block)) {
2937               cur = free_header;
2938               while (cur) {
2939                 if (cur->next_free == (char*)header-(char*)mptr) break;
2940                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2941               }
2942               if (cur != NULL) isfree = 1;
2943             }
2944           }
2945           if (isfree) {
2946               /* remove from free list */
2947             cur->next_free = header->next_free;
2948             if (header == free_header) mptr->freelist_head = header->next_free;
2949              // deregister
2950             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2951             GNI_RC_CHECK("steal Mempool de-register", status);
2952             mempool_block *ptr = current;
2953             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2954             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2955 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2956             freed += ptr->size;
2957             free(ptr);
2958              // try now
2959             if (freed > *size) {
2960               if (pool == NULL) {
2961                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2962                 CmiAssert(ret == 0);
2963               }
2964               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2965               if (status == GNI_RC_SUCCESS) {
2966                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2967 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2968                 return pool;
2969               }
2970 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2971             }
2972           }
2973           else {
2974              prev = current;
2975              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2976           }
2977         }
2978
2979         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2980     }
2981       /* still no luck registering pool */
2982     if (pool) free(pool);
2983     return NULL;
2984 }
2985 #endif
2986
2987 static CmiUInt8 total_mempool_size = 0;
2988 static CmiUInt8 total_mempool_calls = 0;
2989
2990 #if USE_LRTS_MEMPOOL
2991 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2992 {
2993     void *pool;
2994     int ret;
2995     gni_return_t status = GNI_RC_SUCCESS;
2996
2997     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
2998     if (*size < default_size) *size = default_size;
2999 #if LARGEPAGE
3000     // round up to be multiple of _tlbpagesize
3001     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3002     *size = ALIGNHUGEPAGE(*size);
3003 #endif
3004     total_mempool_size += *size;
3005     total_mempool_calls += 1;
3006 #if   !LARGEPAGE
3007     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3008     {
3009         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3010         CmiAbort("alloc_mempool_block");
3011     }
3012 #endif
3013 #if LARGEPAGE
3014     pool = my_get_huge_pages(*size);
3015     ret = pool==NULL;
3016 #else
3017     ret = posix_memalign(&pool, ALIGNBUF, *size);
3018 #endif
3019     if (ret != 0) {
3020 #if CMK_SMP && STEAL_MEMPOOL
3021       pool = steal_mempool_block(size, mem_hndl);
3022       if (pool != NULL) return pool;
3023 #endif
3024       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3025       if (ret == ENOMEM)
3026         CmiAbort("alloc_mempool_block: out of memory.");
3027       else
3028         CmiAbort("alloc_mempool_block: posix_memalign failed");
3029     }
3030 #if LARGEPAGE
3031     CmiMemLock();
3032     register_count++;
3033     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, status);
3034     CmiMemUnlock();
3035     if(status != GNI_RC_SUCCESS) {
3036         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3037 sweep_mempool(CpvAccess(mempool));
3038     }
3039     GNI_RC_CHECK("MEMORY_REGISTER", status);
3040 #else
3041     SetMemHndlZero((*mem_hndl));
3042 #endif
3043     return pool;
3044 }
3045
3046 // ptr is a block head pointer
3047 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3048 {
3049     if(!(IsMemHndlZero(mem_hndl)))
3050     {
3051         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3052     }
3053 #if LARGEPAGE
3054     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3055 #else
3056     free(ptr);
3057 #endif
3058 }
3059 #endif
3060
3061 void LrtsPreCommonInit(int everReturn){
3062 #if USE_LRTS_MEMPOOL
3063     CpvInitialize(mempool_type*, mempool);
3064     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3065     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3066 #endif
3067 }
3068
3069 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3070 {
3071     register int            i;
3072     int                     rc;
3073     int                     device_id = 0;
3074     unsigned int            remote_addr;
3075     gni_cdm_handle_t        cdm_hndl;
3076     gni_return_t            status = GNI_RC_SUCCESS;
3077     uint32_t                vmdh_index = -1;
3078     uint8_t                 ptag;
3079     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3080     int                     first_spawned;
3081     int                     physicalID;
3082     char                   *env;
3083
3084     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3085     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3086     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3087    
3088     status = PMI_Init(&first_spawned);
3089     GNI_RC_CHECK("PMI_Init", status);
3090
3091     status = PMI_Get_size(&mysize);
3092     GNI_RC_CHECK("PMI_Getsize", status);
3093
3094     status = PMI_Get_rank(&myrank);
3095     GNI_RC_CHECK("PMI_getrank", status);
3096
3097     //physicalID = CmiPhysicalNodeID(myrank);
3098     
3099     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3100
3101     *myNodeID = myrank;
3102     *numNodes = mysize;
3103   
3104 #if MULTI_THREAD_SEND
3105     /* Currently, we only consider the case that comm. thread will only recv msgs */
3106     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3107 #endif
3108
3109     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3110     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3111     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3112
3113     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3114     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3115     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3116
3117     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3118     if (env) useDynamicSMSG = 1;
3119     if (!useDynamicSMSG)
3120       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3121     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3122     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3123     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3124     
3125     if(myrank == 0)
3126     {
3127         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3128         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3129     }
3130 #ifdef USE_ONESIDED
3131     onesided_init(NULL, &onesided_hnd);
3132
3133     // this is a GNI test, so use the libonesided bypass functionality
3134     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3135     local_addr = gniGetNicAddress();
3136 #else
3137     ptag = get_ptag();
3138     cookie = get_cookie();
3139 #if 0
3140     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3141 #endif
3142     //Create and attach to the communication  domain */
3143     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3144     GNI_RC_CHECK("GNI_CdmCreate", status);
3145     //* device id The device id is the minor number for the device
3146     //that is assigned to the device by the system when the device is created.
3147     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3148     //where X is the device number 0 default 
3149     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3150     GNI_RC_CHECK("GNI_CdmAttach", status);
3151     local_addr = get_gni_nic_address(0);
3152 #endif
3153     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3154     _MEMCHECK(MPID_UGNI_AllAddr);
3155     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3156     /* create the local completion queue */
3157     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3158     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
3159     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3160     
3161     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
3162     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
3163     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3164
3165     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3166     GNI_RC_CHECK("Create CQ (rx)", status);
3167     
3168     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
3169     //GNI_RC_CHECK("Create Post CQ (rx)", status);
3170     
3171     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3172     //GNI_RC_CHECK("Create BTE CQ", status);
3173
3174     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3175     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3176     _MEMCHECK(ep_hndl_array);
3177 #if MULTI_THREAD_SEND 
3178     tx_cq_lock  = CmiCreateLock();
3179     rx_cq_lock  = CmiCreateLock();
3180 #endif
3181     for (i=0; i<mysize; i++) {
3182         if(i == myrank) continue;
3183         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
3184         GNI_RC_CHECK("GNI_EpCreate ", status);   
3185         remote_addr = MPID_UGNI_AllAddr[i];
3186         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3187         GNI_RC_CHECK("GNI_EpBind ", status);   
3188     }
3189
3190     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3191     _init_smsg();
3192     PMI_Barrier();
3193
3194 #if     USE_LRTS_MEMPOOL
3195     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3196     if (env) {
3197         _totalmem = CmiReadSize(env);
3198         if (myrank == 0)
3199             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3200     }
3201
3202     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3203     if (env) _mempool_size = CmiReadSize(env);
3204     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3205         _mempool_size = CmiReadSize(env);
3206
3207
3208     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3209     if (env) {
3210         MAX_REG_MEM = CmiReadSize(env);
3211         user_set_flag = 1;
3212     }
3213     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3214         MAX_REG_MEM = CmiReadSize(env);
3215         user_set_flag = 1;
3216     }
3217
3218     env = getenv("CHARM_UGNI_SEND_MAX");
3219     if (env) {
3220         MAX_BUFF_SEND = CmiReadSize(env);
3221         user_set_flag = 1;
3222     }
3223     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3224         MAX_BUFF_SEND = CmiReadSize(env);
3225         user_set_flag = 1;
3226     }
3227
3228     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3229     if (env) {
3230         _mempool_size_limit = CmiReadSize(env);
3231     }
3232
3233     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3234     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3235
3236     if (myrank==0) {
3237         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3238         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3239         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3240             /* memblock can expand to BIG_MSG * 2 size */
3241             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3242             CmiAbort("mempool maximum size is too small. \n");
3243         }
3244 #if MULTI_THREAD_SEND
3245         printf("Charm++> worker thread sending messages\n");
3246 #elif COMM_THREAD_SEND
3247         printf("Charm++> only comm thread send/recv messages\n");
3248 #endif
3249     }
3250
3251 #endif     /* end of USE_LRTS_MEMPOOL */
3252
3253     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3254     if (env) {
3255         BIG_MSG = CmiReadSize(env);
3256         if (BIG_MSG < ONE_SEG)
3257           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3258     }
3259     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3260     if (env) {
3261         BIG_MSG_PIPELINE = atoi(env);
3262     }
3263
3264     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3265     if (env) _checkProgress = 0;
3266     if (mysize == 1) _checkProgress = 0;
3267
3268     
3269     /*
3270     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3271     if (env) 
3272         _tlbpagesize = CmiReadSize(env);
3273     */
3274     /* real gethugepagesize() is only available when hugetlb module linked */
3275     _tlbpagesize = gethugepagesize();
3276     if (myrank == 0) {
3277         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3278     }
3279
3280 #if LARGEPAGE
3281     if (_tlbpagesize == 4096) {
3282         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3283     }
3284 #endif
3285
3286     print_stats = CmiGetArgFlag(*argv, "+print_stats");
3287
3288     /* init DMA buffer for medium message */
3289
3290     //_init_DMA_buffer();
3291     
3292     free(MPID_UGNI_AllAddr);
3293 #if CMK_SMP
3294     sendRdmaBuf = PCQueueCreate();
3295 #else
3296     sendRdmaBuf = 0;
3297 #endif
3298 #if MACHINE_DEBUG_LOG
3299     char ln[200];
3300     sprintf(ln,"debugLog.%d",myrank);
3301     debugLog=fopen(ln,"w");
3302 #endif
3303
3304 //    NTK_Init();
3305 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3306
3307 #if CMK_WITH_STATS
3308     init_comm_stats();
3309 #endif
3310 }
3311
3312 void* LrtsAlloc(int n_bytes, int header)
3313 {
3314     void *ptr = NULL;
3315 #if 0
3316     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3317 #endif
3318     if(n_bytes <= SMSG_MAX_MSG)
3319     {
3320         int totalsize = n_bytes+header;
3321         ptr = malloc(totalsize);
3322     }
3323     else {
3324         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3325 #if     USE_LRTS_MEMPOOL
3326         n_bytes = ALIGN64(n_bytes);
3327         if(n_bytes < BIG_MSG)
3328         {
3329             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3330             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3331         }else 
3332         {
3333 #if LARGEPAGE
3334             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3335             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3336             char *res = my_get_huge_pages(n_bytes);
3337 #else
3338             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3339 #endif
3340             if (res) ptr = res + ALIGNBUF - header;
3341         }
3342 #else
3343         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3344         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3345         ptr = res + ALIGNBUF - header;
3346 #endif
3347     }
3348     return ptr;
3349 }
3350
3351 void  LrtsFree(void *msg)
3352 {
3353     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3354     if (size <= SMSG_MAX_MSG)
3355         free(msg);
3356     else {
3357         size = ALIGN64(size);
3358         if(size>=BIG_MSG)
3359         {
3360 #if LARGEPAGE
3361             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3362             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3363 #else
3364             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3365 #endif
3366         }
3367         else {
3368 #if    USE_LRTS_MEMPOOL
3369 #if CMK_SMP
3370             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3371 #else
3372             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3373 #endif
3374 #else
3375             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3376 #endif
3377         }
3378     }
3379 }
3380
3381 void LrtsExit()
3382 {
3383 #if CMK_WITH_STATS
3384     if (print_stats) print_comm_stats();
3385 #endif
3386     /* free memory ? */
3387 #if USE_LRTS_MEMPOOL
3388     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
3389     mempool_destroy(CpvAccess(mempool));
3390 #endif
3391     PMI_Finalize();
3392     exit(0);
3393 }
3394
3395 void LrtsDrainResources()
3396 {
3397     if(mysize == 1) return;
3398     while (
3399 #if CMK_USE_OOB
3400            !SendBufferMsg(&smsg_oob_queue) ||
3401 #endif
3402            !SendBufferMsg(&smsg_queue) 
3403 #if PIGGYBACK_ACK
3404         || !SendBufferMsg(&smsg_ack_queue)
3405 #endif
3406           )
3407     {
3408         if (useDynamicSMSG)
3409             PumpDatagramConnection();
3410         PumpNetworkSmsg();
3411         PumpLocalRdmaTransactions();
3412         SendRdmaMsg();
3413     }
3414     PMI_Barrier();
3415 }
3416
3417 void LrtsAbort(const char *message) {
3418     printf("CmiAbort is calling on PE:%d\n", myrank);
3419     CmiPrintStackTrace(0);
3420     PMI_Abort(-1, message);
3421 }
3422
3423 /**************************  TIMER FUNCTIONS **************************/
3424 #if CMK_TIMER_USE_SPECIAL
3425 /* MPI calls are not threadsafe, even the timer on some machines */
3426 static CmiNodeLock  timerLock = 0;
3427 static int _absoluteTime = 0;
3428 static int _is_global = 0;
3429 static struct timespec start_ts;
3430
3431 inline int CmiTimerIsSynchronized() {
3432     return 0;
3433 }
3434
3435 inline int CmiTimerAbsolute() {
3436     return _absoluteTime;
3437 }
3438
3439 double CmiStartTimer() {
3440     return 0.0;
3441 }
3442
3443 double CmiInitTime() {
3444     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
3445 }
3446
3447 void CmiTimerInit(char **argv) {
3448     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3449     if (_absoluteTime && CmiMyPe() == 0)
3450         printf("Charm++> absolute  timer is used\n");
3451     
3452     _is_global = CmiTimerIsSynchronized();
3453
3454
3455     if (_is_global) {
3456         if (CmiMyRank() == 0) {
3457             clock_gettime(CLOCK_MONOTONIC, &start_ts);
3458         }
3459     } else { /* we don't have a synchronous timer, set our own start time */
3460         CmiBarrier();
3461         CmiBarrier();
3462         CmiBarrier();
3463         clock_gettime(CLOCK_MONOTONIC, &start_ts);
3464     }
3465     CmiNodeAllBarrier();          /* for smp */
3466 }
3467
3468 /**
3469  * Since the timerLock is never created, and is
3470  * always NULL, then all the if-condition inside
3471  * the timer functions could be disabled right
3472  * now in the case of SMP.
3473  */
3474 double CmiTimer(void) {
3475     struct timespec now_ts;
3476     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3477     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3478         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3479 }
3480
3481 double CmiWallTimer(void) {