46474d286577d0851b3a18b71ff3665e840f5dfa
[charm.git] / src / arch / gni / machine.c
1
2 /** @file
3  * GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #define DIRECT_SEQ 0xFFFFFFE 
48 #include "cmidirect.h"
49 #endif
50
51 #if REGULARPAGE 
52 #define     LARGEPAGE              0
53 #else
54 #define     LARGEPAGE              1
55 #endif
56
57 #if CMK_SMP
58 #define MULTI_THREAD_SEND          0
59 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
60 #endif
61
62 #if MULTI_THREAD_SEND
63 #define CMK_WORKER_SINGLE_TASK     1
64 #endif
65
66 #if !CMK_CRAYXC
67 #define REMOTE_EVENT               1
68 #endif
69
70 #define CQWRITE                    0
71
72 #define CMI_EXERT_SEND_LARGE_CAP   0
73 #define CMI_EXERT_RECV_RDMA_CAP    0
74
75
76 #define CMI_SENDBUFFERSMSG_CAP            0
77 #define CMI_PUMPNETWORKSMSG_CAP           0
78 #define CMI_PUMPREMOTETRANSACTIONS_CAP    0
79 #define CMI_PUMPLOCALTRANSACTIONS_CAP     0
80
81 #if CMI_SENDBUFFERSMSG_CAP
82 int     SendBufferMsg_cap  = 20;
83 #endif
84
85 #if CMI_PUMPNETWORKSMSG_CAP
86 int     PumpNetworkSmsg_cap = 20;
87 #endif
88
89 #if CMI_PUMPREMOTETRANSACTIONS_CAP
90 int     PumpRemoteTransactions_cap = 20;
91 #endif
92
93 #if CMI_PUMPREMOTETRANSACTIONS_CAP
94 int     PumpLocalTransactions_cap = 15;
95 #endif
96
97 #if CMI_EXERT_SEND_LARGE_CAP
98 static int SEND_large_cap = 20;
99 static int SEND_large_pending = 0;
100 #endif
101
102 #if CMI_EXERT_RECV_RDMA_CAP
103 static int   RDMA_cap =   10;
104 static int   RDMA_pending = 0;
105 #endif
106
107 #define USE_LRTS_MEMPOOL                  1
108
109 #define PRINT_SYH                         0
110
111 // Trace communication thread
112 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
113 #define TRACE_THRESHOLD     0.00001
114 #define CMI_MPI_TRACE_MOREDETAILED 0
115 #undef CMI_MPI_TRACE_USEREVENTS
116 #define CMI_MPI_TRACE_USEREVENTS 1
117 #else
118 #undef CMK_SMP_TRACE_COMMTHREAD
119 #define CMK_SMP_TRACE_COMMTHREAD 0
120 #endif
121
122 #define CMK_TRACE_COMMOVERHEAD 0
123 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
124 #undef CMI_MPI_TRACE_USEREVENTS
125 #define CMI_MPI_TRACE_USEREVENTS 1
126 #else
127 #undef CMK_TRACE_COMMOVERHEAD
128 #define CMK_TRACE_COMMOVERHEAD 0
129 #endif
130
131 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
132 CpvStaticDeclare(double, projTraceStart);
133 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
134 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
135 #define  EVENT_TIME()   CpvAccess(projTraceStart)
136 #else
137 #define  START_EVENT()
138 #define  END_EVENT(x)
139 #define  EVENT_TIME()   (0.0)
140 #endif
141
142 #if USE_LRTS_MEMPOOL
143
144 #define oneMB (1024ll*1024)
145 #define oneGB (1024ll*1024*1024)
146
147 static CmiInt8 _mempool_size = 8*oneMB;
148 static CmiInt8 _expand_mem =  4*oneMB;
149 static CmiInt8 _mempool_size_limit = 0;
150
151 static CmiInt8 _totalmem = 0.8*oneGB;
152
153 #if LARGEPAGE
154 static CmiInt8 BIG_MSG  =  16*oneMB;
155 static CmiInt8 ONE_SEG  =  4*oneMB;
156 #else
157 static CmiInt8 BIG_MSG  =  4*oneMB;
158 static CmiInt8 ONE_SEG  =  2*oneMB;
159 #endif
160 #if MULTI_THREAD_SEND
161 static int BIG_MSG_PIPELINE = 1;
162 #else
163 static int BIG_MSG_PIPELINE = 4;
164 #endif
165
166 // dynamic flow control
167 static CmiInt8 buffered_send_msg = 0;
168 static CmiInt8 register_memory_size = 0;
169
170 #if LARGEPAGE
171 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
172 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
173 static CmiInt8  register_count = 0;
174 #else
175 #if CMK_SMP && COMM_THREAD_SEND 
176 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
177 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
178 #else
179 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
180 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
181 #endif
182
183
184 #endif
185
186 #endif     /* end USE_LRTS_MEMPOOL */
187
188 #if MULTI_THREAD_SEND 
189 #define     CMI_GNI_LOCK(x)       CmiLock(x);
190 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
191 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
192 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
193 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
194 #else
195 #define     CMI_GNI_LOCK(x)
196 #define     CMI_GNI_TRYLOCK(x)         (0)
197 #define     CMI_GNI_UNLOCK(x)
198 #define     CMI_PCQUEUEPOP_LOCK(Q)   
199 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
200 #endif
201
202 static int _tlbpagesize = 4096;
203
204 //static int _smpd_count  = 0;
205
206 static int   user_set_flag  = 0;
207
208 static int _checkProgress = 1;             /* check deadlock */
209 static int _detected_hang = 0;
210
211 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
212
213 // dynamic SMSG
214 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
215
216 static int avg_smsg_connection = 32;
217 static int                 *smsg_connected_flag= 0;
218 static gni_smsg_attr_t     **smsg_attr_vector_local;
219 static gni_smsg_attr_t     **smsg_attr_vector_remote;
220 static gni_ep_handle_t     ep_hndl_unbound;
221 static gni_smsg_attr_t     send_smsg_attr;
222 static gni_smsg_attr_t     recv_smsg_attr;
223
224 typedef struct _dynamic_smsg_mailbox{
225    void     *mailbox_base;
226    int      size;
227    int      offset;
228    gni_mem_handle_t  mem_hndl;
229    struct      _dynamic_smsg_mailbox  *next;
230 }dynamic_smsg_mailbox_t;
231
232 static dynamic_smsg_mailbox_t  *mailbox_list;
233
234 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
235 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
236
237 #if PRINT_SYH
238 int         lrts_send_msg_id = 0;
239 int         lrts_local_done_msg = 0;
240 int         lrts_send_rdma_success = 0;
241 #endif
242
243 #include "machine.h"
244
245 #include "pcqueue.h"
246
247 #include "mempool.h"
248
249 #if CMK_PERSISTENT_COMM
250 #define PERSISTENT_GET_BASE 0 
251 #if !PERSISTENT_GET_BASE
252 #define CMK_PERSISTENT_COMM_PUT 1 
253 #endif
254 #include "machine-persistent.h"
255 #define  POST_HIGHPRIORITY_RDMA    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
256 #else  
257 #define  POST_HIGHPRIORITY_RDMA   
258 #endif
259
260 #if REMOTE_EVENT && (CMK_USE_OOB || CMK_PERSISTENT_COMM_PUT) 
261 #define  PUMP_REMOTE_HIGHPRIORITY    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
262 #else
263 #define  PUMP_REMOTE_HIGHPRIORITY
264 #endif
265
266 //#define  USE_ONESIDED 1
267 #ifdef USE_ONESIDED
268 //onesided implementation is wrong, since no place to restore omdh
269 #include "onesided.h"
270 onesided_hnd_t   onesided_hnd;
271 onesided_md_t    omdh;
272 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
273
274 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
275
276 #else
277 uint8_t   onesided_hnd, omdh;
278
279 #if REMOTE_EVENT || CQWRITE 
280 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
281     if(register_memory_size+size>= MAX_REG_MEM) { \
282         status = GNI_RC_ERROR_NOMEM;} \
283     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
284         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
285 #else
286 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
287         if (register_memory_size + size >= MAX_REG_MEM) { \
288             status = GNI_RC_ERROR_NOMEM; \
289         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
290             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
291 #endif
292
293 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
294     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
295              register_memory_size -= size; \
296          else CmiAbort("MEM_DEregister");  \
297     } while (0)
298 #endif
299
300 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
301 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
302 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
303 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
304 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
305 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
306 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
307 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
308 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
309 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
310 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
311 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
312 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
313 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
314
315 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
316 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
317
318 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
319 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
320 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
321 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
322
323 #define ALIGNBUF                64
324
325 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
326 /* If SMSG is not used */
327
328 #define FMA_PER_CORE  1024
329 #define FMA_BUFFER_SIZE 1024
330
331 /* If SMSG is used */
332 static int  SMSG_MAX_MSG = 1024;
333 #define SMSG_MAX_CREDIT    72
334
335 #define MSGQ_MAXSIZE       2048
336
337 /* large message transfer with FMA or BTE */
338 #if ! REMOTE_EVENT
339 #define LRTS_GNI_RDMA_THRESHOLD  1024 
340 #else
341    /* remote events only work with RDMA */
342 #define LRTS_GNI_RDMA_THRESHOLD  0 
343 #endif
344
345 #if CMK_SMP
346 static int  REMOTE_QUEUE_ENTRIES=163840; 
347 static int LOCAL_QUEUE_ENTRIES=163840; 
348 #else
349 static int  REMOTE_QUEUE_ENTRIES=20480;
350 static int LOCAL_QUEUE_ENTRIES=20480; 
351 #endif
352
353 #define BIG_MSG_TAG             0x26
354 #define PUT_DONE_TAG            0x28
355 #define DIRECT_PUT_DONE_TAG     0x29
356 #define ACK_TAG                 0x30
357 /* SMSG is data message */
358 #define SMALL_DATA_TAG          0x31
359 /* SMSG is a control message to initialize a BTE */
360 #define LMSG_INIT_TAG           0x33 
361 #define LMSG_PERSISTENT_INIT_TAG           0x34 
362 #define LMSG_OOB_INIT_TAG       0x35
363
364 #define DEBUG
365 #ifdef GNI_RC_CHECK
366 #undef GNI_RC_CHECK
367 #endif
368 #ifdef DEBUG
369 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
370 #else
371 #define GNI_RC_CHECK(msg,rc)
372 #endif
373
374 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
375 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
376 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
377
378 static int useStaticMSGQ = 0;
379 static int useStaticFMA = 0;
380 static int mysize, myrank;
381 static gni_nic_handle_t   nic_hndl;
382
383 typedef struct {
384     gni_mem_handle_t mdh;
385     uint64_t addr;
386 } mdh_addr_t ;
387 // this is related to dynamic SMSG
388
389 typedef struct mdh_addr_list{
390     gni_mem_handle_t mdh;
391    void *addr;
392     struct mdh_addr_list *next;
393 }mdh_addr_list_t;
394
395 static unsigned int         smsg_memlen;
396 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
397 mdh_addr_t          setup_mem;
398 mdh_addr_t          *smsg_connection_vec = 0;
399 gni_mem_handle_t    smsg_connection_memhndl;
400 static int          smsg_expand_slots = 10;
401 static int          smsg_available_slot = 0;
402 static void         *smsg_mailbox_mempool = 0;
403 mdh_addr_list_t     *smsg_dynamic_list = 0;
404
405 static void             *smsg_mailbox_base;
406 gni_msgq_attr_t         msgq_attrs;
407 gni_msgq_handle_t       msgq_handle;
408 gni_msgq_ep_attr_t      msgq_ep_attrs;
409 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
410
411 /* =====Beginning of Declarations of Machine Specific Variables===== */
412 static int cookie;
413 static int modes = 0;
414 static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
415 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
416 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
417 static gni_cq_handle_t       highprior_rdma_tx_cqh = NULL;      // rdma - local event
418 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
419 static gni_cq_handle_t       highpriority_rx_cqh = NULL;      // mempool - remote event
420 static gni_ep_handle_t       *ep_hndl_array;
421
422 static CmiNodeLock           *ep_lock_array;
423 static CmiNodeLock           default_tx_cq_lock; 
424 static CmiNodeLock           rdma_tx_cq_lock; 
425 static CmiNodeLock           global_gni_lock; 
426 static CmiNodeLock           rx_cq_lock;
427 static CmiNodeLock           smsg_mailbox_lock;
428 static CmiNodeLock           smsg_rx_cq_lock;
429 static CmiNodeLock           *mempool_lock;
430 //#define     CMK_WITH_STATS      1
431 typedef struct msg_list
432 {
433     uint32_t destNode;
434     uint32_t size;
435     void *msg;
436     uint8_t tag;
437 #if CMK_WITH_STATS
438     double  creation_time;
439 #endif
440 }MSG_LIST;
441
442
443 typedef struct control_msg
444 {
445     uint64_t            source_addr;    /* address from the start of buffer  */
446     uint64_t            dest_addr;      /* address from the start of buffer */
447     int                 total_length;   /* total length */
448     int                 length;         /* length of this packet */
449 #if REMOTE_EVENT
450     int                 ack_index;      /* index from integer to address */
451 #endif
452     int     seq_id;         //big message   0 meaning single message
453     gni_mem_handle_t    source_mem_hndl;
454 #if PERSISTENT_GET_BASE
455     gni_mem_handle_t    dest_mem_hndl;
456 #endif
457     struct control_msg  *next;
458 } CONTROL_MSG;
459
460 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
461
462 typedef struct ack_msg
463 {
464     uint64_t            source_addr;    /* address from the start of buffer  */
465 #if ! USE_LRTS_MEMPOOL
466     gni_mem_handle_t    source_mem_hndl;
467     int                 length;          /* total length */
468 #endif
469     struct ack_msg     *next;
470 } ACK_MSG;
471
472 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
473
474 #if CMK_DIRECT
475 typedef struct{
476     uint64_t    handler_addr;
477 }CMK_DIRECT_HEADER;
478
479 typedef struct {
480     char core[CmiMsgHeaderSizeBytes];
481     uint64_t handler;
482 }cmidirectMsg;
483
484 //SYH
485 CpvDeclare(int, CmiHandleDirectIdx);
486 void CmiHandleDirectMsg(cmidirectMsg* msg)
487 {
488
489     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
490    (*(_handle->callbackFnPtr))(_handle->callbackData);
491    CmiFree(msg);
492 }
493
494 void CmiDirectInit()
495 {
496     CpvInitialize(int,  CmiHandleDirectIdx);
497     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
498 }
499
500 #endif
501 typedef struct  rmda_msg
502 {
503     int                   destNode;
504 #if REMOTE_EVENT
505     int                   ack_index;
506 #endif
507     gni_post_descriptor_t *pd;
508 }RDMA_REQUEST;
509
510
511 #if CMK_SMP
512 #define SMP_LOCKS                       1
513 #else
514 #define SMP_LOCKS                       0
515 #endif
516 #define ONE_SEND_QUEUE                  0
517
518 typedef PCQueue BufferList;
519 typedef struct  msg_list_index
520 {
521     PCQueue       sendSmsgBuf;
522 #if  SMP_LOCKS
523     CmiNodeLock   lock;
524     int           pushed;
525     int           destpe;
526 #endif
527 } MSG_LIST_INDEX;
528 char                *destpe_avail;
529 PCQueue sendRdmaBuf;
530 PCQueue sendHighPriorBuf;
531 // buffered send queue
532 #if ! ONE_SEND_QUEUE
533 typedef struct smsg_queue
534 {
535     MSG_LIST_INDEX   *smsg_msglist_index;
536     int               smsg_head_index;
537 #if  SMP_LOCKS
538     PCQueue     nonEmptyQueues;
539 #endif
540 } SMSG_QUEUE;
541 #else
542 typedef struct smsg_queue
543 {
544     PCQueue       sendMsgBuf;
545 }  SMSG_QUEUE;
546 #endif
547
548 SMSG_QUEUE                  smsg_queue;
549 #if CMK_USE_OOB
550 SMSG_QUEUE                  smsg_oob_queue;
551 #define SEND_OOB_SMSG(x)            SendBufferMsg(&x, NULL);
552 #define PUMP_LOCAL_HIGHPRIORITY    STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh,  rdma_tx_cq_lock)); 
553 #else
554 #define SEND_OOB_SMSG(x)            
555 #define PUMP_LOCAL_HIGHPRIORITY     
556 #endif
557
558 #define FreeMsgList(d)   free(d);
559 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
560
561 #define FreeControlMsg(d)      free(d);
562 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
563
564 #define FreeAckMsg(d)      free(d);
565 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
566
567 #define FreeRdmaRequest(d)       free(d);
568 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
569 /* reuse gni_post_descriptor_t */
570 static gni_post_descriptor_t *post_freelist=0;
571
572 #define FreePostDesc(d)     free(d);
573 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
574
575
576 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
577 static int      buffered_smsg_counter = 0;
578
579 /* SmsgSend return success but message sent is not confirmed by remote side */
580 static MSG_LIST *buffered_fma_head = 0;
581 static MSG_LIST *buffered_fma_tail = 0;
582
583 /* functions  */
584 #define IsFree(a,ind)  !( a& (1<<(ind) ))
585 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
586 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
587
588 CpvDeclare(mempool_type*, mempool);
589
590 #if CMK_PERSISTENT_COMM_PUT
591 CpvDeclare(mempool_type*, persistent_mempool);
592 #endif
593
594 #if CMK_SMSGS_FREE_AFTER_EVENT
595 /* 
596   SMSGS pool 
597   the pool is to buffer sending smsgs until it can be free'ed .
598 */
599
600 #if CMK_SMP
601 #define SMSG_INIT_SIZE                4096
602 #else
603 #define SMSG_INIT_SIZE                1024
604 #endif
605
606 struct SmsgsStruct {
607 void *addr;
608 int next;
609 int type;               /* 1: charm   2: control msg */
610 };
611
612 typedef struct SmsgsPool {
613     struct SmsgsStruct   *indexes;
614     int                   size;
615     int                   freehead;
616     CmiNodeLock           lock;
617 } SmsgsPool;
618
619 static SmsgsPool  smsgsPool;
620
621 #define  GetSmsgsIndexType(pool, s)             (pool.indexes[s].type)
622 #define  GetSmsgsIndexAddress(pool, s)          (pool.indexes[s].addr)
623
624 static void SmsgsPool_init(SmsgsPool *pool)
625 {
626     int i;
627     pool->size = SMSG_INIT_SIZE;
628     pool->indexes = (struct SmsgsStruct *)malloc(pool->size*sizeof(struct SmsgsStruct));
629     for (i=0; i<pool->size-1; i++) {
630         pool->indexes[i].next = i+1;
631     }
632     pool->indexes[i].next = -1;
633     pool->freehead = 0;
634 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM_PUT
635     pool->lock  = CmiCreateLock();
636 #else
637     pool->lock  = 0;
638 #endif
639 }
640
641 static int SmsgsPool_getslot(SmsgsPool *pool, void *addr, int type)
642 {
643     int s, i;
644 #if MULTI_THREAD_SEND  
645     CmiLock(pool->lock);
646 #endif
647     s = pool->freehead;
648     if (s == -1) {
649         int newsize = pool->size * 2;
650         //printf("[%d] SmsgsPool_getslot %p expand to: %d\n", myrank, pool, newsize);
651         struct SmsgsStruct *old_pool = pool->indexes;
652         pool->indexes = (struct SmsgsStruct *)malloc(newsize*sizeof(struct SmsgsStruct));
653         memcpy(pool->indexes, old_pool, pool->size*sizeof(struct SmsgsStruct));
654         for (i=pool->size; i<newsize-1; i++) {
655             pool->indexes[i].next = i+1;
656         }
657         pool->indexes[i].next = -1;
658         pool->freehead = pool->size;
659         s = pool->size;
660         pool->size = newsize;
661         free(old_pool);
662     }
663     pool->freehead = pool->indexes[s].next;
664     pool->indexes[s].addr = addr;
665     pool->indexes[s].type = type;
666 #if MULTI_THREAD_SEND
667     CmiUnlock(pool->lock);
668 #endif
669     return s;
670 }
671
672 static void SmsgsPool_freeslot(SmsgsPool *pool, int s)
673 {
674     CmiAssert(s>=0 && s<pool->size);
675 #if MULTI_THREAD_SEND
676     CmiLock(pool->lock);
677 #endif
678     pool->indexes[s].next = pool->freehead;
679     pool->freehead = s;
680 #if MULTI_THREAD_SEND
681     CmiUnlock(pool->lock);
682 #endif
683 }
684
685 #endif  /* CMK_SMSGS_FREE_AFTER_EVENT */
686
687
688 #if REMOTE_EVENT
689 /* ack pool for remote events */
690
691 static int  SHIFT   =           18;
692 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
693 #define RANK_MASK               ((1<<SHIFT) - 1)
694 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
695
696 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
697 #define GET_RANK(evt)           ((evt) & RANK_MASK)
698 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
699
700 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
701 #define DIRECT_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
702
703 #if CMK_SMP
704 #define INIT_SIZE                4096
705 #else
706 #define INIT_SIZE                1024
707 #endif
708
709 struct IndexStruct {
710 void *addr;
711 int next;
712 int type;     // 1: ACK   2: Persistent
713 };
714
715 typedef struct IndexPool {
716     struct IndexStruct   *indexes;
717     int                   size;
718     int                   freehead;
719     CmiNodeLock           lock;
720 } IndexPool;
721
722 static IndexPool  ackPool;
723 #if CMK_PERSISTENT_COMM_PUT
724 static IndexPool  persistPool;
725 #else
726 #define persistPool ackPool 
727 #endif
728
729 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
730 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
731
732 static void IndexPool_init(IndexPool *pool)
733 {
734     int i;
735     if ((1<<SHIFT) < mysize) 
736         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
737     pool->size = INIT_SIZE;
738     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
739     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
740     for (i=0; i<pool->size-1; i++) {
741         pool->indexes[i].next = i+1;
742         pool->indexes[i].type = 0;
743     }
744     pool->indexes[i].next = -1;
745     pool->indexes[i].type = 0;
746     pool->freehead = 0;
747 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM_PUT
748     pool->lock  = CmiCreateLock();
749 #else
750     pool->lock  = 0;
751 #endif
752 }
753
754 static int IndexPool_getslot(IndexPool *pool, void *addr, int type)
755 {
756     int s, i;
757 #if MULTI_THREAD_SEND  
758     CmiLock(pool->lock);
759 #endif
760     CmiAssert(type == 1 || type == 2);
761     s = pool->freehead;
762     if (s == -1) {
763         int newsize = pool->size * 2;
764         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
765         if (newsize > (1<<(32-SHIFT-1))) {
766             static int warned = 0;
767             if (!warned)
768               printf("[%d] Warning: IndexPool_getslot %p overflow when expanding to: %d\n", myrank, pool, newsize);
769             warned = 1;
770             return -1;
771             CmiAbort("IndexPool for remote events overflows, try compile Charm++ with remote event disabled.");
772         }
773         struct IndexStruct *old_ackpool = pool->indexes;
774         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
775         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
776         for (i=pool->size; i<newsize-1; i++) {
777             pool->indexes[i].next = i+1;
778             pool->indexes[i].type = 0;
779         }
780         pool->indexes[i].next = -1;
781         pool->indexes[i].type = 0;
782         pool->freehead = pool->size;
783         s = pool->size;
784         pool->size = newsize;
785         free(old_ackpool);
786     }
787     pool->freehead = pool->indexes[s].next;
788     pool->indexes[s].addr = addr;
789     CmiAssert(pool->indexes[s].type == 0);
790     pool->indexes[s].type = type;
791 #if MULTI_THREAD_SEND
792     CmiUnlock(pool->lock);
793 #endif
794     return s;
795 }
796
797 static void IndexPool_freeslot(IndexPool *pool, int s)
798 {
799     CmiAssert(s>=0 && s<pool->size);
800 #if MULTI_THREAD_SEND
801     CmiLock(pool->lock);
802 #endif
803     pool->indexes[s].next = pool->freehead;
804     pool->indexes[s].type = 0;
805     pool->freehead = s;
806 #if MULTI_THREAD_SEND
807     CmiUnlock(pool->lock);
808 #endif
809 }
810
811
812 #endif   /* REMOTE_EVENT */
813
814 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
815 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
816 #define CHARM_MAGIC_NUMBER               126
817
818 #if CMK_ERROR_CHECKING
819 extern unsigned char computeCheckSum(unsigned char *data, int len);
820 static int checksum_flag = 0;
821 #define CMI_SET_CHECKSUM(msg, len)      \
822         if (checksum_flag)  {   \
823           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
824           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
825         }
826 #define CMI_CHECK_CHECKSUM(msg, len)    \
827         if (checksum_flag)      \
828           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
829             CmiAbort("Fatal error: checksum doesn't agree!\n");
830 #else
831 #define CMI_SET_CHECKSUM(msg, len)
832 #define CMI_CHECK_CHECKSUM(msg, len)
833 #endif
834 /* =====End of Definitions of Message-Corruption Related Macros=====*/
835
836 static int print_stats = 0;
837 static int stats_off = 0;
838 void CmiTurnOnStats()
839 {
840     stats_off = 0;
841     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
842 }
843
844 void CmiTurnOffStats()
845 {
846     stats_off = 1;
847 }
848
849 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
850
851 #if CMK_WITH_STATS
852 FILE *counterLog = NULL;
853 typedef struct comm_thread_stats
854 {
855     uint64_t  smsg_data_count;
856     uint64_t  lmsg_init_count;
857     uint64_t  ack_count;
858     uint64_t  big_msg_ack_count;
859     uint64_t  smsg_count;
860     uint64_t  direct_put_done_count;
861     uint64_t  put_done_count;
862     //times of calling SmsgSend
863     uint64_t  try_smsg_data_count;
864     uint64_t  try_lmsg_init_count;
865     uint64_t  try_ack_count;
866     uint64_t  try_big_msg_ack_count;
867     uint64_t  try_direct_put_done_count;
868     uint64_t  try_put_done_count;
869     uint64_t  try_smsg_count;
870     
871     double    max_time_in_send_buffered_smsg;
872     double    all_time_in_send_buffered_smsg;
873
874     uint64_t  rdma_get_count, rdma_put_count;
875     uint64_t  try_rdma_get_count, try_rdma_put_count;
876     double    max_time_from_control_to_rdma_init;
877     double    all_time_from_control_to_rdma_init;
878
879     double    max_time_from_rdma_init_to_rdma_done;
880     double    all_time_from_rdma_init_to_rdma_done;
881
882     int      count_in_PumpNetwork;
883     double   time_in_PumpNetwork;
884     double   max_time_in_PumpNetwork;
885     int      count_in_SendBufferMsg_smsg;
886     double   time_in_SendBufferMsg_smsg;
887     double   max_time_in_SendBufferMsg_smsg;
888     int      count_in_SendRdmaMsg;
889     double   time_in_SendRdmaMsg;
890     double   max_time_in_SendRdmaMsg;
891     int      count_in_PumpRemoteTransactions;
892     double   time_in_PumpRemoteTransactions;
893     double   max_time_in_PumpRemoteTransactions;
894     int      count_in_PumpLocalTransactions_rdma;
895     double   time_in_PumpLocalTransactions_rdma;
896     double   max_time_in_PumpLocalTransactions_rdma;
897     int      count_in_PumpDatagramConnection;
898     double   time_in_PumpDatagramConnection;
899     double   max_time_in_PumpDatagramConnection;
900 } Comm_Thread_Stats;
901
902 static Comm_Thread_Stats   comm_stats;
903
904 static char *counters_dirname = "counters";
905
906 static void init_comm_stats()
907 {
908   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
909   if (print_stats){
910       char ln[200];
911       int code = mkdir(counters_dirname, 00777); 
912       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
913       counterLog=fopen(ln,"w");
914       if (counterLog == NULL) CmiAbort("Counter files open failed");
915   }
916 }
917
918 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
919
920 #define SMSG_SENT_DONE(creation_time, tag)  \
921         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
922             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++;  \
923             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
924             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
925             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
926             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
927             comm_stats.smsg_count++; \
928             double inbuff_time = CmiWallTimer() - creation_time;   \
929             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
930             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
931         }
932
933 #define SMSG_TRY_SEND(tag)  \
934         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
935             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
936             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
937             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
938             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
939             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
940             comm_stats.try_smsg_count++; \
941         }
942
943 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
944
945 #define  RDMA_TRANS_DONE(x)      \
946          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
947              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
948              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
949          }
950
951 #define  RDMA_TRANS_INIT(type, x)      \
952          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
953              double rdma_trans_time = CmiWallTimer() - x ; \
954              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
955              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
956          }
957
958 #define STATS_PUMPNETWORK_TIME(x)   \
959         { double t = CmiWallTimer(); \
960           x;        \
961           t = CmiWallTimer() - t;          \
962           comm_stats.count_in_PumpNetwork++;        \
963           comm_stats.time_in_PumpNetwork += t;   \
964           if (t>comm_stats.max_time_in_PumpNetwork)      \
965               comm_stats.max_time_in_PumpNetwork = t;    \
966         }
967
968 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
969         { double t = CmiWallTimer(); \
970           x;        \
971           t = CmiWallTimer() - t;          \
972           comm_stats.count_in_PumpRemoteTransactions ++;        \
973           comm_stats.time_in_PumpRemoteTransactions += t;   \
974           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
975               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
976         }
977
978 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
979         { double t = CmiWallTimer(); \
980           x;        \
981           t = CmiWallTimer() - t;          \
982           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
983           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
984           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
985               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
986         }
987
988 #define STATS_SEND_SMSGS_TIME(x)   \
989         { double t = CmiWallTimer(); \
990           x;        \
991           t = CmiWallTimer() - t;          \
992           comm_stats.count_in_SendBufferMsg_smsg ++;        \
993           comm_stats.time_in_SendBufferMsg_smsg += t;   \
994           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
995               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
996         }
997
998 #define STATS_SENDRDMAMSG_TIME(x)   \
999         { double t = CmiWallTimer(); \
1000           x;        \
1001           t = CmiWallTimer() - t;          \
1002           comm_stats.count_in_SendRdmaMsg ++;        \
1003           comm_stats.time_in_SendRdmaMsg += t;   \
1004           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
1005               comm_stats.max_time_in_SendRdmaMsg = t;    \
1006         }
1007
1008 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
1009         { double t = CmiWallTimer(); \
1010           x;        \
1011           t = CmiWallTimer() - t;          \
1012           comm_stats.count_in_PumpDatagramConnection ++;        \
1013           comm_stats.time_in_PumpDatagramConnection += t;   \
1014           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
1015               comm_stats.max_time_in_PumpDatagramConnection = t;    \
1016         }
1017
1018 static void print_comm_stats()
1019 {
1020     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
1021     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
1022             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
1023             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
1024     
1025     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
1026             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
1027             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
1028
1029     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
1030     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
1031     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
1032
1033
1034     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
1035     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
1036     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
1037     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
1038     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
1039     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
1040     if (useDynamicSMSG)
1041     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
1042
1043     fclose(counterLog);
1044 }
1045
1046 #else
1047 #define STATS_PUMPNETWORK_TIME(x)                  x
1048 #define STATS_SEND_SMSGS_TIME(x)                   x
1049 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
1050 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1051 #define STATS_SENDRDMAMSG_TIME(x)                  x
1052 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1053 #endif
1054
1055 static void
1056 allgather(void *in,void *out, int len)
1057 {
1058     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1059     int i,rc;
1060     int my_rank;
1061     char *tmp_buf,*out_ptr;
1062
1063     if(!already_called) {
1064
1065         rc = PMI_Get_size(&job_size);
1066         CmiAssert(rc == PMI_SUCCESS);
1067         rc = PMI_Get_rank(&my_rank);
1068         CmiAssert(rc == PMI_SUCCESS);
1069
1070         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1071         CmiAssert(ivec_ptr != NULL);
1072
1073         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1074         CmiAssert(rc == PMI_SUCCESS);
1075
1076         already_called = 1;
1077
1078     }
1079
1080     tmp_buf = (char *)malloc(job_size * len);
1081     CmiAssert(tmp_buf);
1082
1083     rc = PMI_Allgather(in,tmp_buf,len);
1084     CmiAssert(rc == PMI_SUCCESS);
1085
1086     out_ptr = out;
1087
1088     for(i=0;i<job_size;i++) {
1089
1090         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1091
1092     }
1093
1094     free(tmp_buf);
1095 }
1096
1097 static void
1098 allgather_2(void *in,void *out, int len)
1099 {
1100     //PMI_Allgather is out of order
1101     int i,rc, extend_len;
1102     int  rank_index;
1103     char *out_ptr, *out_ref;
1104     char *in2;
1105
1106     extend_len = sizeof(int) + len;
1107     in2 = (char*)malloc(extend_len);
1108
1109     memcpy(in2, &myrank, sizeof(int));
1110     memcpy(in2+sizeof(int), in, len);
1111
1112     out_ptr = (char*)malloc(mysize*extend_len);
1113
1114     rc = PMI_Allgather(in2, out_ptr, extend_len);
1115     GNI_RC_CHECK("allgather", rc);
1116
1117     out_ref = out;
1118
1119     for(i=0;i<mysize;i++) {
1120         //rank index 
1121         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1122         //copy to the rank index slot
1123         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1124     }
1125
1126     free(out_ptr);
1127     free(in2);
1128
1129 }
1130
1131 static unsigned int get_gni_nic_address(int device_id)
1132 {
1133     unsigned int address, cpu_id;
1134     gni_return_t status;
1135     int i, alps_dev_id=-1,alps_address=-1;
1136     char *token, *p_ptr;
1137
1138     p_ptr = getenv("PMI_GNI_DEV_ID");
1139     if (!p_ptr) {
1140         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1141        
1142         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1143     } else {
1144         while ((token = strtok(p_ptr,":")) != NULL) {
1145             alps_dev_id = atoi(token);
1146             if (alps_dev_id == device_id) {
1147                 break;
1148             }
1149             p_ptr = NULL;
1150         }
1151         CmiAssert(alps_dev_id != -1);
1152         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1153         CmiAssert(p_ptr != NULL);
1154         i = 0;
1155         while ((token = strtok(p_ptr,":")) != NULL) {
1156             if (i == alps_dev_id) {
1157                 alps_address = atoi(token);
1158                 break;
1159             }
1160             p_ptr = NULL;
1161             ++i;
1162         }
1163         CmiAssert(alps_address != -1);
1164         address = alps_address;
1165     }
1166     return address;
1167 }
1168
1169 static uint8_t get_ptag(void)
1170 {
1171     char *p_ptr, *token;
1172     uint8_t ptag;
1173
1174     p_ptr = getenv("PMI_GNI_PTAG");
1175     CmiAssert(p_ptr != NULL);
1176     token = strtok(p_ptr, ":");
1177     ptag = (uint8_t)atoi(token);
1178     return ptag;
1179         
1180 }
1181
1182 static uint32_t get_cookie(void)
1183 {
1184     uint32_t cookie;
1185     char *p_ptr, *token;
1186
1187     p_ptr = getenv("PMI_GNI_COOKIE");
1188     CmiAssert(p_ptr != NULL);
1189     token = strtok(p_ptr, ":");
1190     cookie = (uint32_t)atoi(token);
1191
1192     return cookie;
1193 }
1194
1195 #if LARGEPAGE
1196
1197 /* directly mmap memory from hugetlbfs for large pages */
1198
1199 #include <sys/stat.h>
1200 #include <fcntl.h>
1201 #include <sys/mman.h>
1202 #include <hugetlbfs.h>
1203
1204 // size must be _tlbpagesize aligned
1205 void *my_get_huge_pages(size_t size)
1206 {
1207     char filename[512];
1208     int fd;
1209     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1210     void *ptr = NULL;
1211
1212     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1213     fd = open(filename, O_RDWR | O_CREAT, mode);
1214     if (fd == -1) {
1215         CmiAbort("my_get_huge_pages: open filed");
1216     }
1217     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1218     if (ptr == MAP_FAILED) ptr = NULL;
1219 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1220     close(fd);
1221     unlink(filename);
1222     return ptr;
1223 }
1224
1225 void my_free_huge_pages(void *ptr, int size)
1226 {
1227 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1228     int ret = munmap(ptr, size);
1229     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1230 }
1231
1232 #endif
1233
1234 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1235 /* TODO: add any that are related */
1236 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1237
1238
1239 #include "machine-lrts.h"
1240 #include "machine-common-core.c"
1241
1242
1243 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1244 static void SendRdmaMsg(PCQueue );
1245 static void PumpNetworkSmsg();
1246 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1247 #if CQWRITE
1248 static void PumpCqWriteTransactions();
1249 #endif
1250 #if REMOTE_EVENT
1251 static void PumpRemoteTransactions(gni_cq_handle_t);
1252 #endif
1253
1254 #if MACHINE_DEBUG_LOG
1255 static CmiInt8 buffered_recv_msg = 0;
1256 int         lrts_smsg_success = 0;
1257 int         lrts_received_msg = 0;
1258 #endif
1259
1260 static void sweep_mempool(mempool_type *mptr)
1261 {
1262     int n = 0;
1263     block_header *current = &(mptr->block_head);
1264
1265     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1266     while( current!= NULL) {
1267         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1268         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1269     }
1270     printf("[n %d] sweep_mempool slot END.\n", myrank);
1271 }
1272
1273
1274 static INLINE_KEYWORD  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1275 {
1276     block_header *current = *from;
1277
1278     //while(register_memory_size>= MAX_REG_MEM)
1279     //{
1280         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1281             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1282
1283         *from = current;
1284         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1285         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1286         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1287     //}
1288     return GNI_RC_SUCCESS;
1289 }
1290
1291 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1292 {
1293     gni_return_t status = GNI_RC_SUCCESS;
1294     //int size = GetMempoolsize(msg);
1295     //void *blockaddr = GetMempoolBlockPtr(msg);
1296     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1297    
1298     block_header *current = &(mptr->block_head);
1299     while(register_memory_size>= MAX_REG_MEM)
1300     {
1301         status = deregisterMemory(mptr, &current);
1302         if (status != GNI_RC_SUCCESS) break;
1303     }
1304     if(register_memory_size>= MAX_REG_MEM) return status;
1305
1306     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1307     while(1)
1308     {
1309         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1310         if(status == GNI_RC_SUCCESS)
1311         {
1312             break;
1313         }
1314         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1315         {
1316             GNI_RC_CHECK("registerFromMempool", status);
1317         }
1318         else
1319         {
1320             status = deregisterMemory(mptr, &current);
1321             if (status != GNI_RC_SUCCESS) break;
1322         }
1323     }; 
1324     return status;
1325 }
1326
1327 INLINE_KEYWORD
1328 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1329 {
1330     static int rank = -1;
1331     int i;
1332     gni_return_t status;
1333     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1334     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1335     mempool_type *mptr;
1336
1337     status = registerFromMempool(mptr1, msg, size, t, cqh);
1338     if (status == GNI_RC_SUCCESS) return status;
1339 #if CMK_SMP 
1340     for (i=0; i<CmiMyNodeSize()+1; i++) {
1341       rank = (rank+1)%(CmiMyNodeSize()+1);
1342       mptr = CpvAccessOther(mempool, rank);
1343       if (mptr == mptr1) continue;
1344       status = registerFromMempool(mptr, msg, size, t, cqh);
1345       if (status == GNI_RC_SUCCESS) return status;
1346     }
1347 #endif
1348     return  GNI_RC_ERROR_RESOURCE;
1349 }
1350
1351 INLINE_KEYWORD
1352 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1353 {
1354     MSG_LIST        *msg_tmp;
1355     MallocMsgList(msg_tmp);
1356     msg_tmp->destNode = destNode;
1357     msg_tmp->size   = size;
1358     msg_tmp->msg    = msg;
1359     msg_tmp->tag    = tag;
1360 #if CMK_WITH_STATS
1361     SMSG_CREATION(msg_tmp)
1362 #endif
1363
1364 #if ONE_SEND_QUEUE
1365     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1366 #else
1367 #if SMP_LOCKS
1368     CmiLock(queue->smsg_msglist_index[destNode].lock);
1369     if(queue->smsg_msglist_index[destNode].pushed == 0)
1370     {
1371         PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1372     }
1373     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1374     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1375 #else
1376     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1377 #endif
1378 #endif
1379
1380 #if PRINT_SYH
1381     buffered_smsg_counter++;
1382 #endif
1383 }
1384
1385 INLINE_KEYWORD static void print_smsg_attr(gni_smsg_attr_t     *a)
1386 {
1387     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1388 }
1389 #if 0
1390 INLINE_KEYWORD
1391 static void setup_smsg_connection(int destNode)
1392 {
1393     mdh_addr_list_t  *new_entry = 0;
1394     gni_post_descriptor_t *pd;
1395     gni_smsg_attr_t      *smsg_attr;
1396     gni_return_t status = GNI_RC_NOT_DONE;
1397     RDMA_REQUEST        *rdma_request_msg;
1398     
1399     if(smsg_available_slot == smsg_expand_slots)
1400     {
1401         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1402         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1403         memset(new_entry->addr, 0, smsg_memlen*smsg_expand_slots);
1404
1405         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1406             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1407             GNI_MEM_READWRITE,   
1408             -1,
1409             &(new_entry->mdh));
1410         smsg_available_slot = 0; 
1411         new_entry->next = smsg_dynamic_list;
1412         smsg_dynamic_list = new_entry;
1413     }
1414     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1415     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1416     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1417     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1418     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1419     smsg_attr->buff_size = smsg_memlen;
1420     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1421     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1422     smsg_local_attr_vec[destNode] = smsg_attr;
1423     smsg_available_slot++;
1424     MallocPostDesc(pd);
1425     pd->type            = GNI_POST_FMA_PUT;
1426     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1427     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1428     pd->length          = sizeof(gni_smsg_attr_t);
1429     pd->local_addr      = (uint64_t) smsg_attr;
1430     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1431     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1432     pd->src_cq_hndl     = 0;
1433
1434     pd->rdma_mode       = 0;
1435     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1436     print_smsg_attr(smsg_attr);
1437     if(status == GNI_RC_ERROR_RESOURCE )
1438     {
1439         MallocRdmaRequest(rdma_request_msg);
1440         rdma_request_msg->destNode = destNode;
1441         rdma_request_msg->pd = pd;
1442         /* buffer this request */
1443     }
1444 #if PRINT_SYH
1445     if(status != GNI_RC_SUCCESS)
1446        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1447     else
1448         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1449 #endif
1450 }
1451 #endif
1452 /* useDynamicSMSG */
1453 INLINE_KEYWORD
1454 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1455 {
1456     gni_return_t status = GNI_RC_NOT_DONE;
1457
1458     if(mailbox_list->offset == mailbox_list->size)
1459     {
1460         dynamic_smsg_mailbox_t *new_mailbox_entry;
1461         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1462         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1463         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1464         memset(new_mailbox_entry->mailbox_base, 0, new_mailbox_entry->size);
1465         new_mailbox_entry->offset = 0;
1466         
1467         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1468             new_mailbox_entry->size, smsg_rx_cqh,
1469             GNI_MEM_READWRITE,   
1470             -1,
1471             &(new_mailbox_entry->mem_hndl));
1472
1473         GNI_RC_CHECK("register", status);
1474         new_mailbox_entry->next = mailbox_list;
1475         mailbox_list = new_mailbox_entry;
1476     }
1477     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1478     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1479     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1480     local_smsg_attr->mbox_offset = mailbox_list->offset;
1481     mailbox_list->offset += smsg_memlen;
1482     local_smsg_attr->buff_size = smsg_memlen;
1483     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1484     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1485 }
1486
1487 /* useDynamicSMSG */
1488 INLINE_KEYWORD
1489 static int connect_to(int destNode)
1490 {
1491     gni_return_t status = GNI_RC_NOT_DONE;
1492     CmiAssert(smsg_connected_flag[destNode] == 0);
1493     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1494     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1495     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1496     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1497     
1498     CMI_GNI_LOCK(global_gni_lock)
1499     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1500     CMI_GNI_UNLOCK(global_gni_lock)
1501     if (status == GNI_RC_ERROR_RESOURCE) {
1502       /* possibly destNode is making connection at the same time */
1503       free(smsg_attr_vector_local[destNode]);
1504       smsg_attr_vector_local[destNode] = NULL;
1505       free(smsg_attr_vector_remote[destNode]);
1506       smsg_attr_vector_remote[destNode] = NULL;
1507       mailbox_list->offset -= smsg_memlen;
1508 #if PRINT_SYH
1509     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1510 #endif
1511       return 0;
1512     }
1513     GNI_RC_CHECK("GNI_Post", status);
1514     smsg_connected_flag[destNode] = 1;
1515 #if PRINT_SYH
1516     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1517 #endif
1518     return 1;
1519 }
1520
1521 INLINE_KEYWORD
1522 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr, int ischarm )
1523 {
1524     unsigned int          remote_address;
1525     uint32_t              remote_id;
1526     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1527     gni_smsg_attr_t       *smsg_attr;
1528     gni_post_descriptor_t *pd;
1529     gni_post_state_t      post_state;
1530     char                  *real_data; 
1531     int                   msgid = 0;
1532
1533     if (useDynamicSMSG) {
1534         switch (smsg_connected_flag[destNode]) {
1535         case 0: 
1536             connect_to(destNode);         /* continue to case 1 */
1537         case 1:                           /* pending connection, do nothing */
1538             status = GNI_RC_NOT_DONE;
1539             if(inbuff ==0)
1540                 buffer_small_msgs(queue, msg, size, destNode, tag);
1541             return status;
1542         }
1543     }
1544 #if ! ONE_SEND_QUEUE
1545     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1546 #endif
1547     {
1548         //CMI_GNI_LOCK(smsg_mailbox_lock)
1549         CMI_GNI_LOCK(default_tx_cq_lock)
1550 #if CMK_SMP_TRACE_COMMTHREAD
1551         int oldpe = -1;
1552         int oldeventid = -1;
1553         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG)
1554         { 
1555             START_EVENT();
1556             if ( tag == SMALL_DATA_TAG)
1557                 real_data = (char*)msg; 
1558             else 
1559                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1560             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1561             TRACE_COMM_SET_COMM_MSGID(real_data);
1562         }
1563 #endif
1564 #if REMOTE_EVENT
1565         if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG) {
1566             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1567             if (control_msg_tmp->seq_id <= 0 && control_msg_tmp->ack_index == -1)
1568             {
1569                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1570                 if (control_msg_tmp->ack_index == -1) {    /* table overflow */
1571                     status = GNI_RC_NOT_DONE;
1572                     if (inbuff ==0)
1573                         buffer_small_msgs(queue, msg, size, destNode, tag);
1574                     return status;
1575                 }
1576             }
1577         }
1578 #endif
1579 #if     CMK_WITH_STATS
1580         SMSG_TRY_SEND(tag)
1581 #endif
1582 #if CMK_WITH_STATS
1583         double              creation_time;
1584         if (ptr == NULL)
1585             creation_time = CmiWallTimer();
1586         else
1587             creation_time = ptr->creation_time;
1588 #endif
1589
1590 #if CMK_SMSGS_FREE_AFTER_EVENT
1591         msgid = SmsgsPool_getslot(&smsgsPool, msg, ischarm?1:2);
1592 #endif
1593         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, msgid, tag);
1594 #if CMK_SMP_TRACE_COMMTHREAD
1595         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1596 #endif
1597         CMI_GNI_UNLOCK(default_tx_cq_lock)
1598         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1599         if(status == GNI_RC_SUCCESS)
1600         {
1601 #if     CMK_WITH_STATS
1602             SMSG_SENT_DONE(creation_time,tag) 
1603 #endif
1604 #if CMK_SMP_TRACE_COMMTHREAD
1605             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG )
1606             { 
1607                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1608             }
1609 #endif
1610         }else {
1611             status = GNI_RC_ERROR_RESOURCE;
1612 #if CMK_SMSGS_FREE_AFTER_EVENT
1613             SmsgsPool_freeslot(&smsgsPool, msgid);
1614 #endif
1615         }
1616     }
1617     if(status != GNI_RC_SUCCESS && inbuff ==0)
1618         buffer_small_msgs(queue, msg, size, destNode, tag);
1619     return status;
1620 }
1621
1622 INLINE_KEYWORD
1623 static CONTROL_MSG* construct_control_msg(int size, char *msg, int seqno)
1624 {
1625     /* construct a control message and send */
1626     CONTROL_MSG         *control_msg_tmp;
1627     MallocControlMsg(control_msg_tmp);
1628     control_msg_tmp->source_addr = (uint64_t)msg;
1629     control_msg_tmp->seq_id    = seqno;
1630     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1631 #if REMOTE_EVENT
1632     control_msg_tmp->ack_index    =  -1;
1633 #endif
1634 #if     USE_LRTS_MEMPOOL
1635     if(size < BIG_MSG)
1636     {
1637         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1638     }
1639     else
1640     {
1641         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1642         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1643         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1644     }
1645 #else
1646     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1647 #endif
1648     return control_msg_tmp;
1649 }
1650
1651 #define BLOCKING_SEND_CONTROL    0
1652
1653 // Large message, send control to receiver, receiver register memory and do a GET, 
1654 // return 1 - send no success
1655 INLINE_KEYWORD static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr, uint8_t lmsg_tag)
1656 {
1657     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1658     uint32_t            vmdh_index  = -1;
1659     int                 size;
1660     int                 offset = 0;
1661     uint64_t            source_addr;
1662     int                 register_size; 
1663     void                *msg;
1664
1665     size    =   control_msg_tmp->total_length;
1666     source_addr = control_msg_tmp->source_addr;
1667     register_size = control_msg_tmp->length;
1668
1669 #if  USE_LRTS_MEMPOOL
1670     if( control_msg_tmp->seq_id <=0 ){
1671 #if BLOCKING_SEND_CONTROL
1672         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1673             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1674                 LrtsAdvanceCommunication(0);
1675         }
1676 #endif
1677         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1678         {
1679             msg = (void*)source_addr;
1680             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1681             {
1682                 if(!inbuff)
1683                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1684                 return GNI_RC_ERROR_NOMEM;
1685             }
1686             //register the corresponding mempool
1687             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1688             if(status == GNI_RC_SUCCESS)
1689             {
1690                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1691             }
1692         }else
1693         {
1694             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1695             status = GNI_RC_SUCCESS;
1696         }
1697         if(NoMsgInSend(source_addr))
1698             register_size = GetMempoolsize((void*)(source_addr));
1699         else
1700             register_size = 0;
1701     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1702     {
1703         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1704         source_addr += offset;
1705         size = control_msg_tmp->length;
1706 #if BLOCKING_SEND_CONTROL
1707         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1708             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1709                 LrtsAdvanceCommunication(0);
1710         }
1711 #endif
1712         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1713             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1714             {
1715                 if(!inbuff)
1716                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1717                 return GNI_RC_ERROR_NOMEM;
1718             }
1719             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1720             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1721         }
1722         else
1723         {
1724             status = GNI_RC_SUCCESS;
1725         }
1726         register_size = 0;  
1727     }
1728
1729 #if CMI_EXERT_SEND_LARGE_CAP
1730     if(SEND_large_pending >= SEND_large_cap)
1731     {
1732         status = GNI_RC_ERROR_NOMEM;
1733     }
1734 #endif
1735  
1736     if(status == GNI_RC_SUCCESS)
1737     {
1738        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, inbuff, smsg_ptr, 0); 
1739         if(status == GNI_RC_SUCCESS)
1740         {
1741 #if CMI_EXERT_SEND_LARGE_CAP
1742             SEND_large_pending++;
1743 #endif
1744             buffered_send_msg += register_size;
1745             if(control_msg_tmp->seq_id == 0)
1746             {
1747                 IncreaseMsgInSend(source_addr);
1748             }
1749 #if ! CMK_SMSGS_FREE_AFTER_EVENT
1750             FreeControlMsg(control_msg_tmp);
1751 #endif
1752             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, lmsg_tag); 
1753         }else
1754             status = GNI_RC_ERROR_RESOURCE;
1755
1756     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1757     {
1758         CmiAbort("Memory registor for large msg\n");
1759     }else 
1760     {
1761         status = GNI_RC_ERROR_NOMEM; 
1762         if(!inbuff)
1763             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1764     }
1765     return status;
1766 #else
1767     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1768     if(status == GNI_RC_SUCCESS)
1769     {
1770         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, 0, NULL, 0);
1771 #if ! CMK_SMSGS_FREE_AFTER_EVENT
1772         if(status == GNI_RC_SUCCESS)
1773         {
1774             FreeControlMsg(control_msg_tmp);
1775         }
1776 #endif
1777     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1778     {
1779         CmiAbort("Memory registor for large msg\n");
1780     }else 
1781     {
1782         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1783     }
1784     return status;
1785 #endif
1786 }
1787
1788 INLINE_KEYWORD void LrtsBeginIdle() {}
1789
1790 INLINE_KEYWORD void LrtsStillIdle() {}
1791
1792 INLINE_KEYWORD void LrtsNotifyIdle() {}
1793
1794 INLINE_KEYWORD void LrtsPrepareEnvelope(char *msg, int size)
1795 {
1796     CmiSetMsgSize(msg, size);
1797     CMI_SET_CHECKSUM(msg, size);
1798 }
1799
1800 CmiCommHandle LrtsSendFunc(int destNode, int destPE, int size, char *msg, int mode)
1801 {
1802     gni_return_t        status  =   GNI_RC_SUCCESS;
1803     uint8_t tag;
1804     CONTROL_MSG         *control_msg_tmp;
1805     int                 oob = ( mode & OUT_OF_BAND);
1806     SMSG_QUEUE          *queue;
1807
1808     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1809 #if CMK_USE_OOB
1810     queue = oob? &smsg_oob_queue : &smsg_queue;
1811     tag = oob? LMSG_OOB_INIT_TAG: LMSG_INIT_TAG;
1812 #else
1813     queue = &smsg_queue;
1814     tag = LMSG_INIT_TAG;
1815 #endif
1816
1817     LrtsPrepareEnvelope(msg, size);
1818
1819 #if PRINT_SYH
1820     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1821 #endif 
1822
1823 #if CMK_SMP 
1824     if(size <= SMSG_MAX_MSG)
1825         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1826     else if (size < BIG_MSG) {
1827         control_msg_tmp =  construct_control_msg(size, msg, 0);
1828         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1829     }
1830     else {
1831         CmiSetMsgSeq(msg, 0);
1832         control_msg_tmp =  construct_control_msg(size, msg, 1);
1833         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1834     }
1835 #else   //non-smp, smp(worker sending)
1836     if(size <= SMSG_MAX_MSG)
1837     {
1838         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL, 1))
1839         {  
1840 #if !CMK_SMSGS_FREE_AFTER_EVENT
1841             CmiFree(msg); 
1842 #endif
1843         }
1844     }
1845     else if (size < BIG_MSG) {
1846         control_msg_tmp =  construct_control_msg(size, msg, 0);
1847         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1848     }
1849     else {
1850 #if     USE_LRTS_MEMPOOL
1851         CmiSetMsgSeq(msg, 0);
1852         control_msg_tmp =  construct_control_msg(size, msg, 1);
1853         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1854 #else
1855         control_msg_tmp =  construct_control_msg(size, msg, 0);
1856         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1857 #endif
1858     }
1859 #endif
1860     return 0;
1861 }
1862
1863 #if 0
1864 // this is no different from the common code
1865 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1866 {
1867   int i;
1868 #if CMK_BROADCAST_USE_CMIREFERENCE
1869   for(i=0;i<npes;i++) {
1870     if (pes[i] == CmiMyPe())
1871       CmiSyncSend(pes[i], len, msg);
1872     else {
1873       CmiReference(msg);
1874       CmiSyncSendAndFree(pes[i], len, msg);
1875     }
1876   }
1877 #else
1878   for(i=0;i<npes;i++) {
1879     CmiSyncSend(pes[i], len, msg);
1880   }
1881 #endif
1882 }
1883
1884 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1885 {
1886   /* A better asynchronous implementation may be wanted, but at least it works */
1887   CmiSyncListSendFn(npes, pes, len, msg);
1888   return (CmiCommHandle) 0;
1889 }
1890
1891 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1892 {
1893   if (npes == 1) {
1894       CmiSyncSendAndFree(pes[0], len, msg);
1895       return;
1896   }
1897 #if CMK_PERSISTENT_COMM
1898   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE 
1899 #if CMK_SMP
1900             && IS_PERSISTENT_MEMORY(msg)
1901 #endif
1902      ){
1903       int i;
1904       for(i=0;i<npes;i++) {
1905         if (pes[i] == CmiMyPe())
1906           CmiSyncSend(pes[i], len, msg);
1907         else {
1908           CmiReference(msg);
1909           CmiSyncSendAndFree(pes[i], len, msg);
1910         }
1911       }
1912       CmiFree(msg);
1913       return;
1914   }
1915 #endif
1916   
1917 #if CMK_BROADCAST_USE_CMIREFERENCE
1918   CmiSyncListSendFn(npes, pes, len, msg);
1919   CmiFree(msg);
1920 #else
1921   int i;
1922   for(i=0;i<npes-1;i++) {
1923     CmiSyncSend(pes[i], len, msg);
1924   }
1925   if (npes>0)
1926     CmiSyncSendAndFree(pes[npes-1], len, msg);
1927   else 
1928     CmiFree(msg);
1929 #endif
1930 }
1931 #endif
1932
1933 static void    PumpDatagramConnection();
1934 static      int         event_SetupConnect = 111;
1935 static      int         event_PumpSmsg = 222 ;
1936 static      int         event_PumpTransaction = 333;
1937 static      int         event_PumpRdmaTransaction = 444;
1938 static      int         event_SendBufferSmsg = 484;
1939 static      int         event_SendFmaRdmaMsg = 555;
1940 static      int         event_AdvanceCommunication = 666;
1941
1942 static void registerUserTraceEvents() {
1943 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1944     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1945     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1946     event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1947     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1948     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1949     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1950     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1951 #endif
1952 }
1953
1954 static void ProcessDeadlock()
1955 {
1956     static CmiUInt8 *ptr = NULL;
1957     static CmiUInt8  last = 0, mysum, sum;
1958     static int count = 0;
1959     gni_return_t status;
1960     int i;
1961
1962 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1963 //sweep_mempool(CpvAccess(mempool));
1964     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1965     mysum = smsg_send_count + smsg_recv_count;
1966     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1967     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1968     GNI_RC_CHECK("PMI_Allgather", status);
1969     sum = 0;
1970     for (i=0; i<mysize; i++)  sum+= ptr[i];
1971     if (last == 0 || sum == last) 
1972         count++;
1973     else
1974         count = 0;
1975     last = sum;
1976     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1977     if (count == 2) { 
1978         /* detected twice, it is a real deadlock */
1979         if (myrank == 0)  {
1980             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1981             CmiAbort("Fatal> Deadlock detected.");
1982         }
1983
1984     }
1985     _detected_hang = 0;
1986 }
1987
1988 static void CheckProgress()
1989 {
1990     if (smsg_send_count == last_smsg_send_count &&
1991         smsg_recv_count == last_smsg_recv_count ) 
1992     {
1993         _detected_hang = 1;
1994 #if !CMK_SMP
1995         if (_detected_hang) ProcessDeadlock();
1996 #endif
1997
1998     }
1999     else {
2000         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
2001         last_smsg_send_count = smsg_send_count;
2002         last_smsg_recv_count = smsg_recv_count;
2003         _detected_hang = 0;
2004     }
2005 }
2006
2007 static void set_limit()
2008 {
2009     //if (!user_set_flag && CmiMyRank() == 0) {
2010     if (CmiMyRank() == 0) {
2011         int mynode = CmiPhysicalNodeID(CmiMyPe());
2012         int numpes = CmiNumPesOnPhysicalNode(mynode);
2013         int numprocesses = numpes / CmiMyNodeSize();
2014         MAX_REG_MEM  = _totalmem / numprocesses;
2015         MAX_BUFF_SEND = MAX_REG_MEM / 2;
2016         if (CmiMyPe() == 0)
2017            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
2018         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
2019         {
2020              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
2021              CmiAbort("memory registration\n");
2022         }
2023     }
2024 }
2025
2026 void LrtsPostCommonInit(int everReturn)
2027 {
2028 #if CMK_DIRECT
2029     CmiDirectInit();
2030 #endif
2031 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
2032     CpvInitialize(double, projTraceStart);
2033     /* only PE 0 needs to care about registration (to generate sts file). */
2034     //if (CmiMyPe() == 0) 
2035     {
2036         registerMachineUserEventsFunction(&registerUserTraceEvents);
2037     }
2038 #endif
2039
2040 #if !CMK_SMP
2041     if (useDynamicSMSG)
2042     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
2043 #endif
2044
2045 #if ! LARGEPAGE
2046     if (_checkProgress)
2047 #if CMK_SMP
2048     if (CmiMyRank() == 0)
2049 #endif
2050     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
2051 #endif
2052  
2053 #if !LARGEPAGE
2054     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
2055 #endif
2056 }
2057
2058 /* this is called by worker thread */
2059 void LrtsPostNonLocal()
2060 {
2061 #if 1
2062
2063 #if CMK_SMP_TRACE_COMMTHREAD
2064     double startT, endT;
2065 #endif
2066
2067 #if MULTI_THREAD_SEND
2068     if(mysize == 1) return;
2069
2070     if (CmiMyRank() % 6 != 3) return;
2071
2072 #if CMK_SMP_TRACE_COMMTHREAD
2073     traceEndIdle();
2074     startT = CmiWallTimer();
2075 #endif
2076
2077     CmiMachineProgressImpl();
2078
2079 #if CMK_SMP_TRACE_COMMTHREAD
2080     endT = CmiWallTimer();
2081     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2082     traceBeginIdle();
2083 #endif
2084
2085 #endif
2086 #endif
2087 }
2088
2089 /* Network progress function is used to poll the network when for
2090    messages. This flushes receive buffers on some  implementations*/
2091 #if CMK_MACHINE_PROGRESS_DEFINED
2092 void CmiMachineProgressImpl() {
2093 #if ! CMK_SMP || MULTI_THREAD_SEND
2094
2095     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
2096     SEND_OOB_SMSG(smsg_oob_queue)
2097     PUMP_REMOTE_HIGHPRIORITY
2098     PUMP_LOCAL_HIGHPRIORITY
2099     POST_HIGHPRIORITY_RDMA
2100
2101 #if 0
2102 #if CMK_WORKER_SINGLE_TASK
2103     if (CmiMyRank() % 6 == 0)
2104 #endif
2105     PumpNetworkSmsg();
2106
2107 #if CMK_WORKER_SINGLE_TASK
2108     if (CmiMyRank() % 6 == 1)
2109 #endif
2110     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2111
2112 #if CMK_WORKER_SINGLE_TASK
2113     if (CmiMyRank() % 6 == 2)
2114 #endif
2115     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2116
2117 #if REMOTE_EVENT
2118 #if CMK_WORKER_SINGLE_TASK
2119     if (CmiMyRank() % 6 == 3)
2120 #endif
2121     PumpRemoteTransactions(rdma_rx_cqh);         // rdma_rx_cqh
2122 #endif
2123
2124 #if CMK_WORKER_SINGLE_TASK
2125     if (CmiMyRank() % 6 == 4)
2126 #endif
2127     {
2128 #if CMK_USE_OOB
2129     SendBufferMsg(&smsg_oob_queue, NULL);
2130     SendBufferMsg(&smsg_queue, &smsg_oob_queue);
2131 #else
2132     SendBufferMsg(&smsg_queue, NULL);
2133 #endif
2134     }
2135
2136 #if CMK_WORKER_SINGLE_TASK
2137     if (CmiMyRank() % 6 == 5)
2138 #endif
2139 #if CMK_SMP
2140     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
2141 #else
2142     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
2143 #endif
2144
2145 #endif
2146 #endif
2147 }
2148 #endif
2149
2150
2151 /* useDynamicSMSG */
2152 static void    PumpDatagramConnection()
2153 {
2154     uint32_t          remote_address;
2155     uint32_t          remote_id;
2156     gni_return_t status;
2157     gni_post_state_t  post_state;
2158     uint64_t          datagram_id;
2159     int i;
2160
2161    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2162    {
2163        if (datagram_id >= mysize) {           /* bound endpoint */
2164            int pe = datagram_id - mysize;
2165            CMI_GNI_LOCK(global_gni_lock)
2166            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2167            CMI_GNI_UNLOCK(global_gni_lock)
2168            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2169            {
2170                CmiAssert(remote_id == pe);
2171                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2172                GNI_RC_CHECK("Dynamic SMSG Init", status);
2173 #if PRINT_SYH
2174                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2175 #endif
2176                CmiAssert(smsg_connected_flag[pe] == 1);
2177                smsg_connected_flag[pe] = 2;
2178            }
2179        }
2180        else {         /* unbound ep */
2181            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2182            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2183            {
2184                CmiAssert(remote_id<mysize);
2185                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2186                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2187                GNI_RC_CHECK("Dynamic SMSG Init", status);
2188 #if PRINT_SYH
2189                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2190 #endif
2191                smsg_connected_flag[remote_id] = 2;
2192
2193                alloc_smsg_attr(&send_smsg_attr);
2194                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2195                GNI_RC_CHECK("post unbound datagram", status);
2196            }
2197        }
2198    }
2199 }
2200
2201 /* pooling CQ to receive network message */
2202 static void PumpNetworkRdmaMsgs()
2203 {
2204     gni_cq_entry_t      event_data;
2205     gni_return_t        status;
2206
2207 }
2208
2209 INLINE_KEYWORD
2210 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2211 {
2212     RDMA_REQUEST        *rdma_request_msg;
2213     MallocRdmaRequest(rdma_request_msg);
2214     rdma_request_msg->destNode = inst_id;
2215     rdma_request_msg->pd = pd;
2216 #if REMOTE_EVENT
2217     rdma_request_msg->ack_index = ack_index;
2218 #endif
2219     PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2220 }
2221
2222 static void getLargeMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2223 static void getPersistentMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2224 static void PRINT_CONTROL(void *header)
2225 {
2226     CONTROL_MSG *control_msg = (CONTROL_MSG *) header;
2227
2228     printf(" length=%d , seq_id = %d, addr = %lld:%lld:%lld  \n", control_msg->length, control_msg->seq_id, control_msg->source_addr, (control_msg->source_mem_hndl).qword1, (control_msg->source_mem_hndl).qword2 );
2229 #if PERSISTENT_GET_BASE
2230     printf(" memhdl = %lld:%lld:%lld \n", control_msg->dest_addr, (control_msg->dest_mem_hndl).qword1, (control_msg->dest_mem_hndl).qword2);
2231 #endif
2232 }
2233 static void PumpNetworkSmsg()
2234 {
2235     uint64_t            inst_id;
2236     gni_cq_entry_t      event_data;
2237     gni_return_t        status;
2238     void                *header;
2239     uint8_t             msg_tag;
2240     int                 msg_nbytes;
2241     void                *msg_data;
2242     gni_mem_handle_t    msg_mem_hndl;
2243     gni_smsg_attr_t     *smsg_attr;
2244     gni_smsg_attr_t     *remote_smsg_attr;
2245     int                 init_flag;
2246     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2247     uint64_t            source_addr;
2248     SMSG_QUEUE         *queue = &smsg_queue;
2249     PCQueue             tmp_queue;
2250 #if  CMK_DIRECT
2251     cmidirectMsg        *direct_msg;
2252 #endif
2253 #if CMI_PUMPNETWORKSMSG_CAP 
2254     int                  recv_cnt = 0;
2255     while(recv_cnt< PumpNetworkSmsg_cap) {
2256 #else
2257     while(1) {
2258 #endif
2259         CMI_GNI_LOCK(smsg_rx_cq_lock)
2260         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2261         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2262         if(status != GNI_RC_SUCCESS) break;
2263
2264         inst_id = GNI_CQ_GET_INST_ID(event_data);
2265 #if REMOTE_EVENT
2266         inst_id = GET_RANK(inst_id);      /* important */
2267 #endif
2268         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2269 #if PRINT_SYH
2270         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2271 #endif
2272         if (useDynamicSMSG) {
2273             /* subtle: smsg may come before connection is setup */
2274             while (smsg_connected_flag[inst_id] != 2) 
2275                PumpDatagramConnection();
2276         }
2277         msg_tag = GNI_SMSG_ANY_TAG;
2278         while(1) {
2279             CMI_GNI_LOCK(smsg_mailbox_lock)
2280             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2281             if (status != GNI_RC_SUCCESS)
2282             {
2283                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2284                 break;
2285             }
2286 #if         CMI_PUMPNETWORKSMSG_CAP
2287             recv_cnt++; 
2288 #endif
2289 #if PRINT_SYH
2290             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2291 #endif
2292             /* copy msg out and then put into queue (small message) */
2293             switch (msg_tag) {
2294             case SMALL_DATA_TAG:
2295             {
2296                 msg_nbytes = CmiGetMsgSize(header);
2297                 CmiAssert(msg_nbytes > 0);
2298                 msg_data    = CmiAlloc(msg_nbytes);
2299                 memcpy(msg_data, (char*)header, msg_nbytes);
2300                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2301                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2302                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2303                 handleOneRecvedMsg(msg_nbytes, msg_data);
2304                 break;
2305             }
2306             case LMSG_PERSISTENT_INIT_TAG:
2307             {   CMI_GNI_UNLOCK(smsg_mailbox_lock)
2308                 getPersistentMsgRequest(header, inst_id, msg_tag, sendRdmaBuf);
2309                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2310                 break;
2311             }
2312             case LMSG_INIT_TAG:
2313             case LMSG_OOB_INIT_TAG:
2314             {
2315                 tmp_queue = (msg_tag == LMSG_INIT_TAG)? sendRdmaBuf : sendHighPriorBuf; 
2316 #if MULTI_THREAD_SEND
2317                 MallocControlMsg(control_msg_tmp);
2318                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2319                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2320                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2321                 getLargeMsgRequest(control_msg_tmp, inst_id, msg_tag, tmp_queue);
2322                 FreeControlMsg(control_msg_tmp);
2323 #else
2324                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2325                 getLargeMsgRequest(header, inst_id, msg_tag, tmp_queue);
2326                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2327 #endif
2328                 break;
2329             }
2330 #if !REMOTE_EVENT && !CQWRITE
2331             case ACK_TAG:   //msg fit into mempool
2332             {
2333                 /* Get is done, release message . Now put is not used yet*/
2334                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2335                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2336                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2337 #if ! USE_LRTS_MEMPOOL
2338                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2339 #else
2340                 DecreaseMsgInSend(msg);
2341 #endif
2342                 if(NoMsgInSend(msg))
2343                     buffered_send_msg -= GetMempoolsize(msg);
2344                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2345                 CmiFree(msg);
2346 #if CMI_EXERT_SEND_LARGE_CAP
2347                 SEND_large_pending--;
2348 #endif
2349                 break;
2350             }
2351 #endif
2352             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2353             {
2354 #if MULTI_THREAD_SEND
2355                 MallocControlMsg(header_tmp);
2356                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2357                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2358 #else
2359                 header_tmp = (CONTROL_MSG *) header;
2360 #endif
2361                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2362 #if CMI_EXERT_SEND_LARGE_CAP
2363                 SEND_large_pending--;
2364 #endif
2365                 void *msg = (void*)(header_tmp->source_addr);
2366                 int cur_seq = CmiGetMsgSeq(msg);
2367                 int offset = ONE_SEG*(cur_seq+1);
2368                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2369                 buffered_send_msg -= header_tmp->length;
2370                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2371                 if (remain_size < 0) remain_size = 0;
2372                 CmiSetMsgSize(msg, remain_size);
2373                 if(remain_size <= 0) //transaction done
2374                 {
2375                     CmiFree(msg);
2376                 }else if (header_tmp->total_length > offset)
2377                 {
2378                     CmiSetMsgSeq(msg, cur_seq+1);
2379                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2380                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2381                     //send next seg
2382                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2383                          // pipelining
2384                     if (header_tmp->seq_id == 1) {
2385                       int i;
2386                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2387                         int seq = cur_seq+i+2;
2388                         CmiSetMsgSeq(msg, seq-1);
2389                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2390                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2391                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2392                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2393                       }
2394                     }
2395                 }
2396 #if MULTI_THREAD_SEND
2397                 FreeControlMsg(header_tmp);
2398 #else
2399                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2400 #endif
2401                 break;
2402             }
2403 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE
2404             case PUT_DONE_TAG:  {   //persistent message
2405                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2406                 int size = ((CONTROL_MSG *) header)->length;
2407                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2408                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2409                 CmiReference(msg);
2410                 CMI_CHECK_CHECKSUM(msg, size);
2411                 handleOneRecvedMsg(size, msg); 
2412 #if PRINT_SYH
2413                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2414 #endif
2415                 break;
2416             }
2417 #endif
2418 #if CMK_DIRECT
2419             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2420                 //create a trigger message
2421                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2422                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2423                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2424                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2425                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2426                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2427                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2428                 break;
2429 #endif
2430             default:
2431                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2432                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2433                 printf("weird tag problem %d \n", msg_tag);
2434                 CmiAbort("Unknown tag\n");
2435             }               // end switch
2436 #if PRINT_SYH
2437             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2438 #endif
2439             smsg_recv_count ++;
2440             msg_tag = GNI_SMSG_ANY_TAG;
2441         } //endwhile GNI_SmsgGetNextWTag
2442     }   //end while GetEvent
2443     if(status == GNI_RC_ERROR_RESOURCE)
2444     {
2445         printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
2446         GNI_RC_CHECK("Smsg_rx_cq full", status);
2447     }
2448 }
2449
2450 static void printDesc(gni_post_descriptor_t *pd)
2451 {
2452     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2453 }
2454
2455 #if CQWRITE
2456 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2457 {
2458     gni_post_descriptor_t *pd;
2459     gni_return_t        status = GNI_RC_SUCCESS;
2460     
2461     MallocPostDesc(pd);
2462     pd->type = GNI_POST_CQWRITE;
2463     pd->cq_mode = GNI_CQMODE_SILENT;
2464     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2465     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2466     pd->cqwrite_value = data;
2467     pd->remote_mem_hndl = mem_hndl;
2468     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2469     GNI_RC_CHECK("GNI_PostCqWrite", status);
2470 }
2471 #endif
2472
2473 // register memory for a message
2474 // return mem handle
2475 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2476 {
2477     gni_return_t status = GNI_RC_SUCCESS;
2478
2479     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2480
2481 #if CMK_PERSISTENT_COMM_PUT
2482       // persistent message is always registered
2483       // BIG_MSG small pieces do not have malloc chunk header
2484     if (IS_PERSISTENT_MEMORY(msg)) {
2485         *memh = GetMemHndl(msg);
2486         return GNI_RC_SUCCESS;
2487     }
2488 #endif
2489     if(seqno == 0 
2490 #if CMK_PERSISTENT_COMM_PUT
2491          || seqno == PERSIST_SEQ
2492 #endif
2493       )
2494     {
2495         if(IsMemHndlZero((GetMemHndl(msg))))
2496         {
2497             msg = (void*)(msg);
2498             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2499             if(status == GNI_RC_SUCCESS)
2500                 *memh = GetMemHndl(msg);
2501         }
2502         else {
2503             *memh = GetMemHndl(msg);
2504         }
2505     }
2506     else {
2507         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2508         status = registerMemory(msg, size, memh, NULL); 
2509     }
2510     return status;
2511 }
2512
2513 static void getPersistentMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2514 {
2515 #if   PERSISTENT_GET_BASE
2516     CONTROL_MSG         *request_msg;
2517     gni_return_t        status;
2518     gni_post_descriptor_t *pd;
2519     request_msg = (CONTROL_MSG *) header;
2520
2521     MallocPostDesc(pd);
2522     pd->cqwrite_value = request_msg->seq_id;
2523     pd->first_operand = ALIGN64(request_msg->length); //  total length
2524     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2525         pd->type            = GNI_POST_FMA_GET;
2526     else
2527         pd->type            = GNI_POST_RDMA_GET;
2528     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
2529     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2530     pd->length          = ALIGN64(request_msg->length);
2531     pd->local_addr      = (uint64_t) request_msg->dest_addr;
2532     pd->local_mem_hndl  = request_msg->dest_mem_hndl;
2533     pd->remote_addr     = (uint64_t) request_msg->source_addr;
2534     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2535     pd->src_cq_hndl     = 0;
2536     pd->rdma_mode       = 0;
2537     pd->amo_cmd         = 0;
2538 #if REMOTE_EVENT
2539     bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2540 #else
2541     bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2542 #endif
2543
2544 #endif
2545 }
2546
2547 // for BIG_MSG called on receiver side for receiving control message
2548 // LMSG_INIT_TAG
2549 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2550 {
2551 #if     USE_LRTS_MEMPOOL
2552     CONTROL_MSG         *request_msg;
2553     gni_return_t        status = GNI_RC_SUCCESS;
2554     void                *msg_data;
2555     gni_post_descriptor_t *pd;
2556     gni_mem_handle_t    msg_mem_hndl;
2557     int                 size, transaction_size, offset = 0;
2558     size_t              register_size = 0;
2559
2560     // initial a get to transfer data from the sender side */
2561     request_msg = (CONTROL_MSG *) header;
2562     size = request_msg->total_length;
2563     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2564     MallocPostDesc(pd);
2565 #if CMK_WITH_STATS 
2566     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2567 #endif
2568     if(request_msg->seq_id < 2)   {
2569         MACHSTATE2(8, "%d seq id in get large msg requrest %d\n", CmiMyRank(), request_msg->seq_id);
2570 #if CMK_SMP_TRACE_COMMTHREAD 
2571         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2572 #endif
2573         msg_data = CmiAlloc(size);
2574         CmiSetMsgSeq(msg_data, 0);
2575         _MEMCHECK(msg_data);
2576     }
2577     else {
2578         offset = ONE_SEG*(request_msg->seq_id-1);
2579         msg_data = (char*)request_msg->dest_addr + offset;
2580     }
2581    
2582     pd->cqwrite_value = request_msg->seq_id;
2583
2584     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2585     SetMemHndlZero(pd->local_mem_hndl);
2586     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2587     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2588         if(NoMsgInRecv( (void*)(msg_data)))
2589             register_size = GetMempoolsize((void*)(msg_data));
2590     }
2591
2592     pd->first_operand = ALIGN64(size);                   //  total length
2593
2594     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2595         pd->type            = GNI_POST_FMA_GET;
2596     else
2597         pd->type            = GNI_POST_RDMA_GET;
2598     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2599     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2600     pd->length          = transaction_size;
2601     pd->local_addr      = (uint64_t) msg_data;
2602     pd->remote_addr     = request_msg->source_addr + offset;
2603     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2604
2605     if (tag == LMSG_OOB_INIT_TAG) 
2606         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2607     else
2608     {
2609 #if MULTI_THREAD_SEND
2610         pd->src_cq_hndl     = rdma_tx_cqh;
2611 #else
2612         pd->src_cq_hndl     = 0;
2613 #endif
2614     }
2615
2616     pd->rdma_mode       = 0;
2617     pd->amo_cmd         = 0;
2618 #if CMI_EXERT_RECV_RDMA_CAP
2619     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2620 #endif
2621     //memory registration success
2622     if(status == GNI_RC_SUCCESS && tag == LMSG_OOB_INIT_TAG )
2623     {
2624         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2625         CMI_GNI_LOCK(lock)
2626 #if REMOTE_EVENT
2627         if( request_msg->seq_id == 0)
2628         {
2629             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2630             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2631             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2632         }
2633 #endif
2634
2635 #if CMK_WITH_STATS
2636         RDMA_TRY_SEND(pd->type)
2637 #endif
2638         if(pd->type == GNI_POST_RDMA_GET) 
2639         {
2640             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2641         }
2642         else
2643         {
2644             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2645         }
2646         CMI_GNI_UNLOCK(lock)
2647
2648         if(status == GNI_RC_SUCCESS )
2649         {
2650 #if CMI_EXERT_RECV_RDMA_CAP
2651             RDMA_pending++;
2652 #endif
2653             if(pd->cqwrite_value == 0)
2654             {
2655 #if MACHINE_DEBUG_LOG
2656                 buffered_recv_msg += register_size;
2657                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2658 #endif
2659                 IncreaseMsgInRecv(msg_data);
2660 #if CMK_SMP_TRACE_COMMTHREAD 
2661                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2662 #endif
2663             }
2664 #if  CMK_WITH_STATS
2665             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2666             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2667 #endif
2668         }
2669     }else if (status != GNI_RC_SUCCESS)
2670     {
2671         SetMemHndlZero((pd->local_mem_hndl));
2672     }
2673         if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM || tag != LMSG_OOB_INIT_TAG)
2674     {
2675 #if REMOTE_EVENT
2676         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2677 #else
2678         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2679 #endif
2680     }else if (status != GNI_RC_SUCCESS) {
2681         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2682         GNI_RC_CHECK("GetLargeAFter posting", status);
2683     }
2684 #else
2685     CONTROL_MSG         *request_msg;
2686     gni_return_t        status;
2687     void                *msg_data;
2688     gni_post_descriptor_t *pd;
2689     RDMA_REQUEST        *rdma_request_msg;
2690     gni_mem_handle_t    msg_mem_hndl;
2691     //int source;
2692     // initial a get to transfer data from the sender side */
2693     request_msg = (CONTROL_MSG *) header;
2694     msg_data = CmiAlloc(request_msg->length);
2695     _MEMCHECK(msg_data);
2696
2697     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2698
2699     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2700     {
2701         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2702     }
2703
2704     MallocPostDesc(pd);
2705     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2706         pd->type            = GNI_POST_FMA_GET;
2707     else
2708         pd->type            = GNI_POST_RDMA_GET;
2709     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2710     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2711     pd->length          = ALIGN64(request_msg->length);
2712     pd->local_addr      = (uint64_t) msg_data;
2713     pd->remote_addr     = request_msg->source_addr;
2714     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2715     if (tag == LMSG_OOB_INIT_TAG) 
2716         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2717     else
2718     {
2719 #if MULTI_THREAD_SEND
2720         pd->src_cq_hndl     = rdma_tx_cqh;
2721 #else
2722         pd->src_cq_hndl     = 0;
2723 #endif
2724     }
2725     pd->rdma_mode       = 0;
2726     pd->amo_cmd         = 0;
2727
2728     //memory registration successful
2729     if(status == GNI_RC_SUCCESS)
2730     {
2731         pd->local_mem_hndl  = msg_mem_hndl;
2732        
2733         if(pd->type == GNI_POST_RDMA_GET) 
2734         {
2735             CMI_GNI_LOCK(rdma_tx_cq_lock)
2736             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2737             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2738         }
2739         else
2740         {
2741             CMI_GNI_LOCK(default_tx_cq_lock)
2742             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2743             CMI_GNI_UNLOCK(default_tx_cq_lock)
2744         }
2745
2746     }else
2747     {
2748         SetMemHndlZero(pd->local_mem_hndl);
2749     }
2750     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2751     {
2752         MallocRdmaRequest(rdma_request_msg);
2753         rdma_request_msg->next = 0;
2754         rdma_request_msg->destNode = inst_id;
2755         rdma_request_msg->pd = pd;
2756         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2757     }else {
2758         GNI_RC_CHECK("AFter posting", status);
2759     }
2760 #endif
2761 }
2762
2763 #if CQWRITE
2764 static void PumpCqWriteTransactions()
2765 {
2766
2767     gni_cq_entry_t          ev;
2768     gni_return_t            status;
2769     void                    *msg;  
2770     int                     msg_size;
2771     while(1) {
2772         //CMI_GNI_LOCK(my_cq_lock) 
2773         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2774         //CMI_GNI_UNLOCK(my_cq_lock)
2775         if(status != GNI_RC_SUCCESS) break;
2776         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2777 #if CMK_PERSISTENT_COMM_PUT
2778 #if PRINT_SYH
2779         printf(" %d CQ write event %p\n", myrank, msg);
2780 #endif
2781         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2782 #if PRINT_SYH
2783             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2784 #endif
2785             CmiReference(msg);
2786             msg_size = CmiGetMsgSize(msg);
2787             CMI_CHECK_CHECKSUM(msg, msg_size);
2788             handleOneRecvedMsg(msg_size, msg); 
2789             continue;
2790         }
2791 #endif
2792 #if ! USE_LRTS_MEMPOOL
2793        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2794 #else
2795         DecreaseMsgInSend(msg);
2796 #endif
2797         if(NoMsgInSend(msg))
2798             buffered_send_msg -= GetMempoolsize(msg);
2799         CmiFree(msg);
2800     };
2801     if(status == GNI_RC_ERROR_RESOURCE)
2802     {
2803         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2804     }
2805 }
2806 #endif
2807
2808 #if REMOTE_EVENT
2809 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2810 {
2811     gni_cq_entry_t          ev;
2812     gni_return_t            status;
2813     void                    *msg;   
2814     int                     inst_id, index, type, size;
2815
2816 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2817     int                     pump_count = 0;
2818 #endif
2819     while(1) {
2820 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2821         if (pump_count > PumpRemoteTransactions_cap) break;
2822 #endif
2823         CMI_GNI_LOCK(global_gni_lock)
2824 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
2825         status = GNI_CqGetEvent(rx_cqh, &ev);
2826 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2827         CMI_GNI_UNLOCK(global_gni_lock)
2828
2829         if(status != GNI_RC_SUCCESS) break;
2830
2831 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2832         pump_count ++;
2833 #endif
2834
2835         inst_id = GNI_CQ_GET_INST_ID(ev);
2836         index = GET_INDEX(inst_id);
2837         type = GET_TYPE(inst_id);
2838         switch (type) {
2839         case 0:    // ACK
2840             CmiAssert(index>=0 && index<ackPool.size);
2841             CMI_GNI_LOCK(ackPool.lock);
2842             //CmiAssert(GetIndexType(ackPool, index) == 1);
2843             msg = GetIndexAddress(ackPool, index);
2844             CMI_GNI_UNLOCK(ackPool.lock);
2845 #if PRINT_SYH
2846             MACHSTATE4(8,"[%d] PumpRemoteTransactions: ack: %lld index: %d type: %d.\n", myrank, msg, index, type);
2847 #endif
2848 #if ! USE_LRTS_MEMPOOL
2849            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2850 #else
2851             DecreaseMsgInSend(msg);
2852 #endif
2853             if(NoMsgInSend(msg))
2854                 buffered_send_msg -= GetMempoolsize(msg);
2855             CmiFree(msg);
2856             IndexPool_freeslot(&ackPool, index);
2857 #if CMI_EXERT_SEND_LARGE_CAP
2858             SEND_large_pending--;
2859 #endif
2860             break;
2861 #if CMK_PERSISTENT_COMM_PUT
2862         case 1:  {    // PERSISTENT
2863             CmiLock(persistPool.lock);
2864             CmiAssert(GetIndexType(persistPool, index) == 2);
2865             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2866             CmiUnlock(persistPool.lock);
2867             msg = slot->destBuf[slot->addrIndex].destAddress;
2868             size = CmiGetMsgSize(msg);
2869             CmiReference(msg);
2870             CMI_CHECK_CHECKSUM(msg, size);
2871             handleOneRecvedMsg(size, msg); 
2872             break;
2873             }
2874 #endif
2875         default:
2876             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2877             CmiAbort("PumpRemoteTransactions: unknown type");
2878         }
2879     }
2880     if(status == GNI_RC_ERROR_RESOURCE)
2881     {
2882         printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
2883         GNI_RC_CHECK("PumpRemoteTransactions: rx_cqh full", status);
2884     }
2885 }
2886 #endif
2887
2888 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2889 {
2890     gni_cq_entry_t          ev;
2891     gni_return_t            status;
2892     uint64_t                type, inst_id;
2893     gni_post_descriptor_t   *tmp_pd;
2894     MSG_LIST                *ptr;
2895     CONTROL_MSG             *ack_msg_tmp;
2896     ACK_MSG                 *ack_msg;
2897     uint8_t                 msg_tag;
2898 #if CMK_DIRECT
2899     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2900 #endif
2901     SMSG_QUEUE         *queue = &smsg_queue;
2902 #if CMI_PUMPLOCALTRANSACTIONS_CAP
2903     int         pump_count = 0;
2904     while(pump_count < PumpLocalTransactions_cap) {
2905         pump_count++;
2906 #else
2907     while(1) {
2908 #endif
2909         CMI_GNI_LOCK(my_cq_lock) 
2910         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2911         CMI_GNI_UNLOCK(my_cq_lock)
2912         if(status != GNI_RC_SUCCESS) break;
2913         
2914         type = GNI_CQ_GET_TYPE(ev);
2915         if (type == GNI_CQ_EVENT_TYPE_POST)
2916         {
2917
2918 #if CMI_EXERT_RECV_RDMA_CAP
2919             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2920             RDMA_pending--;
2921 #endif
2922             inst_id     = GNI_CQ_GET_INST_ID(ev);
2923 #if PRINT_SYH
2924             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2925 #endif
2926             CMI_GNI_LOCK(my_cq_lock)
2927             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2928             CMI_GNI_UNLOCK(my_cq_lock)
2929
2930             switch (tmp_pd->type) {
2931 #if CMK_PERSISTENT_COMM_PUT  || CMK_DIRECT
2932             case GNI_POST_RDMA_PUT:
2933 #if CMK_PERSISTENT_COMM_PUT && ! USE_LRTS_MEMPOOL
2934                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2935 #endif
2936             case GNI_POST_FMA_PUT:
2937                 if(tmp_pd->amo_cmd == 1) {
2938 #if CMK_DIRECT
2939                     //sender ACK to receiver to trigger it is done
2940                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2941                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2942                     msg_tag = DIRECT_PUT_DONE_TAG;
2943 #endif
2944                 }
2945                 else {
2946                     CmiFree((void *)tmp_pd->local_addr);
2947 #if REMOTE_EVENT
2948                     FreePostDesc(tmp_pd);
2949                     continue;
2950 #elif CQWRITE
2951                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2952                     FreePostDesc(tmp_pd);
2953                     continue;
2954 #else
2955                     MallocControlMsg(ack_msg_tmp);
2956                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2957                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2958                     ack_msg_tmp->length  = tmp_pd->length;
2959                     msg_tag = PUT_DONE_TAG;
2960 #endif
2961                 }
2962                 break;
2963 #endif
2964             case GNI_POST_RDMA_GET:
2965             case GNI_POST_FMA_GET:  {
2966                 MACHSTATE2(8, "PumpLocal Get done %lld=>%lld\n", tmp_pd->local_addr, tmp_pd->remote_addr);
2967 #if  ! USE_LRTS_MEMPOOL
2968                 MallocControlMsg(ack_msg_tmp);
2969                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2970                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2971                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2972                 msg_tag = ACK_TAG;  
2973 #else
2974 #if CMK_WITH_STATS
2975                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2976 #endif
2977                 int seq_id = tmp_pd->cqwrite_value;
2978                 if(seq_id > 0)      // BIG_MSG
2979                 {
2980                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2981                     MallocControlMsg(ack_msg_tmp);
2982                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2983                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2984                     ack_msg_tmp->seq_id = seq_id;
2985                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2986                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2987                     ack_msg_tmp->length = tmp_pd->length;
2988                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2989                     msg_tag = BIG_MSG_TAG; 
2990                 } 
2991                 else
2992                 {
2993                     if(seq_id < 0)
2994                         CmiReference((void*)tmp_pd->local_addr);
2995                     msg_tag = ACK_TAG; 
2996 #if  !REMOTE_EVENT && !CQWRITE
2997                     MallocAckMsg(ack_msg);
2998                     ack_msg->source_addr = tmp_pd->remote_addr;
2999 #endif
3000                 }
3001 #endif
3002                 break;
3003             }
3004             case  GNI_POST_CQWRITE:
3005                    FreePostDesc(tmp_pd);
3006                    continue;
3007             default:
3008                 CmiPrintf("type=%d\n", tmp_pd->type);
3009                 CmiAbort("PumpLocalTransactions: unknown type!");
3010             }      /* end of switch */
3011
3012 #if CMK_DIRECT
3013             if (tmp_pd->amo_cmd == 1) {
3014                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL, 0);
3015 #if ! CMK_SMSGS_FREE_AFTER_EVENT
3016                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
3017 #endif
3018             }
3019             else
3020 #endif
3021             if (msg_tag == ACK_TAG) {
3022 #if !REMOTE_EVENT
3023 #if   !CQWRITE
3024                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL, 0); 
3025 #if !CMK_SMSGS_FREE_AFTER_EVENT
3026                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
3027 #endif
3028 #else
3029                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
3030 #endif
3031 #endif
3032             }
3033             else {
3034                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL, 0); 
3035 #if !CMK_SMSGS_FREE_AFTER_EVENT
3036                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
3037 #endif
3038             }
3039 #if CMK_PERSISTENT_COMM_PUT
3040             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
3041 #endif
3042             {
3043                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
3044 #if PRINT_SYH
3045                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
3046 #endif
3047                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
3048                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
3049
3050                     //CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
3051                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
3052 #if MACHINE_DEBUG_LOG
3053                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
3054                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
3055                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
3056 #endif
3057                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, CmiGetMsgSize(tmp_pd->local_addr));
3058                     handleOneRecvedMsg(CmiGetMsgSize(tmp_pd->local_addr), (void*)tmp_pd->local_addr);
3059                 }else if(msg_tag == BIG_MSG_TAG){
3060                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
3061                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
3062                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
3063                         START_EVENT();
3064 #if PRINT_SYH
3065                         printf("Pipeline msg done [%d]\n", myrank);
3066 #endif
3067 #if     CMK_SMP_TRACE_COMMTHREAD
3068                         if( tmp_pd->cqwrite_value == 1)
3069                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
3070 #endif
3071                         CMI_CHECK_CHECKSUM(msg, CmiGetMsgSize(msg));
3072                         handleOneRecvedMsg(CmiGetMsgSize(msg), msg);
3073                     }
3074                 }
3075             }
3076             FreePostDesc(tmp_pd);
3077         }
3078 #if CMK_SMSGS_FREE_AFTER_EVENT
3079         else if  (type == GNI_CQ_EVENT_TYPE_SMSG) {
3080             // a SmsgsSend is done
3081             int msgid = GNI_CQ_GET_MSG_ID(ev);
3082             int type = GetSmsgsIndexType(smsgsPool, msgid);
3083             void *addr = GetSmsgsIndexAddress(smsgsPool, msgid);
3084             switch (type) {
3085             case 1:
3086                 CmiFree(addr);
3087                 break;
3088             case 2:
3089                 free(addr);
3090                 break;
3091             default:
3092                 CmiAbort("Invalid SmsgsIndex");
3093             }
3094             SmsgsPool_freeslot(&smsgsPool, msgid);
3095         }
3096 #endif
3097     } //end while
3098     if(status == GNI_RC_ERROR_RESOURCE)
3099     {
3100         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
3101         GNI_RC_CHECK("Smsg_tx_cq full", status);
3102     }
3103 }
3104
3105 static void  SendRdmaMsg( BufferList sendqueue)
3106 {
3107     gni_return_t            status = GNI_RC_SUCCESS;
3108     gni_mem_handle_t        msg_mem_hndl;
3109     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
3110     RDMA_REQUEST            *pre = 0;
3111     uint64_t                register_size = 0;
3112     void                    *msg;
3113     int                     i;
3114
3115     int len = PCQueueLength(sendqueue);
3116     for (i=0; i<len; i++)
3117     {
3118 #if CMI_EXERT_RECV_RDMA_CAP
3119         if( RDMA_pending >= RDMA_cap) break;
3120 #endif
3121         CMI_PCQUEUEPOP_LOCK( sendqueue)
3122         ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
3123         CMI_PCQUEUEPOP_UNLOCK( sendqueue)
3124         if (ptr == NULL) break;
3125         
3126         gni_post_descriptor_t *pd = ptr->pd;
3127         
3128         msg = (void*)(pd->local_addr);
3129         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
3130         register_size = 0;
3131         if(pd->cqwrite_value == 0) {
3132             if(NoMsgInRecv(msg))
3133                 register_size = GetMempoolsize(msg);
3134         }
3135
3136         if(status == GNI_RC_SUCCESS)        //mem register good
3137         {
3138             int destNode = ptr->destNode;
3139             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
3140             CMI_GNI_LOCK(lock);
3141 #if REMOTE_EVENT
3142             if( pd->cqwrite_value == 0 || pd->cqwrite_value == -1) {
3143                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3144                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
3145                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3146             }
3147 #if CMK_PERSISTENT_COMM_PUT
3148             else if (pd->cqwrite_value == PERSIST_SEQ) {
3149                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3150                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
3151                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3152             }
3153 #endif
3154 #if CMK_DIRECT
3155             else if (pd->cqwrite_value == DIRECT_SEQ) {
3156                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3157                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, DIRECT_EVENT(ptr->ack_index));
3158                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3159             }
3160 #endif
3161
3162 #endif
3163 #if CMK_WITH_STATS
3164             RDMA_TRY_SEND(pd->type)
3165 #endif
3166 #if CMK_SMP_TRACE_COMMTHREAD
3167             if(IS_PUT(pd->type))
3168             {
3169                  START_EVENT();
3170                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
3171             }
3172 #endif
3173             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
3174             {
3175                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
3176             }
3177             else
3178             {
3179                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3180             }
3181             CMI_GNI_UNLOCK(lock);
3182             
3183             if(status == GNI_RC_SUCCESS)    //post good
3184             {
3185                 MACHSTATE4(8, "post noempty-rdma  %d (%lld==%lld,%d) \n", ptr->destNode, pd->local_addr, pd->remote_addr,  register_memory_size); 
3186 #if CMI_EXERT_RECV_RDMA_CAP
3187                 RDMA_pending ++;
3188 #endif
3189                 if(pd->cqwrite_value <= 0)
3190                 {
3191 #if CMK_SMP_TRACE_COMMTHREAD 
3192                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3193 #endif
3194                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3195                 }
3196 #if  CMK_WITH_STATS
3197                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3198                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3199 #endif
3200 #if MACHINE_DEBUG_LOG
3201                 buffered_recv_msg += register_size;
3202                 MACHSTATE(8, "GO request from buffered\n"); 
3203 #endif
3204 #if PRINT_SYH
3205                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
3206 #endif
3207                 FreeRdmaRequest(ptr);
3208             }else           // cannot post
3209             {
3210                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3211 #if PRINT_SYH
3212                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3213 #endif
3214                 break;
3215             }
3216         } else          //memory registration fails
3217         {
3218             PCQueuePush(sendqueue, (char*)ptr);
3219         }
3220     } //end while
3221 }
3222
3223 static 
3224 INLINE_KEYWORD gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
3225 {
3226     CONTROL_MSG         *control_msg_tmp;
3227     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
3228
3229     MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3230     if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {   
3231             /* connection not exists yet */
3232 #if CMK_SMP
3233             /* non-smp case, connect is issued in send_smsg_message */
3234         if (smsg_connected_flag[ptr->destNode] == 0)
3235             connect_to(ptr->destNode); 
3236 #endif
3237     }
3238     else
3239     switch(ptr->tag)
3240     {
3241     case SMALL_DATA_TAG:
3242         status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr, 1);  
3243 #if !CMK_SMSGS_FREE_AFTER_EVENT
3244         if(status == GNI_RC_SUCCESS)
3245         {
3246             CmiFree(ptr->msg);
3247         }
3248 #endif
3249         break;
3250     case LMSG_PERSISTENT_INIT_TAG:
3251     case LMSG_INIT_TAG:
3252     case LMSG_OOB_INIT_TAG:
3253         control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3254         status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr, ptr->tag);
3255         break;
3256 #if !REMOTE_EVENT && !CQWRITE
3257     case ACK_TAG:
3258         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, 0);  
3259 #if !CMK_SMSGS_FREE_AFTER_EVENT
3260         if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3261 #endif
3262         break;
3263 #endif
3264     case BIG_MSG_TAG:
3265         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, 0);  
3266 #if !CMK_SMSGS_FREE_AFTER_EVENT
3267         if(status == GNI_RC_SUCCESS)
3268         {
3269             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3270         }
3271 #endif
3272         break;
3273 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE 
3274     case PUT_DONE_TAG:
3275         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, 0);
3276 #if !CMK_SMSGS_FREE_AFTER_EVENT
3277         if(status == GNI_RC_SUCCESS)
3278         {
3279             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3280         }
3281 #endif
3282         break;
3283 #endif
3284 #if CMK_DIRECT
3285     case DIRECT_PUT_DONE_TAG:
3286         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr, 0);
3287 #if !CMK_SMSGS_FREE_AFTER_EVENT
3288         if(status == GNI_RC_SUCCESS)
3289         {
3290             free((CMK_DIRECT_HEADER*)ptr->msg);
3291         }
3292 #endif
3293         break;
3294 #endif
3295     default:
3296         printf("Weird tag\n");
3297         CmiAbort("should not happen\n");
3298     }       // end switch
3299     return status;
3300 }
3301
3302 // return 1 if all messages are sent
3303
3304 #if ONE_SEND_QUEUE
3305
3306 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3307 {
3308     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3309     CONTROL_MSG         *control_msg_tmp;
3310     gni_return_t        status;
3311     int                 done = 1;
3312     uint64_t            register_size;
3313     void                *register_addr;
3314     int                 index_previous = -1;
3315 #if     CMI_SENDBUFFERSMSG_CAP
3316     int                 sent_length = 0;
3317 #endif
3318     int          index = 0;
3319     memset(destpe_avail, 0, mysize * sizeof(char));
3320     for (index=0; index<1; index++)
3321     {
3322         int i, len = PCQueueLength(queue->sendMsgBuf);
3323         for (i=0; i<len; i++) 
3324         {
3325             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3326             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3327             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3328             if(ptr == NULL) break;
3329             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3330                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3331                 continue;
3332             }
3333             status = _sendOneBufferedSmsg(queue, ptr);
3334 #if CMI_SENDBUFFERSMSG_CAP
3335             sent_length++;
3336 #endif
3337             if(status == GNI_RC_SUCCESS)
3338             {
3339 #if PRINT_SYH
3340                 buffered_smsg_counter--;
3341                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3342 #endif
3343                 FreeMsgList(ptr);
3344             }else {
3345                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3346                 done = 0;
3347                 if(status == GNI_RC_ERROR_RESOURCE)
3348                 {
3349                     destpe_avail[ptr->destNode] = 1;
3350                 }
3351             } 
3352         } //end while
3353     }   // end pooling for all cores
3354     return done;
3355 }
3356
3357 #else   /*  ! ONE_SEND_QUEUE  */
3358
3359 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3360 {
3361     MSG_LIST            *ptr;
3362     gni_return_t        status;
3363     int                 done = 1;
3364 #if     CMI_SENDBUFFERSMSG_CAP
3365     int                 sent_length = 0;
3366 #endif
3367     int idx;
3368 #if SMP_LOCKS
3369     int          index = -1;
3370     int nonempty = PCQueueLength(queue->nonEmptyQueues);
3371     for(idx =0; idx<nonempty; idx++) 
3372     {
3373         index++;  if (index >= nonempty) index = 0;
3374 #if CMI_SENDBUFFERSMSG_CAP
3375         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3376 #endif
3377         CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3378         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3379         CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3380         if(current_list == NULL) break; 
3381         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3382             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3383             continue;
3384         }
3385         PCQueue current_queue= current_list->sendSmsgBuf;
3386         CmiLock(current_list->lock);
3387         int i, len = PCQueueLength(current_queue);
3388         current_list->pushed = 0;
3389         CmiUnlock(current_list->lock);
3390 #else      /* ! SMP_LOCKS */
3391     static int          index = -1;
3392     for(idx =0; idx<mysize; idx++) 
3393     {
3394         index++;  if (index == mysize) index = 0;
3395 #if CMI_SENDBUFFERSMSG_CAP
3396         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3397 #endif
3398         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue;             // check urgent queue
3399         //if (index == myrank) continue;
3400         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3401         int i, len = PCQueueLength(current_queue);
3402 #endif
3403         for (i=0; i<len; i++)  {
3404             CMI_PCQUEUEPOP_LOCK(current_queue)
3405             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3406             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3407             if (ptr == 0) break;
3408
3409             status = _sendOneBufferedSmsg(queue, ptr);
3410 #if CMI_SENDBUFFERSMSG_CAP
3411             sent_length++;
3412 #endif
3413             if(status == GNI_RC_SUCCESS)
3414             {
3415 #if PRINT_SYH
3416                 buffered_smsg_counter--;
3417                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3418 #endif
3419                 FreeMsgList(ptr);
3420             }else {
3421                 PCQueuePush(current_queue, (char*)ptr);
3422                 done = 0;
3423                 if(status == GNI_RC_ERROR_RESOURCE)
3424                 {
3425                     break;
3426                 }
3427             } 
3428         } //end for i
3429 #if SMP_LOCKS
3430         CmiLock(current_list->lock);
3431         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3432         {
3433             current_list->pushed = 1;
3434             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3435         }
3436         CmiUnlock(current_list->lock); 
3437 #endif
3438     }   // end pooling for all cores
3439     return done;
3440 }
3441
3442 #endif
3443
3444 static void ProcessDeadlock();
3445 void LrtsAdvanceCommunication(int whileidle)
3446 {
3447     static int count = 0;
3448     /*  Receive Msg first */
3449 #if CMK_SMP_TRACE_COMMTHREAD
3450     double startT, endT;
3451 #endif
3452     if (useDynamicSMSG && whileidle)
3453     {
3454 #if CMK_SMP_TRACE_COMMTHREAD
3455         startT = CmiWallTimer();
3456 #endif
3457         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3458 #if CMK_SMP_TRACE_COMMTHREAD
3459         endT = CmiWallTimer();
3460         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3461 #endif
3462     }
3463
3464     SEND_OOB_SMSG(smsg_oob_queue)
3465     PUMP_REMOTE_HIGHPRIORITY
3466     PUMP_LOCAL_HIGHPRIORITY
3467     POST_HIGHPRIORITY_RDMA
3468     // Receiving small messages and persistent
3469 #if CMK_SMP_TRACE_COMMTHREAD
3470     startT = CmiWallTimer();
3471 #endif
3472     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3473 #if CMK_SMP_TRACE_COMMTHREAD
3474     endT = CmiWallTimer();
3475     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3476 #endif
3477
3478     SEND_OOB_SMSG(smsg_oob_queue)
3479     PUMP_REMOTE_HIGHPRIORITY
3480     PUMP_LOCAL_HIGHPRIORITY
3481     POST_HIGHPRIORITY_RDMA
3482     
3483     ///* Send buffered Message */
3484 #if CMK_SMP_TRACE_COMMTHREAD
3485     startT = CmiWallTimer();
3486 #endif
3487 #if CMK_USE_OOB
3488     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3489 #else
3490     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3491 #endif
3492 #if CMK_SMP_TRACE_COMMTHREAD
3493     endT = CmiWallTimer();
3494     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3495 #endif
3496
3497     SEND_OOB_SMSG(smsg_oob_queue)
3498     PUMP_REMOTE_HIGHPRIORITY
3499     PUMP_LOCAL_HIGHPRIORITY
3500     POST_HIGHPRIORITY_RDMA
3501
3502     //Pump Get messages or PUT messages
3503 #if CMK_SMP_TRACE_COMMTHREAD
3504     startT = CmiWallTimer();
3505 #endif
3506     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3507 #if MULTI_THREAD_SEND
3508     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3509 #endif
3510 #if CMK_SMP_TRACE_COMMTHREAD
3511     endT = CmiWallTimer();
3512     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3513 #endif
3514     
3515     SEND_OOB_SMSG(smsg_oob_queue)
3516     PUMP_REMOTE_HIGHPRIORITY
3517     PUMP_LOCAL_HIGHPRIORITY
3518     POST_HIGHPRIORITY_RDMA
3519     //Pump Remote event
3520 #if CMK_SMP_TRACE_COMMTHREAD
3521     startT = CmiWallTimer();
3522 #endif
3523 #if CQWRITE
3524     PumpCqWriteTransactions();
3525 #endif
3526 #if REMOTE_EVENT
3527     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3528 #endif
3529 #if CMK_SMP_TRACE_COMMTHREAD
3530     endT = CmiWallTimer();
3531     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3532 #endif
3533
3534     SEND_OOB_SMSG(smsg_oob_queue)
3535     PUMP_REMOTE_HIGHPRIORITY
3536     PUMP_LOCAL_HIGHPRIORITY
3537     POST_HIGHPRIORITY_RDMA
3538
3539 #if CMK_SMP_TRACE_COMMTHREAD
3540     startT = CmiWallTimer();
3541 #endif
3542     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3543 #if CMK_SMP_TRACE_COMMTHREAD
3544     endT = CmiWallTimer();
3545     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3546 #endif
3547
3548 #if CMK_SMP && ! LARGEPAGE
3549     if (_detected_hang)  ProcessDeadlock();
3550 #endif
3551 }
3552
3553 static void set_smsg_max()
3554 {
3555     char *env;
3556
3557     if(mysize <=512)
3558     {
3559         SMSG_MAX_MSG = 1024;
3560     }else if (mysize <= 4096)
3561     {
3562         SMSG_MAX_MSG = 1024;
3563     }else if (mysize <= 16384)
3564     {
3565         SMSG_MAX_MSG = 512;
3566     }else {
3567         SMSG_MAX_MSG = 256;
3568     }
3569
3570     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3571     if (env) SMSG_MAX_MSG = atoi(env);
3572     CmiAssert(SMSG_MAX_MSG > 0);
3573 }    
3574
3575 /* useDynamicSMSG */
3576 static void _init_dynamic_smsg()
3577 {
3578     gni_return_t status;
3579     uint32_t     vmdh_index = -1;
3580     int i;
3581
3582     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3583     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3584     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3585     for(i=0; i<mysize; i++) {
3586         smsg_connected_flag[i] = 0;
3587         smsg_attr_vector_local[i] = NULL;
3588         smsg_attr_vector_remote[i] = NULL;
3589     }
3590
3591     set_smsg_max();
3592
3593     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3594     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3595     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3596     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3597     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3598
3599     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3600     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3601     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3602     memset(mailbox_list->mailbox_base, 0, mailbox_list->size);
3603     mailbox_list->offset = 0;
3604     mailbox_list->next = 0;
3605     
3606     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3607         mailbox_list->size, smsg_rx_cqh,
3608         GNI_MEM_READWRITE,   
3609         vmdh_index,
3610         &(mailbox_list->mem_hndl));
3611     GNI_RC_CHECK("MEMORY registration for smsg", status);
3612
3613     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3614     GNI_RC_CHECK("Unbound EP", status);
3615     
3616     alloc_smsg_attr(&send_smsg_attr);
3617
3618     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3619     GNI_RC_CHECK("post unbound datagram", status);
3620
3621       /* always pre-connect to proc 0 */
3622     //if (myrank != 0) connect_to(0);
3623
3624     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3625     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3626 }
3627
3628 static void _init_static_smsg()
3629 {
3630     gni_smsg_attr_t      *smsg_attr;
3631     gni_smsg_attr_t      remote_smsg_attr;
3632     gni_smsg_attr_t      *smsg_attr_vec;
3633     gni_mem_handle_t     my_smsg_mdh_mailbox;
3634     int      ret, i;
3635     gni_return_t status;
3636     uint32_t              vmdh_index = -1;
3637     mdh_addr_t            base_infor;
3638     mdh_addr_t            *base_addr_vec;
3639
3640     set_smsg_max();
3641     
3642     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3643     
3644     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3645     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3646     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3647     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3648     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3649     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3650     CmiAssert(ret == 0);
3651     memset(smsg_mailbox_base, 0, smsg_memlen*(mysize));
3652     
3653     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3654             smsg_memlen*(mysize), smsg_rx_cqh,
3655             GNI_MEM_READWRITE,   
3656             vmdh_index,
3657             &my_smsg_mdh_mailbox);
3658     register_memory_size += smsg_memlen*(mysize);
3659     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3660
3661     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3662     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3663
3664     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3665     base_infor.mdh =  my_smsg_mdh_mailbox;
3666     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3667
3668     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3669  
3670     for(i=0; i<mysize; i++)
3671     {
3672         if(i==myrank)
3673             continue;
3674         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3675         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3676         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3677         smsg_attr[i].mbox_offset = i*smsg_memlen;
3678         smsg_attr[i].buff_size = smsg_memlen;
3679         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3680         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3681     }
3682
3683     for(i=0; i<mysize; i++)
3684     {
3685         if (myrank == i) continue;
3686
3687         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3688         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3689         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3690         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3691         remote_smsg_attr.buff_size = smsg_memlen;
3692         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3693         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3694
3695         /* initialize the smsg channel */
3696         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3697         GNI_RC_CHECK("SMSG Init", status);
3698     } //end initialization
3699
3700     free(base_addr_vec);
3701     free(smsg_attr);
3702
3703     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3704     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3705
3706
3707 INLINE_KEYWORD
3708 static void _init_send_queue(SMSG_QUEUE *queue)
3709 {
3710      int i;
3711 #if ONE_SEND_QUEUE
3712      queue->sendMsgBuf = PCQueueCreate();
3713      destpe_avail = (char*)malloc(mysize * sizeof(char));
3714 #else
3715      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3716 #if SMP_LOCKS
3717      queue->nonEmptyQueues = PCQueueCreate();
3718 #endif
3719      for(i =0; i<mysize; i++)
3720      {
3721          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3722 #if SMP_LOCKS
3723          queue->smsg_msglist_index[i].pushed = 0;
3724          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3725          queue->smsg_msglist_index[i].destpe = i;
3726 #endif
3727      }
3728 #endif
3729 }
3730
3731 INLINE_KEYWORD
3732 static void _init_smsg()
3733 {
3734     if(mysize > 1) {
3735         if (useDynamicSMSG)
3736             _init_dynamic_smsg();
3737         else
3738             _init_static_smsg();
3739     }
3740
3741     _init_send_queue(&smsg_queue);
3742 #if CMK_USE_OOB
3743     _init_send_queue(&smsg_oob_queue);
3744 #endif
3745 }
3746
3747 static void _init_static_msgq()
3748 {
3749     gni_return_t status;
3750     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3751     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3752     msgq_attrs.smsg_q_sz = 1;
3753     msgq_attrs.rcv_pool_sz = 1;
3754     msgq_attrs.num_msgq_eps = 2;
3755     msgq_attrs.nloc_insts = 8;
3756     msgq_attrs.modes = 0;
3757     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3758
3759     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3760     GNI_RC_CHECK("MSGQ Init", status);
3761
3762
3763 }
3764
3765
3766 static CmiUInt8 total_mempool_size = 0;
3767 static CmiUInt8 total_mempool_calls = 0;
3768
3769 #if USE_LRTS_MEMPOOL
3770
3771 INLINE_KEYWORD
3772 static void *_alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag, gni_cq_handle_t cqh)
3773 {
3774     void *pool;
3775     int ret;
3776     gni_return_t status = GNI_RC_SUCCESS;
3777
3778     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3779     if (*size < default_size) *size = default_size;
3780 #if LARGEPAGE
3781     // round up to be multiple of _tlbpagesize
3782     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3783     *size = ALIGNHUGEPAGE(*size);
3784 #endif
3785     total_mempool_size += *size;
3786     total_mempool_calls += 1;
3787 #if   !LARGEPAGE
3788     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3789     {
3790         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3791         CmiAbort("alloc_mempool_block");
3792     }
3793 #endif
3794 #if LARGEPAGE
3795     pool = my_get_huge_pages(*size);
3796     ret = pool==NULL;
3797 #else
3798     ret = posix_memalign(&pool, ALIGNBUF, *size);
3799 #endif
3800     if (ret != 0) {
3801       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3802       if (ret == ENOMEM)
3803         CmiAbort("alloc_mempool_block: out of memory.");
3804       else
3805         CmiAbort("alloc_mempool_block: posix_memalign failed");
3806     }
3807 #if LARGEPAGE
3808     CmiMemLock();
3809     register_count++;
3810     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, cqh, status);
3811     CmiMemUnlock();
3812     if(status != GNI_RC_SUCCESS) {
3813         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3814 sweep_mempool(CpvAccess(mempool));
3815     }
3816     GNI_RC_CHECK("MEMORY_REGISTER", status);
3817 #else
3818     SetMemHndlZero((*mem_hndl));
3819 #endif
3820     return pool;
3821 }
3822
3823 INLINE_KEYWORD
3824 static void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3825 {
3826     return _alloc_mempool_block(size, mem_hndl, expand_flag, rdma_rx_cqh);
3827 }
3828
3829 #if CMK_PERSISTENT_COMM_PUT
3830 INLINE_KEYWORD
3831 static void *alloc_persistent_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3832 {
3833     return _alloc_mempool_block(size, mem_hndl, expand_flag, highpriority_rx_cqh);
3834 }
3835 #endif
3836
3837 // ptr is a block head pointer
3838 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3839 {
3840     if(!(IsMemHndlZero(mem_hndl)))
3841     {
3842         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3843     }
3844 #if LARGEPAGE
3845     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3846 #else
3847     free(ptr);
3848 #endif
3849 }
3850 #endif
3851
3852 void LrtsPreCommonInit(int everReturn){
3853 #if USE_LRTS_MEMPOOL
3854     CpvInitialize(mempool_type*, mempool);
3855     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3856 #if CMK_PERSISTENT_COMM_PUT
3857     CpvInitialize(mempool_type*, persistent_mempool);
3858     CpvAccess(persistent_mempool) = mempool_init(_mempool_size, alloc_persistent_mempool_block, free_mempool_block, _mempool_size_limit);
3859 #endif
3860     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3861 #endif
3862 }
3863
3864 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3865 {
3866     register int            i;
3867     int                     rc;
3868     int                     device_id = 0;
3869     unsigned int            remote_addr;
3870     gni_cdm_handle_t        cdm_hndl;
3871     gni_return_t            status = GNI_RC_SUCCESS;
3872     uint32_t                vmdh_index = -1;
3873     uint8_t                 ptag;
3874     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3875     int                     first_spawned;
3876     int                     physicalID;
3877     char                   *env;
3878
3879     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3880     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3881     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3882   
3883     if(!CharmLibInterOperate) {
3884       status = PMI_Init(&first_spawned);
3885       GNI_RC_CHECK("PMI_Init", status);
3886     }
3887
3888     status = PMI_Get_size(&mysize);
3889     GNI_RC_CHECK("PMI_Getsize", status);
3890
3891     status = PMI_Get_rank(&myrank);
3892     GNI_RC_CHECK("PMI_getrank", status);
3893
3894     //physicalID = CmiPhysicalNodeID(myrank);
3895     
3896     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3897
3898     *myNodeID = myrank;
3899     *numNodes = mysize;
3900   
3901 #if MULTI_THREAD_SEND
3902     /* Currently, we only consider the case that comm. thread will only recv msgs */
3903     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3904 #endif
3905
3906 #if CMI_EXERT_SEND_LARGE_CAP
3907     CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
3908 #endif
3909
3910 #if CMI_EXERT_RECV_RDMA_CAP 
3911     CmiGetArgInt(*argv,"+useRecvRdmaCap", &RDMA_cap);
3912 #endif
3913   
3914 #if CMI_SENDBUFFERSMSG_CAP
3915     CmiGetArgInt(*argv,"+useSendBufferCap", &SendBufferMsg_cap);
3916 #endif
3917
3918 #if CMI_PUMPNETWORKSMSG_CAP 
3919     CmiGetArgInt(*argv,"+usePumpSmsgCap", &PumpNetworkSmsg_cap);
3920 #endif
3921
3922     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3923     
3924     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3925     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3926     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3927
3928     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3929     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3930     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3931
3932     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3933     if (env) useDynamicSMSG = 1;
3934     if (!useDynamicSMSG)
3935       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3936     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3937     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3938     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3939     
3940     if(myrank == 0)
3941     {
3942         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3943         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3944     }
3945 #ifdef USE_ONESIDED
3946     onesided_init(NULL, &onesided_hnd);
3947
3948     // this is a GNI test, so use the libonesided bypass functionality
3949     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3950     local_addr = gniGetNicAddress();
3951 #else
3952     ptag = get_ptag();
3953     cookie = get_cookie();
3954 #if 0
3955     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3956 #endif
3957     //Create and attach to the communication  domain */
3958     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3959     GNI_RC_CHECK("GNI_CdmCreate", status);
3960     //* device id The device id is the minor number for the device
3961     //that is assigned to the device by the system when the device is created.
3962     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3963     //where X is the device number 0 default 
3964     // GNI_CdmAttach adds about 1GB memory usage
3965     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3966     GNI_RC_CHECK("GNI_CdmAttach", status);
3967     local_addr = get_gni_nic_address(0);
3968 #endif
3969     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3970     _MEMCHECK(MPID_UGNI_AllAddr);
3971     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3972     /* create the local completion queue */
3973     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3974     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3975     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3976 #if MULTI_THREAD_SEND
3977     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3978     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3979 #endif
3980
3981 #if CMK_USE_OOB
3982     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &highprior_rdma_tx_cqh);
3983     GNI_RC_CHECK("GNI_CqCreate high priority RDMA (tx)", status);
3984 #endif
3985     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3986
3987     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3988     GNI_RC_CHECK("Create CQ (rx)", status);
3989     
3990     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3991     GNI_RC_CHECK("Create Post CQ (rx)", status);
3992    
3993 #if CMK_PERSISTENT_COMM_PUT
3994     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &highpriority_rx_cqh);
3995     GNI_RC_CHECK("Create Post CQ (rx)", status);
3996 #endif
3997     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3998     //GNI_RC_CHECK("Create BTE CQ", status);
3999
4000     /* create the endpoints. they need to be bound to allow later CQWrites to them */
4001     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
4002     _MEMCHECK(ep_hndl_array);
4003 #if MULTI_THREAD_SEND 
4004     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
4005     //default_tx_cq_lock = CmiCreateLock();
4006     rdma_tx_cq_lock = CmiCreateLock();
4007     smsg_rx_cq_lock = CmiCreateLock();
4008     //global_gni_lock  = CmiCreateLock();
4009     //rx_cq_lock  = CmiCreateLock();
4010 #endif
4011     for (i=0; i<mysize; i++) {
4012         if(i == myrank) continue;
4013         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
4014         GNI_RC_CHECK("GNI_EpCreate ", status);   
4015         remote_addr = MPID_UGNI_AllAddr[i];
4016         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
4017         GNI_RC_CHECK("GNI_EpBind ", status);   
4018     }
4019
4020     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
4021     _init_smsg();
4022     PMI_Barrier();
4023
4024 #if     USE_LRTS_MEMPOOL
4025     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
4026     if (env) {
4027         _totalmem = CmiReadSize(env);
4028         if (myrank == 0)
4029             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
4030     }
4031
4032     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
4033     if (env) _mempool_size = CmiReadSize(env);
4034     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
4035         _mempool_size = CmiReadSize(env);
4036
4037
4038     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
4039     if (env) {
4040         MAX_REG_MEM = CmiReadSize(env);
4041         user_set_flag = 1;
4042     }
4043     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
4044         MAX_REG_MEM = CmiReadSize(env);
4045         user_set_flag = 1;
4046     }
4047
4048     env = getenv("CHARM_UGNI_SEND_MAX");
4049     if (env) {
4050         MAX_BUFF_SEND = CmiReadSize(env);
4051         user_set_flag = 1;
4052     }
4053     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
4054         MAX_BUFF_SEND = CmiReadSize(env);
4055         user_set_flag = 1;
4056     }
4057
4058     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
4059     if (env) {
4060         _mempool_size_limit = CmiReadSize(env);
4061     }
4062
4063     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
4064     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
4065
4066     if (myrank==0) {
4067         printf("Charm++> memory pool init block size: %1.fMB, total memory pool limit %1.fMB (0 means no limit)\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
4068         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
4069         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
4070             /* memblock can expand to BIG_MSG * 2 size */
4071             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
4072             CmiAbort("mempool maximum size is too small. \n");
4073         }
4074 #if MULTI_THREAD_SEND
4075         printf("Charm++> worker thread sending messages\n");
4076 #elif COMM_THREAD_SEND
4077         printf("Charm++> only comm thread send/recv messages\n");
4078 #endif
4079     }
4080
4081 #endif     /* end of USE_LRTS_MEMPOOL */
4082
4083     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
4084     if (env) {
4085         BIG_MSG = CmiReadSize(env);
4086         if (BIG_MSG < ONE_SEG)
4087           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
4088     }
4089     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
4090     if (env) {
4091         BIG_MSG_PIPELINE = atoi(env);
4092     }
4093
4094     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
4095     if (env) _checkProgress = 0;
4096     if (mysize == 1) _checkProgress = 0;
4097
4098 #if CMI_EXERT_RECV_RDMA_CAP
4099     env = getenv("CHARM_UGNI_RDMA_MAX");
4100     if (env)  {
4101         RDMA_pending = atoi(env);
4102         if (myrank == 0)
4103             printf("Charm++> Max pending RDMA set to: %d\n", RDMA_pending);
4104     }
4105 #endif
4106     
4107     /*
4108     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
4109     if (env) 
4110         _tlbpagesize = CmiReadSize(env);
4111     */
4112     /* real gethugepagesize() is only available when hugetlb module linked */
4113     _tlbpagesize = gethugepagesize();
4114     if (myrank == 0) {
4115         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
4116     }
4117
4118 #if LARGEPAGE
4119     if (_tlbpagesize == 4096) {
4120         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
4121     }
4122 #endif
4123
4124       /* stats related arguments */
4125 #if CMK_WITH_STATS
4126     CmiGetArgStringDesc(*argv,"+gni_stats_root",&counters_dirname,"counter directory name, default counters");
4127
4128     print_stats = CmiGetArgFlag(*argv, "+print_stats");
4129     
4130     stats_off = CmiGetArgFlag(*argv, "+stats_off");
4131
4132     init_comm_stats();
4133 #endif
4134
4135     /* checksum flag */
4136     if (CmiGetArgFlag(*argv,"+checksum")) {
4137 #if CMK_ERROR_CHECKING
4138         checksum_flag = 1;
4139         if (myrank == 0) CmiPrintf("Charm++> CheckSum checking enabled! \n");
4140 #else
4141         if (myrank == 0) CmiPrintf("Charm++> +checksum ignored in optimized version! \n");
4142 #endif
4143     }
4144
4145     /* init DMA buffer for medium message */
4146
4147     //_init_DMA_buffer();
4148     
4149     free(MPID_UGNI_AllAddr);
4150
4151     sendRdmaBuf = PCQueueCreate();
4152     sendHighPriorBuf = PCQueueCreate();
4153
4154 //    NTK_Init();
4155 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
4156
4157 #if CMK_SMSGS_FREE_AFTER_EVENT
4158     SmsgsPool_init(&smsgsPool);
4159 #endif
4160
4161 #if  REMOTE_EVENT
4162     SHIFT = 1;
4163     while (1<<SHIFT < mysize) SHIFT++;
4164     CmiAssert(SHIFT < 31);
4165     IndexPool_init(&ackPool);
4166 #if CMK_PERSISTENT_COMM_PUT
4167     IndexPool_init(&persistPool);
4168 #endif
4169 #endif
4170 }
4171
4172 void* LrtsAlloc(int n_bytes, int header)
4173 {
4174     void *ptr = NULL;
4175 #if 0
4176     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
4177 #endif
4178     if(n_bytes <= SMSG_MAX_MSG)
4179     {
4180         int totalsize = n_bytes+header;
4181         ptr = memalign(ALIGNBUF, totalsize);
4182     }
4183     else {
4184         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
4185 #if     USE_LRTS_MEMPOOL
4186         n_bytes = ALIGN64(n_bytes);
4187         if(n_bytes < BIG_MSG)
4188         {
4189             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
4190             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;