eb86aba4418941c00019732f789af304a625e274
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     1
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_LARGE_CAP   0
65 #define CMI_EXERT_RECV_RDMA_CAP    0
66
67
68 #define CMI_SENDBUFFERSMSG_CAP     0
69 #define CMI_PUMPNETWORKSMSG_CAP    0
70
71 #if CMI_SENDBUFFERSMSG_CAP
72 int     SendBufferMsg_cap  = 10;
73 #endif
74
75 #if CMI_PUMPNETWORKSMSG_CAP
76 int     PumpNetworkSmsg_cap = 10;
77 #endif
78
79 #if CMI_EXERT_SEND_LARGE_CAP
80 static int SEND_large_cap = 100;
81 static int SEND_large_pending = 0;
82 #endif
83
84 #if CMI_EXERT_RECV_RDMA_CAP
85 static int   RDMA_cap =   100;
86 static int   RDMA_pending = 0;
87 #endif
88
89 #define USE_LRTS_MEMPOOL                  1
90
91 #define PRINT_SYH                         0
92
93 // Trace communication thread
94 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
95 #define TRACE_THRESHOLD     0.00001
96 #define CMI_MPI_TRACE_MOREDETAILED 0
97 #undef CMI_MPI_TRACE_USEREVENTS
98 #define CMI_MPI_TRACE_USEREVENTS 1
99 #else
100 #undef CMK_SMP_TRACE_COMMTHREAD
101 #define CMK_SMP_TRACE_COMMTHREAD 0
102 #endif
103
104 #define CMK_TRACE_COMMOVERHEAD 0
105 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
106 #undef CMI_MPI_TRACE_USEREVENTS
107 #define CMI_MPI_TRACE_USEREVENTS 1
108 #else
109 #undef CMK_TRACE_COMMOVERHEAD
110 #define CMK_TRACE_COMMOVERHEAD 0
111 #endif
112
113 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
114 CpvStaticDeclare(double, projTraceStart);
115 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
116 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
117 #define  EVENT_TIME()   CpvAccess(projTraceStart)
118 #else
119 #define  START_EVENT()
120 #define  END_EVENT(x)
121 #define  EVENT_TIME()   (0.0)
122 #endif
123
124 #if USE_LRTS_MEMPOOL
125
126 #define oneMB (1024ll*1024)
127 #define oneGB (1024ll*1024*1024)
128
129 static CmiInt8 _mempool_size = 8*oneMB;
130 static CmiInt8 _expand_mem =  4*oneMB;
131 static CmiInt8 _mempool_size_limit = 0;
132
133 static CmiInt8 _totalmem = 0.8*oneGB;
134
135 #if LARGEPAGE
136 static CmiInt8 BIG_MSG  =  16*oneMB;
137 static CmiInt8 ONE_SEG  =  4*oneMB;
138 #else
139 static CmiInt8 BIG_MSG  =  4*oneMB;
140 static CmiInt8 ONE_SEG  =  2*oneMB;
141 #endif
142 #if MULTI_THREAD_SEND
143 static int BIG_MSG_PIPELINE = 1;
144 #else
145 static int BIG_MSG_PIPELINE = 4;
146 #endif
147
148 // dynamic flow control
149 static CmiInt8 buffered_send_msg = 0;
150 static CmiInt8 register_memory_size = 0;
151
152 #if LARGEPAGE
153 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
154 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
155 static CmiInt8  register_count = 0;
156 #else
157 #if CMK_SMP && COMM_THREAD_SEND 
158 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
159 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
160 #else
161 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
162 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
163 #endif
164
165
166 #endif
167
168 #endif     /* end USE_LRTS_MEMPOOL */
169
170 #if MULTI_THREAD_SEND 
171 #define     CMI_GNI_LOCK(x)       CmiLock(x);
172 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
173 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
174 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
175 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
176 #else
177 #define     CMI_GNI_LOCK(x)
178 #define     CMI_GNI_TRYLOCK(x)         (0)
179 #define     CMI_GNI_UNLOCK(x)
180 #define     CMI_PCQUEUEPOP_LOCK(Q)   
181 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
182 #endif
183
184 static int _tlbpagesize = 4096;
185
186 //static int _smpd_count  = 0;
187
188 static int   user_set_flag  = 0;
189
190 static int _checkProgress = 1;             /* check deadlock */
191 static int _detected_hang = 0;
192
193 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
194
195 // dynamic SMSG
196 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
197
198 static int avg_smsg_connection = 32;
199 static int                 *smsg_connected_flag= 0;
200 static gni_smsg_attr_t     **smsg_attr_vector_local;
201 static gni_smsg_attr_t     **smsg_attr_vector_remote;
202 static gni_ep_handle_t     ep_hndl_unbound;
203 static gni_smsg_attr_t     send_smsg_attr;
204 static gni_smsg_attr_t     recv_smsg_attr;
205
206 typedef struct _dynamic_smsg_mailbox{
207    void     *mailbox_base;
208    int      size;
209    int      offset;
210    gni_mem_handle_t  mem_hndl;
211    struct      _dynamic_smsg_mailbox  *next;
212 }dynamic_smsg_mailbox_t;
213
214 static dynamic_smsg_mailbox_t  *mailbox_list;
215
216 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
217 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
218
219 #if PRINT_SYH
220 int         lrts_send_msg_id = 0;
221 int         lrts_local_done_msg = 0;
222 int         lrts_send_rdma_success = 0;
223 #endif
224
225 #include "machine.h"
226
227 #include "pcqueue.h"
228
229 #include "mempool.h"
230
231 #if CMK_PERSISTENT_COMM
232 #include "machine-persistent.h"
233 #define  SEND_PERSISTENT    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendPersistentBuf));
234 #define  RECV_PERSISTENT    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(persistent_rx_cqh) );
235 #else  
236 #define  SEND_PERSISTENT   
237 #define  RECV_PERSISTENT
238 #endif
239
240 //#define  USE_ONESIDED 1
241 #ifdef USE_ONESIDED
242 //onesided implementation is wrong, since no place to restore omdh
243 #include "onesided.h"
244 onesided_hnd_t   onesided_hnd;
245 onesided_md_t    omdh;
246 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
247
248 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
249
250 #else
251 uint8_t   onesided_hnd, omdh;
252
253 #if REMOTE_EVENT || CQWRITE 
254 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
255     if(register_memory_size+size>= MAX_REG_MEM) { \
256         status = GNI_RC_ERROR_NOMEM;} \
257     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
258         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
259 #else
260 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
261         if (register_memory_size + size >= MAX_REG_MEM) { \
262             status = GNI_RC_ERROR_NOMEM; \
263         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
264             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
265 #endif
266
267 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
268     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
269              register_memory_size -= size; \
270          else CmiAbort("MEM_DEregister");  \
271     } while (0)
272 #endif
273
274 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
275 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
276 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
277 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
278 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
279 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
280 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
281 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
282 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
283 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
284 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
285 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
286 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
287 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
288
289 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
290 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
291
292 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
293 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
294 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
295 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
296
297 #define ALIGNBUF                64
298
299 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
300 /* If SMSG is not used */
301
302 #define FMA_PER_CORE  1024
303 #define FMA_BUFFER_SIZE 1024
304
305 /* If SMSG is used */
306 static int  SMSG_MAX_MSG = 1024;
307 #define SMSG_MAX_CREDIT    72
308
309 #define MSGQ_MAXSIZE       2048
310
311 /* large message transfer with FMA or BTE */
312 #if ! REMOTE_EVENT
313 #define LRTS_GNI_RDMA_THRESHOLD  1024 
314 #else
315    /* remote events only work with RDMA */
316 #define LRTS_GNI_RDMA_THRESHOLD  0 
317 #endif
318
319 #if CMK_SMP
320 static int  REMOTE_QUEUE_ENTRIES=163840; 
321 static int LOCAL_QUEUE_ENTRIES=163840; 
322 #else
323 static int  REMOTE_QUEUE_ENTRIES=20480;
324 static int LOCAL_QUEUE_ENTRIES=20480; 
325 #endif
326
327 #define BIG_MSG_TAG             0x26
328 #define PUT_DONE_TAG            0x28
329 #define DIRECT_PUT_DONE_TAG     0x29
330 #define ACK_TAG                 0x30
331 /* SMSG is data message */
332 #define SMALL_DATA_TAG          0x31
333 /* SMSG is a control message to initialize a BTE */
334 #define LMSG_INIT_TAG           0x39 
335
336 #define DEBUG
337 #ifdef GNI_RC_CHECK
338 #undef GNI_RC_CHECK
339 #endif
340 #ifdef DEBUG
341 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
342 #else
343 #define GNI_RC_CHECK(msg,rc)
344 #endif
345
346 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
347 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
348 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
349
350 static int useStaticMSGQ = 0;
351 static int useStaticFMA = 0;
352 static int mysize, myrank;
353 static gni_nic_handle_t   nic_hndl;
354
355 typedef struct {
356     gni_mem_handle_t mdh;
357     uint64_t addr;
358 } mdh_addr_t ;
359 // this is related to dynamic SMSG
360
361 typedef struct mdh_addr_list{
362     gni_mem_handle_t mdh;
363    void *addr;
364     struct mdh_addr_list *next;
365 }mdh_addr_list_t;
366
367 static unsigned int         smsg_memlen;
368 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
369 mdh_addr_t          setup_mem;
370 mdh_addr_t          *smsg_connection_vec = 0;
371 gni_mem_handle_t    smsg_connection_memhndl;
372 static int          smsg_expand_slots = 10;
373 static int          smsg_available_slot = 0;
374 static void         *smsg_mailbox_mempool = 0;
375 mdh_addr_list_t     *smsg_dynamic_list = 0;
376
377 static void             *smsg_mailbox_base;
378 gni_msgq_attr_t         msgq_attrs;
379 gni_msgq_handle_t       msgq_handle;
380 gni_msgq_ep_attr_t      msgq_ep_attrs;
381 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
382
383 /* =====Beginning of Declarations of Machine Specific Variables===== */
384 static int cookie;
385 static int modes = 0;
386 static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
387 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
388 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
389 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
390 static gni_cq_handle_t       persistent_rx_cqh = NULL;      // mempool - remote event
391 static gni_ep_handle_t       *ep_hndl_array;
392
393 static CmiNodeLock           *ep_lock_array;
394 static CmiNodeLock           default_tx_cq_lock; 
395 static CmiNodeLock           rdma_tx_cq_lock; 
396 static CmiNodeLock           global_gni_lock; 
397 static CmiNodeLock           rx_cq_lock;
398 static CmiNodeLock           smsg_mailbox_lock;
399 static CmiNodeLock           smsg_rx_cq_lock;
400 static CmiNodeLock           *mempool_lock;
401 //#define     CMK_WITH_STATS      1
402 typedef struct msg_list
403 {
404     uint32_t destNode;
405     uint32_t size;
406     void *msg;
407     uint8_t tag;
408 #if !CMK_SMP
409     struct msg_list *next;
410 #endif
411 #if CMK_WITH_STATS
412     double  creation_time;
413 #endif
414 }MSG_LIST;
415
416
417 typedef struct control_msg
418 {
419     uint64_t            source_addr;    /* address from the start of buffer  */
420     uint64_t            dest_addr;      /* address from the start of buffer */
421     int                 total_length;   /* total length */
422     int                 length;         /* length of this packet */
423 #if REMOTE_EVENT
424     int                 ack_index;      /* index from integer to address */
425 #endif
426     uint8_t             seq_id;         //big message   0 meaning single message
427     gni_mem_handle_t    source_mem_hndl;
428     struct control_msg *next;
429 } CONTROL_MSG;
430
431 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
432
433 typedef struct ack_msg
434 {
435     uint64_t            source_addr;    /* address from the start of buffer  */
436 #if ! USE_LRTS_MEMPOOL
437     gni_mem_handle_t    source_mem_hndl;
438     int                 length;          /* total length */
439 #endif
440     struct ack_msg     *next;
441 } ACK_MSG;
442
443 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
444
445 #if CMK_DIRECT
446 typedef struct{
447     uint64_t    handler_addr;
448 }CMK_DIRECT_HEADER;
449
450 typedef struct {
451     char core[CmiMsgHeaderSizeBytes];
452     uint64_t handler;
453 }cmidirectMsg;
454
455 //SYH
456 CpvDeclare(int, CmiHandleDirectIdx);
457 void CmiHandleDirectMsg(cmidirectMsg* msg)
458 {
459
460     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
461    (*(_handle->callbackFnPtr))(_handle->callbackData);
462    CmiFree(msg);
463 }
464
465 void CmiDirectInit()
466 {
467     CpvInitialize(int,  CmiHandleDirectIdx);
468     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
469 }
470
471 #endif
472 typedef struct  rmda_msg
473 {
474     int                   destNode;
475 #if REMOTE_EVENT
476     int                   ack_index;
477 #endif
478     gni_post_descriptor_t *pd;
479 #if !CMK_SMP
480     struct  rmda_msg      *next;
481 #endif
482 }RDMA_REQUEST;
483
484
485 #if CMK_SMP
486 #define SMP_LOCKS                       1
487 #define ONE_SEND_QUEUE                  0
488 PCQueue sendRdmaBuf;
489 PCQueue sendPersistentBuf;
490 typedef struct  msg_list_index
491 {
492     PCQueue     sendSmsgBuf;
493     int         pushed;
494     CmiNodeLock   lock;
495 } MSG_LIST_INDEX;
496 char                *destpe_avail;
497 #if  !ONE_SEND_QUEUE && SMP_LOCKS
498     PCQueue     nonEmptyQueues;
499 #endif
500 #else         /* non-smp */
501
502 static RDMA_REQUEST        *sendRdmaBuf = 0;
503 static RDMA_REQUEST        *sendPersistentBuf = 0;
504 static RDMA_REQUEST        *sendRdmaTail = 0;
505 typedef struct  msg_list_index
506 {
507     int         next;
508     MSG_LIST    *sendSmsgBuf;
509     MSG_LIST    *tail;
510 } MSG_LIST_INDEX;
511
512 #endif
513
514 // buffered send queue
515 #if ! ONE_SEND_QUEUE
516 typedef struct smsg_queue
517 {
518     MSG_LIST_INDEX   *smsg_msglist_index;
519     int               smsg_head_index;
520 } SMSG_QUEUE;
521 #else
522 typedef struct smsg_queue
523 {
524     PCQueue       sendMsgBuf;
525 }  SMSG_QUEUE;
526 #endif
527
528 SMSG_QUEUE                  smsg_queue;
529 #if CMK_USE_OOB
530 SMSG_QUEUE                  smsg_oob_queue;
531 #endif
532
533 #if CMK_SMP
534
535 #define FreeMsgList(d)   free(d);
536 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
537
538 #else
539
540 static MSG_LIST       *msglist_freelist=0;
541
542 #define FreeMsgList(d)  \
543   do { \
544   (d)->next = msglist_freelist;\
545   msglist_freelist = d; \
546   } while (0)
547
548 #define MallocMsgList(d) \
549   do {  \
550   d = msglist_freelist;\
551   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
552              _MEMCHECK(d);\
553   } else msglist_freelist = d->next; \
554   d->next =0;  \
555   } while (0)
556
557 #endif
558
559 #if CMK_SMP
560
561 #define FreeControlMsg(d)      free(d);
562 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
563
564 #else
565
566 static CONTROL_MSG    *control_freelist=0;
567
568 #define FreeControlMsg(d)       \
569   do { \
570   (d)->next = control_freelist;\
571   control_freelist = d; \
572   } while (0);
573
574 #define MallocControlMsg(d) \
575   do {  \
576   d = control_freelist;\
577   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
578              _MEMCHECK(d);\
579   } else control_freelist = d->next;  \
580   } while (0);
581
582 #endif
583
584 #if CMK_SMP
585
586 #define FreeAckMsg(d)      free(d);
587 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
588
589 #else
590
591 static ACK_MSG        *ack_freelist=0;
592
593 #define FreeAckMsg(d)       \
594   do { \
595   (d)->next = ack_freelist;\
596   ack_freelist = d; \
597   } while (0)
598
599 #define MallocAckMsg(d) \
600   do { \
601   d = ack_freelist;\
602   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
603              _MEMCHECK(d);\
604   } else ack_freelist = d->next; \
605   } while (0)
606
607 #endif
608
609
610 # if CMK_SMP
611 #define FreeRdmaRequest(d)       free(d);
612 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
613 #else
614
615 static RDMA_REQUEST         *rdma_freelist = NULL;
616
617 #define FreeRdmaRequest(d)       \
618   do {  \
619   (d)->next = rdma_freelist;\
620   rdma_freelist = d;    \
621   } while (0)
622
623 #define MallocRdmaRequest(d) \
624   do {   \
625   d = rdma_freelist;\
626   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
627              _MEMCHECK(d);\
628   } else rdma_freelist = d->next; \
629   d->next =0;   \
630   } while (0)
631 #endif
632
633 /* reuse gni_post_descriptor_t */
634 static gni_post_descriptor_t *post_freelist=0;
635
636 #if  !CMK_SMP
637 #define FreePostDesc(d)       \
638     (d)->next_descr = post_freelist;\
639     post_freelist = d;
640
641 #define MallocPostDesc(d) \
642   d = post_freelist;\
643   if (d==0) { \
644      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
645      d->next_descr = 0;\
646       _MEMCHECK(d);\
647   } else post_freelist = d->next_descr;
648 #else
649
650 #define FreePostDesc(d)     free(d);
651 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
652
653 #endif
654
655
656 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
657 static int      buffered_smsg_counter = 0;
658
659 /* SmsgSend return success but message sent is not confirmed by remote side */
660 static MSG_LIST *buffered_fma_head = 0;
661 static MSG_LIST *buffered_fma_tail = 0;
662
663 /* functions  */
664 #define IsFree(a,ind)  !( a& (1<<(ind) ))
665 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
666 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
667
668 CpvDeclare(mempool_type*, mempool);
669
670 #if CMK_PERSISTENT_COMM
671 CpvDeclare(mempool_type*, persistent_mempool);
672 #endif
673
674 #if REMOTE_EVENT
675 /* ack pool for remote events */
676
677 static int  SHIFT   =           18;
678 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
679 #define RANK_MASK               ((1<<SHIFT) - 1)
680 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
681
682 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
683 #define GET_RANK(evt)           ((evt) & RANK_MASK)
684 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
685
686 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
687
688 #if CMK_SMP
689 #define INIT_SIZE                4096
690 #else
691 #define INIT_SIZE                1024
692 #endif
693
694 struct IndexStruct {
695 void *addr;
696 int next;
697 int type;     // 1: ACK   2: Persistent
698 };
699
700 typedef struct IndexPool {
701     struct IndexStruct   *indexes;
702     int                   size;
703     int                   freehead;
704     CmiNodeLock           lock;
705 } IndexPool;
706
707 static IndexPool  ackPool;
708 #if CMK_PERSISTENT_COMM
709 static IndexPool  persistPool;
710 #endif
711
712 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
713 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
714
715 static void IndexPool_init(IndexPool *pool)
716 {
717     int i;
718     if ((1<<SHIFT) < mysize) 
719         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
720     pool->size = INIT_SIZE;
721     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
722     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
723     for (i=0; i<pool->size-1; i++) {
724         pool->indexes[i].next = i+1;
725         pool->indexes[i].type = 0;
726     }
727     pool->indexes[i].next = -1;
728     pool->freehead = 0;
729 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
730     pool->lock  = CmiCreateLock();
731 #else
732     pool->lock  = 0;
733 #endif
734 }
735
736 static
737 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
738 {
739     int s, i;
740 #if MULTI_THREAD_SEND  
741     CmiLock(pool->lock);
742 #endif
743     s = pool->freehead;
744     if (s == -1) {
745         int newsize = pool->size * 2;
746         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
747         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
748         struct IndexStruct *old_ackpool = pool->indexes;
749         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
750         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
751         for (i=pool->size; i<newsize-1; i++) {
752             pool->indexes[i].next = i+1;
753             pool->indexes[i].type = 0;
754         }
755         pool->indexes[i].next = -1;
756         pool->indexes[i].type = 0;
757         pool->freehead = pool->size;
758         s = pool->size;
759         pool->size = newsize;
760         free(old_ackpool);
761     }
762     pool->freehead = pool->indexes[s].next;
763     pool->indexes[s].addr = addr;
764     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
765     pool->indexes[s].type = type;
766 #if MULTI_THREAD_SEND
767     CmiUnlock(pool->lock);
768 #endif
769     return s;
770 }
771
772 static
773 inline  void IndexPool_freeslot(IndexPool *pool, int s)
774 {
775     CmiAssert(s>=0 && s<pool->size);
776 #if MULTI_THREAD_SEND
777     CmiLock(pool->lock);
778 #endif
779     pool->indexes[s].next = pool->freehead;
780     pool->indexes[s].type = 0;
781     pool->freehead = s;
782 #if MULTI_THREAD_SEND
783     CmiUnlock(pool->lock);
784 #endif
785 }
786
787
788 #endif
789
790 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
791 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
792 #define CHARM_MAGIC_NUMBER               126
793
794 #if CMK_ERROR_CHECKING
795 extern unsigned char computeCheckSum(unsigned char *data, int len);
796 static int checksum_flag = 0;
797 #define CMI_SET_CHECKSUM(msg, len)      \
798         if (checksum_flag)  {   \
799           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
800           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
801         }
802 #define CMI_CHECK_CHECKSUM(msg, len)    \
803         if (checksum_flag)      \
804           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
805             CmiAbort("Fatal error: checksum doesn't agree!\n");
806 #else
807 #define CMI_SET_CHECKSUM(msg, len)
808 #define CMI_CHECK_CHECKSUM(msg, len)
809 #endif
810 /* =====End of Definitions of Message-Corruption Related Macros=====*/
811
812 static int print_stats = 0;
813 static int stats_off = 0;
814 void CmiTurnOnStats()
815 {
816     stats_off = 0;
817     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
818 }
819
820 void CmiTurnOffStats()
821 {
822     stats_off = 1;
823 }
824
825 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
826
827 #if CMK_WITH_STATS
828 FILE *counterLog = NULL;
829 typedef struct comm_thread_stats
830 {
831     uint64_t  smsg_data_count;
832     uint64_t  lmsg_init_count;
833     uint64_t  ack_count;
834     uint64_t  big_msg_ack_count;
835     uint64_t  smsg_count;
836     uint64_t  direct_put_done_count;
837     uint64_t  put_done_count;
838     //times of calling SmsgSend
839     uint64_t  try_smsg_data_count;
840     uint64_t  try_lmsg_init_count;
841     uint64_t  try_ack_count;
842     uint64_t  try_big_msg_ack_count;
843     uint64_t  try_direct_put_done_count;
844     uint64_t  try_put_done_count;
845     uint64_t  try_smsg_count;
846     
847     double    max_time_in_send_buffered_smsg;
848     double    all_time_in_send_buffered_smsg;
849
850     uint64_t  rdma_get_count, rdma_put_count;
851     uint64_t  try_rdma_get_count, try_rdma_put_count;
852     double    max_time_from_control_to_rdma_init;
853     double    all_time_from_control_to_rdma_init;
854
855     double    max_time_from_rdma_init_to_rdma_done;
856     double    all_time_from_rdma_init_to_rdma_done;
857
858     int      count_in_PumpNetwork;
859     double   time_in_PumpNetwork;
860     double   max_time_in_PumpNetwork;
861     int      count_in_SendBufferMsg_smsg;
862     double   time_in_SendBufferMsg_smsg;
863     double   max_time_in_SendBufferMsg_smsg;
864     int      count_in_SendRdmaMsg;
865     double   time_in_SendRdmaMsg;
866     double   max_time_in_SendRdmaMsg;
867     int      count_in_PumpRemoteTransactions;
868     double   time_in_PumpRemoteTransactions;
869     double   max_time_in_PumpRemoteTransactions;
870     int      count_in_PumpLocalTransactions_rdma;
871     double   time_in_PumpLocalTransactions_rdma;
872     double   max_time_in_PumpLocalTransactions_rdma;
873     int      count_in_PumpDatagramConnection;
874     double   time_in_PumpDatagramConnection;
875     double   max_time_in_PumpDatagramConnection;
876 } Comm_Thread_Stats;
877
878 static Comm_Thread_Stats   comm_stats;
879
880 static char *counters_dirname = "counters";
881
882 static void init_comm_stats()
883 {
884   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
885   if (print_stats){
886       char ln[200];
887       int code = mkdir(counters_dirname, 00777); 
888       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
889       counterLog=fopen(ln,"w");
890       if (counterLog == NULL) CmiAbort("Counter files open failed");
891   }
892 }
893
894 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
895
896 #define SMSG_SENT_DONE(creation_time, tag)  \
897         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
898             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
899             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
900             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
901             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
902             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
903             comm_stats.smsg_count++; \
904             double inbuff_time = CmiWallTimer() - creation_time;   \
905             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
906             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
907         }
908
909 #define SMSG_TRY_SEND(tag)  \
910         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
911             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
912             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
913             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
914             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
915             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
916             comm_stats.try_smsg_count++; \
917         }
918
919 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
920
921 #define  RDMA_TRANS_DONE(x)      \
922          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
923              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
924              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
925          }
926
927 #define  RDMA_TRANS_INIT(type, x)      \
928          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
929              double rdma_trans_time = CmiWallTimer() - x ; \
930              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
931              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
932          }
933
934 #define STATS_PUMPNETWORK_TIME(x)   \
935         { double t = CmiWallTimer(); \
936           x;        \
937           t = CmiWallTimer() - t;          \
938           comm_stats.count_in_PumpNetwork++;        \
939           comm_stats.time_in_PumpNetwork += t;   \
940           if (t>comm_stats.max_time_in_PumpNetwork)      \
941               comm_stats.max_time_in_PumpNetwork = t;    \
942         }
943
944 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
945         { double t = CmiWallTimer(); \
946           x;        \
947           t = CmiWallTimer() - t;          \
948           comm_stats.count_in_PumpRemoteTransactions ++;        \
949           comm_stats.time_in_PumpRemoteTransactions += t;   \
950           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
951               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
952         }
953
954 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
955         { double t = CmiWallTimer(); \
956           x;        \
957           t = CmiWallTimer() - t;          \
958           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
959           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
960           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
961               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
962         }
963
964 #define STATS_SEND_SMSGS_TIME(x)   \
965         { double t = CmiWallTimer(); \
966           x;        \
967           t = CmiWallTimer() - t;          \
968           comm_stats.count_in_SendBufferMsg_smsg ++;        \
969           comm_stats.time_in_SendBufferMsg_smsg += t;   \
970           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
971               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
972         }
973
974 #define STATS_SENDRDMAMSG_TIME(x)   \
975         { double t = CmiWallTimer(); \
976           x;        \
977           t = CmiWallTimer() - t;          \
978           comm_stats.count_in_SendRdmaMsg ++;        \
979           comm_stats.time_in_SendRdmaMsg += t;   \
980           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
981               comm_stats.max_time_in_SendRdmaMsg = t;    \
982         }
983
984 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
985         { double t = CmiWallTimer(); \
986           x;        \
987           t = CmiWallTimer() - t;          \
988           comm_stats.count_in_PumpDatagramConnection ++;        \
989           comm_stats.time_in_PumpDatagramConnection += t;   \
990           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
991               comm_stats.max_time_in_PumpDatagramConnection = t;    \
992         }
993
994 static void print_comm_stats()
995 {
996     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
997     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
998             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
999             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
1000     
1001     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
1002             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
1003             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
1004
1005     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
1006     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
1007     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
1008
1009
1010     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
1011     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
1012     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
1013     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
1014     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
1015     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
1016     if (useDynamicSMSG)
1017     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
1018
1019     fclose(counterLog);
1020 }
1021
1022 #else
1023 #define STATS_PUMPNETWORK_TIME(x)                  x
1024 #define STATS_SEND_SMSGS_TIME(x)                   x
1025 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
1026 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1027 #define STATS_SENDRDMAMSG_TIME(x)                  x
1028 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1029 #endif
1030
1031 static void
1032 allgather(void *in,void *out, int len)
1033 {
1034     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1035     int i,rc;
1036     int my_rank;
1037     char *tmp_buf,*out_ptr;
1038
1039     if(!already_called) {
1040
1041         rc = PMI_Get_size(&job_size);
1042         CmiAssert(rc == PMI_SUCCESS);
1043         rc = PMI_Get_rank(&my_rank);
1044         CmiAssert(rc == PMI_SUCCESS);
1045
1046         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1047         CmiAssert(ivec_ptr != NULL);
1048
1049         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1050         CmiAssert(rc == PMI_SUCCESS);
1051
1052         already_called = 1;
1053
1054     }
1055
1056     tmp_buf = (char *)malloc(job_size * len);
1057     CmiAssert(tmp_buf);
1058
1059     rc = PMI_Allgather(in,tmp_buf,len);
1060     CmiAssert(rc == PMI_SUCCESS);
1061
1062     out_ptr = out;
1063
1064     for(i=0;i<job_size;i++) {
1065
1066         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1067
1068     }
1069
1070     free(tmp_buf);
1071 }
1072
1073 static void
1074 allgather_2(void *in,void *out, int len)
1075 {
1076     //PMI_Allgather is out of order
1077     int i,rc, extend_len;
1078     int  rank_index;
1079     char *out_ptr, *out_ref;
1080     char *in2;
1081
1082     extend_len = sizeof(int) + len;
1083     in2 = (char*)malloc(extend_len);
1084
1085     memcpy(in2, &myrank, sizeof(int));
1086     memcpy(in2+sizeof(int), in, len);
1087
1088     out_ptr = (char*)malloc(mysize*extend_len);
1089
1090     rc = PMI_Allgather(in2, out_ptr, extend_len);
1091     GNI_RC_CHECK("allgather", rc);
1092
1093     out_ref = out;
1094
1095     for(i=0;i<mysize;i++) {
1096         //rank index 
1097         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1098         //copy to the rank index slot
1099         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1100     }
1101
1102     free(out_ptr);
1103     free(in2);
1104
1105 }
1106
1107 static unsigned int get_gni_nic_address(int device_id)
1108 {
1109     unsigned int address, cpu_id;
1110     gni_return_t status;
1111     int i, alps_dev_id=-1,alps_address=-1;
1112     char *token, *p_ptr;
1113
1114     p_ptr = getenv("PMI_GNI_DEV_ID");
1115     if (!p_ptr) {
1116         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1117        
1118         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1119     } else {
1120         while ((token = strtok(p_ptr,":")) != NULL) {
1121             alps_dev_id = atoi(token);
1122             if (alps_dev_id == device_id) {
1123                 break;
1124             }
1125             p_ptr = NULL;
1126         }
1127         CmiAssert(alps_dev_id != -1);
1128         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1129         CmiAssert(p_ptr != NULL);
1130         i = 0;
1131         while ((token = strtok(p_ptr,":")) != NULL) {
1132             if (i == alps_dev_id) {
1133                 alps_address = atoi(token);
1134                 break;
1135             }
1136             p_ptr = NULL;
1137             ++i;
1138         }
1139         CmiAssert(alps_address != -1);
1140         address = alps_address;
1141     }
1142     return address;
1143 }
1144
1145 static uint8_t get_ptag(void)
1146 {
1147     char *p_ptr, *token;
1148     uint8_t ptag;
1149
1150     p_ptr = getenv("PMI_GNI_PTAG");
1151     CmiAssert(p_ptr != NULL);
1152     token = strtok(p_ptr, ":");
1153     ptag = (uint8_t)atoi(token);
1154     return ptag;
1155         
1156 }
1157
1158 static uint32_t get_cookie(void)
1159 {
1160     uint32_t cookie;
1161     char *p_ptr, *token;
1162
1163     p_ptr = getenv("PMI_GNI_COOKIE");
1164     CmiAssert(p_ptr != NULL);
1165     token = strtok(p_ptr, ":");
1166     cookie = (uint32_t)atoi(token);
1167
1168     return cookie;
1169 }
1170
1171 #if LARGEPAGE
1172
1173 /* directly mmap memory from hugetlbfs for large pages */
1174
1175 #include <sys/stat.h>
1176 #include <fcntl.h>
1177 #include <sys/mman.h>
1178 #include <hugetlbfs.h>
1179
1180 // size must be _tlbpagesize aligned
1181 void *my_get_huge_pages(size_t size)
1182 {
1183     char filename[512];
1184     int fd;
1185     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1186     void *ptr = NULL;
1187
1188     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1189     fd = open(filename, O_RDWR | O_CREAT, mode);
1190     if (fd == -1) {
1191         CmiAbort("my_get_huge_pages: open filed");
1192     }
1193     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1194     if (ptr == MAP_FAILED) ptr = NULL;
1195 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1196     close(fd);
1197     unlink(filename);
1198     return ptr;
1199 }
1200
1201 void my_free_huge_pages(void *ptr, int size)
1202 {
1203 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1204     int ret = munmap(ptr, size);
1205     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1206 }
1207
1208 #endif
1209
1210 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1211 /* TODO: add any that are related */
1212 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1213
1214
1215 #include "machine-lrts.h"
1216 #include "machine-common-core.c"
1217
1218 /* Network progress function is used to poll the network when for
1219    messages. This flushes receive buffers on some  implementations*/
1220 #if CMK_MACHINE_PROGRESS_DEFINED
1221 void CmiMachineProgressImpl() {
1222 }
1223 #endif
1224
1225 static int SendBufferMsg(SMSG_QUEUE *queue);
1226 #if CMK_SMP
1227 static void SendRdmaMsg(PCQueue );
1228 #else
1229 static void SendRdmaMsg();
1230 #endif 
1231 static void PumpNetworkSmsg();
1232 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1233 #if CQWRITE
1234 static void PumpCqWriteTransactions();
1235 #endif
1236 #if REMOTE_EVENT
1237 static void PumpRemoteTransactions(gni_cq_handle_t);
1238 #endif
1239
1240 #if MACHINE_DEBUG_LOG
1241 FILE *debugLog = NULL;
1242 static CmiInt8 buffered_recv_msg = 0;
1243 int         lrts_smsg_success = 0;
1244 int         lrts_received_msg = 0;
1245 #endif
1246
1247 static void sweep_mempool(mempool_type *mptr)
1248 {
1249     int n = 0;
1250     block_header *current = &(mptr->block_head);
1251
1252     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1253     while( current!= NULL) {
1254         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1255         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1256     }
1257     printf("[n %d] sweep_mempool slot END.\n", myrank);
1258 }
1259
1260 inline
1261 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1262 {
1263     block_header *current = *from;
1264
1265     //while(register_memory_size>= MAX_REG_MEM)
1266     //{
1267         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1268             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1269
1270         *from = current;
1271         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1272         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1273         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1274     //}
1275     return GNI_RC_SUCCESS;
1276 }
1277
1278 inline 
1279 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1280 {
1281     gni_return_t status = GNI_RC_SUCCESS;
1282     //int size = GetMempoolsize(msg);
1283     //void *blockaddr = GetMempoolBlockPtr(msg);
1284     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1285    
1286     block_header *current = &(mptr->block_head);
1287     while(register_memory_size>= MAX_REG_MEM)
1288     {
1289         status = deregisterMemory(mptr, &current);
1290         if (status != GNI_RC_SUCCESS) break;
1291     }
1292     if(register_memory_size>= MAX_REG_MEM) return status;
1293
1294     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1295     while(1)
1296     {
1297         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1298         if(status == GNI_RC_SUCCESS)
1299         {
1300             break;
1301         }
1302         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1303         {
1304             GNI_RC_CHECK("registerFromMempool", status);
1305         }
1306         else
1307         {
1308             status = deregisterMemory(mptr, &current);
1309             if (status != GNI_RC_SUCCESS) break;
1310         }
1311     }; 
1312     return status;
1313 }
1314
1315 inline 
1316 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1317 {
1318     static int rank = -1;
1319     int i;
1320     gni_return_t status;
1321     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1322     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1323     mempool_type *mptr;
1324
1325     status = registerFromMempool(mptr1, msg, size, t, cqh);
1326     if (status == GNI_RC_SUCCESS) return status;
1327 #if CMK_SMP 
1328     for (i=0; i<CmiMyNodeSize()+1; i++) {
1329       rank = (rank+1)%(CmiMyNodeSize()+1);
1330       mptr = CpvAccessOther(mempool, rank);
1331       if (mptr == mptr1) continue;
1332       status = registerFromMempool(mptr, msg, size, t, cqh);
1333       if (status == GNI_RC_SUCCESS) return status;
1334     }
1335 #endif
1336     return  GNI_RC_ERROR_RESOURCE;
1337 }
1338
1339 inline
1340 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1341 {
1342     MSG_LIST        *msg_tmp;
1343     MallocMsgList(msg_tmp);
1344     msg_tmp->destNode = destNode;
1345     msg_tmp->size   = size;
1346     msg_tmp->msg    = msg;
1347     msg_tmp->tag    = tag;
1348 #if CMK_WITH_STATS
1349     SMSG_CREATION(msg_tmp)
1350 #endif
1351 #if !CMK_SMP
1352     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1353         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1354         queue->smsg_head_index = destNode;
1355         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1356     }else
1357     {
1358         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1359         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1360     }
1361 #else
1362 #if ONE_SEND_QUEUE
1363     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1364 #else
1365 #if SMP_LOCKS
1366     CmiLock(queue->smsg_msglist_index[destNode].lock);
1367     if(queue->smsg_msglist_index[destNode].pushed == 0)
1368     {
1369         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1370     }
1371     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1372     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1373 #else
1374     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1375 #endif
1376 #endif
1377 #endif
1378 #if PRINT_SYH
1379     buffered_smsg_counter++;
1380 #endif
1381 }
1382
1383 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1384 {
1385     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1386 }
1387
1388 inline
1389 static void setup_smsg_connection(int destNode)
1390 {
1391     mdh_addr_list_t  *new_entry = 0;
1392     gni_post_descriptor_t *pd;
1393     gni_smsg_attr_t      *smsg_attr;
1394     gni_return_t status = GNI_RC_NOT_DONE;
1395     RDMA_REQUEST        *rdma_request_msg;
1396     
1397     if(smsg_available_slot == smsg_expand_slots)
1398     {
1399         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1400         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1401         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1402
1403         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1404             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1405             GNI_MEM_READWRITE,   
1406             -1,
1407             &(new_entry->mdh));
1408         smsg_available_slot = 0; 
1409         new_entry->next = smsg_dynamic_list;
1410         smsg_dynamic_list = new_entry;
1411     }
1412     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1413     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1414     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1415     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1416     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1417     smsg_attr->buff_size = smsg_memlen;
1418     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1419     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1420     smsg_local_attr_vec[destNode] = smsg_attr;
1421     smsg_available_slot++;
1422     MallocPostDesc(pd);
1423     pd->type            = GNI_POST_FMA_PUT;
1424     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1425     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1426     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1427     pd->length          = sizeof(gni_smsg_attr_t);
1428     pd->local_addr      = (uint64_t) smsg_attr;
1429     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1430     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1431 #if MULTI_THREAD_SEND
1432     pd->src_cq_hndl     = rdma_tx_cqh;
1433 #else
1434     pd->src_cq_hndl     = 0;
1435 #endif
1436     pd->rdma_mode       = 0;
1437     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1438     print_smsg_attr(smsg_attr);
1439     if(status == GNI_RC_ERROR_RESOURCE )
1440     {
1441         MallocRdmaRequest(rdma_request_msg);
1442         rdma_request_msg->destNode = destNode;
1443         rdma_request_msg->pd = pd;
1444         /* buffer this request */
1445     }
1446 #if PRINT_SYH
1447     if(status != GNI_RC_SUCCESS)
1448        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1449     else
1450         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1451 #endif
1452 }
1453
1454 /* useDynamicSMSG */
1455 inline 
1456 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1457 {
1458     gni_return_t status = GNI_RC_NOT_DONE;
1459
1460     if(mailbox_list->offset == mailbox_list->size)
1461     {
1462         dynamic_smsg_mailbox_t *new_mailbox_entry;
1463         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1464         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1465         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1466         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1467         new_mailbox_entry->offset = 0;
1468         
1469         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1470             new_mailbox_entry->size, smsg_rx_cqh,
1471             GNI_MEM_READWRITE,   
1472             -1,
1473             &(new_mailbox_entry->mem_hndl));
1474
1475         GNI_RC_CHECK("register", status);
1476         new_mailbox_entry->next = mailbox_list;
1477         mailbox_list = new_mailbox_entry;
1478     }
1479     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1480     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1481     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1482     local_smsg_attr->mbox_offset = mailbox_list->offset;
1483     mailbox_list->offset += smsg_memlen;
1484     local_smsg_attr->buff_size = smsg_memlen;
1485     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1486     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1487 }
1488
1489 /* useDynamicSMSG */
1490 inline 
1491 static int connect_to(int destNode)
1492 {
1493     gni_return_t status = GNI_RC_NOT_DONE;
1494     CmiAssert(smsg_connected_flag[destNode] == 0);
1495     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1496     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1497     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1498     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1499     
1500     CMI_GNI_LOCK(global_gni_lock)
1501     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1502     CMI_GNI_UNLOCK(global_gni_lock)
1503     if (status == GNI_RC_ERROR_RESOURCE) {
1504       /* possibly destNode is making connection at the same time */
1505       free(smsg_attr_vector_local[destNode]);
1506       smsg_attr_vector_local[destNode] = NULL;
1507       free(smsg_attr_vector_remote[destNode]);
1508       smsg_attr_vector_remote[destNode] = NULL;
1509       mailbox_list->offset -= smsg_memlen;
1510 #if PRINT_SYH
1511     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1512 #endif
1513       return 0;
1514     }
1515     GNI_RC_CHECK("GNI_Post", status);
1516     smsg_connected_flag[destNode] = 1;
1517 #if PRINT_SYH
1518     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1519 #endif
1520     return 1;
1521 }
1522
1523 inline 
1524 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1525 {
1526     unsigned int          remote_address;
1527     uint32_t              remote_id;
1528     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1529     gni_smsg_attr_t       *smsg_attr;
1530     gni_post_descriptor_t *pd;
1531     gni_post_state_t      post_state;
1532     char                  *real_data; 
1533
1534     if (useDynamicSMSG) {
1535         switch (smsg_connected_flag[destNode]) {
1536         case 0: 
1537             connect_to(destNode);         /* continue to case 1 */
1538         case 1:                           /* pending connection, do nothing */
1539             status = GNI_RC_NOT_DONE;
1540             if(inbuff ==0)
1541                 buffer_small_msgs(queue, msg, size, destNode, tag);
1542             return status;
1543         }
1544     }
1545 #if CMK_SMP
1546 #if ! ONE_SEND_QUEUE
1547     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1548 #endif
1549     {
1550 #else
1551     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1552     {
1553 #endif
1554         //CMI_GNI_LOCK(smsg_mailbox_lock)
1555         CMI_GNI_LOCK(default_tx_cq_lock)
1556 #if CMK_SMP_TRACE_COMMTHREAD
1557         int oldpe = -1;
1558         int oldeventid = -1;
1559         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1560         { 
1561             START_EVENT();
1562             if ( tag == SMALL_DATA_TAG)
1563                 real_data = (char*)msg; 
1564             else 
1565                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1566             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1567             TRACE_COMM_SET_COMM_MSGID(real_data);
1568         }
1569 #endif
1570 #if REMOTE_EVENT
1571         if (tag == LMSG_INIT_TAG) {
1572             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1573             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1574                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1575         }
1576         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1577 #endif
1578 #if     CMK_WITH_STATS
1579         SMSG_TRY_SEND(tag)
1580 #endif
1581 #if CMK_WITH_STATS
1582     double              creation_time;
1583     if (ptr == NULL)
1584         creation_time = CmiWallTimer();
1585     else
1586         creation_time = ptr->creation_time;
1587 #endif
1588
1589     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1590 #if CMK_SMP_TRACE_COMMTHREAD
1591         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1592 #endif
1593         CMI_GNI_UNLOCK(default_tx_cq_lock)
1594         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1595         if(status == GNI_RC_SUCCESS)
1596         {
1597 #if     CMK_WITH_STATS
1598             SMSG_SENT_DONE(creation_time,tag) 
1599 #endif
1600 #if CMK_SMP_TRACE_COMMTHREAD
1601             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1602             { 
1603                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1604             }
1605 #endif
1606         }else
1607             status = GNI_RC_ERROR_RESOURCE;
1608     }
1609     if(status != GNI_RC_SUCCESS && inbuff ==0)
1610         buffer_small_msgs(queue, msg, size, destNode, tag);
1611     return status;
1612 }
1613
1614 inline 
1615 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1616 {
1617     /* construct a control message and send */
1618     CONTROL_MSG         *control_msg_tmp;
1619     MallocControlMsg(control_msg_tmp);
1620     control_msg_tmp->source_addr = (uint64_t)msg;
1621     control_msg_tmp->seq_id    = seqno;
1622     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1623 #if REMOTE_EVENT
1624     control_msg_tmp->ack_index    =  -1;
1625 #endif
1626 #if     USE_LRTS_MEMPOOL
1627     if(size < BIG_MSG)
1628     {
1629         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1630     }
1631     else
1632     {
1633         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1634         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1635         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1636     }
1637 #else
1638     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1639 #endif
1640     return control_msg_tmp;
1641 }
1642
1643 #define BLOCKING_SEND_CONTROL    0
1644
1645 // Large message, send control to receiver, receiver register memory and do a GET, 
1646 // return 1 - send no success
1647 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1648 {
1649     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1650     uint32_t            vmdh_index  = -1;
1651     int                 size;
1652     int                 offset = 0;
1653     uint64_t            source_addr;
1654     int                 register_size; 
1655     void                *msg;
1656
1657     size    =   control_msg_tmp->total_length;
1658     source_addr = control_msg_tmp->source_addr;
1659     register_size = control_msg_tmp->length;
1660
1661 #if  USE_LRTS_MEMPOOL
1662     if( control_msg_tmp->seq_id == 0 ){
1663 #if BLOCKING_SEND_CONTROL
1664         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1665             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1666                 LrtsAdvanceCommunication(0);
1667         }
1668 #endif
1669         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1670         {
1671             msg = (void*)source_addr;
1672             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1673             {
1674                 if(!inbuff)
1675                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1676                 return GNI_RC_ERROR_NOMEM;
1677             }
1678             //register the corresponding mempool
1679             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1680             if(status == GNI_RC_SUCCESS)
1681             {
1682                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1683             }
1684         }else
1685         {
1686             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1687             status = GNI_RC_SUCCESS;
1688         }
1689         if(NoMsgInSend(source_addr))
1690             register_size = GetMempoolsize((void*)(source_addr));
1691         else
1692             register_size = 0;
1693     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1694     {
1695         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1696         source_addr += offset;
1697         size = control_msg_tmp->length;
1698 #if BLOCKING_SEND_CONTROL
1699         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1700             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1701                 LrtsAdvanceCommunication(0);
1702         }
1703 #endif
1704         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1705             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1706             {
1707                 if(!inbuff)
1708                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1709                 return GNI_RC_ERROR_NOMEM;
1710             }
1711             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1712             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1713         }
1714         else
1715         {
1716             status = GNI_RC_SUCCESS;
1717         }
1718         register_size = 0;  
1719     }
1720
1721 #if CMI_EXERT_SEND_LARGE_CAP
1722     if(SEND_large_pending >= SEND_large_cap)
1723     {
1724         status = GNI_RC_ERROR_NOMEM;
1725     }
1726 #endif
1727  
1728     if(status == GNI_RC_SUCCESS)
1729     {
1730        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1731         if(status == GNI_RC_SUCCESS)
1732         {
1733 #if CMI_EXERT_SEND_LARGE_CAP
1734             SEND_large_pending++;
1735 #endif
1736             buffered_send_msg += register_size;
1737             if(control_msg_tmp->seq_id == 0)
1738             {
1739                 IncreaseMsgInSend(source_addr);
1740             }
1741             FreeControlMsg(control_msg_tmp);
1742             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1743         }else
1744             status = GNI_RC_ERROR_RESOURCE;
1745
1746     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1747     {
1748         CmiAbort("Memory registor for large msg\n");
1749     }else 
1750     {
1751         status = GNI_RC_ERROR_NOMEM; 
1752         if(!inbuff)
1753             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1754     }
1755     return status;
1756 #else
1757     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1758     if(status == GNI_RC_SUCCESS)
1759     {
1760         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1761         if(status == GNI_RC_SUCCESS)
1762         {
1763             FreeControlMsg(control_msg_tmp);
1764         }
1765     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1766     {
1767         CmiAbort("Memory registor for large msg\n");
1768     }else 
1769     {
1770         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1771     }
1772     return status;
1773 #endif
1774 }
1775
1776 inline void LrtsPrepareEnvelope(char *msg, int size)
1777 {
1778     CmiSetMsgSize(msg, size);
1779     CMI_SET_CHECKSUM(msg, size);
1780 }
1781
1782 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1783 {
1784     gni_return_t        status  =   GNI_RC_SUCCESS;
1785     uint8_t tag;
1786     CONTROL_MSG         *control_msg_tmp;
1787     int                 oob = ( mode & OUT_OF_BAND);
1788     SMSG_QUEUE          *queue;
1789
1790     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1791 #if CMK_USE_OOB
1792     queue = oob? &smsg_oob_queue : &smsg_queue;
1793 #else
1794     queue = &smsg_queue;
1795 #endif
1796
1797     LrtsPrepareEnvelope(msg, size);
1798
1799 #if PRINT_SYH
1800     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1801 #endif 
1802 #if CMK_SMP 
1803     if(size <= SMSG_MAX_MSG)
1804         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1805     else if (size < BIG_MSG) {
1806         control_msg_tmp =  construct_control_msg(size, msg, 0);
1807         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1808     }
1809     else {
1810           CmiSetMsgSeq(msg, 0);
1811           control_msg_tmp =  construct_control_msg(size, msg, 1);
1812           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1813     }
1814 #else   //non-smp, smp(worker sending)
1815     if(size <= SMSG_MAX_MSG)
1816     {
1817         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1818             CmiFree(msg);
1819     }
1820     else if (size < BIG_MSG) {
1821         control_msg_tmp =  construct_control_msg(size, msg, 0);
1822         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1823     }
1824     else {
1825 #if     USE_LRTS_MEMPOOL
1826         CmiSetMsgSeq(msg, 0);
1827         control_msg_tmp =  construct_control_msg(size, msg, 1);
1828         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1829 #else
1830         control_msg_tmp =  construct_control_msg(size, msg, 0);
1831         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1832 #endif
1833     }
1834 #endif
1835     return 0;
1836 }
1837
1838 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1839 {
1840   int i;
1841 #if CMK_BROADCAST_USE_CMIREFERENCE
1842   for(i=0;i<npes;i++) {
1843     if (pes[i] == CmiMyPe())
1844       CmiSyncSend(pes[i], len, msg);
1845     else {
1846       CmiReference(msg);
1847       CmiSyncSendAndFree(pes[i], len, msg);
1848     }
1849   }
1850 #else
1851   for(i=0;i<npes;i++) {
1852     CmiSyncSend(pes[i], len, msg);
1853   }
1854 #endif
1855 }
1856
1857 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1858 {
1859   /* A better asynchronous implementation may be wanted, but at least it works */
1860   CmiSyncListSendFn(npes, pes, len, msg);
1861   return (CmiCommHandle) 0;
1862 }
1863
1864 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1865 {
1866   if (npes == 1) {
1867       CmiSyncSendAndFree(pes[0], len, msg);
1868       return;
1869   }
1870 #if CMK_PERSISTENT_COMM
1871   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE) {
1872       int i;
1873       for(i=0;i<npes;i++) {
1874         if (pes[i] == CmiMyPe())
1875           CmiSyncSend(pes[i], len, msg);
1876         else {
1877           CmiReference(msg);
1878           CmiSyncSendAndFree(pes[i], len, msg);
1879         }
1880       }
1881       CmiFree(msg);
1882       return;
1883   }
1884 #endif
1885   
1886 #if CMK_BROADCAST_USE_CMIREFERENCE
1887   CmiSyncListSendFn(npes, pes, len, msg);
1888   CmiFree(msg);
1889 #else
1890   int i;
1891   for(i=0;i<npes-1;i++) {
1892     CmiSyncSend(pes[i], len, msg);
1893   }
1894   if (npes>0)
1895     CmiSyncSendAndFree(pes[npes-1], len, msg);
1896   else 
1897     CmiFree(msg);
1898 #endif
1899 }
1900
1901 static void    PumpDatagramConnection();
1902 static      int         event_SetupConnect = 111;
1903 static      int         event_PumpSmsg = 222 ;
1904 static      int         event_PumpTransaction = 333;
1905 static      int         event_PumpRdmaTransaction = 444;
1906 static      int         event_SendBufferSmsg = 484;
1907 static      int         event_SendFmaRdmaMsg = 555;
1908 static      int         event_AdvanceCommunication = 666;
1909
1910 static void registerUserTraceEvents() {
1911 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1912     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1913     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1914     event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1915     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1916     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1917     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1918     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1919 #endif
1920 }
1921
1922 static void ProcessDeadlock()
1923 {
1924     static CmiUInt8 *ptr = NULL;
1925     static CmiUInt8  last = 0, mysum, sum;
1926     static int count = 0;
1927     gni_return_t status;
1928     int i;
1929
1930 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1931 //sweep_mempool(CpvAccess(mempool));
1932     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1933     mysum = smsg_send_count + smsg_recv_count;
1934     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1935     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1936     GNI_RC_CHECK("PMI_Allgather", status);
1937     sum = 0;
1938     for (i=0; i<mysize; i++)  sum+= ptr[i];
1939     if (last == 0 || sum == last) 
1940         count++;
1941     else
1942         count = 0;
1943     last = sum;
1944     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1945     if (count == 2) { 
1946         /* detected twice, it is a real deadlock */
1947         if (myrank == 0)  {
1948             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1949             CmiAbort("Fatal> Deadlock detected.");
1950         }
1951
1952     }
1953     _detected_hang = 0;
1954 }
1955
1956 static void CheckProgress()
1957 {
1958     if (smsg_send_count == last_smsg_send_count &&
1959         smsg_recv_count == last_smsg_recv_count ) 
1960     {
1961         _detected_hang = 1;
1962 #if !CMK_SMP
1963         if (_detected_hang) ProcessDeadlock();
1964 #endif
1965
1966     }
1967     else {
1968         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1969         last_smsg_send_count = smsg_send_count;
1970         last_smsg_recv_count = smsg_recv_count;
1971         _detected_hang = 0;
1972     }
1973 }
1974
1975 static void set_limit()
1976 {
1977     //if (!user_set_flag && CmiMyRank() == 0) {
1978     if (CmiMyRank() == 0) {
1979         int mynode = CmiPhysicalNodeID(CmiMyPe());
1980         int numpes = CmiNumPesOnPhysicalNode(mynode);
1981         int numprocesses = numpes / CmiMyNodeSize();
1982         MAX_REG_MEM  = _totalmem / numprocesses;
1983         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1984         if (CmiMyPe() == 0)
1985            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1986         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1987         {
1988              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1989              CmiAbort("memory registration\n");
1990         }
1991     }
1992 }
1993
1994 void LrtsPostCommonInit(int everReturn)
1995 {
1996 #if CMK_DIRECT
1997     CmiDirectInit();
1998 #endif
1999 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
2000     CpvInitialize(double, projTraceStart);
2001     /* only PE 0 needs to care about registration (to generate sts file). */
2002     //if (CmiMyPe() == 0) 
2003     {
2004         registerMachineUserEventsFunction(&registerUserTraceEvents);
2005     }
2006 #endif
2007
2008 #if CMK_SMP
2009     CmiIdleState *s=CmiNotifyGetState();
2010     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
2011     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
2012 #else
2013     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
2014     if (useDynamicSMSG)
2015     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
2016 #endif
2017
2018 #if ! LARGEPAGE
2019     if (_checkProgress)
2020 #if CMK_SMP
2021     if (CmiMyRank() == 0)
2022 #endif
2023     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
2024 #endif
2025  
2026 #if !LARGEPAGE
2027     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
2028 #endif
2029 }
2030
2031 /* this is called by worker thread */
2032 void LrtsPostNonLocal(){
2033 #if 1
2034 #if CMK_SMP_TRACE_COMMTHREAD
2035     double startT, endT;
2036 #endif
2037 #if MULTI_THREAD_SEND
2038
2039     if(mysize == 1) return;
2040
2041 #if CMK_SMP_TRACE_COMMTHREAD
2042     traceEndIdle();
2043     startT = CmiWallTimer();
2044 #endif
2045
2046 #if CMK_WORKER_SINGLE_TASK
2047     if (CmiMyRank() % 6 == 0)
2048 #endif
2049     PumpNetworkSmsg();
2050
2051 #if CMK_WORKER_SINGLE_TASK
2052     if (CmiMyRank() % 6 == 1)
2053 #endif
2054     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2055
2056 #if CMK_WORKER_SINGLE_TASK
2057     if (CmiMyRank() % 6 == 2)
2058 #endif
2059     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2060
2061 #if REMOTE_EVENT
2062 #if CMK_WORKER_SINGLE_TASK
2063     if (CmiMyRank() % 6 == 3)
2064 #endif
2065     PumpRemoteTransactions( rdma_rx_cqh);         // rdma_rx_cqh
2066 #endif
2067
2068 #if CMK_WORKER_SINGLE_TASK
2069     if (CmiMyRank() % 6 == 4)
2070 #endif
2071 #if CMK_USE_OOB
2072     if (SendBufferMsg(&smsg_oob_queue) == 1)
2073 #endif
2074     {
2075         SendBufferMsg(&smsg_queue);
2076     }
2077
2078 #if CMK_WORKER_SINGLE_TASK
2079     if (CmiMyRank() % 6 == 5)
2080 #endif
2081 #if CMK_SMP
2082     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
2083 #else
2084     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
2085 #endif
2086
2087 #if CMK_SMP_TRACE_COMMTHREAD
2088     endT = CmiWallTimer();
2089     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2090     traceBeginIdle();
2091 #endif
2092
2093 #endif
2094 #endif
2095 }
2096
2097 /* useDynamicSMSG */
2098 static void    PumpDatagramConnection()
2099 {
2100     uint32_t          remote_address;
2101     uint32_t          remote_id;
2102     gni_return_t status;
2103     gni_post_state_t  post_state;
2104     uint64_t          datagram_id;
2105     int i;
2106
2107    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2108    {
2109        if (datagram_id >= mysize) {           /* bound endpoint */
2110            int pe = datagram_id - mysize;
2111            CMI_GNI_LOCK(global_gni_lock)
2112            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2113            CMI_GNI_UNLOCK(global_gni_lock)
2114            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2115            {
2116                CmiAssert(remote_id == pe);
2117                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2118                GNI_RC_CHECK("Dynamic SMSG Init", status);
2119 #if PRINT_SYH
2120                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2121 #endif
2122                CmiAssert(smsg_connected_flag[pe] == 1);
2123                smsg_connected_flag[pe] = 2;
2124            }
2125        }
2126        else {         /* unbound ep */
2127            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2128            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2129            {
2130                CmiAssert(remote_id<mysize);
2131                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2132                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2133                GNI_RC_CHECK("Dynamic SMSG Init", status);
2134 #if PRINT_SYH
2135                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2136 #endif
2137                smsg_connected_flag[remote_id] = 2;
2138
2139                alloc_smsg_attr(&send_smsg_attr);
2140                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2141                GNI_RC_CHECK("post unbound datagram", status);
2142            }
2143        }
2144    }
2145 }
2146
2147 /* pooling CQ to receive network message */
2148 static void PumpNetworkRdmaMsgs()
2149 {
2150     gni_cq_entry_t      event_data;
2151     gni_return_t        status;
2152
2153 }
2154
2155 #if CMK_SMP
2156 inline 
2157 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2158 {
2159     RDMA_REQUEST        *rdma_request_msg;
2160     MallocRdmaRequest(rdma_request_msg);
2161     rdma_request_msg->destNode = inst_id;
2162     rdma_request_msg->pd = pd;
2163 #if REMOTE_EVENT
2164     rdma_request_msg->ack_index = ack_index;
2165 #endif
2166     PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2167 }
2168
2169 #else
2170 inline 
2171 static void bufferRdmaMsg(RDMA_REQUEST *bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2172 {
2173     RDMA_REQUEST        *rdma_request_msg;
2174     MallocRdmaRequest(rdma_request_msg);
2175     rdma_request_msg->destNode = inst_id;
2176     rdma_request_msg->pd = pd;
2177 #if REMOTE_EVENT
2178     rdma_request_msg->ack_index = ack_index;
2179 #endif
2180     if(bufferqueue == 0)
2181     {
2182         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2183     }else{
2184         sendRdmaTail->next = rdma_request_msg;
2185         sendRdmaTail =  rdma_request_msg;
2186     }
2187 }
2188
2189 #endif
2190
2191 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2192
2193 static void PumpNetworkSmsg()
2194 {
2195     uint64_t            inst_id;
2196     gni_cq_entry_t      event_data;
2197     gni_return_t        status;
2198     void                *header;
2199     uint8_t             msg_tag;
2200     int                 msg_nbytes;
2201     void                *msg_data;
2202     gni_mem_handle_t    msg_mem_hndl;
2203     gni_smsg_attr_t     *smsg_attr;
2204     gni_smsg_attr_t     *remote_smsg_attr;
2205     int                 init_flag;
2206     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2207     uint64_t            source_addr;
2208     SMSG_QUEUE         *queue = &smsg_queue;
2209 #if   CMK_DIRECT
2210     cmidirectMsg        *direct_msg;
2211 #endif
2212 #if CMI_PUMPNETWORKSMSG_CAP 
2213     int                  recv_cnt = 0;
2214     while(recv_cnt< PumpNetworkSmsg_cap) {
2215 #else
2216     while(1) {
2217 #endif
2218         CMI_GNI_LOCK(smsg_rx_cq_lock)
2219         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2220         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2221         if(status != GNI_RC_SUCCESS) break;
2222
2223         inst_id = GNI_CQ_GET_INST_ID(event_data);
2224 #if REMOTE_EVENT
2225         inst_id = GET_RANK(inst_id);      /* important */
2226 #endif
2227         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2228 #if PRINT_SYH
2229         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2230 #endif
2231         if (useDynamicSMSG) {
2232             /* subtle: smsg may come before connection is setup */
2233             while (smsg_connected_flag[inst_id] != 2) 
2234                PumpDatagramConnection();
2235         }
2236         msg_tag = GNI_SMSG_ANY_TAG;
2237         while(1) {
2238             CMI_GNI_LOCK(smsg_mailbox_lock)
2239             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2240             if (status != GNI_RC_SUCCESS)
2241             {
2242                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2243                 break;
2244             }
2245 #if         CMI_PUMPNETWORKSMSG_CAP
2246             recv_cnt++; 
2247 #endif
2248 #if PRINT_SYH
2249             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2250 #endif
2251             /* copy msg out and then put into queue (small message) */
2252             switch (msg_tag) {
2253             case SMALL_DATA_TAG:
2254             {
2255                 START_EVENT();
2256                 msg_nbytes = CmiGetMsgSize(header);
2257                 msg_data    = CmiAlloc(msg_nbytes);
2258                 memcpy(msg_data, (char*)header, msg_nbytes);
2259                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2260                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2261                 TRACE_COMM_CREATION(EVENT_TIME(), msg_data);
2262                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2263                 handleOneRecvedMsg(msg_nbytes, msg_data);
2264                 break;
2265             }
2266             case LMSG_INIT_TAG:
2267             {
2268 #if MULTI_THREAD_SEND
2269                 MallocControlMsg(control_msg_tmp);
2270                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2271                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2272                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2273                 getLargeMsgRequest(control_msg_tmp, inst_id);
2274                 FreeControlMsg(control_msg_tmp);
2275 #else
2276                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2277                 getLargeMsgRequest(header, inst_id);
2278                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2279 #endif
2280                 break;
2281             }
2282 #if !REMOTE_EVENT && !CQWRITE
2283             case ACK_TAG:   //msg fit into mempool
2284             {
2285                 /* Get is done, release message . Now put is not used yet*/
2286                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2287                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2288                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2289 #if ! USE_LRTS_MEMPOOL
2290                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2291 #else
2292                 DecreaseMsgInSend(msg);
2293 #endif
2294                 if(NoMsgInSend(msg))
2295                     buffered_send_msg -= GetMempoolsize(msg);
2296                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2297                 CmiFree(msg);
2298 #if CMI_EXERT_SEND_LARGE_CAP
2299                 SEND_large_pending--;
2300 #endif
2301                 break;
2302             }
2303 #endif
2304             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2305             {
2306 #if MULTI_THREAD_SEND
2307                 MallocControlMsg(header_tmp);
2308                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2309                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2310 #else
2311                 header_tmp = (CONTROL_MSG *) header;
2312 #endif
2313                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2314 #if CMI_EXERT_SEND_LARGE_CAP
2315                 SEND_large_pending--;
2316 #endif
2317                 void *msg = (void*)(header_tmp->source_addr);
2318                 int cur_seq = CmiGetMsgSeq(msg);
2319                 int offset = ONE_SEG*(cur_seq+1);
2320                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2321                 buffered_send_msg -= header_tmp->length;
2322                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2323                 if (remain_size < 0) remain_size = 0;
2324                 CmiSetMsgSize(msg, remain_size);
2325                 if(remain_size <= 0) //transaction done
2326                 {
2327                     CmiFree(msg);
2328                 }else if (header_tmp->total_length > offset)
2329                 {
2330                     CmiSetMsgSeq(msg, cur_seq+1);
2331                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2332                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2333                     //send next seg
2334                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2335                          // pipelining
2336                     if (header_tmp->seq_id == 1) {
2337                       int i;
2338                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2339                         int seq = cur_seq+i+2;
2340                         CmiSetMsgSeq(msg, seq-1);
2341                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2342                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2343                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2344                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2345                       }
2346                     }
2347                 }
2348 #if MULTI_THREAD_SEND
2349                 FreeControlMsg(header_tmp);
2350 #else
2351                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2352 #endif
2353                 break;
2354             }
2355 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2356             case PUT_DONE_TAG:  {   //persistent message
2357                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2358                 int size = ((CONTROL_MSG *) header)->length;
2359                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2360                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2361                 CmiReference(msg);
2362                 CMI_CHECK_CHECKSUM(msg, size);
2363                 handleOneRecvedMsg(size, msg); 
2364 #if PRINT_SYH
2365                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2366 #endif
2367                 break;
2368             }
2369 #endif
2370 #if CMK_DIRECT
2371             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2372                 //create a trigger message
2373                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2374                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2375                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2376                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2377                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2378                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2379                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2380                 break;
2381 #endif
2382             default:
2383                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2384                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2385                 printf("weird tag problem\n");
2386                 CmiAbort("Unknown tag\n");
2387             }               // end switch
2388 #if PRINT_SYH
2389             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2390 #endif
2391             smsg_recv_count ++;
2392             msg_tag = GNI_SMSG_ANY_TAG;
2393         } //endwhile GNI_SmsgGetNextWTag
2394     }   //end while GetEvent
2395     if(status == GNI_RC_ERROR_RESOURCE)
2396     {
2397         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2398         GNI_RC_CHECK("Smsg_rx_cq full", status);
2399     }
2400 }
2401
2402 static void printDesc(gni_post_descriptor_t *pd)
2403 {
2404     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2405 }
2406
2407 #if CQWRITE
2408 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2409 {
2410     gni_post_descriptor_t *pd;
2411     gni_return_t        status = GNI_RC_SUCCESS;
2412     
2413     MallocPostDesc(pd);
2414     pd->type = GNI_POST_CQWRITE;
2415     pd->cq_mode = GNI_CQMODE_SILENT;
2416     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2417     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2418     pd->cqwrite_value = data;
2419     pd->remote_mem_hndl = mem_hndl;
2420     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2421     GNI_RC_CHECK("GNI_PostCqWrite", status);
2422 }
2423 #endif
2424
2425 // register memory for a message
2426 // return mem handle
2427 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2428 {
2429     gni_return_t status = GNI_RC_SUCCESS;
2430
2431     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2432
2433 #if CMK_PERSISTENT_COMM
2434       // persistent message is always registered
2435       // BIG_MSG small pieces do not have malloc chunk header
2436     if (IS_PERSISTENT_MEMORY(msg)) {
2437         *memh = GetMemHndl(msg);
2438         return GNI_RC_SUCCESS;
2439     }
2440 #endif
2441     if(seqno == 0 
2442 #if CMK_PERSISTENT_COMM
2443          || seqno == PERSIST_SEQ
2444 #endif
2445       )
2446     {
2447         if(IsMemHndlZero((GetMemHndl(msg))))
2448         {
2449             msg = (void*)(msg);
2450             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2451             if(status == GNI_RC_SUCCESS)
2452                 *memh = GetMemHndl(msg);
2453         }
2454         else {
2455             *memh = GetMemHndl(msg);
2456         }
2457     }
2458     else {
2459         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2460         status = registerMemory(msg, size, memh, NULL); 
2461     }
2462     return status;
2463 }
2464
2465 // for BIG_MSG called on receiver side for receiving control message
2466 // LMSG_INIT_TAG
2467 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2468 {
2469 #if     USE_LRTS_MEMPOOL
2470     CONTROL_MSG         *request_msg;
2471     gni_return_t        status = GNI_RC_SUCCESS;
2472     void                *msg_data;
2473     gni_post_descriptor_t *pd;
2474     gni_mem_handle_t    msg_mem_hndl;
2475     int                 size, transaction_size, offset = 0;
2476     size_t              register_size = 0;
2477
2478     // initial a get to transfer data from the sender side */
2479     request_msg = (CONTROL_MSG *) header;
2480     size = request_msg->total_length;
2481     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2482     MallocPostDesc(pd);
2483 #if CMK_WITH_STATS 
2484     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2485 #endif
2486     if(request_msg->seq_id < 2)   {
2487 #if CMK_SMP_TRACE_COMMTHREAD 
2488         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2489 #endif
2490         msg_data = CmiAlloc(size);
2491         CmiSetMsgSeq(msg_data, 0);
2492         _MEMCHECK(msg_data);
2493     }
2494     else {
2495         offset = ONE_SEG*(request_msg->seq_id-1);
2496         msg_data = (char*)request_msg->dest_addr + offset;
2497     }
2498    
2499     pd->cqwrite_value = request_msg->seq_id;
2500
2501     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2502     SetMemHndlZero(pd->local_mem_hndl);
2503     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2504     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2505         if(NoMsgInRecv( (void*)(msg_data)))
2506             register_size = GetMempoolsize((void*)(msg_data));
2507     }
2508
2509     pd->first_operand = ALIGN64(size);                   //  total length
2510
2511     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2512         pd->type            = GNI_POST_FMA_GET;
2513     else
2514         pd->type            = GNI_POST_RDMA_GET;
2515     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2516     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2517     pd->length          = transaction_size;
2518     pd->local_addr      = (uint64_t) msg_data;
2519     pd->remote_addr     = request_msg->source_addr + offset;
2520     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2521 #if MULTI_THREAD_SEND
2522     pd->src_cq_hndl     = rdma_tx_cqh;
2523 #else
2524     pd->src_cq_hndl     = 0; 
2525 #endif
2526     pd->rdma_mode       = 0;
2527     pd->amo_cmd         = 0;
2528 #if CMI_EXERT_RECV_RDMA_CAP
2529     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2530 #endif
2531     //memory registration success
2532     if(status == GNI_RC_SUCCESS )
2533     {
2534         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2535         CMI_GNI_LOCK(lock)
2536 #if REMOTE_EVENT
2537         if( request_msg->seq_id == 0)
2538         {
2539             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2540             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2541             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2542         }
2543 #endif
2544
2545 #if CMK_WITH_STATS
2546         RDMA_TRY_SEND(pd->type)
2547 #endif
2548         if(pd->type == GNI_POST_RDMA_GET) 
2549         {
2550             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2551         }
2552         else
2553         {
2554             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2555         }
2556         CMI_GNI_UNLOCK(lock)
2557
2558         if(status == GNI_RC_SUCCESS )
2559         {
2560 #if CMI_EXERT_RECV_RDMA_CAP
2561             RDMA_pending++;
2562 #endif
2563             if(pd->cqwrite_value == 0)
2564             {
2565 #if MACHINE_DEBUG_LOG
2566                 buffered_recv_msg += register_size;
2567                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2568 #endif
2569                 IncreaseMsgInRecv(msg_data);
2570 #if CMK_SMP_TRACE_COMMTHREAD 
2571                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2572 #endif
2573             }
2574 #if  CMK_WITH_STATS
2575             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2576             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2577 #endif
2578         }
2579     }else
2580     {
2581         SetMemHndlZero((pd->local_mem_hndl));
2582     }
2583     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2584     {
2585 #if REMOTE_EVENT
2586         bufferRdmaMsg(sendRdmaBuf, inst_id, pd, request_msg->ack_index); 
2587 #else
2588         bufferRdmaMsg(sendRdmaBuf, inst_id, pd, -1); 
2589 #endif
2590     }else if (status != GNI_RC_SUCCESS) {
2591         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2592         GNI_RC_CHECK("GetLargeAFter posting", status);
2593     }
2594 #else
2595     CONTROL_MSG         *request_msg;
2596     gni_return_t        status;
2597     void                *msg_data;
2598     gni_post_descriptor_t *pd;
2599     RDMA_REQUEST        *rdma_request_msg;
2600     gni_mem_handle_t    msg_mem_hndl;
2601     //int source;
2602     // initial a get to transfer data from the sender side */
2603     request_msg = (CONTROL_MSG *) header;
2604     msg_data = CmiAlloc(request_msg->length);
2605     _MEMCHECK(msg_data);
2606
2607     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2608
2609     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2610     {
2611         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2612     }
2613
2614     MallocPostDesc(pd);
2615     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2616         pd->type            = GNI_POST_FMA_GET;
2617     else
2618         pd->type            = GNI_POST_RDMA_GET;
2619     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2620     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2621     pd->length          = ALIGN64(request_msg->length);
2622     pd->local_addr      = (uint64_t) msg_data;
2623     pd->remote_addr     = request_msg->source_addr;
2624     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2625 #if MULTI_THREAD_SEND
2626     pd->src_cq_hndl     = rdma_tx_cqh;
2627 #else
2628     pd->src_cq_hndl     = 0;
2629 #endif
2630     pd->rdma_mode       = 0;
2631     pd->amo_cmd         = 0;
2632
2633     //memory registration successful
2634     if(status == GNI_RC_SUCCESS)
2635     {
2636         pd->local_mem_hndl  = msg_mem_hndl;
2637        
2638         if(pd->type == GNI_POST_RDMA_GET) 
2639         {
2640             CMI_GNI_LOCK(rdma_tx_cq_lock)
2641             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2642             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2643         }
2644         else
2645         {
2646             CMI_GNI_LOCK(default_tx_cq_lock)
2647             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2648             CMI_GNI_UNLOCK(default_tx_cq_lock)
2649         }
2650
2651     }else
2652     {
2653         SetMemHndlZero(pd->local_mem_hndl);
2654     }
2655     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2656     {
2657         MallocRdmaRequest(rdma_request_msg);
2658         rdma_request_msg->next = 0;
2659         rdma_request_msg->destNode = inst_id;
2660         rdma_request_msg->pd = pd;
2661         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2662     }else {
2663         GNI_RC_CHECK("AFter posting", status);
2664     }
2665 #endif
2666 }
2667
2668 #if CQWRITE
2669 static void PumpCqWriteTransactions()
2670 {
2671
2672     gni_cq_entry_t          ev;
2673     gni_return_t            status;
2674     void                    *msg;  
2675     int                     msg_size;
2676     while(1) {
2677         //CMI_GNI_LOCK(my_cq_lock) 
2678         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2679         //CMI_GNI_UNLOCK(my_cq_lock)
2680         if(status != GNI_RC_SUCCESS) break;
2681         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2682 #if CMK_PERSISTENT_COMM
2683 #if PRINT_SYH
2684         printf(" %d CQ write event %p\n", myrank, msg);
2685 #endif
2686         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2687 #if PRINT_SYH
2688             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2689 #endif
2690             CmiReference(msg);
2691             msg_size = CmiGetMsgSize(msg);
2692             CMI_CHECK_CHECKSUM(msg, msg_size);
2693             handleOneRecvedMsg(msg_size, msg); 
2694             continue;
2695         }
2696 #endif
2697 #if ! USE_LRTS_MEMPOOL
2698        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2699 #else
2700         DecreaseMsgInSend(msg);
2701 #endif
2702         if(NoMsgInSend(msg))
2703             buffered_send_msg -= GetMempoolsize(msg);
2704         CmiFree(msg);
2705     };
2706     if(status == GNI_RC_ERROR_RESOURCE)
2707     {
2708         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2709     }
2710 }
2711 #endif
2712
2713 #if REMOTE_EVENT
2714 static void PumpRemoteTransactions( gni_cq_handle_t rx_cqh)
2715 {
2716     gni_cq_entry_t          ev;
2717     gni_return_t            status;
2718     void                    *msg;   
2719     int                     inst_id, index, type, size;
2720
2721     while(1) {
2722         CMI_GNI_LOCK(global_gni_lock)
2723 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
2724         status = GNI_CqGetEvent(rx_cqh, &ev);
2725 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2726         CMI_GNI_UNLOCK(global_gni_lock)
2727
2728         if(status != GNI_RC_SUCCESS) break;
2729
2730         inst_id = GNI_CQ_GET_INST_ID(ev);
2731         index = GET_INDEX(inst_id);
2732         type = GET_TYPE(inst_id);
2733         switch (type) {
2734         case 0:    // ACK
2735             CmiAssert(index>=0 && index<ackPool.size);
2736             CMI_GNI_LOCK(ackPool.lock);
2737             CmiAssert(GetIndexType(ackPool, index) == 1);
2738             msg = GetIndexAddress(ackPool, index);
2739             CMI_GNI_UNLOCK(ackPool.lock);
2740 #if PRINT_SYH
2741             printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2742 #endif
2743 #if ! USE_LRTS_MEMPOOL
2744            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2745 #else
2746             DecreaseMsgInSend(msg);
2747 #endif
2748             if(NoMsgInSend(msg))
2749                 buffered_send_msg -= GetMempoolsize(msg);
2750             CmiFree(msg);
2751             IndexPool_freeslot(&ackPool, index);
2752 #if CMI_EXERT_SEND_LARGE_CAP
2753             SEND_large_pending--;
2754 #endif
2755             break;
2756 #if CMK_PERSISTENT_COMM
2757         case 1:  {    // PERSISTENT
2758             CmiLock(persistPool.lock);
2759             CmiAssert(GetIndexType(persistPool, index) == 2);
2760             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2761             CmiUnlock(persistPool.lock);
2762             START_EVENT();
2763             msg = slot->destBuf[0].destAddress;
2764             size = CmiGetMsgSize(msg);
2765             CmiReference(msg);
2766             CMI_CHECK_CHECKSUM(msg, size);
2767             TRACE_COMM_CREATION(EVENT_TIME(), msg);
2768             handleOneRecvedMsg(size, msg); 
2769             break;
2770             }
2771 #endif
2772         default:
2773             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2774             CmiAbort("PumpRemoteTransactions: unknown type");
2775         }
2776     }
2777     if(status == GNI_RC_ERROR_RESOURCE)
2778     {
2779         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2780     }
2781 }
2782 #endif
2783
2784 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2785 {
2786     gni_cq_entry_t          ev;
2787     gni_return_t            status;
2788     uint64_t                type, inst_id;
2789     gni_post_descriptor_t   *tmp_pd;
2790     MSG_LIST                *ptr;
2791     CONTROL_MSG             *ack_msg_tmp;
2792     ACK_MSG                 *ack_msg;
2793     uint8_t                 msg_tag;
2794 #if CMK_DIRECT
2795     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2796 #endif
2797     SMSG_QUEUE         *queue = &smsg_queue;
2798
2799     while(1) {
2800         CMI_GNI_LOCK(my_cq_lock) 
2801         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2802         CMI_GNI_UNLOCK(my_cq_lock)
2803         if(status != GNI_RC_SUCCESS) break;
2804         
2805         type = GNI_CQ_GET_TYPE(ev);
2806         if (type == GNI_CQ_EVENT_TYPE_POST)
2807         {
2808
2809 #if CMI_EXERT_RECV_RDMA_CAP
2810             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2811             RDMA_pending--;
2812 #endif
2813             inst_id     = GNI_CQ_GET_INST_ID(ev);
2814 #if PRINT_SYH
2815             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2816 #endif
2817             CMI_GNI_LOCK(my_cq_lock)
2818             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2819             CMI_GNI_UNLOCK(my_cq_lock)
2820
2821             switch (tmp_pd->type) {
2822 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2823             case GNI_POST_RDMA_PUT:
2824 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2825                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2826 #endif
2827             case GNI_POST_FMA_PUT:
2828                 if(tmp_pd->amo_cmd == 1) {
2829 #if CMK_DIRECT
2830                     //sender ACK to receiver to trigger it is done
2831                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2832                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2833                     msg_tag = DIRECT_PUT_DONE_TAG;
2834 #endif
2835                 }
2836                 else {
2837                     CmiFree((void *)tmp_pd->local_addr);
2838 #if REMOTE_EVENT
2839                     FreePostDesc(tmp_pd);
2840                     continue;
2841 #elif CQWRITE
2842                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2843                     FreePostDesc(tmp_pd);
2844                     continue;
2845 #else
2846                     MallocControlMsg(ack_msg_tmp);
2847                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2848                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2849                     ack_msg_tmp->length  = tmp_pd->length;
2850                     msg_tag = PUT_DONE_TAG;
2851 #endif
2852                 }
2853                 break;
2854 #endif
2855             case GNI_POST_RDMA_GET:
2856             case GNI_POST_FMA_GET:  {
2857 #if  ! USE_LRTS_MEMPOOL
2858                 MallocControlMsg(ack_msg_tmp);
2859                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2860                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2861                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2862                 msg_tag = ACK_TAG;  
2863 #else
2864 #if CMK_WITH_STATS
2865                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2866 #endif
2867                 int seq_id = tmp_pd->cqwrite_value;
2868                 if(seq_id > 0)      // BIG_MSG
2869                 {
2870                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2871                     MallocControlMsg(ack_msg_tmp);
2872                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2873                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2874                     ack_msg_tmp->seq_id = seq_id;
2875                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2876                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2877                     ack_msg_tmp->length = tmp_pd->length;
2878                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2879                     msg_tag = BIG_MSG_TAG; 
2880                 } 
2881                 else
2882                 {
2883                     msg_tag = ACK_TAG; 
2884 #if  !REMOTE_EVENT && !CQWRITE
2885                     MallocAckMsg(ack_msg);
2886                     ack_msg->source_addr = tmp_pd->remote_addr;
2887 #endif
2888                 }
2889 #endif
2890                 break;
2891             }
2892             case  GNI_POST_CQWRITE:
2893                    FreePostDesc(tmp_pd);
2894                    continue;
2895             default:
2896                 CmiPrintf("type=%d\n", tmp_pd->type);
2897                 CmiAbort("PumpLocalTransactions: unknown type!");
2898             }      /* end of switch */
2899
2900 #if CMK_DIRECT
2901             if (tmp_pd->amo_cmd == 1) {
2902                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2903                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2904             }
2905             else
2906 #endif
2907             if (msg_tag == ACK_TAG) {
2908 #if !REMOTE_EVENT
2909 #if   !CQWRITE
2910                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2911                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2912 #else
2913                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2914 #endif
2915 #endif
2916             }
2917             else {
2918                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2919                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2920             }
2921 #if CMK_PERSISTENT_COMM
2922             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2923 #endif
2924             {
2925                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2926 #if PRINT_SYH
2927                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2928 #endif
2929                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2930                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2931
2932                     START_EVENT();
2933                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2934                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2935 #if MACHINE_DEBUG_LOG
2936                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2937                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2938                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2939 #endif
2940                     TRACE_COMM_CREATION(EVENT_TIME(), (void*)tmp_pd->local_addr);
2941                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2942                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2943                 }else if(msg_tag == BIG_MSG_TAG){
2944                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2945                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2946                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2947                         START_EVENT();
2948 #if PRINT_SYH
2949                         printf("Pipeline msg done [%d]\n", myrank);
2950 #endif
2951 #if     CMK_SMP_TRACE_COMMTHREAD
2952                         if( tmp_pd->cqwrite_value == 1)
2953                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2954 #endif
2955                         TRACE_COMM_CREATION(EVENT_TIME(), msg);
2956                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2957                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2958                     }
2959                 }
2960             }
2961             FreePostDesc(tmp_pd);
2962         }
2963     } //end while
2964     if(status == GNI_RC_ERROR_RESOURCE)
2965     {
2966         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2967         GNI_RC_CHECK("Smsg_tx_cq full", status);
2968     }
2969 }
2970
2971 #if CMK_SMP
2972
2973 static void  SendRdmaMsg( PCQueue sendqueue)
2974 {
2975     gni_return_t            status = GNI_RC_SUCCESS;
2976     gni_mem_handle_t        msg_mem_hndl;
2977     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2978     RDMA_REQUEST            *pre = 0;
2979     uint64_t                register_size = 0;
2980     void                    *msg;
2981     int                     i;
2982     int len = PCQueueLength(sendqueue);
2983     for (i=0; i<len; i++)
2984     {
2985 #if CMI_EXERT_RECV_RDMA_CAP
2986         if( RDMA_pending >= RDMA_cap) break;
2987 #endif
2988         CMI_PCQUEUEPOP_LOCK( sendqueue)
2989         ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
2990         CMI_PCQUEUEPOP_UNLOCK( sendqueue)
2991         if (ptr == NULL) break;
2992         
2993         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2994         gni_post_descriptor_t *pd = ptr->pd;
2995         
2996         msg = (void*)(pd->local_addr);
2997         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2998         register_size = 0;
2999         if(pd->cqwrite_value == 0) {
3000             if(NoMsgInRecv(msg))
3001                 register_size = GetMempoolsize(msg);
3002         }
3003
3004         if(status == GNI_RC_SUCCESS)        //mem register good
3005         {
3006             int destNode = ptr->destNode;
3007             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
3008             CMI_GNI_LOCK(lock);
3009 #if REMOTE_EVENT
3010             if( pd->cqwrite_value == 0) {
3011                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3012                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
3013                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3014             }
3015 #if CMK_PERSISTENT_COMM
3016             else if (pd->cqwrite_value == PERSIST_SEQ) {
3017                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3018                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
3019                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3020             }
3021 #endif
3022 #endif
3023 #if CMK_WITH_STATS
3024             RDMA_TRY_SEND(pd->type)
3025 #endif
3026 #if CMK_SMP_TRACE_COMMTHREAD
3027             if(IS_PUT(pd->type))
3028             {
3029                  START_EVENT();
3030                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
3031             }
3032 #endif
3033
3034             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
3035             {
3036                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
3037             }
3038             else
3039             {
3040                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3041             }
3042             CMI_GNI_UNLOCK(lock);
3043             
3044             if(status == GNI_RC_SUCCESS)    //post good
3045             {
3046 #if CMI_EXERT_RECV_RDMA_CAP
3047                 RDMA_pending ++;
3048 #endif
3049                 if(pd->cqwrite_value == 0)
3050                 {
3051 #if CMK_SMP_TRACE_COMMTHREAD 
3052                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3053 #endif
3054                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3055                 }
3056 #if  CMK_WITH_STATS
3057                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3058                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3059 #endif
3060 #if MACHINE_DEBUG_LOG
3061                 buffered_recv_msg += register_size;
3062                 MACHSTATE(8, "GO request from buffered\n"); 
3063 #endif
3064 #if PRINT_SYH
3065                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
3066 #endif
3067             }else           // cannot post
3068             {
3069                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3070 #if PRINT_SYH
3071                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3072 #endif
3073                 break;
3074             }
3075         } else          //memory registration fails
3076         {
3077             PCQueuePush(sendqueue, (char*)ptr);
3078         }
3079     } //end while
3080 }
3081
3082
3083 #else
3084 static void  SendRdmaMsg()
3085 {
3086     gni_return_t            status = GNI_RC_SUCCESS;
3087     gni_mem_handle_t        msg_mem_hndl;
3088     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
3089     RDMA_REQUEST            *pre = 0;
3090     uint64_t                register_size = 0;
3091     void                    *msg;
3092     int                     i;
3093     ptr = sendRdmaBuf;
3094     while (ptr!=0 )
3095     {
3096 #if CMI_EXERT_RECV_RDMA_CAP
3097          if( RDMA_pending >= RDMA_cap) break;
3098 #endif
3099         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
3100         gni_post_descriptor_t *pd = ptr->pd;
3101         
3102         msg = (void*)(pd->local_addr);
3103         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
3104         register_size = 0;
3105         if(pd->cqwrite_value == 0) {
3106             if(NoMsgInRecv(msg))
3107                 register_size = GetMempoolsize(msg);
3108         }
3109
3110         if(status == GNI_RC_SUCCESS)        //mem register good
3111         {
3112             int destNode = ptr->destNode;
3113             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
3114             CMI_GNI_LOCK(lock);
3115 #if REMOTE_EVENT
3116             if( pd->cqwrite_value == 0) {
3117                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3118                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
3119                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3120             }
3121 #if CMK_PERSISTENT_COMM
3122             else if (pd->cqwrite_value == PERSIST_SEQ) {
3123                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3124                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
3125                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3126             }
3127 #endif
3128 #endif
3129 #if CMK_WITH_STATS
3130             RDMA_TRY_SEND(pd->type)
3131 #endif
3132 #if CMK_SMP_TRACE_COMMTHREAD
3133             if(IS_PUT(pd->type))
3134             {
3135                  START_EVENT();
3136                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
3137             }
3138 #endif
3139
3140             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
3141             {
3142                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
3143             }
3144             else
3145             {
3146                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3147             }
3148             CMI_GNI_UNLOCK(lock);
3149             
3150             if(status == GNI_RC_SUCCESS)    //post good
3151             {
3152 #if CMI_EXERT_RECV_RDMA_CAP
3153                 RDMA_pending ++;
3154 #endif
3155                 tmp_ptr = ptr;
3156                 if(pre != 0) {
3157                     pre->next = ptr->next;
3158                 }
3159                 else {
3160                     sendRdmaBuf = ptr->next;
3161                 }
3162                 ptr = ptr->next;
3163                 FreeRdmaRequest(tmp_ptr);
3164                 if(pd->cqwrite_value == 0)
3165                 {
3166 #if CMK_SMP_TRACE_COMMTHREAD 
3167                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3168 #endif
3169                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3170                 }
3171 #if  CMK_WITH_STATS
3172                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3173                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3174 #endif
3175 #if MACHINE_DEBUG_LOG
3176                 buffered_recv_msg += register_size;
3177                 MACHSTATE(8, "GO request from buffered\n"); 
3178 #endif
3179 #if PRINT_SYH
3180                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
3181 #endif
3182             }else           // cannot post
3183             {
3184                 pre = ptr;
3185                 ptr = ptr->next;
3186 #if PRINT_SYH
3187                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3188 #endif
3189                 break;
3190             }
3191         } else          //memory registration fails
3192         {
3193             pre = ptr;
3194             ptr = ptr->next;
3195         }
3196     } //end while
3197     if(ptr == 0)
3198         sendRdmaTail = pre;
3199 }
3200 #endif
3201
3202 static inline gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
3203 {
3204     CONTROL_MSG         *control_msg_tmp;
3205     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
3206
3207     MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3208     if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {   
3209             /* connection not exists yet */
3210 #if CMK_SMP
3211             /* non-smp case, connect is issued in send_smsg_message */
3212         if (smsg_connected_flag[ptr->destNode] == 0)
3213             connect_to(ptr->destNode); 
3214 #endif
3215     }
3216     else
3217     switch(ptr->tag)
3218     {
3219     case SMALL_DATA_TAG:
3220         status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3221         if(status == GNI_RC_SUCCESS)
3222         {
3223             CmiFree(ptr->msg);
3224         }
3225         break;
3226     case LMSG_INIT_TAG:
3227         control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3228         status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr);
3229         break;
3230 #if !REMOTE_EVENT && !CQWRITE
3231     case ACK_TAG:
3232         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3233         if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3234         break;
3235 #endif
3236     case BIG_MSG_TAG:
3237         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3238         if(status == GNI_RC_SUCCESS)
3239         {
3240             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3241         }
3242         break;
3243 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3244     case PUT_DONE_TAG:
3245         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3246         if(status == GNI_RC_SUCCESS)
3247         {
3248             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3249         }
3250         break;
3251 #endif
3252 #if CMK_DIRECT
3253     case DIRECT_PUT_DONE_TAG:
3254         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3255         if(status == GNI_RC_SUCCESS)
3256         {
3257             free((CMK_DIRECT_HEADER*)ptr->msg);
3258         }
3259         break;
3260 #endif
3261     default:
3262         printf("Weird tag\n");
3263         CmiAbort("should not happen\n");
3264     }       // end switch
3265     return status;
3266 }
3267
3268 // return 1 if all messages are sent
3269 #if CMK_SMP
3270
3271 #if ONE_SEND_QUEUE
3272
3273 static int SendBufferMsg(SMSG_QUEUE *queue)
3274 {
3275     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3276     CONTROL_MSG         *control_msg_tmp;
3277     gni_return_t        status;
3278     int                 done = 1;
3279     uint64_t            register_size;
3280     void                *register_addr;
3281     int                 index_previous = -1;
3282 #if     CMI_SENDBUFFERSMSG_CAP
3283     int                 sent_length = 0;
3284 #endif
3285     int          index = 0;
3286     memset(destpe_avail, 0, mysize * sizeof(char));
3287     for (index=0; index<1; index++)
3288     {
3289         int i, len = PCQueueLength(queue->sendMsgBuf);
3290         for (i=0; i<len; i++) 
3291         {
3292             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3293             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3294             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3295             if(ptr == NULL) break;
3296             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3297                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3298                 continue;
3299             }
3300             status = _sendOneBufferedSmsg(queue, ptr);
3301 #if CMI_SENDBUFFERSMSG_CAP
3302             sent_length++;
3303 #endif
3304             if(status == GNI_RC_SUCCESS)
3305             {
3306 #if PRINT_SYH
3307                 buffered_smsg_counter--;
3308                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3309 #endif
3310                 FreeMsgList(ptr);
3311             }else {
3312                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3313                 done = 0;
3314                 if(status == GNI_RC_ERROR_RESOURCE)
3315                 {
3316                     destpe_avail[ptr->destNode] = 1;
3317                 }
3318             } 
3319         } //end while
3320     }   // end pooling for all cores
3321     return done;
3322 }
3323
3324 #else   /*  ! ONE_SEND_QUEUE  */
3325
3326 static int SendBufferMsg(SMSG_QUEUE *queue)
3327 {
3328     MSG_LIST            *ptr;
3329     gni_return_t        status;
3330     int                 done = 1;
3331 #if     CMI_SENDBUFFERSMSG_CAP
3332     int                 sent_length = 0;
3333 #endif
3334     static int          index = -1;
3335     int idx;
3336 #if SMP_LOCKS
3337     int nonempty = PCQueueLength(nonEmptyQueues);
3338     for(idx =0; idx<nonempty; idx++) 
3339     {
3340         index++;  if (index >= nonempty) index = 0;
3341 #if CMI_SENDBUFFERSMSG_CAP
3342         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3343 #endif
3344         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
3345         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
3346         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
3347         if(current_list == NULL) break; 
3348         PCQueue current_queue= current_list->sendSmsgBuf;
3349         CmiLock(current_list->lock);
3350         int i, len = PCQueueLength(current_queue);
3351         current_list->pushed = 0;
3352         CmiUnlock(current_list->lock);
3353 #else      /* ! SMP_LOCKS */
3354     for(idx =0; idx<mysize; idx++) 
3355     {
3356         index++;  if (index == mysize) index = 0;
3357 #if CMI_SENDBUFFERSMSG_CAP
3358         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3359 #endif
3360         //if (index == myrank) continue;
3361         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3362         int i, len = PCQueueLength(current_queue);
3363 #endif
3364         for (i=0; i<len; i++)  {
3365             CMI_PCQUEUEPOP_LOCK(current_queue)
3366             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3367             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3368             if (ptr == 0) break;
3369
3370             status = _sendOneBufferedSmsg(queue, ptr);
3371 #if CMI_SENDBUFFERSMSG_CAP
3372             sent_length++;
3373 #endif
3374             if(status == GNI_RC_SUCCESS)
3375             {
3376 #if PRINT_SYH
3377                 buffered_smsg_counter--;
3378                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3379 #endif
3380                 FreeMsgList(ptr);
3381             }else {
3382                 PCQueuePush(current_queue, (char*)ptr);
3383                 done = 0;
3384                 if(status == GNI_RC_ERROR_RESOURCE)
3385                 {
3386                     break;
3387                 }
3388             } 
3389         } //end while
3390 #if SMP_LOCKS
3391         CmiLock(current_list->lock);
3392         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3393         {
3394             current_list->pushed = 1;
3395             PCQueuePush(nonEmptyQueues, (char*)current_list);
3396         }
3397         CmiUnlock(current_list->lock); 
3398 #endif
3399     }   // end pooling for all cores
3400     return done;
3401 }
3402
3403 #endif
3404
3405 #else              /* non-smp */
3406
3407 static int SendBufferMsg(SMSG_QUEUE *queue)
3408 {
3409     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3410     CONTROL_MSG         *control_msg_tmp;
3411     gni_return_t        status;
3412     int                 done = 1;
3413     uint64_t            register_size;
3414     void                *register_addr;
3415     int                 index_previous = -1;
3416 #if     CMI_SENDBUFFERSMSG_CAP
3417     int                 sent_length = 0;
3418 #endif
3419     int index = queue->smsg_head_index;
3420     while(index != -1)
3421     {
3422         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3423         pre = 0;
3424         while(ptr != 0)
3425         {
3426             status = _sendOneBufferedSmsg(queue, ptr);
3427 #if CMI_SENDBUFFERSMSG_CAP
3428             sent_length++;
3429 #endif
3430             if(status == GNI_RC_SUCCESS)
3431             {
3432 #if PRINT_SYH
3433                 buffered_smsg_counter--;
3434                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3435 #endif
3436                 tmp_ptr = ptr;
3437                 if(pre)
3438                 {
3439                     ptr = pre ->next = ptr->next;
3440                 }else
3441                 {
3442                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3443                 }
3444                 FreeMsgList(tmp_ptr);
3445             }else {
3446                 pre = ptr;
3447                 ptr=ptr->next;
3448                 done = 0;
3449                 if(status == GNI_RC_ERROR_RESOURCE)
3450                 {
3451                     break;
3452                 }
3453             } 
3454         } //end while
3455         if(ptr == 0)
3456             queue->smsg_msglist_index[index].tail = pre;
3457         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3458         {
3459             if(index_previous != -1)
3460                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3461             else
3462                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3463         }
3464         else {
3465             index_previous = index;
3466         }
3467         index = queue->smsg_msglist_index[index].next;
3468
3469     }   // end pooling for all cores
3470     return done;
3471 }
3472 #endif
3473
3474 static void ProcessDeadlock();
3475 void LrtsAdvanceCommunication(int whileidle)
3476 {
3477     static int count = 0;
3478     /*  Receive Msg first */
3479 #if CMK_SMP_TRACE_COMMTHREAD
3480     double startT, endT;
3481 #endif
3482     if (useDynamicSMSG && whileidle)
3483     {
3484 #if CMK_SMP_TRACE_COMMTHREAD
3485         startT = CmiWallTimer();
3486 #endif
3487         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3488 #if CMK_SMP_TRACE_COMMTHREAD
3489         endT = CmiWallTimer();
3490         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3491 #endif
3492     }
3493
3494     // Receiving small messages and persistent
3495 #if CMK_SMP_TRACE_COMMTHREAD
3496     startT = CmiWallTimer();
3497 #endif
3498     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3499     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3500 #if CMK_SMP_TRACE_COMMTHREAD
3501     endT = CmiWallTimer();
3502     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3503 #endif
3504
3505 #if REMOTE_EVENT
3506     RECV_PERSISTENT
3507 #endif
3508     SEND_PERSISTENT
3509
3510     //sending small messages
3511     /* Send buffered Message */
3512 #if CMK_SMP_TRACE_COMMTHREAD
3513     startT = CmiWallTimer();
3514 #endif
3515 #if CMK_USE_OOB
3516     if (SendBufferMsg(&smsg_oob_queue) == 1)
3517 #endif
3518     {
3519         STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
3520     }
3521 #if CMK_SMP_TRACE_COMMTHREAD
3522     endT = CmiWallTimer();
3523     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3524 #endif
3525
3526 #if REMOTE_EVENT
3527     RECV_PERSISTENT
3528 #endif
3529     SEND_PERSISTENT
3530
3531     //Pump Get messages or PUT messages
3532 #if CMK_SMP_TRACE_COMMTHREAD
3533     startT = CmiWallTimer();
3534 #endif
3535     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3536 #if MULTI_THREAD_SEND
3537     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3538 #endif
3539 #if CMK_SMP_TRACE_COMMTHREAD
3540     endT = CmiWallTimer();
3541     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3542 #endif
3543
3544 #if REMOTE_EVENT
3545     RECV_PERSISTENT
3546 #endif
3547     SEND_PERSISTENT
3548
3549     //Pump Remote event
3550
3551 #if CMK_SMP_TRACE_COMMTHREAD
3552     startT = CmiWallTimer();
3553 #endif
3554
3555 #if CQWRITE
3556     PumpCqWriteTransactions();
3557 #endif
3558
3559 #if REMOTE_EVENT
3560     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions( rdma_rx_cqh));
3561 #endif
3562
3563 #if CMK_SMP_TRACE_COMMTHREAD
3564     endT = CmiWallTimer();
3565     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3566 #endif
3567  
3568     //Send pending RDMA transactions
3569 #if REMOTE_EVENT
3570     RECV_PERSISTENT
3571 #endif
3572     SEND_PERSISTENT
3573
3574 #if CMK_SMP_TRACE_COMMTHREAD
3575     startT = CmiWallTimer();
3576 #endif
3577 #if CMK_SMP
3578     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3579 #else
3580     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3581 #endif
3582     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3583 #if CMK_SMP_TRACE_COMMTHREAD
3584     endT = CmiWallTimer();
3585     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3586 #endif
3587
3588 #if REMOTE_EVENT
3589     RECV_PERSISTENT
3590 #endif
3591     SEND_PERSISTENT
3592 #if CMK_SMP && ! LARGEPAGE
3593     if (_detected_hang)  ProcessDeadlock();
3594 #endif
3595 }
3596
3597 static void set_smsg_max()
3598 {
3599     char *env;
3600
3601     if(mysize <=512)
3602     {
3603         SMSG_MAX_MSG = 1024;
3604     }else if (mysize <= 4096)
3605     {
3606         SMSG_MAX_MSG = 1024;
3607     }else if (mysize <= 16384)
3608     {
3609         SMSG_MAX_MSG = 512;
3610     }else {
3611         SMSG_MAX_MSG = 256;
3612     }
3613
3614     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3615     if (env) SMSG_MAX_MSG = atoi(env);
3616     CmiAssert(SMSG_MAX_MSG > 0);
3617 }    
3618
3619 /* useDynamicSMSG */
3620 static void _init_dynamic_smsg()
3621 {
3622     gni_return_t status;
3623     uint32_t     vmdh_index = -1;
3624     int i;
3625
3626     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3627     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3628     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3629     for(i=0; i<mysize; i++) {
3630         smsg_connected_flag[i] = 0;
3631         smsg_attr_vector_local[i] = NULL;
3632         smsg_attr_vector_remote[i] = NULL;
3633     }
3634
3635     set_smsg_max();
3636
3637     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3638     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3639     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3640     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3641     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3642
3643     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3644     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3645     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3646     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3647     mailbox_list->offset = 0;
3648     mailbox_list->next = 0;
3649     
3650     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3651         mailbox_list->size, smsg_rx_cqh,
3652         GNI_MEM_READWRITE,   
3653         vmdh_index,
3654         &(mailbox_list->mem_hndl));
3655     GNI_RC_CHECK("MEMORY registration for smsg", status);
3656
3657     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3658     GNI_RC_CHECK("Unbound EP", status);
3659     
3660     alloc_smsg_attr(&send_smsg_attr);
3661
3662     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3663     GNI_RC_CHECK("post unbound datagram", status);
3664
3665       /* always pre-connect to proc 0 */
3666     //if (myrank != 0) connect_to(0);
3667
3668     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3669     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3670 }
3671
3672 static void _init_static_smsg()
3673 {
3674     gni_smsg_attr_t      *smsg_attr;
3675     gni_smsg_attr_t      remote_smsg_attr;
3676     gni_smsg_attr_t      *smsg_attr_vec;
3677     gni_mem_handle_t     my_smsg_mdh_mailbox;
3678     int      ret, i;
3679     gni_return_t status;
3680     uint32_t              vmdh_index = -1;
3681     mdh_addr_t            base_infor;
3682     mdh_addr_t            *base_addr_vec;
3683
3684     set_smsg_max();
3685     
3686     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3687     
3688     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3689     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3690     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3691     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3692     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3693     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3694     CmiAssert(ret == 0);
3695     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3696     
3697     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3698             smsg_memlen*(mysize), smsg_rx_cqh,
3699             GNI_MEM_READWRITE,   
3700             vmdh_index,
3701             &my_smsg_mdh_mailbox);
3702     register_memory_size += smsg_memlen*(mysize);
3703     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3704
3705     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3706     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3707
3708     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3709     base_infor.mdh =  my_smsg_mdh_mailbox;
3710     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3711
3712     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3713  
3714     for(i=0; i<mysize; i++)
3715     {
3716         if(i==myrank)
3717             continue;
3718         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3719         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3720         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3721         smsg_attr[i].mbox_offset = i*smsg_memlen;
3722         smsg_attr[i].buff_size = smsg_memlen;
3723         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3724         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3725     }
3726
3727     for(i=0; i<mysize; i++)
3728     {
3729         if (myrank == i) continue;
3730
3731         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3732         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3733         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3734         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3735         remote_smsg_attr.buff_size = smsg_memlen;
3736         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3737         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3738
3739         /* initialize the smsg channel */
3740         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3741         GNI_RC_CHECK("SMSG Init", status);
3742     } //end initialization
3743
3744     free(base_addr_vec);
3745     free(smsg_attr);
3746
3747     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3748     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3749
3750
3751 inline
3752 static void _init_send_queue(SMSG_QUEUE *queue)
3753 {
3754      int i;
3755 #if ONE_SEND_QUEUE
3756      queue->sendMsgBuf = PCQueueCreate();
3757      destpe_avail = (char*)malloc(mysize * sizeof(char));
3758 #else
3759      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3760 #if CMK_SMP && SMP_LOCKS
3761      nonEmptyQueues = PCQueueCreate();
3762 #endif
3763      for(i =0; i<mysize; i++)
3764      {
3765 #if CMK_SMP
3766          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3767 #if SMP_LOCKS
3768          queue->smsg_msglist_index[i].pushed = 0;
3769          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3770 #endif
3771 #else
3772          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3773          queue->smsg_msglist_index[i].next = -1;
3774          queue->smsg_head_index = -1;
3775 #endif
3776         
3777      }
3778 #endif
3779 }
3780
3781 inline
3782 static void _init_smsg()
3783 {
3784     if(mysize > 1) {
3785         if (useDynamicSMSG)
3786             _init_dynamic_smsg();
3787         else
3788             _init_static_smsg();
3789     }
3790
3791     _init_send_queue(&smsg_queue);
3792 #if CMK_USE_OOB
3793     _init_send_queue(&smsg_oob_queue);
3794 #endif
3795 }
3796
3797 static void _init_static_msgq()
3798 {
3799     gni_return_t status;
3800     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3801     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3802     msgq_attrs.smsg_q_sz = 1;
3803     msgq_attrs.rcv_pool_sz = 1;
3804     msgq_attrs.num_msgq_eps = 2;
3805     msgq_attrs.nloc_insts = 8;
3806     msgq_attrs.modes = 0;
3807     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3808
3809     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3810     GNI_RC_CHECK("MSGQ Init", status);
3811
3812
3813 }
3814
3815
3816 static CmiUInt8 total_mempool_size = 0;
3817 static CmiUInt8 total_mempool_calls = 0;
3818
3819 #if USE_LRTS_MEMPOOL
3820
3821 #if CMK_PERSISTENT_COMM
3822 void *alloc_persistent_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3823 {
3824     void *pool;
3825     int ret;
3826     gni_return_t status = GNI_RC_SUCCESS;
3827
3828     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3829     if (*size < default_size) *size = default_size;
3830 #if LARGEPAGE
3831     // round up to be multiple of _tlbpagesize
3832     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3833     *size = ALIGNHUGEPAGE(*size);
3834 #endif
3835     total_mempool_size += *size;
3836     total_mempool_calls += 1;
3837 #if   !LARGEPAGE
3838     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3839     {
3840         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3841         CmiAbort("alloc_mempool_block");
3842     }
3843 #endif
3844 #if LARGEPAGE
3845     pool = my_get_huge_pages(*size);
3846     ret = pool==NULL;
3847 #else
3848     ret = posix_memalign(&pool, ALIGNBUF, *size);
3849 #endif
3850     if (ret != 0) {
3851       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3852       if (ret == ENOMEM)
3853         CmiAbort("alloc_mempool_block: out of memory.");
3854       else
3855         CmiAbort("alloc_mempool_block: posix_memalign failed");
3856     }
3857 #if LARGEPAGE
3858     CmiMemLock();
3859     register_count++;
3860     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, persistent_rx_cqh, status);
3861     CmiMemUnlock();
3862     if(status != GNI_RC_SUCCESS) {
3863         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3864 sweep_mempool(CpvAccess(mempool));
3865     }
3866     GNI_RC_CHECK("MEMORY_REGISTER", status);
3867 #else
3868     SetMemHndlZero((*mem_hndl));
3869 #endif
3870     return pool;
3871 }
3872 #endif
3873
3874 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3875 {
3876     void *pool;
3877     int ret;
3878     gni_return_t status = GNI_RC_SUCCESS;
3879
3880     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3881     if (*size < default_size) *size = default_size;
3882 #if LARGEPAGE
3883     // round up to be multiple of _tlbpagesize
3884     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3885     *size = ALIGNHUGEPAGE(*size);
3886 #endif
3887     total_mempool_size += *size;
3888     total_mempool_calls += 1;
3889 #if   !LARGEPAGE
3890     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3891     {
3892         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3893         CmiAbort("alloc_mempool_block");
3894     }
3895 #endif
3896 #if LARGEPAGE
3897     pool = my_get_huge_pages(*size);
3898     ret = pool==NULL;
3899 #else
3900     ret = posix_memalign(&pool, ALIGNBUF, *size);
3901 #endif
3902     if (ret != 0) {
3903       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3904       if (ret == ENOMEM)
3905         CmiAbort("alloc_mempool_block: out of memory.");
3906       else
3907         CmiAbort("alloc_mempool_block: posix_memalign failed");
3908     }
3909 #if LARGEPAGE
3910     CmiMemLock();
3911     register_count++;
3912     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3913     CmiMemUnlock();
3914     if(status != GNI_RC_SUCCESS) {
3915         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3916 sweep_mempool(CpvAccess(mempool));
3917     }
3918     GNI_RC_CHECK("MEMORY_REGISTER", status);
3919 #else
3920     SetMemHndlZero((*mem_hndl));
3921 #endif
3922     return pool;
3923 }
3924
3925 // ptr is a block head pointer
3926 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3927 {
3928     if(!(IsMemHndlZero(mem_hndl)))
3929     {
3930         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3931     }
3932 #if LARGEPAGE
3933     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3934 #else
3935     free(ptr);
3936 #endif
3937 }
3938 #endif
3939
3940 void LrtsPreCommonInit(int everReturn){
3941 #if USE_LRTS_MEMPOOL
3942     CpvInitialize(mempool_type*, mempool);
3943     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3944 #if CMK_PERSISTENT_COMM
3945     CpvInitialize(mempool_type*, persistent_mempool);
3946     CpvAccess(persistent_mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3947 #endif
3948     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3949 #endif
3950 }
3951
3952 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3953 {
3954     register int            i;
3955     int                     rc;
3956     int                     device_id = 0;
3957     unsigned int            remote_addr;
3958     gni_cdm_handle_t        cdm_hndl;
3959     gni_return_t            status = GNI_RC_SUCCESS;
3960     uint32_t                vmdh_index = -1;
3961     uint8_t                 ptag;
3962     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3963     int                     first_spawned;
3964     int                     physicalID;
3965     char                   *env;
3966
3967     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3968     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3969     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3970    
3971     status = PMI_Init(&first_spawned);
3972     GNI_RC_CHECK("PMI_Init", status);
3973
3974     status = PMI_Get_size(&mysize);
3975     GNI_RC_CHECK("PMI_Getsize", status);
3976
3977     status = PMI_Get_rank(&myrank);
3978     GNI_RC_CHECK("PMI_getrank", status);
3979
3980     //physicalID = CmiPhysicalNodeID(myrank);
3981     
3982     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3983
3984     *myNodeID = myrank;
3985     *numNodes = mysize;
3986   
3987 #if MULTI_THREAD_SEND
3988     /* Currently, we only consider the case that comm. thread will only recv msgs */
3989     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3990 #endif
3991
3992 #if CMI_EXERT_SEND_LARGE_CAP
3993     CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
3994 #endif
3995
3996 #if CMI_SENDBUFFERSMSG_CAP
3997     CmiGetArgInt(*argv,"+useSendBufferCap", &SendBufferMsg_cap);
3998 #endif
3999
4000 #if CMI_PUMPNETWORKSMSG_CAP 
4001     CmiGetArgInt(*argv,"+usePumpSmsgCap", &PumpNetworkSmsg_cap);
4002 #endif
4003
4004     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
4005     
4006     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
4007     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
4008     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
4009
4010     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
4011     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
4012     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
4013
4014     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
4015     if (env) useDynamicSMSG = 1;
4016     if (!useDynamicSMSG)
4017       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
4018     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
4019     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
4020     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
4021     
4022     if(myrank == 0)
4023     {
4024         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
4025         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
4026     }
4027 #ifdef USE_ONESIDED
4028     onesided_init(NULL, &onesided_hnd);
4029
4030     // this is a GNI test, so use the libonesided bypass functionality
4031     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
4032     local_addr = gniGetNicAddress();
4033 #else
4034     ptag = get_ptag();
4035     cookie = get_cookie();
4036 #if 0
4037     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
4038 #endif
4039     //Create and attach to the communication  domain */
4040     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
4041     GNI_RC_CHECK("GNI_CdmCreate", status);
4042     //* device id The device id is the minor number for the device
4043     //that is assigned to the device by the system when the device is created.
4044     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
4045     //where X is the device number 0 default 
4046     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
4047     GNI_RC_CHECK("GNI_CdmAttach", status);
4048     local_addr = get_gni_nic_address(0);
4049 #endif
4050     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
4051     _MEMCHECK(MPID_UGNI_AllAddr);
4052     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
4053     /* create the local completion queue */
4054     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
4055     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
4056     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
4057 #if MULTI_THREAD_SEND
4058     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
4059     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
4060 #endif
4061     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
4062
4063     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
4064     GNI_RC_CHECK("Create CQ (rx)", status);
4065     
4066     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
4067     GNI_RC_CHECK("Create Post CQ (rx)", status);
4068    
4069 #if CMK_PERSISTENT_COMM
4070     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &persistent_rx_cqh);
4071     GNI_RC_CHECK("Create Post CQ (rx)", status);
4072 #endif
4073     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
4074     //GNI_RC_CHECK("Create BTE CQ", status);
4075
4076     /* create the endpoints. they