uncomment trace BTE Init tracing
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37
38 #include <gni_pub.h>
39 #include <pmi.h>
40 //#include <numatoolkit.h>
41
42 #include "converse.h"
43
44 #if CMK_DIRECT
45 #include "cmidirect.h"
46 #endif
47
48 #define     LARGEPAGE              0
49
50 #if CMK_SMP
51 #define MULTI_THREAD_SEND          0
52 #define COMM_THREAD_SEND           1
53 #endif
54
55 #if MULTI_THREAD_SEND
56 #define CMK_WORKER_SINGLE_TASK     0
57 #endif
58
59 #define REMOTE_EVENT               0
60 #define CQWRITE                    0
61
62 #define CMI_EXERT_SEND_CAP      0
63 #define CMI_EXERT_RECV_CAP      0
64
65 #if CMI_EXERT_SEND_CAP
66 #define SEND_CAP 16
67 #endif
68
69 #if CMI_EXERT_RECV_CAP
70 #define RECV_CAP 2
71 #endif
72
73 #define USE_LRTS_MEMPOOL                  1
74
75 #define PRINT_SYH                         0
76
77 // Trace communication thread
78 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
79 #define TRACE_THRESHOLD     0.00005
80 #define CMI_MPI_TRACE_MOREDETAILED 0
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_SMP_TRACE_COMMTHREAD
85 #define CMK_SMP_TRACE_COMMTHREAD 0
86 #endif
87
88 #define CMK_TRACE_COMMOVERHEAD 0
89 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
90 #undef CMI_MPI_TRACE_USEREVENTS
91 #define CMI_MPI_TRACE_USEREVENTS 1
92 #else
93 #undef CMK_TRACE_COMMOVERHEAD
94 #define CMK_TRACE_COMMOVERHEAD 0
95 #endif
96
97 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
98 CpvStaticDeclare(double, projTraceStart);
99 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
100 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
101 #else
102 #define  START_EVENT()
103 #define  END_EVENT(x)
104 #endif
105
106 #if USE_LRTS_MEMPOOL
107
108 #define oneMB (1024ll*1024)
109 #define oneGB (1024ll*1024*1024)
110
111 static CmiInt8 _mempool_size = 8*oneMB;
112 static CmiInt8 _expand_mem =  4*oneMB;
113 static CmiInt8 _mempool_size_limit = 0;
114
115 static CmiInt8 _totalmem = 0.8*oneGB;
116
117 #if LARGEPAGE
118 static CmiInt8 BIG_MSG  =  16*oneMB;
119 static CmiInt8 ONE_SEG  =  4*oneMB;
120 #else
121 static CmiInt8 BIG_MSG  =  4*oneMB;
122 static CmiInt8 ONE_SEG  =  2*oneMB;
123 #endif
124 #if MULTI_THREAD_SEND
125 static int BIG_MSG_PIPELINE = 1;
126 #else
127 static int BIG_MSG_PIPELINE = 4;
128 #endif
129
130 // dynamic flow control
131 static CmiInt8 buffered_send_msg = 0;
132 static CmiInt8 register_memory_size = 0;
133
134 #if LARGEPAGE
135 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
136 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
137 static CmiInt8  register_count = 0;
138 #else
139 #if CMK_SMP && COMM_THREAD_SEND 
140 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
141 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
142 #else
143 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
144 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
145 #endif
146
147
148 #endif
149
150 #endif     /* end USE_LRTS_MEMPOOL */
151
152 #if MULTI_THREAD_SEND 
153 #define     CMI_GNI_LOCK(x)       CmiLock(x);
154 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
155 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
156 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
157 #else
158 #define     CMI_GNI_LOCK(x)
159 #define     CMI_GNI_UNLOCK(x)
160 #define     CMI_PCQUEUEPOP_LOCK(Q)   
161 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
162 #endif
163
164 static int _tlbpagesize = 4096;
165
166 //static int _smpd_count  = 0;
167
168 static int   user_set_flag  = 0;
169
170 static int _checkProgress = 1;             /* check deadlock */
171 static int _detected_hang = 0;
172
173 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
174
175 // dynamic SMSG
176 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
177
178 static int avg_smsg_connection = 32;
179 static int                 *smsg_connected_flag= 0;
180 static gni_smsg_attr_t     **smsg_attr_vector_local;
181 static gni_smsg_attr_t     **smsg_attr_vector_remote;
182 static gni_ep_handle_t     ep_hndl_unbound;
183 static gni_smsg_attr_t     send_smsg_attr;
184 static gni_smsg_attr_t     recv_smsg_attr;
185
186 typedef struct _dynamic_smsg_mailbox{
187    void     *mailbox_base;
188    int      size;
189    int      offset;
190    gni_mem_handle_t  mem_hndl;
191    struct      _dynamic_smsg_mailbox  *next;
192 }dynamic_smsg_mailbox_t;
193
194 static dynamic_smsg_mailbox_t  *mailbox_list;
195
196 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
197 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
198
199 #if PRINT_SYH
200 int         lrts_send_msg_id = 0;
201 int         lrts_local_done_msg = 0;
202 int         lrts_send_rdma_success = 0;
203 #endif
204
205 #include "machine.h"
206
207 #include "pcqueue.h"
208
209 #include "mempool.h"
210
211 #if CMK_PERSISTENT_COMM
212 #include "machine-persistent.h"
213 #endif
214
215 //#define  USE_ONESIDED 1
216 #ifdef USE_ONESIDED
217 //onesided implementation is wrong, since no place to restore omdh
218 #include "onesided.h"
219 onesided_hnd_t   onesided_hnd;
220 onesided_md_t    omdh;
221 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
222
223 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
224
225 #else
226 uint8_t   onesided_hnd, omdh;
227
228 #if REMOTE_EVENT || CQWRITE 
229 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
230     if(register_memory_size+size>= MAX_REG_MEM) { \
231         status = GNI_RC_ERROR_NOMEM;} \
232     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
233         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
234 #else
235 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
236         if (register_memory_size + size >= MAX_REG_MEM) { \
237             status = GNI_RC_ERROR_NOMEM; \
238         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
239             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
240 #endif
241
242 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
243     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
244              register_memory_size -= size; \
245          else CmiAbort("MEM_DEregister");  \
246     } while (0)
247 #endif
248
249 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
250 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
251 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
252 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
253 #define   IncreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)++
254 #define   DecreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)--
255 #define   IncreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)++
256 #define   DecreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)--
257 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
258 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
259 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
260 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
261 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
262 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
263
264 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
265 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
266
267 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
268 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
269 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
270 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
271
272 #define ALIGNBUF                64
273
274 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
275 /* If SMSG is not used */
276
277 #define FMA_PER_CORE  1024
278 #define FMA_BUFFER_SIZE 1024
279
280 /* If SMSG is used */
281 static int  SMSG_MAX_MSG = 1024;
282 #define SMSG_MAX_CREDIT 72 
283
284 #define MSGQ_MAXSIZE       2048
285 /* large message transfer with FMA or BTE */
286 #define LRTS_GNI_RDMA_THRESHOLD  1024 
287
288 #if CMK_SMP
289 static int  REMOTE_QUEUE_ENTRIES=163840; 
290 static int LOCAL_QUEUE_ENTRIES=163840; 
291 #else
292 static int  REMOTE_QUEUE_ENTRIES=20480;
293 static int LOCAL_QUEUE_ENTRIES=20480; 
294 #endif
295
296 #define BIG_MSG_TAG             0x26
297 #define PUT_DONE_TAG            0x28
298 #define DIRECT_PUT_DONE_TAG     0x29
299 #define ACK_TAG                 0x30
300 /* SMSG is data message */
301 #define SMALL_DATA_TAG          0x31
302 /* SMSG is a control message to initialize a BTE */
303 #define LMSG_INIT_TAG           0x39 
304
305 #define DEBUG
306 #ifdef GNI_RC_CHECK
307 #undef GNI_RC_CHECK
308 #endif
309 #ifdef DEBUG
310 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
311 #else
312 #define GNI_RC_CHECK(msg,rc)
313 #endif
314
315 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
316 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
317 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
318
319 static int useStaticMSGQ = 0;
320 static int useStaticFMA = 0;
321 static int mysize, myrank;
322 static gni_nic_handle_t   nic_hndl;
323
324 typedef struct {
325     gni_mem_handle_t mdh;
326     uint64_t addr;
327 } mdh_addr_t ;
328 // this is related to dynamic SMSG
329
330 typedef struct mdh_addr_list{
331     gni_mem_handle_t mdh;
332    void *addr;
333     struct mdh_addr_list *next;
334 }mdh_addr_list_t;
335
336 static unsigned int         smsg_memlen;
337 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
338 mdh_addr_t          setup_mem;
339 mdh_addr_t          *smsg_connection_vec = 0;
340 gni_mem_handle_t    smsg_connection_memhndl;
341 static int          smsg_expand_slots = 10;
342 static int          smsg_available_slot = 0;
343 static void         *smsg_mailbox_mempool = 0;
344 mdh_addr_list_t     *smsg_dynamic_list = 0;
345
346 static void             *smsg_mailbox_base;
347 gni_msgq_attr_t         msgq_attrs;
348 gni_msgq_handle_t       msgq_handle;
349 gni_msgq_ep_attr_t      msgq_ep_attrs;
350 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
351
352 /* =====Beginning of Declarations of Machine Specific Variables===== */
353 static int cookie;
354 static int modes = 0;
355 static gni_cq_handle_t       smsg_rx_cqh = NULL;
356 static gni_cq_handle_t       default_tx_cqh = NULL;
357 static gni_cq_handle_t       rdma_tx_cqh = NULL;
358 static gni_cq_handle_t       rdma_rx_cqh = NULL;
359 static gni_cq_handle_t       post_tx_cqh = NULL;
360 static gni_ep_handle_t       *ep_hndl_array;
361
362 static CmiNodeLock           *ep_lock_array;
363 static CmiNodeLock           default_tx_cq_lock; 
364 static CmiNodeLock           rdma_tx_cq_lock; 
365 static CmiNodeLock           global_gni_lock; 
366 static CmiNodeLock           rx_cq_lock;
367 static CmiNodeLock           smsg_mailbox_lock;
368 static CmiNodeLock           smsg_rx_cq_lock;
369 static CmiNodeLock           *mempool_lock;
370 //#define     CMK_WITH_STATS      0
371 typedef struct msg_list
372 {
373     uint32_t destNode;
374     uint32_t size;
375     void *msg;
376     uint8_t tag;
377 #if !CMK_SMP
378     struct msg_list *next;
379 #endif
380 #if CMK_WITH_STATS
381     double  creation_time;
382 #endif
383 }MSG_LIST;
384
385
386 typedef struct control_msg
387 {
388     uint64_t            source_addr;    /* address from the start of buffer  */
389     uint64_t            dest_addr;      /* address from the start of buffer */
390     int                 total_length;   /* total length */
391     int                 length;         /* length of this packet */
392 #if REMOTE_EVENT
393     int                 ack_index;      /* index from integer to address */
394 #endif
395     uint8_t             seq_id;         //big message   0 meaning single message
396     gni_mem_handle_t    source_mem_hndl;
397     struct control_msg *next;
398 } CONTROL_MSG;
399
400 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
401
402 typedef struct ack_msg
403 {
404     uint64_t            source_addr;    /* address from the start of buffer  */
405 #if ! USE_LRTS_MEMPOOL
406     gni_mem_handle_t    source_mem_hndl;
407     int                 length;          /* total length */
408 #endif
409     struct ack_msg     *next;
410 } ACK_MSG;
411
412 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
413
414 #if CMK_DIRECT
415 typedef struct{
416     uint64_t    handler_addr;
417 }CMK_DIRECT_HEADER;
418
419 typedef struct {
420     char core[CmiMsgHeaderSizeBytes];
421     uint64_t handler;
422 }cmidirectMsg;
423
424 //SYH
425 CpvDeclare(int, CmiHandleDirectIdx);
426 void CmiHandleDirectMsg(cmidirectMsg* msg)
427 {
428
429     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
430    (*(_handle->callbackFnPtr))(_handle->callbackData);
431    CmiFree(msg);
432 }
433
434 void CmiDirectInit()
435 {
436     CpvInitialize(int,  CmiHandleDirectIdx);
437     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
438 }
439
440 #endif
441 typedef struct  rmda_msg
442 {
443     int                   destNode;
444 #if REMOTE_EVENT
445     int                   ack_index;
446 #endif
447     gni_post_descriptor_t *pd;
448 #if !CMK_SMP
449     struct  rmda_msg      *next;
450 #endif
451 }RDMA_REQUEST;
452
453
454 #if CMK_SMP
455 #define SMP_LOCKS               0
456 #define ONE_SEND_QUEUE                  0
457 PCQueue sendRdmaBuf;
458 typedef struct  msg_list_index
459 {
460     PCQueue     sendSmsgBuf;
461     int         pushed;
462     CmiNodeLock   lock;
463 } MSG_LIST_INDEX;
464 char                *destpe_avail;
465 #if  !ONE_SEND_QUEUE && SMP_LOCKS
466     PCQueue     nonEmptyQueues;
467 #endif
468 #else         /* non-smp */
469
470 static RDMA_REQUEST        *sendRdmaBuf = 0;
471 static RDMA_REQUEST        *sendRdmaTail = 0;
472 typedef struct  msg_list_index
473 {
474     int         next;
475     MSG_LIST    *sendSmsgBuf;
476     MSG_LIST    *tail;
477 } MSG_LIST_INDEX;
478
479 #endif
480
481 // buffered send queue
482 #if ! ONE_SEND_QUEUE
483 typedef struct smsg_queue
484 {
485     MSG_LIST_INDEX   *smsg_msglist_index;
486     int               smsg_head_index;
487 } SMSG_QUEUE;
488 #else
489 typedef struct smsg_queue
490 {
491     PCQueue       sendMsgBuf;
492 }  SMSG_QUEUE;
493 #endif
494
495 SMSG_QUEUE                  smsg_queue;
496 #if CMK_USE_OOB
497 SMSG_QUEUE                  smsg_oob_queue;
498 #endif
499
500 #if CMK_SMP
501
502 #define FreeMsgList(d)   free(d);
503 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
504
505 #else
506
507 static MSG_LIST       *msglist_freelist=0;
508
509 #define FreeMsgList(d)  \
510   do { \
511   (d)->next = msglist_freelist;\
512   msglist_freelist = d; \
513   } while (0)
514
515 #define MallocMsgList(d) \
516   do {  \
517   d = msglist_freelist;\
518   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
519              _MEMCHECK(d);\
520   } else msglist_freelist = d->next; \
521   d->next =0;  \
522   } while (0)
523
524 #endif
525
526 #if CMK_SMP
527
528 #define FreeControlMsg(d)      free(d);
529 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
530
531 #else
532
533 static CONTROL_MSG    *control_freelist=0;
534
535 #define FreeControlMsg(d)       \
536   do { \
537   (d)->next = control_freelist;\
538   control_freelist = d; \
539   } while (0);
540
541 #define MallocControlMsg(d) \
542   do {  \
543   d = control_freelist;\
544   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
545              _MEMCHECK(d);\
546   } else control_freelist = d->next;  \
547   } while (0);
548
549 #endif
550
551 #if CMK_SMP
552
553 #define FreeAckMsg(d)      free(d);
554 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
555
556 #else
557
558 static ACK_MSG        *ack_freelist=0;
559
560 #define FreeAckMsg(d)       \
561   do { \
562   (d)->next = ack_freelist;\
563   ack_freelist = d; \
564   } while (0)
565
566 #define MallocAckMsg(d) \
567   do { \
568   d = ack_freelist;\
569   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
570              _MEMCHECK(d);\
571   } else ack_freelist = d->next; \
572   } while (0)
573
574 #endif
575
576
577 # if CMK_SMP
578 #define FreeRdmaRequest(d)       free(d);
579 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
580 #else
581
582 static RDMA_REQUEST         *rdma_freelist = NULL;
583
584 #define FreeRdmaRequest(d)       \
585   do {  \
586   (d)->next = rdma_freelist;\
587   rdma_freelist = d;    \
588   } while (0)
589
590 #define MallocRdmaRequest(d) \
591   do {   \
592   d = rdma_freelist;\
593   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
594              _MEMCHECK(d);\
595   } else rdma_freelist = d->next; \
596   d->next =0;   \
597   } while (0)
598 #endif
599
600 /* reuse gni_post_descriptor_t */
601 static gni_post_descriptor_t *post_freelist=0;
602
603 #if  !CMK_SMP
604 #define FreePostDesc(d)       \
605     (d)->next_descr = post_freelist;\
606     post_freelist = d;
607
608 #define MallocPostDesc(d) \
609   d = post_freelist;\
610   if (d==0) { \
611      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
612      d->next_descr = 0;\
613       _MEMCHECK(d);\
614   } else post_freelist = d->next_descr;
615 #else
616
617 #define FreePostDesc(d)     free(d);
618 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
619
620 #endif
621
622
623 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
624 static int      buffered_smsg_counter = 0;
625
626 /* SmsgSend return success but message sent is not confirmed by remote side */
627 static MSG_LIST *buffered_fma_head = 0;
628 static MSG_LIST *buffered_fma_tail = 0;
629
630 /* functions  */
631 #define IsFree(a,ind)  !( a& (1<<(ind) ))
632 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
633 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
634
635 CpvDeclare(mempool_type*, mempool);
636
637 #if REMOTE_EVENT
638 /* ack pool for remote events */
639
640 #define ACK_SHIFT                  19
641 #define ACK_EVENT(idx)             ((idx<<ACK_SHIFT) | myrank)
642 #define ACK_GET_RANK(evt)          (evt & ((1<<ACK_SHIFT)-1))
643 #define ACK_GET_INDEX(evt)         (evt >> ACK_SHIFT)
644
645 struct AckPool {
646 void *addr;
647 int next;
648 };
649
650 static struct AckPool   *ackpool;
651 static int    ackpoolsize;
652 static int    ackpool_freehead;
653 static CmiNodeLock  ackpool_lock;
654
655 #define  GetAckAddress(s)          (ackpool[s].addr)
656
657 static void AckPool_init()
658 {
659     int i;
660     if ((1<<ACK_SHIFT) < mysize) 
661         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
662     ackpoolsize = 2048;
663     ackpool = (struct AckPool *)malloc(ackpoolsize*sizeof(struct AckPool));
664     for (i=0; i<ackpoolsize-1; i++) {
665         ackpool[i].next = i+1;
666     }
667     ackpool[i].next = -1;
668     ackpool_freehead = 0;
669 #if MULTI_THREAD_SEND 
670     ackpool_lock  = CmiCreateLock();
671 #endif
672 }
673
674 static
675 inline int AckPool_getslot(void *addr)
676 {
677     int i;
678     int s;
679     CMI_GNI_LOCK(ackpool_lock);
680     s = ackpool_freehead;
681     if (s == -1) {
682         int newsize = ackpoolsize * 2;
683         printf("[%d] AckPool_getslot expand to: %d\n", myrank, newsize);
684         if (newsize > (1<<(32-ACK_SHIFT))) CmiAbort("AckPool too large");
685         struct AckPool *old_ackpool = ackpool;
686         ackpool = (struct AckPool *)malloc(newsize*sizeof(struct AckPool));
687         memcpy(ackpool, old_ackpool, ackpoolsize*sizeof(struct AckPool));
688         for (i=ackpoolsize; i<newsize-1; i++) {
689             ackpool[i].next = i+1;
690         }
691         ackpool[i].next = -1;
692         ackpool_freehead = ackpoolsize;
693         s = ackpoolsize;
694         ackpoolsize = newsize;
695         free(old_ackpool);
696     }
697     ackpool_freehead = ackpool[s].next;
698     ackpool[s].addr = addr;
699     CMI_GNI_UNLOCK(ackpool_lock);
700     return s;
701 }
702
703 static
704 inline  void AckPool_freeslot(int s)
705 {
706     CmiAssert(s>=0 && s<ackpoolsize);
707     CMI_GNI_LOCK(ackpool_lock);
708     ackpool[s].next = ackpool_freehead;
709     ackpool_freehead = s;
710     CMI_GNI_UNLOCK(ackpool_lock);
711 }
712
713
714 #endif
715
716 #if CMK_WITH_STATS
717 typedef struct comm_thread_stats
718 {
719     uint64_t  smsg_data_count;
720     uint64_t  lmsg_init_count;
721     uint64_t  ack_count;
722     uint64_t  big_msg_ack_count;
723     uint64_t  smsg_count;
724     //times of calling SmsgSend
725     uint64_t  try_smsg_data_count;
726     uint64_t  try_lmsg_init_count;
727     uint64_t  try_ack_count;
728     uint64_t  try_big_msg_ack_count;
729     uint64_t  try_smsg_count;
730     
731     double    max_time_in_send_buffered_smsg;
732     double    all_time_in_send_buffered_smsg;
733
734     uint64_t  rdma_count;
735     uint64_t  try_rdma_count;
736     double    max_time_from_control_to_rdma_init;
737     double    all_time_from_control_to_rdma_init;
738
739     double    max_time_from_rdma_init_to_rdma_done;
740     double    all_time_from_rdma_init_to_rdma_done;
741 } Comm_Thread_Stats;
742
743 static Comm_Thread_Stats   comm_stats;
744
745 static void init_comm_stats()
746 {
747   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
748 }
749
750 #define SMSG_CREATION( x )  x->creation_time = CmiWallTimer();
751
752 #define SMSG_SENT_DONE(creation_time, tag)  \
753         {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
754             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
755             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
756             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
757             comm_stats.smsg_count++; \
758             double inbuff_time = CmiWallTimer() - creation_time;   \
759             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
760             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
761         }
762
763 #define SMSG_TRY_SEND(tag)  \
764         {   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
765             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
766             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
767             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
768             comm_stats.try_smsg_count++; \
769         }
770
771 #define  RDMA_TRY_SEND()        comm_stats.try_rdma_count++;
772
773 #define  RDMA_Trans_DONE(x)      \
774          {  comm_stats.rdma_count++;  \
775              double rdma_trans_time = CmiWallTimer() - x ; \
776              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
777              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
778          }
779
780 #define  RDMA_Trans_INIT(x)      \
781          {  double rdma_trans_time = CmiWallTimer() - x ; \
782              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
783              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
784          }
785
786
787
788 static void print_comm_stats()
789 {
790     printf("PE[%d]SMSG\t[max:%f\tAverage:%f](milisecond)\n", myrank, 1000*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
791     printf("PE[%d]Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n", myrank, 
792             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
793             comm_stats.ack_count, comm_stats.big_msg_ack_count);
794     
795     printf("PE[%d]SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n", myrank, 
796             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
797             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count);
798 }
799 #else
800 #define STATS_ACK_TIME(x)            x
801 #define STATS_SEND_SMSGS_TIME(x)     x
802 #endif
803
804 static int print_stats = 0;
805
806 static void
807 allgather(void *in,void *out, int len)
808 {
809     static int *ivec_ptr=NULL,already_called=0,job_size=0;
810     int i,rc;
811     int my_rank;
812     char *tmp_buf,*out_ptr;
813
814     if(!already_called) {
815
816         rc = PMI_Get_size(&job_size);
817         CmiAssert(rc == PMI_SUCCESS);
818         rc = PMI_Get_rank(&my_rank);
819         CmiAssert(rc == PMI_SUCCESS);
820
821         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
822         CmiAssert(ivec_ptr != NULL);
823
824         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
825         CmiAssert(rc == PMI_SUCCESS);
826
827         already_called = 1;
828
829     }
830
831     tmp_buf = (char *)malloc(job_size * len);
832     CmiAssert(tmp_buf);
833
834     rc = PMI_Allgather(in,tmp_buf,len);
835     CmiAssert(rc == PMI_SUCCESS);
836
837     out_ptr = out;
838
839     for(i=0;i<job_size;i++) {
840
841         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
842
843     }
844
845     free(tmp_buf);
846 }
847
848 static void
849 allgather_2(void *in,void *out, int len)
850 {
851     //PMI_Allgather is out of order
852     int i,rc, extend_len;
853     int  rank_index;
854     char *out_ptr, *out_ref;
855     char *in2;
856
857     extend_len = sizeof(int) + len;
858     in2 = (char*)malloc(extend_len);
859
860     memcpy(in2, &myrank, sizeof(int));
861     memcpy(in2+sizeof(int), in, len);
862
863     out_ptr = (char*)malloc(mysize*extend_len);
864
865     rc = PMI_Allgather(in2, out_ptr, extend_len);
866     GNI_RC_CHECK("allgather", rc);
867
868     out_ref = out;
869
870     for(i=0;i<mysize;i++) {
871         //rank index 
872         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
873         //copy to the rank index slot
874         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
875     }
876
877     free(out_ptr);
878     free(in2);
879
880 }
881
882 static unsigned int get_gni_nic_address(int device_id)
883 {
884     unsigned int address, cpu_id;
885     gni_return_t status;
886     int i, alps_dev_id=-1,alps_address=-1;
887     char *token, *p_ptr;
888
889     p_ptr = getenv("PMI_GNI_DEV_ID");
890     if (!p_ptr) {
891         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
892        
893         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
894     } else {
895         while ((token = strtok(p_ptr,":")) != NULL) {
896             alps_dev_id = atoi(token);
897             if (alps_dev_id == device_id) {
898                 break;
899             }
900             p_ptr = NULL;
901         }
902         CmiAssert(alps_dev_id != -1);
903         p_ptr = getenv("PMI_GNI_LOC_ADDR");
904         CmiAssert(p_ptr != NULL);
905         i = 0;
906         while ((token = strtok(p_ptr,":")) != NULL) {
907             if (i == alps_dev_id) {
908                 alps_address = atoi(token);
909                 break;
910             }
911             p_ptr = NULL;
912             ++i;
913         }
914         CmiAssert(alps_address != -1);
915         address = alps_address;
916     }
917     return address;
918 }
919
920 static uint8_t get_ptag(void)
921 {
922     char *p_ptr, *token;
923     uint8_t ptag;
924
925     p_ptr = getenv("PMI_GNI_PTAG");
926     CmiAssert(p_ptr != NULL);
927     token = strtok(p_ptr, ":");
928     ptag = (uint8_t)atoi(token);
929     return ptag;
930         
931 }
932
933 static uint32_t get_cookie(void)
934 {
935     uint32_t cookie;
936     char *p_ptr, *token;
937
938     p_ptr = getenv("PMI_GNI_COOKIE");
939     CmiAssert(p_ptr != NULL);
940     token = strtok(p_ptr, ":");
941     cookie = (uint32_t)atoi(token);
942
943     return cookie;
944 }
945
946 #if LARGEPAGE
947
948 /* directly mmap memory from hugetlbfs for large pages */
949
950 #include <sys/stat.h>
951 #include <fcntl.h>
952 #include <sys/mman.h>
953 #include <hugetlbfs.h>
954
955 // size must be _tlbpagesize aligned
956 void *my_get_huge_pages(size_t size)
957 {
958     char filename[512];
959     int fd;
960     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
961     void *ptr = NULL;
962
963     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
964     fd = open(filename, O_RDWR | O_CREAT, mode);
965     if (fd == -1) {
966         CmiAbort("my_get_huge_pages: open filed");
967     }
968     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
969     if (ptr == MAP_FAILED) ptr = NULL;
970 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
971     close(fd);
972     unlink(filename);
973     return ptr;
974 }
975
976 void my_free_huge_pages(void *ptr, int size)
977 {
978 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
979     int ret = munmap(ptr, size);
980     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
981 }
982
983 #endif
984
985 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
986 /* TODO: add any that are related */
987 /* =====End of Definitions of Message-Corruption Related Macros=====*/
988
989
990 #include "machine-lrts.h"
991 #include "machine-common-core.c"
992
993 /* Network progress function is used to poll the network when for
994    messages. This flushes receive buffers on some  implementations*/
995 #if CMK_MACHINE_PROGRESS_DEFINED
996 void CmiMachineProgressImpl() {
997 }
998 #endif
999
1000 static int SendBufferMsg(SMSG_QUEUE *queue);
1001 static void SendRdmaMsg();
1002 static void PumpNetworkSmsg();
1003 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1004 #if CQWRITE
1005 static void PumpCqWriteTransactions();
1006 #endif
1007 #if REMOTE_EVENT
1008 static void PumpRemoteTransactions();
1009 #endif
1010
1011 #if MACHINE_DEBUG_LOG
1012 FILE *debugLog = NULL;
1013 static CmiInt8 buffered_recv_msg = 0;
1014 int         lrts_smsg_success = 0;
1015 int         lrts_received_msg = 0;
1016 #endif
1017
1018 static void sweep_mempool(mempool_type *mptr)
1019 {
1020     int n = 0;
1021     block_header *current = &(mptr->block_head);
1022
1023     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1024     while( current!= NULL) {
1025         printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1026         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1027     }
1028     printf("[n %d] sweep_mempool slot END.\n", myrank);
1029 }
1030
1031 inline
1032 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1033 {
1034     block_header *current = *from;
1035
1036     //while(register_memory_size>= MAX_REG_MEM)
1037     //{
1038         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1039             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1040
1041         *from = current;
1042         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1043         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1044         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1045     //}
1046     return GNI_RC_SUCCESS;
1047 }
1048
1049 inline 
1050 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1051 {
1052     gni_return_t status = GNI_RC_SUCCESS;
1053     //int size = GetMempoolsize(msg);
1054     //void *blockaddr = GetMempoolBlockPtr(msg);
1055     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1056    
1057     block_header *current = &(mptr->block_head);
1058     while(register_memory_size>= MAX_REG_MEM)
1059     {
1060         status = deregisterMemory(mptr, &current);
1061         if (status != GNI_RC_SUCCESS) break;
1062     }
1063     if(register_memory_size>= MAX_REG_MEM) return status;
1064
1065     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1066     while(1)
1067     {
1068         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1069         if(status == GNI_RC_SUCCESS)
1070         {
1071             break;
1072         }
1073         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1074         {
1075             CmiAbort("Memory registor for mempool fails\n");
1076         }
1077         else
1078         {
1079             status = deregisterMemory(mptr, &current);
1080             if (status != GNI_RC_SUCCESS) break;
1081         }
1082     }; 
1083     return status;
1084 }
1085
1086 inline 
1087 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1088 {
1089     static int rank = -1;
1090     int i;
1091     gni_return_t status;
1092     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1093     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1094     mempool_type *mptr;
1095
1096     status = registerFromMempool(mptr1, msg, size, t, cqh);
1097     if (status == GNI_RC_SUCCESS) return status;
1098 #if CMK_SMP 
1099     for (i=0; i<CmiMyNodeSize()+1; i++) {
1100       rank = (rank+1)%(CmiMyNodeSize()+1);
1101       mptr = CpvAccessOther(mempool, rank);
1102       if (mptr == mptr1) continue;
1103       status = registerFromMempool(mptr, msg, size, t, cqh);
1104       if (status == GNI_RC_SUCCESS) return status;
1105     }
1106 #endif
1107     return  GNI_RC_ERROR_RESOURCE;
1108 }
1109
1110 inline
1111 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1112 {
1113     MSG_LIST        *msg_tmp;
1114     MallocMsgList(msg_tmp);
1115     msg_tmp->destNode = destNode;
1116     msg_tmp->size   = size;
1117     msg_tmp->msg    = msg;
1118     msg_tmp->tag    = tag;
1119 #if CMK_WITH_STATS
1120     SMSG_CREATION(msg_tmp)
1121 #endif
1122 #if !CMK_SMP
1123     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1124         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1125         queue->smsg_head_index = destNode;
1126         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1127     }else
1128     {
1129         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1130         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1131     }
1132 #else
1133 #if ONE_SEND_QUEUE
1134     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1135 #else
1136 #if SMP_LOCKS
1137     CmiLock(queue->smsg_msglist_index[destNode].lock);
1138     if(queue->smsg_msglist_index[destNode].pushed == 0)
1139     {
1140         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1141     }
1142     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1143     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1144 #else
1145     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1146 #endif
1147 #endif
1148 #endif
1149 #if PRINT_SYH
1150     buffered_smsg_counter++;
1151 #endif
1152 }
1153
1154 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1155 {
1156     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1157 }
1158
1159 inline
1160 static void setup_smsg_connection(int destNode)
1161 {
1162     mdh_addr_list_t  *new_entry = 0;
1163     gni_post_descriptor_t *pd;
1164     gni_smsg_attr_t      *smsg_attr;
1165     gni_return_t status = GNI_RC_NOT_DONE;
1166     RDMA_REQUEST        *rdma_request_msg;
1167     
1168     if(smsg_available_slot == smsg_expand_slots)
1169     {
1170         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1171         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1172         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1173
1174         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1175             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1176             GNI_MEM_READWRITE,   
1177             -1,
1178             &(new_entry->mdh));
1179         smsg_available_slot = 0; 
1180         new_entry->next = smsg_dynamic_list;
1181         smsg_dynamic_list = new_entry;
1182     }
1183     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1184     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1185     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1186     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1187     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1188     smsg_attr->buff_size = smsg_memlen;
1189     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1190     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1191     smsg_local_attr_vec[destNode] = smsg_attr;
1192     smsg_available_slot++;
1193     MallocPostDesc(pd);
1194     pd->type            = GNI_POST_FMA_PUT;
1195     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1196     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1197     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1198     pd->length          = sizeof(gni_smsg_attr_t);
1199     pd->local_addr      = (uint64_t) smsg_attr;
1200     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1201     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1202     pd->src_cq_hndl     = rdma_tx_cqh;
1203     pd->rdma_mode       = 0;
1204     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1205     print_smsg_attr(smsg_attr);
1206     if(status == GNI_RC_ERROR_RESOURCE )
1207     {
1208         MallocRdmaRequest(rdma_request_msg);
1209         rdma_request_msg->destNode = destNode;
1210         rdma_request_msg->pd = pd;
1211         /* buffer this request */
1212     }
1213 #if PRINT_SYH
1214     if(status != GNI_RC_SUCCESS)
1215        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1216     else
1217         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1218 #endif
1219 }
1220
1221 /* useDynamicSMSG */
1222 inline 
1223 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1224 {
1225     gni_return_t status = GNI_RC_NOT_DONE;
1226
1227     if(mailbox_list->offset == mailbox_list->size)
1228     {
1229         dynamic_smsg_mailbox_t *new_mailbox_entry;
1230         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1231         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1232         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1233         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1234         new_mailbox_entry->offset = 0;
1235         
1236         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1237             new_mailbox_entry->size, smsg_rx_cqh,
1238             GNI_MEM_READWRITE,   
1239             -1,
1240             &(new_mailbox_entry->mem_hndl));
1241
1242         GNI_RC_CHECK("register", status);
1243         new_mailbox_entry->next = mailbox_list;
1244         mailbox_list = new_mailbox_entry;
1245     }
1246     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1247     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1248     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1249     local_smsg_attr->mbox_offset = mailbox_list->offset;
1250     mailbox_list->offset += smsg_memlen;
1251     local_smsg_attr->buff_size = smsg_memlen;
1252     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1253     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1254 }
1255
1256 /* useDynamicSMSG */
1257 inline 
1258 static int connect_to(int destNode)
1259 {
1260     gni_return_t status = GNI_RC_NOT_DONE;
1261     CmiAssert(smsg_connected_flag[destNode] == 0);
1262     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1263     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1264     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1265     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1266     
1267     CMI_GNI_LOCK(global_gni_lock)
1268     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1269     CMI_GNI_UNLOCK(global_gni_lock)
1270     if (status == GNI_RC_ERROR_RESOURCE) {
1271       /* possibly destNode is making connection at the same time */
1272       free(smsg_attr_vector_local[destNode]);
1273       smsg_attr_vector_local[destNode] = NULL;
1274       free(smsg_attr_vector_remote[destNode]);
1275       smsg_attr_vector_remote[destNode] = NULL;
1276       mailbox_list->offset -= smsg_memlen;
1277       return 0;
1278     }
1279     GNI_RC_CHECK("GNI_Post", status);
1280     smsg_connected_flag[destNode] = 1;
1281     return 1;
1282 }
1283
1284 inline 
1285 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1286 {
1287     unsigned int          remote_address;
1288     uint32_t              remote_id;
1289     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1290     gni_smsg_attr_t       *smsg_attr;
1291     gni_post_descriptor_t *pd;
1292     gni_post_state_t      post_state;
1293     char                  *real_data; 
1294
1295     if (useDynamicSMSG) {
1296         switch (smsg_connected_flag[destNode]) {
1297         case 0: 
1298             connect_to(destNode);         /* continue to case 1 */
1299         case 1:                           /* pending connection, do nothing */
1300             status = GNI_RC_NOT_DONE;
1301             if(inbuff ==0)
1302                 buffer_small_msgs(queue, msg, size, destNode, tag);
1303             return status;
1304         }
1305     }
1306 #if CMK_SMP
1307 #if ! ONE_SEND_QUEUE
1308     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1309 #endif
1310     {
1311 #else
1312     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1313     {
1314 #endif
1315         //CMI_GNI_LOCK(smsg_mailbox_lock)
1316         CMI_GNI_LOCK(default_tx_cq_lock)
1317 #if CMK_SMP_TRACE_COMMTHREAD
1318         int oldpe = -1;
1319         int oldeventid = -1;
1320         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1321         { 
1322             START_EVENT();
1323             if ( tag == SMALL_DATA_TAG)
1324                 real_data = (char*)msg; 
1325             else 
1326                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1327             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1328             TRACE_COMM_SET_COMM_MSGID(real_data);
1329         }
1330 #endif
1331 #if REMOTE_EVENT
1332         if (tag == LMSG_INIT_TAG) {
1333             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1334             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1335                 control_msg_tmp->ack_index = AckPool_getslot((void*)control_msg_tmp->source_addr);
1336         }
1337         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1338 #endif
1339 #if     CMK_WITH_STATS
1340         SMSG_TRY_SEND(tag)
1341 #endif
1342 #if CMK_WITH_STATS
1343     double              creation_time;
1344     if (ptr == NULL)
1345         creation_time = CmiWallTimer();
1346     else
1347         creation_time = ptr->creation_time;
1348 #endif
1349
1350     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1351 #if CMK_SMP_TRACE_COMMTHREAD
1352         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1353 #endif
1354         CMI_GNI_UNLOCK(default_tx_cq_lock)
1355         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1356         if(status == GNI_RC_SUCCESS)
1357         {
1358 #if     CMK_WITH_STATS
1359             SMSG_SENT_DONE(creation_time,tag) 
1360 #endif
1361 #if CMK_SMP_TRACE_COMMTHREAD
1362             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1363             { 
1364                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1365             }
1366 #endif
1367         }else
1368             status = GNI_RC_ERROR_RESOURCE;
1369     }
1370     if(status != GNI_RC_SUCCESS && inbuff ==0)
1371         buffer_small_msgs(queue, msg, size, destNode, tag);
1372     return status;
1373 }
1374
1375 inline 
1376 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1377 {
1378     /* construct a control message and send */
1379     CONTROL_MSG         *control_msg_tmp;
1380     MallocControlMsg(control_msg_tmp);
1381     control_msg_tmp->source_addr = (uint64_t)msg;
1382     control_msg_tmp->seq_id    = seqno;
1383     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1384 #if REMOTE_EVENT
1385     control_msg_tmp->ack_index    =  -1;
1386 #endif
1387 #if     USE_LRTS_MEMPOOL
1388     if(size < BIG_MSG)
1389     {
1390         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1391     }
1392     else
1393     {
1394         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1395         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1396         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1397     }
1398 #else
1399     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1400 #endif
1401     return control_msg_tmp;
1402 }
1403
1404 #define BLOCKING_SEND_CONTROL    0
1405
1406 // Large message, send control to receiver, receiver register memory and do a GET, 
1407 // return 1 - send no success
1408 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1409 {
1410     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1411     uint32_t            vmdh_index  = -1;
1412     int                 size;
1413     int                 offset = 0;
1414     uint64_t            source_addr;
1415     int                 register_size; 
1416     void                *msg;
1417
1418     size    =   control_msg_tmp->total_length;
1419     source_addr = control_msg_tmp->source_addr;
1420     register_size = control_msg_tmp->length;
1421
1422 #if  USE_LRTS_MEMPOOL
1423     if( control_msg_tmp->seq_id == 0 ){
1424 #if BLOCKING_SEND_CONTROL
1425         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1426             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1427                 LrtsAdvanceCommunication(0);
1428         }
1429 #endif
1430         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1431         {
1432             msg = (void*)source_addr;
1433             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1434             {
1435                 if(!inbuff)
1436                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1437                 return GNI_RC_ERROR_NOMEM;
1438             }
1439             //register the corresponding mempool
1440             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1441             if(status == GNI_RC_SUCCESS)
1442             {
1443                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1444             }
1445         }else
1446         {
1447             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1448             status = GNI_RC_SUCCESS;
1449         }
1450         if(NoMsgInSend(source_addr))
1451             register_size = GetMempoolsize((void*)(source_addr));
1452         else
1453             register_size = 0;
1454     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1455     {
1456         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1457         source_addr += offset;
1458         size = control_msg_tmp->length;
1459 #if BLOCKING_SEND_CONTROL
1460         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1461             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1462                 LrtsAdvanceCommunication(0);
1463         }
1464 #endif
1465         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1466             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1467             {
1468                 if(!inbuff)
1469                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1470                 return GNI_RC_ERROR_NOMEM;
1471             }
1472             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1473             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1474         }
1475         else
1476         {
1477             status = GNI_RC_SUCCESS;
1478         }
1479         register_size = 0;  
1480     }
1481
1482     if(status == GNI_RC_SUCCESS)
1483     {
1484         status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1485         if(status == GNI_RC_SUCCESS)
1486         {
1487             buffered_send_msg += register_size;
1488             if(control_msg_tmp->seq_id == 0)
1489             {
1490                 IncreaseMsgInSend(source_addr);
1491             }
1492             FreeControlMsg(control_msg_tmp);
1493             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1494         }else
1495             status = GNI_RC_ERROR_RESOURCE;
1496
1497     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1498     {
1499         CmiAbort("Memory registor for large msg\n");
1500     }else 
1501     {
1502         status = GNI_RC_ERROR_NOMEM; 
1503         if(!inbuff)
1504             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1505     }
1506     return status;
1507 #else
1508     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1509     if(status == GNI_RC_SUCCESS)
1510     {
1511         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1512         if(status == GNI_RC_SUCCESS)
1513         {
1514             FreeControlMsg(control_msg_tmp);
1515         }
1516     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1517     {
1518         CmiAbort("Memory registor for large msg\n");
1519     }else 
1520     {
1521         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1522     }
1523     return status;
1524 #endif
1525 }
1526
1527 inline void LrtsPrepareEnvelope(char *msg, int size)
1528 {
1529     CmiSetMsgSize(msg, size);
1530 }
1531
1532 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1533 {
1534     gni_return_t        status  =   GNI_RC_SUCCESS;
1535     uint8_t tag;
1536     CONTROL_MSG         *control_msg_tmp;
1537     int                 oob = ( mode & OUT_OF_BAND);
1538     SMSG_QUEUE          *queue;
1539
1540     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1541 #if CMK_USE_OOB
1542     queue = oob? &smsg_oob_queue : &smsg_queue;
1543 #else
1544     queue = &smsg_queue;
1545 #endif
1546
1547     LrtsPrepareEnvelope(msg, size);
1548
1549 #if PRINT_SYH
1550     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1551 #endif 
1552 #if CMK_SMP 
1553     if(size <= SMSG_MAX_MSG)
1554         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1555     else if (size < BIG_MSG) {
1556         control_msg_tmp =  construct_control_msg(size, msg, 0);
1557         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1558     }
1559     else {
1560           CmiSetMsgSeq(msg, 0);
1561           control_msg_tmp =  construct_control_msg(size, msg, 1);
1562           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1563     }
1564 #else   //non-smp, smp(worker sending)
1565     if(size <= SMSG_MAX_MSG)
1566     {
1567         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1568             CmiFree(msg);
1569     }
1570     else if (size < BIG_MSG) {
1571         control_msg_tmp =  construct_control_msg(size, msg, 0);
1572         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1573     }
1574     else {
1575 #if     USE_LRTS_MEMPOOL
1576         CmiSetMsgSeq(msg, 0);
1577         control_msg_tmp =  construct_control_msg(size, msg, 1);
1578         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1579 #else
1580         control_msg_tmp =  construct_control_msg(size, msg, 0);
1581         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1582 #endif
1583     }
1584 #endif
1585     return 0;
1586 }
1587
1588 static void    PumpDatagramConnection();
1589 static      int         event_SetupConnect = 111;
1590 static      int         event_PumpSmsg = 222 ;
1591 static      int         event_PumpTransaction = 333;
1592 static      int         event_PumpRdmaTransaction = 444;
1593 static      int         event_SendBufferSmsg = 444;
1594 static      int         event_SendFmaRdmaMsg = 555;
1595 static      int         event_AdvanceCommunication = 666;
1596
1597 static void registerUserTraceEvents() {
1598 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1599     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1600     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1601     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1602     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1603     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1604     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1605     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1606 #endif
1607 }
1608
1609 static void ProcessDeadlock()
1610 {
1611     static CmiUInt8 *ptr = NULL;
1612     static CmiUInt8  last = 0, mysum, sum;
1613     static int count = 0;
1614     gni_return_t status;
1615     int i;
1616
1617 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1618 //sweep_mempool(CpvAccess(mempool));
1619     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1620     mysum = smsg_send_count + smsg_recv_count;
1621     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1622     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1623     GNI_RC_CHECK("PMI_Allgather", status);
1624     sum = 0;
1625     for (i=0; i<mysize; i++)  sum+= ptr[i];
1626     if (last == 0 || sum == last) 
1627         count++;
1628     else
1629         count = 0;
1630     last = sum;
1631     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1632     if (count == 2) { 
1633         /* detected twice, it is a real deadlock */
1634         if (myrank == 0)  {
1635             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1636             CmiAbort("Fatal> Deadlock detected.");
1637         }
1638
1639     }
1640     _detected_hang = 0;
1641 }
1642
1643 static void CheckProgress()
1644 {
1645     if (smsg_send_count == last_smsg_send_count &&
1646         smsg_recv_count == last_smsg_recv_count ) 
1647     {
1648         _detected_hang = 1;
1649 #if !CMK_SMP
1650         if (_detected_hang) ProcessDeadlock();
1651 #endif
1652
1653     }
1654     else {
1655         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1656         last_smsg_send_count = smsg_send_count;
1657         last_smsg_recv_count = smsg_recv_count;
1658         _detected_hang = 0;
1659     }
1660 }
1661
1662 static void set_limit()
1663 {
1664     //if (!user_set_flag && CmiMyRank() == 0) {
1665     if (CmiMyRank() == 0) {
1666         int mynode = CmiPhysicalNodeID(CmiMyPe());
1667         int numpes = CmiNumPesOnPhysicalNode(mynode);
1668         int numprocesses = numpes / CmiMyNodeSize();
1669         MAX_REG_MEM  = _totalmem / numprocesses;
1670         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1671         if (CmiMyPe() == 0)
1672            printf("mem_max = %lld, send_max =%lld\n", MAX_REG_MEM, MAX_BUFF_SEND);
1673         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1674         {
1675              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1676              CmiAbort("memory registration\n");
1677         }
1678     }
1679 }
1680
1681 void LrtsPostCommonInit(int everReturn)
1682 {
1683 #if CMK_DIRECT
1684     CmiDirectInit();
1685 #endif
1686 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1687     CpvInitialize(double, projTraceStart);
1688     /* only PE 0 needs to care about registration (to generate sts file). */
1689     //if (CmiMyPe() == 0) 
1690     {
1691         registerMachineUserEventsFunction(&registerUserTraceEvents);
1692     }
1693 #endif
1694
1695 #if CMK_SMP
1696     CmiIdleState *s=CmiNotifyGetState();
1697     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1698     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1699 #else
1700     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1701     if (useDynamicSMSG)
1702     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1703 #endif
1704
1705 #if ! LARGEPAGE
1706     if (_checkProgress)
1707 #if CMK_SMP
1708     if (CmiMyRank() == 0)
1709 #endif
1710     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1711 #endif
1712  
1713 #if !LARGEPAGE
1714     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1715 #endif
1716 }
1717
1718 /* this is called by worker thread */
1719 void LrtsPostNonLocal(){
1720 #if CMK_SMP_TRACE_COMMTHREAD
1721     double startT, endT;
1722 #endif
1723 #if MULTI_THREAD_SEND
1724     if(mysize == 1) return;
1725 #if CMK_SMP_TRACE_COMMTHREAD
1726     traceEndIdle();
1727 #endif
1728
1729 #if CMK_SMP_TRACE_COMMTHREAD
1730     startT = CmiWallTimer();
1731 #endif
1732
1733 #if CMK_WORKER_SINGLE_TASK
1734     if (CmiMyRank() % 6 == 0)
1735 #endif
1736     PumpNetworkSmsg();
1737
1738 #if CMK_WORKER_SINGLE_TASK
1739     if (CmiMyRank() % 6 == 1)
1740 #endif
1741     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1742
1743 #if CMK_WORKER_SINGLE_TASK
1744     if (CmiMyRank() % 6 == 2)
1745 #endif
1746     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1747
1748 #if REMOTE_EVENT
1749 #if CMK_WORKER_SINGLE_TASK
1750     if (CmiMyRank() % 6 == 3)
1751 #endif
1752     PumpRemoteTransactions();
1753 #endif
1754
1755 #if CMK_USE_OOB
1756     if (SendBufferMsg(&smsg_oob_queue) == 1)
1757 #endif
1758     {
1759 #if CMK_WORKER_SINGLE_TASK
1760     if (CmiMyRank() % 6 == 4)
1761 #endif
1762         SendBufferMsg(&smsg_queue);
1763     }
1764
1765 #if CMK_WORKER_SINGLE_TASK
1766     if (CmiMyRank() % 6 == 5)
1767 #endif
1768     SendRdmaMsg();
1769
1770 #if CMK_SMP_TRACE_COMMTHREAD
1771     endT = CmiWallTimer();
1772     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1773 #endif
1774 #if CMK_SMP_TRACE_COMMTHREAD
1775     traceBeginIdle();
1776 #endif
1777 #endif
1778 }
1779
1780 /* useDynamicSMSG */
1781 static void    PumpDatagramConnection()
1782 {
1783     uint32_t          remote_address;
1784     uint32_t          remote_id;
1785     gni_return_t status;
1786     gni_post_state_t  post_state;
1787     uint64_t          datagram_id;
1788     int i;
1789
1790    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1791    {
1792        if (datagram_id >= mysize) {           /* bound endpoint */
1793            int pe = datagram_id - mysize;
1794            CMI_GNI_LOCK(global_gni_lock)
1795            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1796            CMI_GNI_UNLOCK(global_gni_lock)
1797            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1798            {
1799                CmiAssert(remote_id == pe);
1800                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1801                GNI_RC_CHECK("Dynamic SMSG Init", status);
1802 #if PRINT_SYH
1803                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1804 #endif
1805                CmiAssert(smsg_connected_flag[pe] == 1);
1806                smsg_connected_flag[pe] = 2;
1807            }
1808        }
1809        else {         /* unbound ep */
1810            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1811            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1812            {
1813                CmiAssert(remote_id<mysize);
1814                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1815                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1816                GNI_RC_CHECK("Dynamic SMSG Init", status);
1817 #if PRINT_SYH
1818                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1819 #endif
1820                smsg_connected_flag[remote_id] = 2;
1821
1822                alloc_smsg_attr(&send_smsg_attr);
1823                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1824                GNI_RC_CHECK("post unbound datagram", status);
1825            }
1826        }
1827    }
1828 }
1829
1830 /* pooling CQ to receive network message */
1831 static void PumpNetworkRdmaMsgs()
1832 {
1833     gni_cq_entry_t      event_data;
1834     gni_return_t        status;
1835
1836 }
1837
1838 inline 
1839 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
1840 {
1841     RDMA_REQUEST        *rdma_request_msg;
1842     MallocRdmaRequest(rdma_request_msg);
1843     rdma_request_msg->destNode = inst_id;
1844     rdma_request_msg->pd = pd;
1845 #if REMOTE_EVENT
1846     rdma_request_msg->ack_index = ack_index;
1847 #endif
1848 #if CMK_SMP
1849     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1850 #else
1851     if(sendRdmaBuf == 0)
1852     {
1853         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1854     }else{
1855         sendRdmaTail->next = rdma_request_msg;
1856         sendRdmaTail =  rdma_request_msg;
1857     }
1858 #endif
1859
1860 }
1861
1862 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1863
1864 static void PumpNetworkSmsg()
1865 {
1866     uint64_t            inst_id;
1867     int                 ret;
1868     gni_cq_entry_t      event_data;
1869     gni_return_t        status, status2;
1870     void                *header;
1871     uint8_t             msg_tag;
1872     int                 msg_nbytes;
1873     void                *msg_data;
1874     gni_mem_handle_t    msg_mem_hndl;
1875     gni_smsg_attr_t     *smsg_attr;
1876     gni_smsg_attr_t     *remote_smsg_attr;
1877     int                 init_flag;
1878     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1879     uint64_t            source_addr;
1880     SMSG_QUEUE         *queue = &smsg_queue;
1881 #if     CMK_DIRECT
1882     cmidirectMsg        *direct_msg;
1883 #endif
1884     while(1)
1885     {
1886         CMI_GNI_LOCK(smsg_rx_cq_lock)
1887         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1888         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
1889         if(status != GNI_RC_SUCCESS)
1890             break;
1891         inst_id = GNI_CQ_GET_INST_ID(event_data);
1892 #if REMOTE_EVENT
1893         inst_id = ACK_GET_RANK(inst_id);
1894 #endif
1895         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1896 #if PRINT_SYH
1897         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1898 #endif
1899         if (useDynamicSMSG) {
1900             /* subtle: smsg may come before connection is setup */
1901             while (smsg_connected_flag[inst_id] != 2) 
1902                PumpDatagramConnection();
1903         }
1904         msg_tag = GNI_SMSG_ANY_TAG;
1905         while(1) {
1906             CMI_GNI_LOCK(smsg_mailbox_lock)
1907             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1908             if (status != GNI_RC_SUCCESS)
1909             {
1910                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1911                 break;
1912             }
1913 #if PRINT_SYH
1914             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1915 #endif
1916             /* copy msg out and then put into queue (small message) */
1917             switch (msg_tag) {
1918             case SMALL_DATA_TAG:
1919             {
1920                 START_EVENT();
1921                 msg_nbytes = CmiGetMsgSize(header);
1922                 msg_data    = CmiAlloc(msg_nbytes);
1923                 memcpy(msg_data, (char*)header, msg_nbytes);
1924                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1925                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1926                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1927                 handleOneRecvedMsg(msg_nbytes, msg_data);
1928                 break;
1929             }
1930             case LMSG_INIT_TAG:
1931             {
1932 #if MULTI_THREAD_SEND
1933                 MallocControlMsg(control_msg_tmp);
1934                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
1935                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1936                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1937                 getLargeMsgRequest(control_msg_tmp, inst_id);
1938                 FreeControlMsg(control_msg_tmp);
1939 #else
1940                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1941                 getLargeMsgRequest(header, inst_id);
1942                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1943 #endif
1944                 break;
1945             }
1946             case ACK_TAG:   //msg fit into mempool
1947             {
1948                 /* Get is done, release message . Now put is not used yet*/
1949                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1950                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1951                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1952 #if ! USE_LRTS_MEMPOOL
1953                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1954 #else
1955                 DecreaseMsgInSend(msg);
1956 #endif
1957                 if(NoMsgInSend(msg))
1958                     buffered_send_msg -= GetMempoolsize(msg);
1959                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1960                 CmiFree(msg);
1961                 break;
1962             }
1963             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1964             {
1965 #if MULTI_THREAD_SEND
1966                 MallocControlMsg(header_tmp);
1967                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
1968                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1969 #else
1970                 header_tmp = (CONTROL_MSG *) header;
1971 #endif
1972                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1973                 void *msg = (void*)(header_tmp->source_addr);
1974                 int cur_seq = CmiGetMsgSeq(msg);
1975                 int offset = ONE_SEG*(cur_seq+1);
1976                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1977                 buffered_send_msg -= header_tmp->length;
1978                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1979                 if (remain_size < 0) remain_size = 0;
1980                 CmiSetMsgSize(msg, remain_size);
1981                 if(remain_size <= 0) //transaction done
1982                 {
1983                     CmiFree(msg);
1984                 }else if (header_tmp->total_length > offset)
1985                 {
1986                     CmiSetMsgSeq(msg, cur_seq+1);
1987                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1988                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1989                     //send next seg
1990                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
1991                          // pipelining
1992                     if (header_tmp->seq_id == 1) {
1993                       int i;
1994                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1995                         int seq = cur_seq+i+2;
1996                         CmiSetMsgSeq(msg, seq-1);
1997                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1998                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1999                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2000                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2001                       }
2002                     }
2003                 }
2004 #if MULTI_THREAD_SEND
2005                 FreeControlMsg(header_tmp);
2006 #else
2007                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2008 #endif
2009                 break;
2010             }
2011 #if CMK_PERSISTENT_COMM
2012             case PUT_DONE_TAG: //persistent message
2013                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
2014                 int size = ((CONTROL_MSG *) header)->length;
2015                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2016                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2017                 CmiReference(msg);
2018                 handleOneRecvedMsg(size, msg); 
2019                 break;
2020 #endif
2021 #if CMK_DIRECT
2022             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2023                 //create a trigger message
2024                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2025                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2026                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2027                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2028                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2029                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2030                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2031                 break;
2032 #endif
2033             default: {
2034                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2035                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2036                 printf("weird tag problem\n");
2037                 CmiAbort("Unknown tag\n");
2038                      }
2039             }               // end switch
2040 #if PRINT_SYH
2041             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2042 #endif
2043             smsg_recv_count ++;
2044             msg_tag = GNI_SMSG_ANY_TAG;
2045         } //endwhile getNext
2046     }   //end while GetEvent
2047     if(status == GNI_RC_ERROR_RESOURCE)
2048     {
2049         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2050         GNI_RC_CHECK("Smsg_rx_cq full", status);
2051     }
2052 }
2053
2054 static void printDesc(gni_post_descriptor_t *pd)
2055 {
2056     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2057 }
2058
2059 #if CQWRITE
2060 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2061 {
2062     gni_post_descriptor_t *pd;
2063     gni_return_t        status = GNI_RC_SUCCESS;
2064     
2065     MallocPostDesc(pd);
2066     pd->type = GNI_POST_CQWRITE;
2067     pd->cq_mode = GNI_CQMODE_SILENT;
2068     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2069     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2070     pd->cqwrite_value = data;
2071     pd->remote_mem_hndl = mem_hndl;
2072     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2073     GNI_RC_CHECK("GNI_PostCqWrite", status);
2074 }
2075 #endif
2076
2077 // for BIG_MSG called on receiver side for receiving control message
2078 // LMSG_INIT_TAG
2079 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2080 {
2081 #if     USE_LRTS_MEMPOOL
2082     CONTROL_MSG         *request_msg;
2083     gni_return_t        status = GNI_RC_SUCCESS;
2084     void                *msg_data;
2085     gni_post_descriptor_t *pd;
2086     gni_mem_handle_t    msg_mem_hndl;
2087     int source, size, transaction_size, offset = 0;
2088     size_t     register_size = 0;
2089
2090     // initial a get to transfer data from the sender side */
2091     request_msg = (CONTROL_MSG *) header;
2092     size = request_msg->total_length;
2093     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2094     MallocPostDesc(pd);
2095     if(request_msg->seq_id < 2)   {
2096         msg_data = CmiAlloc(size);
2097         CmiSetMsgSeq(msg_data, 0);
2098         _MEMCHECK(msg_data);
2099 #if CMK_SMP_TRACE_COMMTHREAD
2100         pd->second_operand = 1000000 * CmiWallTimer(); //microsecond
2101 #endif
2102     }
2103     else {
2104         offset = ONE_SEG*(request_msg->seq_id-1);
2105         msg_data = (char*)request_msg->dest_addr + offset;
2106     }
2107    
2108     pd->cqwrite_value = request_msg->seq_id;
2109     if( request_msg->seq_id == 0)
2110     {
2111         pd->local_mem_hndl= GetMemHndl(msg_data);
2112         transaction_size = ALIGN64(size);
2113         if(IsMemHndlZero(pd->local_mem_hndl))
2114         {   
2115             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)), rdma_rx_cqh);
2116             if(status == GNI_RC_SUCCESS)
2117             {
2118                 pd->local_mem_hndl = GetMemHndl(msg_data);
2119             }
2120             else
2121             {
2122                 SetMemHndlZero(pd->local_mem_hndl);
2123             }
2124         }
2125         if(NoMsgInRecv( (void*)(msg_data)))
2126             register_size = GetMempoolsize((void*)(msg_data));
2127         else
2128             register_size = 0;
2129     }
2130     else{
2131         transaction_size = ALIGN64(request_msg->length);
2132         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl), NULL); 
2133         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2134         {
2135             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2136         }
2137     }
2138     pd->first_operand = ALIGN64(size);                   //  total length
2139
2140     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2141         pd->type            = GNI_POST_FMA_GET;
2142     else
2143         pd->type            = GNI_POST_RDMA_GET;
2144     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2145     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2146     pd->length          = transaction_size;
2147     pd->local_addr      = (uint64_t) msg_data;
2148     pd->remote_addr     = request_msg->source_addr + offset;
2149     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2150     pd->src_cq_hndl     = rdma_tx_cqh;
2151     pd->rdma_mode       = 0;
2152     pd->amo_cmd         = 0;
2153
2154     //memory registration success
2155     if(status == GNI_RC_SUCCESS)
2156     {
2157         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2158         CMI_GNI_LOCK(lock)
2159 #if REMOTE_EVENT
2160         if( request_msg->seq_id == 0)
2161         {
2162             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2163             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2164             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2165         }
2166         else {
2167             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, myrank);
2168             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2169         }
2170 #endif
2171         if(pd->type == GNI_POST_RDMA_GET) 
2172         {
2173             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2174         }
2175         else
2176         {
2177             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2178         }
2179         CMI_GNI_UNLOCK(lock)
2180
2181         if(status == GNI_RC_SUCCESS )
2182         {
2183             if(pd->cqwrite_value == 0)
2184             {
2185 #if MACHINE_DEBUG_LOG
2186                 buffered_recv_msg += register_size;
2187                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2188 #endif
2189                 IncreaseMsgInRecv(msg_data);
2190 #if CMK_SMP_TRACE_COMMTHREAD
2191                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2192 #endif
2193             }
2194         }
2195     }else
2196     {
2197         SetMemHndlZero((pd->local_mem_hndl));
2198     }
2199     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2200     {
2201 #if REMOTE_EVENT
2202         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2203 #else
2204         bufferRdmaMsg(inst_id, pd, -1); 
2205 #endif
2206     }else {
2207          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
2208         GNI_RC_CHECK("GetLargeAFter posting", status);
2209     }
2210 #else
2211     CONTROL_MSG         *request_msg;
2212     gni_return_t        status;
2213     void                *msg_data;
2214     gni_post_descriptor_t *pd;
2215     RDMA_REQUEST        *rdma_request_msg;
2216     gni_mem_handle_t    msg_mem_hndl;
2217     //int source;
2218     // initial a get to transfer data from the sender side */
2219     request_msg = (CONTROL_MSG *) header;
2220     msg_data = CmiAlloc(request_msg->length);
2221     _MEMCHECK(msg_data);
2222
2223     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2224
2225     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2226     {
2227         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2228     }
2229
2230     MallocPostDesc(pd);
2231     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2232         pd->type            = GNI_POST_FMA_GET;
2233     else
2234         pd->type            = GNI_POST_RDMA_GET;
2235     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2236     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2237     pd->length          = ALIGN64(request_msg->length);
2238     pd->local_addr      = (uint64_t) msg_data;
2239     pd->remote_addr     = request_msg->source_addr;
2240     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2241     pd->src_cq_hndl     = rdma_tx_cqh;
2242     pd->rdma_mode       = 0;
2243     pd->amo_cmd         = 0;
2244
2245     //memory registration successful
2246     if(status == GNI_RC_SUCCESS)
2247     {
2248         pd->local_mem_hndl  = msg_mem_hndl;
2249        
2250         if(pd->type == GNI_POST_RDMA_GET) 
2251         {
2252             CMI_GNI_LOCK(rdma_tx_cq_lock)
2253             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2254             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2255         }
2256         else
2257         {
2258             CMI_GNI_LOCK(default_tx_cq_lock)
2259             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2260             CMI_GNI_UNLOCK(default_tx_cq_lock)
2261         }
2262
2263     }else
2264     {
2265         SetMemHndlZero(pd->local_mem_hndl);
2266     }
2267     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2268     {
2269         MallocRdmaRequest(rdma_request_msg);
2270         rdma_request_msg->next = 0;
2271         rdma_request_msg->destNode = inst_id;
2272         rdma_request_msg->pd = pd;
2273         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2274     }else {
2275         GNI_RC_CHECK("AFter posting", status);
2276     }
2277 #endif
2278 }
2279
2280 #if CQWRITE
2281 static void PumpCqWriteTransactions()
2282 {
2283
2284     gni_cq_entry_t          ev;
2285     gni_return_t            status;
2286     void                    *msg;   
2287     while(1) {
2288         //CMI_GNI_LOCK(my_cq_lock) 
2289         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2290         //CMI_GNI_UNLOCK(my_cq_lock)
2291         if(status != GNI_RC_SUCCESS) break;
2292         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2293
2294         DecreaseMsgInSend(msg);
2295 #if ! USE_LRTS_MEMPOOL
2296        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2297 #else
2298         DecreaseMsgInSend(msg);
2299 #endif
2300         if(NoMsgInSend(msg))
2301             buffered_send_msg -= GetMempoolsize(msg);
2302         CmiFree(msg);
2303     };
2304     if(status == GNI_RC_ERROR_RESOURCE)
2305     {
2306         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2307     }
2308 }
2309 #endif
2310
2311 #if REMOTE_EVENT
2312 static void PumpRemoteTransactions()
2313 {
2314     gni_cq_entry_t          ev;
2315     gni_return_t            status;
2316     void                    *msg;   
2317     int                     slot;
2318
2319     while(1) {
2320         CMI_GNI_LOCK(global_gni_lock)
2321         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2322         CMI_GNI_UNLOCK(global_gni_lock)
2323         if(status != GNI_RC_SUCCESS) {
2324             break;
2325         }
2326
2327         slot = GNI_CQ_GET_INST_ID(ev);
2328         slot = ACK_GET_INDEX(slot);
2329         //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
2330
2331         //CMI_GNI_LOCK(ackpool_lock);
2332         msg = GetAckAddress(slot);
2333         //CMI_GNI_UNLOCK(ackpool_lock);
2334
2335         DecreaseMsgInSend(msg);
2336 #if ! USE_LRTS_MEMPOOL
2337        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2338 #else
2339         DecreaseMsgInSend(msg);
2340 #endif
2341         if(NoMsgInSend(msg))
2342             buffered_send_msg -= GetMempoolsize(msg);
2343         CmiFree(msg);
2344         AckPool_freeslot(slot);
2345     }
2346     if(status == GNI_RC_ERROR_RESOURCE)
2347     {
2348         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2349     }
2350 }
2351 #endif
2352
2353 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2354 {
2355     gni_cq_entry_t          ev;
2356     gni_return_t            status;
2357     uint64_t                type, inst_id;
2358     gni_post_descriptor_t   *tmp_pd;
2359     MSG_LIST                *ptr;
2360     CONTROL_MSG             *ack_msg_tmp;
2361     ACK_MSG                 *ack_msg;
2362     uint8_t                 msg_tag;
2363 #if CMK_DIRECT
2364     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2365 #endif
2366     SMSG_QUEUE         *queue = &smsg_queue;
2367
2368     while(1) {
2369         CMI_GNI_LOCK(my_cq_lock) 
2370         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2371         CMI_GNI_UNLOCK(my_cq_lock)
2372         if(status != GNI_RC_SUCCESS) break;
2373         
2374         type = GNI_CQ_GET_TYPE(ev);
2375         if (type == GNI_CQ_EVENT_TYPE_POST)
2376         {
2377             inst_id     = GNI_CQ_GET_INST_ID(ev);
2378 #if PRINT_SYH
2379             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2380 #endif
2381             CMI_GNI_LOCK(my_cq_lock)
2382             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2383             CMI_GNI_UNLOCK(my_cq_lock)
2384
2385             switch (tmp_pd->type) {
2386 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2387             case GNI_POST_RDMA_PUT:
2388 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2389                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2390 #endif
2391             case GNI_POST_FMA_PUT:
2392                 if(tmp_pd->amo_cmd == 1) {
2393                     //sender ACK to receiver to trigger it is done
2394                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2395                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2396                     msg_tag = DIRECT_PUT_DONE_TAG;
2397                 }
2398                 else {
2399                     CmiFree((void *)tmp_pd->local_addr);
2400                     MallocControlMsg(ack_msg_tmp);
2401                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2402                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2403                     msg_tag = PUT_DONE_TAG;
2404                 }
2405                 break;
2406 #endif
2407             case GNI_POST_RDMA_GET:
2408             case GNI_POST_FMA_GET:  {
2409 #if  ! USE_LRTS_MEMPOOL
2410                 MallocControlMsg(ack_msg_tmp);
2411                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2412                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2413                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2414                 msg_tag = ACK_TAG;  
2415 #else
2416                 int seq_id = tmp_pd->cqwrite_value;
2417                 if(seq_id > 0)      // BIG_MSG
2418                 {
2419                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2420                     MallocControlMsg(ack_msg_tmp);
2421                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2422                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2423                     ack_msg_tmp->seq_id = seq_id;
2424                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2425                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2426                     ack_msg_tmp->length = tmp_pd->length;
2427                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2428                     msg_tag = BIG_MSG_TAG; 
2429                 } 
2430                 else
2431                 {
2432                     msg_tag = ACK_TAG; 
2433 #if  !REMOTE_EVENT && !CQWRITE
2434                     MallocAckMsg(ack_msg);
2435                     ack_msg->source_addr = tmp_pd->remote_addr;
2436 #endif
2437                 }
2438 #endif
2439                 break;
2440             }
2441             case  GNI_POST_CQWRITE:
2442                    FreePostDesc(tmp_pd);
2443                    continue;
2444             default:
2445                 CmiPrintf("type=%d\n", tmp_pd->type);
2446                 CmiAbort("PumpLocalTransactions: unknown type!");
2447             }      /* end of switch */
2448
2449 #if CMK_DIRECT
2450             if (tmp_pd->amo_cmd == 1) {
2451                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2452                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2453             }
2454             else
2455 #endif
2456             if (msg_tag == ACK_TAG) {
2457 #if !REMOTE_EVENT
2458 #if   !CQWRITE
2459                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2460                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2461 #else
2462                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2463 #endif
2464 #endif
2465             }
2466             else {
2467                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2468                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2469             }
2470 #if CMK_PERSISTENT_COMM
2471             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2472 #endif
2473             {
2474                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2475 #if PRINT_SYH
2476                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2477 #endif
2478                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->second_operand/1000000.0), (double)((tmp_pd->second_operand+1)/1000000.0), (double)((tmp_pd->second_operand+1)/1000000.0), (void*)tmp_pd->local_addr); 
2479                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2480
2481                     START_EVENT();
2482                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2483                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2484 #if MACHINE_DEBUG_LOG
2485                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2486                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2487                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2488 #endif
2489                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2490                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2491                 }else if(msg_tag == BIG_MSG_TAG){
2492                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2493                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2494                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2495                         START_EVENT();
2496 #if PRINT_SYH
2497                         printf("Pipeline msg done [%d]\n", myrank);
2498 #endif
2499 #if                 CMK_SMP_TRACE_COMMTHREAD
2500                         if( tmp_pd->cqwrite_value == 1)
2501                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->second_operand/1000000.0), (double)((tmp_pd->second_operand+1)/1000000.0), (double)((tmp_pd->second_operand+2)/1000000.0), (void*)tmp_pd->local_addr); 
2502 #endif
2503                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2504                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2505                     }
2506                 }
2507             }
2508             FreePostDesc(tmp_pd);
2509         }
2510     } //end while
2511     if(status == GNI_RC_ERROR_RESOURCE)
2512     {
2513         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2514         GNI_RC_CHECK("Smsg_tx_cq full", status);
2515     }
2516 }
2517
2518 static void  SendRdmaMsg()
2519 {
2520     gni_return_t            status = GNI_RC_SUCCESS;
2521     gni_mem_handle_t        msg_mem_hndl;
2522     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2523     RDMA_REQUEST            *pre = 0;
2524     uint64_t                register_size = 0;
2525     void                    *msg;
2526     int                     i;
2527 #if CMK_SMP
2528     int len = PCQueueLength(sendRdmaBuf);
2529     for (i=0; i<len; i++)
2530     {
2531         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2532         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2533         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2534         if (ptr == NULL) break;
2535 #else
2536     ptr = sendRdmaBuf;
2537     while (ptr!=0)
2538     {
2539 #endif 
2540         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2541         gni_post_descriptor_t *pd = ptr->pd;
2542         status = GNI_RC_SUCCESS;
2543         
2544         if(pd->cqwrite_value == 0)
2545         {
2546             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2547             {
2548                 msg = (void*)(pd->local_addr);
2549                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2550                 if(status == GNI_RC_SUCCESS)
2551                 {
2552                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2553                 }
2554             }else
2555             {
2556                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2557             }
2558             if(NoMsgInRecv( (void*)(pd->local_addr)))
2559                 register_size = GetMempoolsize((void*)(pd->local_addr));
2560             else
2561                 register_size = 0;
2562         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2563         {
2564             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl), NULL); 
2565         }
2566         if(status == GNI_RC_SUCCESS)        //mem register good
2567         {
2568             CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET? rdma_tx_cq_lock:default_tx_cq_lock;
2569             CMI_GNI_LOCK(lock);
2570 #if REMOTE_EVENT
2571             if( pd->cqwrite_value == 0)
2572             {
2573                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2574                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2575                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2576             }
2577             else {
2578                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, myrank);
2579                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2580             }
2581 #endif
2582             if(pd->type == GNI_POST_RDMA_GET) 
2583             {
2584                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2585             }
2586             else
2587             {
2588                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2589             }
2590             CMI_GNI_UNLOCK(lock);
2591             
2592             if(status == GNI_RC_SUCCESS)    //post good
2593             {
2594 #if !CMK_SMP
2595                 tmp_ptr = ptr;
2596                 if(pre != 0) {
2597                     pre->next = ptr->next;
2598                 }
2599                 else {
2600                     sendRdmaBuf = ptr->next;
2601                 }
2602                 ptr = ptr->next;
2603                 FreeRdmaRequest(tmp_ptr);
2604 #endif
2605                 if(pd->cqwrite_value == 0)
2606                 {
2607                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2608                 }
2609 #if MACHINE_DEBUG_LOG
2610                 buffered_recv_msg += register_size;
2611                 MACHSTATE(8, "GO request from buffered\n"); 
2612 #endif
2613             }else           // cannot post
2614             {
2615 #if CMK_SMP
2616                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2617 #else
2618                 pre = ptr;
2619                 ptr = ptr->next;
2620 #endif
2621                 break;
2622             }
2623         } else          //memory registration fails
2624         {
2625 #if CMK_SMP
2626             PCQueuePush(sendRdmaBuf, (char*)ptr);
2627 #else
2628             pre = ptr;
2629             ptr = ptr->next;
2630 #endif
2631         }
2632     } //end while
2633 #if ! CMK_SMP
2634     if(ptr == 0)
2635         sendRdmaTail = pre;
2636 #endif
2637 }
2638
2639 // return 1 if all messages are sent
2640 static int SendBufferMsg(SMSG_QUEUE *queue)
2641 {
2642     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2643     CONTROL_MSG         *control_msg_tmp;
2644     gni_return_t        status;
2645     int                 done = 1;
2646     uint64_t            register_size;
2647     void                *register_addr;
2648     int                 index_previous = -1;
2649 #if CMI_EXERT_SEND_CAP
2650     int                 sent_cnt = 0;
2651 #endif
2652
2653 #if CMK_SMP
2654     int          index = 0;
2655 #if ONE_SEND_QUEUE
2656     memset(destpe_avail, 0, mysize * sizeof(char));
2657     for (index=0; index<1; index++)
2658     {
2659         int i, len = PCQueueLength(queue->sendMsgBuf);
2660         for (i=0; i<len; i++) 
2661         {
2662             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2663             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2664             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2665             if(ptr == NULL) break;
2666             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2667                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2668                 continue;
2669             }
2670 #else
2671 #if SMP_LOCKS
2672     int nonempty = PCQueueLength(nonEmptyQueues);
2673     for(index =0; index<nonempty; index++)
2674     {
2675         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2676         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2677         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
2678         if(current_list == NULL) break; 
2679         PCQueue current_queue= current_list-> sendSmsgBuf;
2680         CmiLock(current_list->lock);
2681         int i, len = PCQueueLength(current_queue);
2682         current_list->pushed = 0;
2683         CmiUnlock(current_list->lock);
2684 #else
2685     for(index =0; index<mysize; index++)
2686     {
2687         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2688         int i, len = PCQueueLength(current_queue);
2689 #endif
2690         for (i=0; i<len; i++) 
2691         {
2692             CMI_PCQUEUEPOP_LOCK(current_queue)
2693             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2694             CMI_PCQUEUEPOP_UNLOCK(current_queue)
2695             if (ptr == 0) break;
2696 #endif
2697 #else
2698     int index = queue->smsg_head_index;
2699     while(index != -1)
2700     {
2701         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2702         pre = 0;
2703         while(ptr != 0)
2704         {
2705 #endif
2706             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2707             status = GNI_RC_ERROR_RESOURCE;
2708             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2709                 /* connection not exists yet */
2710             }
2711             else
2712             switch(ptr->tag)
2713             {
2714             case SMALL_DATA_TAG:
2715                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2716                 if(status == GNI_RC_SUCCESS)
2717                 {
2718                     CmiFree(ptr->msg);
2719                 }
2720                 break;
2721             case LMSG_INIT_TAG:
2722                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2723                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
2724                 break;
2725             case ACK_TAG:
2726                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2727                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2728                 break;
2729             case BIG_MSG_TAG:
2730                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2731                 if(status == GNI_RC_SUCCESS)
2732                 {
2733                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2734                 }
2735                 break;
2736 #if CMK_DIRECT
2737             case DIRECT_PUT_DONE_TAG:
2738                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
2739                 if(status == GNI_RC_SUCCESS)
2740                 {
2741                     free((CMK_DIRECT_HEADER*)ptr->msg);
2742                 }
2743                 break;
2744
2745 #endif
2746             default:
2747                 printf("Weird tag\n");
2748                 CmiAbort("should not happen\n");
2749             }       // end switch
2750             if(status == GNI_RC_SUCCESS)
2751             {
2752 #if PRINT_SYH
2753                 buffered_smsg_counter--;
2754                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2755 #endif
2756 #if !CMK_SMP
2757                 tmp_ptr = ptr;
2758                 if(pre)
2759                 {
2760                     ptr = pre ->next = ptr->next;
2761                 }else
2762                 {
2763                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2764                 }
2765                 FreeMsgList(tmp_ptr);
2766 #else
2767                 FreeMsgList(ptr);
2768 #endif
2769 #if CMI_EXERT_SEND_CAP
2770                 sent_cnt++;
2771                 if(sent_cnt == SEND_CAP)
2772                     break;
2773 #endif
2774             }else {
2775 #if CMK_SMP
2776 #if ONE_SEND_QUEUE
2777                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2778 #else
2779                 PCQueuePush(current_queue, (char*)ptr);
2780 #endif
2781 #else
2782                 pre = ptr;
2783                 ptr=ptr->next;
2784 #endif
2785                 done = 0;
2786                 if(status == GNI_RC_ERROR_RESOURCE)
2787                 {
2788 #if CMK_SMP && ONE_SEND_QUEUE 
2789                     destpe_avail[ptr->destNode] = 1;
2790 #else
2791                     break;
2792 #endif
2793                 }
2794             } 
2795         } //end while
2796 #if !CMK_SMP
2797         if(ptr == 0)
2798             queue->smsg_msglist_index[index].tail = pre;
2799         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2800         {
2801             if(index_previous != -1)
2802                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2803             else
2804                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2805         }
2806         else {
2807             index_previous = index;
2808         }
2809         index = queue->smsg_msglist_index[index].next;
2810 #else
2811 #if !ONE_SEND_QUEUE && SMP_LOCKS
2812         CmiLock(current_list->lock);
2813         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2814         {
2815             current_list->pushed = 1;
2816             PCQueuePush(nonEmptyQueues, current_list);
2817         }
2818         CmiUnlock(current_list->lock); 
2819 #endif
2820 #endif
2821
2822 #if CMI_EXERT_SEND_CAP
2823         if(sent_cnt == SEND_CAP)
2824                 break;
2825 #endif
2826     }   // end pooling for all cores
2827     return done;
2828 }
2829
2830 static void ProcessDeadlock();
2831 void LrtsAdvanceCommunication(int whileidle)
2832 {
2833     static int count = 0;
2834     /*  Receive Msg first */
2835 #if CMK_SMP_TRACE_COMMTHREAD
2836     double startT, endT;
2837 #endif
2838     if (useDynamicSMSG && whileidle)
2839     {
2840 #if CMK_SMP_TRACE_COMMTHREAD
2841         startT = CmiWallTimer();
2842 #endif
2843         PumpDatagramConnection();
2844 #if CMK_SMP_TRACE_COMMTHREAD
2845         endT = CmiWallTimer();
2846         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
2847 #endif
2848     }
2849
2850 #if CMK_SMP_TRACE_COMMTHREAD
2851     startT = CmiWallTimer();
2852 #endif
2853     PumpNetworkSmsg();
2854     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2855 #if CMK_SMP_TRACE_COMMTHREAD
2856     endT = CmiWallTimer();
2857     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
2858 #endif
2859
2860 #if CMK_SMP_TRACE_COMMTHREAD
2861     startT = CmiWallTimer();
2862 #endif
2863     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2864     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
2865 #if CMK_SMP_TRACE_COMMTHREAD
2866     endT = CmiWallTimer();
2867     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
2868 #endif
2869
2870 #if CMK_SMP_TRACE_COMMTHREAD
2871     startT = CmiWallTimer();
2872 #endif
2873     PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock);
2874
2875 #if CQWRITE
2876     PumpCqWriteTransactions();
2877 #endif
2878
2879 #if REMOTE_EVENT
2880     PumpRemoteTransactions();
2881 #endif
2882
2883     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
2884 #if CMK_SMP_TRACE_COMMTHREAD
2885     endT = CmiWallTimer();
2886     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
2887 #endif
2888  
2889     /* Send buffered Message */
2890 #if CMK_SMP_TRACE_COMMTHREAD
2891     startT = CmiWallTimer();
2892 #endif
2893 #if CMK_USE_OOB
2894     if (SendBufferMsg(&smsg_oob_queue) == 1)
2895 #endif
2896     {
2897     SendBufferMsg(&smsg_queue);
2898     }
2899     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2900 #if CMK_SMP_TRACE_COMMTHREAD
2901     endT = CmiWallTimer();
2902     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
2903 #endif
2904
2905 #if CMK_SMP_TRACE_COMMTHREAD
2906     startT = CmiWallTimer();
2907 #endif
2908     SendRdmaMsg();
2909     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2910 #if CMK_SMP_TRACE_COMMTHREAD
2911     endT = CmiWallTimer();
2912     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
2913 #endif
2914
2915 #if CMK_SMP && ! LARGEPAGE
2916     if (_detected_hang)  ProcessDeadlock();
2917 #endif
2918 }
2919
2920 /* useDynamicSMSG */
2921 static void _init_dynamic_smsg()
2922 {
2923     gni_return_t status;
2924     uint32_t     vmdh_index = -1;
2925     int i;
2926
2927     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2928     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2929     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2930     for(i=0; i<mysize; i++) {
2931         smsg_connected_flag[i] = 0;
2932         smsg_attr_vector_local[i] = NULL;
2933         smsg_attr_vector_remote[i] = NULL;
2934     }
2935     if(mysize <=512)
2936     {
2937         SMSG_MAX_MSG = 4096;
2938     }else if (mysize <= 4096)
2939     {
2940         SMSG_MAX_MSG = 4096/mysize * 1024;
2941     }else if (mysize <= 16384)
2942     {
2943         SMSG_MAX_MSG = 512;
2944     }else {
2945         SMSG_MAX_MSG = 256;
2946     }
2947
2948     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2949     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2950     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2951     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2952     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2953
2954     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2955     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2956     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2957     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2958     mailbox_list->offset = 0;
2959     mailbox_list->next = 0;
2960     
2961     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2962         mailbox_list->size, smsg_rx_cqh,
2963         GNI_MEM_READWRITE,   
2964         vmdh_index,
2965         &(mailbox_list->mem_hndl));
2966     GNI_RC_CHECK("MEMORY registration for smsg", status);
2967
2968     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
2969     GNI_RC_CHECK("Unbound EP", status);
2970     
2971     alloc_smsg_attr(&send_smsg_attr);
2972
2973     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2974     GNI_RC_CHECK("post unbound datagram", status);
2975
2976       /* always pre-connect to proc 0 */
2977     //if (myrank != 0) connect_to(0);
2978 }
2979
2980 static void _init_static_smsg()
2981 {
2982     gni_smsg_attr_t      *smsg_attr;
2983     gni_smsg_attr_t      remote_smsg_attr;
2984     gni_smsg_attr_t      *smsg_attr_vec;
2985     gni_mem_handle_t     my_smsg_mdh_mailbox;
2986     int      ret, i;
2987     gni_return_t status;
2988     uint32_t              vmdh_index = -1;
2989     mdh_addr_t            base_infor;
2990     mdh_addr_t            *base_addr_vec;
2991     char *env;
2992
2993     if(mysize <=512)
2994     {
2995         SMSG_MAX_MSG = 1024;
2996     }else if (mysize <= 4096)
2997     {
2998         SMSG_MAX_MSG = 1024;
2999     }else if (mysize <= 16384)
3000     {
3001         SMSG_MAX_MSG = 512;
3002     }else {
3003         SMSG_MAX_MSG = 256;
3004     }
3005     
3006     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3007     if (env) SMSG_MAX_MSG = atoi(env);
3008     CmiAssert(SMSG_MAX_MSG > 0);
3009
3010     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3011     
3012     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3013     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3014     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3015     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3016     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3017     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3018     CmiAssert(ret == 0);
3019     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3020     
3021     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3022             smsg_memlen*(mysize), smsg_rx_cqh,
3023             GNI_MEM_READWRITE,   
3024             vmdh_index,
3025             &my_smsg_mdh_mailbox);
3026     register_memory_size += smsg_memlen*(mysize);
3027     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3028
3029     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3030     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3031
3032     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3033     base_infor.mdh =  my_smsg_mdh_mailbox;
3034     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3035
3036     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3037  
3038     for(i=0; i<mysize; i++)
3039     {
3040         if(i==myrank)
3041             continue;
3042         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3043         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3044         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3045         smsg_attr[i].mbox_offset = i*smsg_memlen;
3046         smsg_attr[i].buff_size = smsg_memlen;
3047         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3048         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3049     }
3050
3051     for(i=0; i<mysize; i++)
3052     {
3053         if (myrank == i) continue;
3054
3055         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3056         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3057         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3058         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3059         remote_smsg_attr.buff_size = smsg_memlen;
3060         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3061         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3062
3063         /* initialize the smsg channel */
3064         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3065         GNI_RC_CHECK("SMSG Init", status);
3066     } //end initialization
3067
3068     free(base_addr_vec);
3069     free(smsg_attr);
3070
3071     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3072     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3073
3074
3075 inline
3076 static void _init_send_queue(SMSG_QUEUE *queue)
3077 {
3078      int i;
3079 #if ONE_SEND_QUEUE
3080      queue->sendMsgBuf = PCQueueCreate();
3081      destpe_avail = (char*)malloc(mysize * sizeof(char));
3082 #else
3083      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3084 #if CMK_SMP && SMP_LOCKS
3085      nonEmptyQueues = PCQueueCreate();
3086 #endif
3087      for(i =0; i<mysize; i++)
3088      {
3089 #if CMK_SMP
3090          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3091 #if SMP_LOCKS
3092          queue->smsg_msglist_index[i].pushed = 0;
3093          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3094 #endif
3095 #else
3096          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3097          queue->smsg_msglist_index[i].next = -1;
3098          queue->smsg_head_index = -1;
3099 #endif
3100         
3101      }
3102 #endif
3103 }
3104
3105 inline
3106 static void _init_smsg()
3107 {
3108     if(mysize > 1) {
3109         if (useDynamicSMSG)
3110             _init_dynamic_smsg();
3111         else
3112             _init_static_smsg();
3113     }
3114
3115     _init_send_queue(&smsg_queue);
3116 #if CMK_USE_OOB
3117     _init_send_queue(&smsg_oob_queue);
3118 #endif
3119 }
3120
3121 static void _init_static_msgq()
3122 {
3123     gni_return_t status;
3124     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3125     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3126     msgq_attrs.smsg_q_sz = 1;
3127     msgq_attrs.rcv_pool_sz = 1;
3128     msgq_attrs.num_msgq_eps = 2;
3129     msgq_attrs.nloc_insts = 8;
3130     msgq_attrs.modes = 0;
3131     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3132
3133     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3134     GNI_RC_CHECK("MSGQ Init", status);
3135
3136
3137 }
3138
3139
3140 static CmiUInt8 total_mempool_size = 0;
3141 static CmiUInt8 total_mempool_calls = 0;
3142
3143 #if USE_LRTS_MEMPOOL
3144 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3145 {
3146     void *pool;
3147     int ret;
3148     gni_return_t status = GNI_RC_SUCCESS;
3149
3150     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3151     if (*size < default_size) *size = default_size;
3152 #if LARGEPAGE
3153     // round up to be multiple of _tlbpagesize
3154     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3155     *size = ALIGNHUGEPAGE(*size);
3156 #endif
3157     total_mempool_size += *size;
3158     total_mempool_calls += 1;
3159 #if   !LARGEPAGE
3160     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3161     {
3162         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3163         CmiAbort("alloc_mempool_block");
3164     }
3165 #endif
3166 #if LARGEPAGE
3167     pool = my_get_huge_pages(*size);
3168     ret = pool==NULL;
3169 #else
3170     ret = posix_memalign(&pool, ALIGNBUF, *size);
3171 #endif
3172     if (ret != 0) {
3173 #if CMK_SMP && STEAL_MEMPOOL
3174       pool = steal_mempool_block(size, mem_hndl);
3175       if (pool != NULL) return pool;
3176 #endif
3177       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3178       if (ret == ENOMEM)
3179         CmiAbort("alloc_mempool_block: out of memory.");
3180       else
3181         CmiAbort("alloc_mempool_block: posix_memalign failed");
3182     }
3183 #if LARGEPAGE
3184     CmiMemLock();
3185     register_count++;
3186     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3187     CmiMemUnlock();
3188     if(status != GNI_RC_SUCCESS) {
3189         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3190 sweep_mempool(CpvAccess(mempool));
3191     }
3192     GNI_RC_CHECK("MEMORY_REGISTER", status);
3193 #else
3194     SetMemHndlZero((*mem_hndl));
3195 #endif
3196     return pool;
3197 }
3198
3199 // ptr is a block head pointer
3200 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3201 {
3202     if(!(IsMemHndlZero(mem_hndl)))
3203     {
3204         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3205     }
3206 #if LARGEPAGE
3207     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3208 #else
3209     free(ptr);
3210 #endif
3211 }
3212 #endif
3213
3214 void LrtsPreCommonInit(int everReturn){
3215 #if USE_LRTS_MEMPOOL
3216     CpvInitialize(mempool_type*, mempool);
3217     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3218     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3219 #endif
3220 }
3221
3222 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3223 {
3224     register int            i;
3225     int                     rc;
3226     int                     device_id = 0;
3227     unsigned int            remote_addr;
3228     gni_cdm_handle_t        cdm_hndl;
3229     gni_return_t            status = GNI_RC_SUCCESS;
3230     uint32_t                vmdh_index = -1;
3231     uint8_t                 ptag;
3232     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3233     int                     first_spawned;
3234     int                     physicalID;
3235     char                   *env;
3236
3237     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3238     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3239     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3240    
3241     status = PMI_Init(&first_spawned);
3242     GNI_RC_CHECK("PMI_Init", status);
3243
3244     status = PMI_Get_size(&mysize);
3245     GNI_RC_CHECK("PMI_Getsize", status);
3246
3247     status = PMI_Get_rank(&myrank);
3248     GNI_RC_CHECK("PMI_getrank", status);
3249
3250     //physicalID = CmiPhysicalNodeID(myrank);
3251     
3252     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3253
3254     *myNodeID = myrank;
3255     *numNodes = mysize;
3256   
3257 #if MULTI_THREAD_SEND
3258     /* Currently, we only consider the case that comm. thread will only recv msgs */
3259     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3260 #endif
3261
3262     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3263     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3264     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3265
3266     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3267     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3268     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3269
3270     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3271     if (env) useDynamicSMSG = 1;
3272     if (!useDynamicSMSG)
3273       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3274     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3275     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3276     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3277     
3278     if(myrank == 0)
3279     {
3280         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3281         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3282     }
3283 #ifdef USE_ONESIDED
3284     onesided_init(NULL, &onesided_hnd);
3285
3286     // this is a GNI test, so use the libonesided bypass functionality
3287     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3288     local_addr = gniGetNicAddress();
3289 #else
3290     ptag = get_ptag();
3291     cookie = get_cookie();
3292 #if 0
3293     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3294 #endif
3295     //Create and attach to the communication  domain */
3296     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3297     GNI_RC_CHECK("GNI_CdmCreate", status);
3298     //* device id The device id is the minor number for the device
3299     //that is assigned to the device by the system when the device is created.
3300     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3301     //where X is the device number 0 default 
3302     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3303     GNI_RC_CHECK("GNI_CdmAttach", status);
3304     local_addr = get_gni_nic_address(0);
3305 #endif
3306     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3307     _MEMCHECK(MPID_UGNI_AllAddr);
3308     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3309     /* create the local completion queue */
3310     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3311     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3312     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3313     
3314     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3315     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3316     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3317
3318     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3319     GNI_RC_CHECK("Create CQ (rx)", status);
3320     
3321     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3322     GNI_RC_CHECK("Create Post CQ (rx)", status);
3323     
3324     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3325     //GNI_RC_CHECK("Create BTE CQ", status);
3326
3327     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3328     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3329     _MEMCHECK(ep_hndl_array);
3330 #if MULTI_THREAD_SEND 
3331     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3332     //default_tx_cq_lock = CmiCreateLock();
3333     rdma_tx_cq_lock = CmiCreateLock();
3334     smsg_rx_cq_lock = CmiCreateLock();
3335     //global_gni_lock  = CmiCreateLock();
3336     //rx_cq_lock  = CmiCreateLock();
3337 #endif
3338     for (i=0; i<mysize; i++) {
3339         if(i == myrank) continue;
3340         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3341         GNI_RC_CHECK("GNI_EpCreate ", status);   
3342         remote_addr = MPID_UGNI_AllAddr[i];
3343         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3344         GNI_RC_CHECK("GNI_EpBind ", status);   
3345     }
3346
3347     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3348     _init_smsg();
3349     PMI_Barrier();
3350
3351 #if     USE_LRTS_MEMPOOL
3352     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3353     if (env) {
3354         _totalmem = CmiReadSize(env);
3355         if (myrank == 0)
3356             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3357     }
3358
3359     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3360     if (env) _mempool_size = CmiReadSize(env);
3361     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3362         _mempool_size = CmiReadSize(env);
3363
3364
3365     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3366     if (env) {
3367         MAX_REG_MEM = CmiReadSize(env);
3368         user_set_flag = 1;
3369     }
3370     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3371         MAX_REG_MEM = CmiReadSize(env);
3372         user_set_flag = 1;
3373     }
3374
3375     env = getenv("CHARM_UGNI_SEND_MAX");
3376     if (env) {
3377         MAX_BUFF_SEND = CmiReadSize(env);
3378         user_set_flag = 1;
3379     }
3380     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3381         MAX_BUFF_SEND = CmiReadSize(env);
3382         user_set_flag = 1;
3383     }
3384
3385     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3386     if (env) {
3387         _mempool_size_limit = CmiReadSize(env);
3388     }
3389
3390     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3391     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3392
3393     if (myrank==0) {
3394         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3395         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3396         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3397             /* memblock can expand to BIG_MSG * 2 size */
3398             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3399             CmiAbort("mempool maximum size is too small. \n");
3400         }
3401 #if MULTI_THREAD_SEND
3402         printf("Charm++> worker thread sending messages\n");
3403 #elif COMM_THREAD_SEND
3404         printf("Charm++> only comm thread send/recv messages\n");
3405 #endif
3406     }
3407
3408 #endif     /* end of USE_LRTS_MEMPOOL */
3409
3410     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3411     if (env) {
3412         BIG_MSG = CmiReadSize(env);
3413         if (BIG_MSG < ONE_SEG)
3414           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3415     }
3416     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3417     if (env) {
3418         BIG_MSG_PIPELINE = atoi(env);
3419     }
3420
3421     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3422     if (env) _checkProgress = 0;
3423     if (mysize == 1) _checkProgress = 0;
3424
3425     
3426     /*
3427     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3428     if (env) 
3429         _tlbpagesize = CmiReadSize(env);
3430     */
3431     /* real gethugepagesize() is only available when hugetlb module linked */
3432     _tlbpagesize = gethugepagesize();
3433     if (myrank == 0) {
3434         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3435     }
3436
3437 #if LARGEPAGE
3438     if (_tlbpagesize == 4096) {
3439         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3440     }
3441 #endif
3442
3443     print_stats = CmiGetArgFlag(*argv, "+print_stats");
3444
3445     /* init DMA buffer for medium message */
3446
3447     //_init_DMA_buffer();
3448     
3449     free(MPID_UGNI_AllAddr);
3450 #if CMK_SMP
3451     sendRdmaBuf = PCQueueCreate();
3452 #else
3453     sendRdmaBuf = 0;
3454 #endif
3455 #if MACHINE_DEBUG_LOG
3456     char ln[200];
3457     sprintf(ln,"debugLog.%d",myrank);
3458     debugLog=fopen(ln,"w");
3459 #endif
3460
3461 //    NTK_Init();
3462 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3463
3464 #if  REMOTE_EVENT
3465     AckPool_init();
3466 #endif
3467
3468 #if CMK_WITH_STATS
3469     init_comm_stats();
3470 #endif
3471 }
3472
3473 void* LrtsAlloc(int n_bytes, int header)
3474 {
3475     void *ptr = NULL;
3476 #if 0
3477     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3478 #endif
3479     if(n_bytes <= SMSG_MAX_MSG)
3480     {
3481         int totalsize = n_bytes+header;
3482         ptr = malloc(totalsize);
3483     }
3484     else {
3485         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3486 #if     USE_LRTS_MEMPOOL
3487         n_bytes = ALIGN64(n_bytes);
3488         if(n_bytes < BIG_MSG)</