08928d3384d0b2b7d3ef9d7bcacc6e91c4267191
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           1
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     0
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_CAP      0
65 #define CMI_EXERT_RECV_CAP      0
66 #define CMI_EXERT_RDMA_CAP      0
67 #if CMI_EXERT_SEND_CAP
68 int SEND_large_cap = 100;
69 int SEND_large_pending = 0;
70 #endif
71
72 #if CMI_EXERT_RECV_CAP
73 #define RECV_CAP  4                  /* cap <= 2 sometimes hang */
74 #endif
75
76 #if CMI_EXERT_RDMA_CAP
77 int   RDMA_cap =   100;
78 int   RDMA_pending = 0;
79 #endif
80
81 #define USE_LRTS_MEMPOOL                  1
82
83 #define PRINT_SYH                         0
84
85 // Trace communication thread
86 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
87 #define TRACE_THRESHOLD     0.00005
88 #define CMI_MPI_TRACE_MOREDETAILED 0
89 #undef CMI_MPI_TRACE_USEREVENTS
90 #define CMI_MPI_TRACE_USEREVENTS 1
91 #else
92 #undef CMK_SMP_TRACE_COMMTHREAD
93 #define CMK_SMP_TRACE_COMMTHREAD 0
94 #endif
95
96 #define CMK_TRACE_COMMOVERHEAD 0
97 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
98 #undef CMI_MPI_TRACE_USEREVENTS
99 #define CMI_MPI_TRACE_USEREVENTS 1
100 #else
101 #undef CMK_TRACE_COMMOVERHEAD
102 #define CMK_TRACE_COMMOVERHEAD 0
103 #endif
104
105 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
106 CpvStaticDeclare(double, projTraceStart);
107 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
108 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
109 #else
110 #define  START_EVENT()
111 #define  END_EVENT(x)
112 #endif
113
114 #if USE_LRTS_MEMPOOL
115
116 #define oneMB (1024ll*1024)
117 #define oneGB (1024ll*1024*1024)
118
119 static CmiInt8 _mempool_size = 8*oneMB;
120 static CmiInt8 _expand_mem =  4*oneMB;
121 static CmiInt8 _mempool_size_limit = 0;
122
123 static CmiInt8 _totalmem = 0.8*oneGB;
124
125 #if LARGEPAGE
126 static CmiInt8 BIG_MSG  =  16*oneMB;
127 static CmiInt8 ONE_SEG  =  4*oneMB;
128 #else
129 static CmiInt8 BIG_MSG  =  4*oneMB;
130 static CmiInt8 ONE_SEG  =  2*oneMB;
131 #endif
132 #if MULTI_THREAD_SEND
133 static int BIG_MSG_PIPELINE = 1;
134 #else
135 static int BIG_MSG_PIPELINE = 4;
136 #endif
137
138 // dynamic flow control
139 static CmiInt8 buffered_send_msg = 0;
140 static CmiInt8 register_memory_size = 0;
141
142 #if LARGEPAGE
143 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
144 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
145 static CmiInt8  register_count = 0;
146 #else
147 #if CMK_SMP && COMM_THREAD_SEND 
148 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
149 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
150 #else
151 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
152 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
153 #endif
154
155
156 #endif
157
158 #endif     /* end USE_LRTS_MEMPOOL */
159
160 #if MULTI_THREAD_SEND 
161 #define     CMI_GNI_LOCK(x)       CmiLock(x);
162 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
163 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
164 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
165 #else
166 #define     CMI_GNI_LOCK(x)
167 #define     CMI_GNI_UNLOCK(x)
168 #define     CMI_PCQUEUEPOP_LOCK(Q)   
169 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
170 #endif
171
172 static int _tlbpagesize = 4096;
173
174 //static int _smpd_count  = 0;
175
176 static int   user_set_flag  = 0;
177
178 static int _checkProgress = 1;             /* check deadlock */
179 static int _detected_hang = 0;
180
181 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
182
183 // dynamic SMSG
184 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
185
186 static int avg_smsg_connection = 32;
187 static int                 *smsg_connected_flag= 0;
188 static gni_smsg_attr_t     **smsg_attr_vector_local;
189 static gni_smsg_attr_t     **smsg_attr_vector_remote;
190 static gni_ep_handle_t     ep_hndl_unbound;
191 static gni_smsg_attr_t     send_smsg_attr;
192 static gni_smsg_attr_t     recv_smsg_attr;
193
194 typedef struct _dynamic_smsg_mailbox{
195    void     *mailbox_base;
196    int      size;
197    int      offset;
198    gni_mem_handle_t  mem_hndl;
199    struct      _dynamic_smsg_mailbox  *next;
200 }dynamic_smsg_mailbox_t;
201
202 static dynamic_smsg_mailbox_t  *mailbox_list;
203
204 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
205 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
206
207 #if PRINT_SYH
208 int         lrts_send_msg_id = 0;
209 int         lrts_local_done_msg = 0;
210 int         lrts_send_rdma_success = 0;
211 #endif
212
213 #include "machine.h"
214
215 #include "pcqueue.h"
216
217 #include "mempool.h"
218
219 #if CMK_PERSISTENT_COMM
220 #include "machine-persistent.h"
221 #endif
222
223 //#define  USE_ONESIDED 1
224 #ifdef USE_ONESIDED
225 //onesided implementation is wrong, since no place to restore omdh
226 #include "onesided.h"
227 onesided_hnd_t   onesided_hnd;
228 onesided_md_t    omdh;
229 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
230
231 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
232
233 #else
234 uint8_t   onesided_hnd, omdh;
235
236 #if REMOTE_EVENT || CQWRITE 
237 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
238     if(register_memory_size+size>= MAX_REG_MEM) { \
239         status = GNI_RC_ERROR_NOMEM;} \
240     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
241         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
242 #else
243 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
244         if (register_memory_size + size >= MAX_REG_MEM) { \
245             status = GNI_RC_ERROR_NOMEM; \
246         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
247             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
248 #endif
249
250 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
251     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
252              register_memory_size -= size; \
253          else CmiAbort("MEM_DEregister");  \
254     } while (0)
255 #endif
256
257 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
258 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
259 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
260 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
261 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
262 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
263 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
264 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
265 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
266 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
267 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
268 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
269 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
270 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
271
272 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
273 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
274
275 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
276 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
277 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
278 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
279
280 #define ALIGNBUF                64
281
282 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
283 /* If SMSG is not used */
284
285 #define FMA_PER_CORE  1024
286 #define FMA_BUFFER_SIZE 1024
287
288 /* If SMSG is used */
289 static int  SMSG_MAX_MSG = 1024;
290 #define SMSG_MAX_CREDIT    72
291
292 #define MSGQ_MAXSIZE       2048
293
294 /* large message transfer with FMA or BTE */
295 #if ! REMOTE_EVENT
296 #define LRTS_GNI_RDMA_THRESHOLD  1024 
297 #else
298    /* remote events only work with RDMA */
299 #define LRTS_GNI_RDMA_THRESHOLD  0 
300 #endif
301
302 #if CMK_SMP
303 static int  REMOTE_QUEUE_ENTRIES=163840; 
304 static int LOCAL_QUEUE_ENTRIES=163840; 
305 #else
306 static int  REMOTE_QUEUE_ENTRIES=20480;
307 static int LOCAL_QUEUE_ENTRIES=20480; 
308 #endif
309
310 #define BIG_MSG_TAG             0x26
311 #define PUT_DONE_TAG            0x28
312 #define DIRECT_PUT_DONE_TAG     0x29
313 #define ACK_TAG                 0x30
314 /* SMSG is data message */
315 #define SMALL_DATA_TAG          0x31
316 /* SMSG is a control message to initialize a BTE */
317 #define LMSG_INIT_TAG           0x39 
318
319 #define DEBUG
320 #ifdef GNI_RC_CHECK
321 #undef GNI_RC_CHECK
322 #endif
323 #ifdef DEBUG
324 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
325 #else
326 #define GNI_RC_CHECK(msg,rc)
327 #endif
328
329 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
330 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
331 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
332
333 static int useStaticMSGQ = 0;
334 static int useStaticFMA = 0;
335 static int mysize, myrank;
336 static gni_nic_handle_t   nic_hndl;
337
338 typedef struct {
339     gni_mem_handle_t mdh;
340     uint64_t addr;
341 } mdh_addr_t ;
342 // this is related to dynamic SMSG
343
344 typedef struct mdh_addr_list{
345     gni_mem_handle_t mdh;
346    void *addr;
347     struct mdh_addr_list *next;
348 }mdh_addr_list_t;
349
350 static unsigned int         smsg_memlen;
351 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
352 mdh_addr_t          setup_mem;
353 mdh_addr_t          *smsg_connection_vec = 0;
354 gni_mem_handle_t    smsg_connection_memhndl;
355 static int          smsg_expand_slots = 10;
356 static int          smsg_available_slot = 0;
357 static void         *smsg_mailbox_mempool = 0;
358 mdh_addr_list_t     *smsg_dynamic_list = 0;
359
360 static void             *smsg_mailbox_base;
361 gni_msgq_attr_t         msgq_attrs;
362 gni_msgq_handle_t       msgq_handle;
363 gni_msgq_ep_attr_t      msgq_ep_attrs;
364 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
365
366 /* =====Beginning of Declarations of Machine Specific Variables===== */
367 static int cookie;
368 static int modes = 0;
369 static gni_cq_handle_t       smsg_rx_cqh = NULL;
370 static gni_cq_handle_t       default_tx_cqh = NULL;
371 static gni_cq_handle_t       rdma_tx_cqh = NULL;
372 static gni_cq_handle_t       rdma_rx_cqh = NULL;
373 static gni_cq_handle_t       post_tx_cqh = NULL;
374 static gni_ep_handle_t       *ep_hndl_array;
375
376 static CmiNodeLock           *ep_lock_array;
377 static CmiNodeLock           default_tx_cq_lock; 
378 static CmiNodeLock           rdma_tx_cq_lock; 
379 static CmiNodeLock           global_gni_lock; 
380 static CmiNodeLock           rx_cq_lock;
381 static CmiNodeLock           smsg_mailbox_lock;
382 static CmiNodeLock           smsg_rx_cq_lock;
383 static CmiNodeLock           *mempool_lock;
384 //#define     CMK_WITH_STATS      1
385 typedef struct msg_list
386 {
387     uint32_t destNode;
388     uint32_t size;
389     void *msg;
390     uint8_t tag;
391 #if !CMK_SMP
392     struct msg_list *next;
393 #endif
394 #if CMK_WITH_STATS
395     double  creation_time;
396 #endif
397 }MSG_LIST;
398
399
400 typedef struct control_msg
401 {
402     uint64_t            source_addr;    /* address from the start of buffer  */
403     uint64_t            dest_addr;      /* address from the start of buffer */
404     int                 total_length;   /* total length */
405     int                 length;         /* length of this packet */
406 #if REMOTE_EVENT
407     int                 ack_index;      /* index from integer to address */
408 #endif
409     uint8_t             seq_id;         //big message   0 meaning single message
410     gni_mem_handle_t    source_mem_hndl;
411     struct control_msg *next;
412 } CONTROL_MSG;
413
414 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
415
416 typedef struct ack_msg
417 {
418     uint64_t            source_addr;    /* address from the start of buffer  */
419 #if ! USE_LRTS_MEMPOOL
420     gni_mem_handle_t    source_mem_hndl;
421     int                 length;          /* total length */
422 #endif
423     struct ack_msg     *next;
424 } ACK_MSG;
425
426 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
427
428 #if CMK_DIRECT
429 typedef struct{
430     uint64_t    handler_addr;
431 }CMK_DIRECT_HEADER;
432
433 typedef struct {
434     char core[CmiMsgHeaderSizeBytes];
435     uint64_t handler;
436 }cmidirectMsg;
437
438 //SYH
439 CpvDeclare(int, CmiHandleDirectIdx);
440 void CmiHandleDirectMsg(cmidirectMsg* msg)
441 {
442
443     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
444    (*(_handle->callbackFnPtr))(_handle->callbackData);
445    CmiFree(msg);
446 }
447
448 void CmiDirectInit()
449 {
450     CpvInitialize(int,  CmiHandleDirectIdx);
451     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
452 }
453
454 #endif
455 typedef struct  rmda_msg
456 {
457     int                   destNode;
458 #if REMOTE_EVENT
459     int                   ack_index;
460 #endif
461     gni_post_descriptor_t *pd;
462 #if !CMK_SMP
463     struct  rmda_msg      *next;
464 #endif
465 }RDMA_REQUEST;
466
467
468 #if CMK_SMP
469 #define SMP_LOCKS               0
470 #define ONE_SEND_QUEUE                  0
471 PCQueue sendRdmaBuf;
472 typedef struct  msg_list_index
473 {
474     PCQueue     sendSmsgBuf;
475     int         pushed;
476     CmiNodeLock   lock;
477 } MSG_LIST_INDEX;
478 char                *destpe_avail;
479 #if  !ONE_SEND_QUEUE && SMP_LOCKS
480     PCQueue     nonEmptyQueues;
481 #endif
482 #else         /* non-smp */
483
484 static RDMA_REQUEST        *sendRdmaBuf = 0;
485 static RDMA_REQUEST        *sendRdmaTail = 0;
486 typedef struct  msg_list_index
487 {
488     int         next;
489     MSG_LIST    *sendSmsgBuf;
490     MSG_LIST    *tail;
491 } MSG_LIST_INDEX;
492
493 #endif
494
495 // buffered send queue
496 #if ! ONE_SEND_QUEUE
497 typedef struct smsg_queue
498 {
499     MSG_LIST_INDEX   *smsg_msglist_index;
500     int               smsg_head_index;
501 } SMSG_QUEUE;
502 #else
503 typedef struct smsg_queue
504 {
505     PCQueue       sendMsgBuf;
506 }  SMSG_QUEUE;
507 #endif
508
509 SMSG_QUEUE                  smsg_queue;
510 #if CMK_USE_OOB
511 SMSG_QUEUE                  smsg_oob_queue;
512 #endif
513
514 #if CMK_SMP
515
516 #define FreeMsgList(d)   free(d);
517 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
518
519 #else
520
521 static MSG_LIST       *msglist_freelist=0;
522
523 #define FreeMsgList(d)  \
524   do { \
525   (d)->next = msglist_freelist;\
526   msglist_freelist = d; \
527   } while (0)
528
529 #define MallocMsgList(d) \
530   do {  \
531   d = msglist_freelist;\
532   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
533              _MEMCHECK(d);\
534   } else msglist_freelist = d->next; \
535   d->next =0;  \
536   } while (0)
537
538 #endif
539
540 #if CMK_SMP
541
542 #define FreeControlMsg(d)      free(d);
543 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
544
545 #else
546
547 static CONTROL_MSG    *control_freelist=0;
548
549 #define FreeControlMsg(d)       \
550   do { \
551   (d)->next = control_freelist;\
552   control_freelist = d; \
553   } while (0);
554
555 #define MallocControlMsg(d) \
556   do {  \
557   d = control_freelist;\
558   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
559              _MEMCHECK(d);\
560   } else control_freelist = d->next;  \
561   } while (0);
562
563 #endif
564
565 #if CMK_SMP
566
567 #define FreeAckMsg(d)      free(d);
568 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
569
570 #else
571
572 static ACK_MSG        *ack_freelist=0;
573
574 #define FreeAckMsg(d)       \
575   do { \
576   (d)->next = ack_freelist;\
577   ack_freelist = d; \
578   } while (0)
579
580 #define MallocAckMsg(d) \
581   do { \
582   d = ack_freelist;\
583   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
584              _MEMCHECK(d);\
585   } else ack_freelist = d->next; \
586   } while (0)
587
588 #endif
589
590
591 # if CMK_SMP
592 #define FreeRdmaRequest(d)       free(d);
593 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
594 #else
595
596 static RDMA_REQUEST         *rdma_freelist = NULL;
597
598 #define FreeRdmaRequest(d)       \
599   do {  \
600   (d)->next = rdma_freelist;\
601   rdma_freelist = d;    \
602   } while (0)
603
604 #define MallocRdmaRequest(d) \
605   do {   \
606   d = rdma_freelist;\
607   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
608              _MEMCHECK(d);\
609   } else rdma_freelist = d->next; \
610   d->next =0;   \
611   } while (0)
612 #endif
613
614 /* reuse gni_post_descriptor_t */
615 static gni_post_descriptor_t *post_freelist=0;
616
617 #if  !CMK_SMP
618 #define FreePostDesc(d)       \
619     (d)->next_descr = post_freelist;\
620     post_freelist = d;
621
622 #define MallocPostDesc(d) \
623   d = post_freelist;\
624   if (d==0) { \
625      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
626      d->next_descr = 0;\
627       _MEMCHECK(d);\
628   } else post_freelist = d->next_descr;
629 #else
630
631 #define FreePostDesc(d)     free(d);
632 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
633
634 #endif
635
636
637 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
638 static int      buffered_smsg_counter = 0;
639
640 /* SmsgSend return success but message sent is not confirmed by remote side */
641 static MSG_LIST *buffered_fma_head = 0;
642 static MSG_LIST *buffered_fma_tail = 0;
643
644 /* functions  */
645 #define IsFree(a,ind)  !( a& (1<<(ind) ))
646 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
647 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
648
649 CpvDeclare(mempool_type*, mempool);
650
651 #if REMOTE_EVENT
652 /* ack pool for remote events */
653
654 static int  SHIFT   =           18;
655 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
656 #define RANK_MASK               ((1<<SHIFT) - 1)
657 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
658
659 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
660 #define GET_RANK(evt)           ((evt) & RANK_MASK)
661 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
662
663 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
664
665 #if CMK_SMP
666 #define INIT_SIZE                4096
667 #else
668 #define INIT_SIZE                1024
669 #endif
670
671 struct IndexStruct {
672 void *addr;
673 int next;
674 int type;     // 1: ACK   2: Persistent
675 };
676
677 typedef struct IndexPool {
678     struct IndexStruct   *indexes;
679     int                   size;
680     int                   freehead;
681     CmiNodeLock           lock;
682 } IndexPool;
683
684 static IndexPool  ackPool;
685 #if CMK_PERSISTENT_COMM
686 static IndexPool  persistPool;
687 #endif
688
689 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
690 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
691
692 static void IndexPool_init(IndexPool *pool)
693 {
694     int i;
695     if ((1<<SHIFT) < mysize) 
696         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
697     pool->size = INIT_SIZE;
698     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
699     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
700     for (i=0; i<pool->size-1; i++) {
701         pool->indexes[i].next = i+1;
702         pool->indexes[i].type = 0;
703     }
704     pool->indexes[i].next = -1;
705     pool->freehead = 0;
706 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
707     pool->lock  = CmiCreateLock();
708 #else
709     pool->lock  = 0;
710 #endif
711 }
712
713 static
714 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
715 {
716     int i;
717     int s;
718 #if MULTI_THREAD_SEND  
719     CmiLock(pool->lock);
720 #endif
721     s = pool->freehead;
722     if (s == -1) {
723         int newsize = pool->size * 2;
724         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
725         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
726         struct IndexStruct *old_ackpool = pool->indexes;
727         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
728         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
729         for (i=pool->size; i<newsize-1; i++) {
730             pool->indexes[i].next = i+1;
731             pool->indexes[i].type = 0;
732         }
733         pool->indexes[i].next = -1;
734         pool->indexes[i].type = 0;
735         pool->freehead = pool->size;
736         s = pool->size;
737         pool->size = newsize;
738         free(old_ackpool);
739     }
740     pool->freehead = pool->indexes[s].next;
741     pool->indexes[s].addr = addr;
742     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
743     pool->indexes[s].type = type;
744 #if MULTI_THREAD_SEND
745     CmiUnlock(pool->lock);
746 #endif
747     return s;
748 }
749
750 static
751 inline  void IndexPool_freeslot(IndexPool *pool, int s)
752 {
753     CmiAssert(s>=0 && s<pool->size);
754 #if MULTI_THREAD_SEND
755     CmiLock(pool->lock);
756 #endif
757     pool->indexes[s].next = pool->freehead;
758     pool->indexes[s].type = 0;
759     pool->freehead = s;
760 #if MULTI_THREAD_SEND
761     CmiUnlock(pool->lock);
762 #endif
763 }
764
765
766 #endif
767
768 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
769 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
770 #define CHARM_MAGIC_NUMBER               126
771
772 #if CMK_ERROR_CHECKING
773 extern unsigned char computeCheckSum(unsigned char *data, int len);
774 static int checksum_flag = 0;
775 #define CMI_SET_CHECKSUM(msg, len)      \
776         if (checksum_flag)  {   \
777           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
778           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
779         }
780 #define CMI_CHECK_CHECKSUM(msg, len)    \
781         if (checksum_flag)      \
782           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
783             CmiAbort("Fatal error: checksum doesn't agree!\n");
784 #else
785 #define CMI_SET_CHECKSUM(msg, len)
786 #define CMI_CHECK_CHECKSUM(msg, len)
787 #endif
788 /* =====End of Definitions of Message-Corruption Related Macros=====*/
789
790 static int print_stats = 0;
791 static int stats_off = 0;
792 void CmiTurnOnStats()
793 {
794     stats_off = 0;
795     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
796 }
797
798 void CmiTurnOffStats()
799 {
800     stats_off = 1;
801 }
802
803 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
804
805 #if CMK_WITH_STATS
806 FILE *counterLog = NULL;
807 typedef struct comm_thread_stats
808 {
809     uint64_t  smsg_data_count;
810     uint64_t  lmsg_init_count;
811     uint64_t  ack_count;
812     uint64_t  big_msg_ack_count;
813     uint64_t  smsg_count;
814     uint64_t  direct_put_done_count;
815     uint64_t  put_done_count;
816     //times of calling SmsgSend
817     uint64_t  try_smsg_data_count;
818     uint64_t  try_lmsg_init_count;
819     uint64_t  try_ack_count;
820     uint64_t  try_big_msg_ack_count;
821     uint64_t  try_direct_put_done_count;
822     uint64_t  try_put_done_count;
823     uint64_t  try_smsg_count;
824     
825     double    max_time_in_send_buffered_smsg;
826     double    all_time_in_send_buffered_smsg;
827
828     uint64_t  rdma_get_count, rdma_put_count;
829     uint64_t  try_rdma_get_count, try_rdma_put_count;
830     double    max_time_from_control_to_rdma_init;
831     double    all_time_from_control_to_rdma_init;
832
833     double    max_time_from_rdma_init_to_rdma_done;
834     double    all_time_from_rdma_init_to_rdma_done;
835
836     int      count_in_PumpNetwork;
837     double   time_in_PumpNetwork;
838     double   max_time_in_PumpNetwork;
839     int      count_in_SendBufferMsg_smsg;
840     double   time_in_SendBufferMsg_smsg;
841     double   max_time_in_SendBufferMsg_smsg;
842     int      count_in_SendRdmaMsg;
843     double   time_in_SendRdmaMsg;
844     double   max_time_in_SendRdmaMsg;
845     int      count_in_PumpRemoteTransactions;
846     double   time_in_PumpRemoteTransactions;
847     double   max_time_in_PumpRemoteTransactions;
848     int      count_in_PumpLocalTransactions_rdma;
849     double   time_in_PumpLocalTransactions_rdma;
850     double   max_time_in_PumpLocalTransactions_rdma;
851     int      count_in_PumpDatagramConnection;
852     double   time_in_PumpDatagramConnection;
853     double   max_time_in_PumpDatagramConnection;
854 } Comm_Thread_Stats;
855
856 static Comm_Thread_Stats   comm_stats;
857
858 static char *counters_dirname = "counters";
859
860 static void init_comm_stats()
861 {
862   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
863   if (print_stats){
864       char ln[200];
865       int code = mkdir(counters_dirname, 00777); 
866       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
867       counterLog=fopen(ln,"w");
868       if (counterLog == NULL) CmiAbort("Counter files open failed");
869   }
870 }
871
872 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
873
874 #define SMSG_SENT_DONE(creation_time, tag)  \
875         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
876             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
877             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
878             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
879             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
880             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
881             comm_stats.smsg_count++; \
882             double inbuff_time = CmiWallTimer() - creation_time;   \
883             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
884             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
885         }
886
887 #define SMSG_TRY_SEND(tag)  \
888         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
889             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
890             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
891             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
892             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
893             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
894             comm_stats.try_smsg_count++; \
895         }
896
897 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
898
899 #define  RDMA_TRANS_DONE(x)      \
900          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
901              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
902              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
903          }
904
905 #define  RDMA_TRANS_INIT(type, x)      \
906          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
907              double rdma_trans_time = CmiWallTimer() - x ; \
908              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
909              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
910          }
911
912 #define STATS_PUMPNETWORK_TIME(x)   \
913         { double t = CmiWallTimer(); \
914           x;        \
915           t = CmiWallTimer() - t;          \
916           comm_stats.count_in_PumpNetwork++;        \
917           comm_stats.time_in_PumpNetwork += t;   \
918           if (t>comm_stats.max_time_in_PumpNetwork)      \
919               comm_stats.max_time_in_PumpNetwork = t;    \
920         }
921
922 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
923         { double t = CmiWallTimer(); \
924           x;        \
925           t = CmiWallTimer() - t;          \
926           comm_stats.count_in_PumpRemoteTransactions ++;        \
927           comm_stats.time_in_PumpRemoteTransactions += t;   \
928           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
929               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
930         }
931
932 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
933         { double t = CmiWallTimer(); \
934           x;        \
935           t = CmiWallTimer() - t;          \
936           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
937           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
938           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
939               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
940         }
941
942 #define STATS_SEND_SMSGS_TIME(x)   \
943         { double t = CmiWallTimer(); \
944           x;        \
945           t = CmiWallTimer() - t;          \
946           comm_stats.count_in_SendBufferMsg_smsg ++;        \
947           comm_stats.time_in_SendBufferMsg_smsg += t;   \
948           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
949               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
950         }
951
952 #define STATS_SENDRDMAMSG_TIME(x)   \
953         { double t = CmiWallTimer(); \
954           x;        \
955           t = CmiWallTimer() - t;          \
956           comm_stats.count_in_SendRdmaMsg ++;        \
957           comm_stats.time_in_SendRdmaMsg += t;   \
958           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
959               comm_stats.max_time_in_SendRdmaMsg = t;    \
960         }
961
962 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
963         { double t = CmiWallTimer(); \
964           x;        \
965           t = CmiWallTimer() - t;          \
966           comm_stats.count_in_PumpDatagramConnection ++;        \
967           comm_stats.time_in_PumpDatagramConnection += t;   \
968           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
969               comm_stats.max_time_in_PumpDatagramConnection = t;    \
970         }
971
972 static void print_comm_stats()
973 {
974     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
975     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
976             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
977             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
978     
979     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
980             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
981             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
982
983     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
984     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
985     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
986
987
988     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
989     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
990     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
991     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
992     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
993     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
994     if (useDynamicSMSG)
995     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
996
997     fclose(counterLog);
998 }
999
1000 #else
1001 #define STATS_PUMPNETWORK_TIME(x)                  x
1002 #define STATS_SEND_SMSGS_TIME(x)                   x
1003 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
1004 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1005 #define STATS_SENDRDMAMSG_TIME(x)                  x
1006 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1007 #endif
1008
1009 static void
1010 allgather(void *in,void *out, int len)
1011 {
1012     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1013     int i,rc;
1014     int my_rank;
1015     char *tmp_buf,*out_ptr;
1016
1017     if(!already_called) {
1018
1019         rc = PMI_Get_size(&job_size);
1020         CmiAssert(rc == PMI_SUCCESS);
1021         rc = PMI_Get_rank(&my_rank);
1022         CmiAssert(rc == PMI_SUCCESS);
1023
1024         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1025         CmiAssert(ivec_ptr != NULL);
1026
1027         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1028         CmiAssert(rc == PMI_SUCCESS);
1029
1030         already_called = 1;
1031
1032     }
1033
1034     tmp_buf = (char *)malloc(job_size * len);
1035     CmiAssert(tmp_buf);
1036
1037     rc = PMI_Allgather(in,tmp_buf,len);
1038     CmiAssert(rc == PMI_SUCCESS);
1039
1040     out_ptr = out;
1041
1042     for(i=0;i<job_size;i++) {
1043
1044         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1045
1046     }
1047
1048     free(tmp_buf);
1049 }
1050
1051 static void
1052 allgather_2(void *in,void *out, int len)
1053 {
1054     //PMI_Allgather is out of order
1055     int i,rc, extend_len;
1056     int  rank_index;
1057     char *out_ptr, *out_ref;
1058     char *in2;
1059
1060     extend_len = sizeof(int) + len;
1061     in2 = (char*)malloc(extend_len);
1062
1063     memcpy(in2, &myrank, sizeof(int));
1064     memcpy(in2+sizeof(int), in, len);
1065
1066     out_ptr = (char*)malloc(mysize*extend_len);
1067
1068     rc = PMI_Allgather(in2, out_ptr, extend_len);
1069     GNI_RC_CHECK("allgather", rc);
1070
1071     out_ref = out;
1072
1073     for(i=0;i<mysize;i++) {
1074         //rank index 
1075         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1076         //copy to the rank index slot
1077         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1078     }
1079
1080     free(out_ptr);
1081     free(in2);
1082
1083 }
1084
1085 static unsigned int get_gni_nic_address(int device_id)
1086 {
1087     unsigned int address, cpu_id;
1088     gni_return_t status;
1089     int i, alps_dev_id=-1,alps_address=-1;
1090     char *token, *p_ptr;
1091
1092     p_ptr = getenv("PMI_GNI_DEV_ID");
1093     if (!p_ptr) {
1094         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1095        
1096         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1097     } else {
1098         while ((token = strtok(p_ptr,":")) != NULL) {
1099             alps_dev_id = atoi(token);
1100             if (alps_dev_id == device_id) {
1101                 break;
1102             }
1103             p_ptr = NULL;
1104         }
1105         CmiAssert(alps_dev_id != -1);
1106         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1107         CmiAssert(p_ptr != NULL);
1108         i = 0;
1109         while ((token = strtok(p_ptr,":")) != NULL) {
1110             if (i == alps_dev_id) {
1111                 alps_address = atoi(token);
1112                 break;
1113             }
1114             p_ptr = NULL;
1115             ++i;
1116         }
1117         CmiAssert(alps_address != -1);
1118         address = alps_address;
1119     }
1120     return address;
1121 }
1122
1123 static uint8_t get_ptag(void)
1124 {
1125     char *p_ptr, *token;
1126     uint8_t ptag;
1127
1128     p_ptr = getenv("PMI_GNI_PTAG");
1129     CmiAssert(p_ptr != NULL);
1130     token = strtok(p_ptr, ":");
1131     ptag = (uint8_t)atoi(token);
1132     return ptag;
1133         
1134 }
1135
1136 static uint32_t get_cookie(void)
1137 {
1138     uint32_t cookie;
1139     char *p_ptr, *token;
1140
1141     p_ptr = getenv("PMI_GNI_COOKIE");
1142     CmiAssert(p_ptr != NULL);
1143     token = strtok(p_ptr, ":");
1144     cookie = (uint32_t)atoi(token);
1145
1146     return cookie;
1147 }
1148
1149 #if LARGEPAGE
1150
1151 /* directly mmap memory from hugetlbfs for large pages */
1152
1153 #include <sys/stat.h>
1154 #include <fcntl.h>
1155 #include <sys/mman.h>
1156 #include <hugetlbfs.h>
1157
1158 // size must be _tlbpagesize aligned
1159 void *my_get_huge_pages(size_t size)
1160 {
1161     char filename[512];
1162     int fd;
1163     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1164     void *ptr = NULL;
1165
1166     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1167     fd = open(filename, O_RDWR | O_CREAT, mode);
1168     if (fd == -1) {
1169         CmiAbort("my_get_huge_pages: open filed");
1170     }
1171     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1172     if (ptr == MAP_FAILED) ptr = NULL;
1173 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1174     close(fd);
1175     unlink(filename);
1176     return ptr;
1177 }
1178
1179 void my_free_huge_pages(void *ptr, int size)
1180 {
1181 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1182     int ret = munmap(ptr, size);
1183     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1184 }
1185
1186 #endif
1187
1188 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1189 /* TODO: add any that are related */
1190 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1191
1192
1193 #include "machine-lrts.h"
1194 #include "machine-common-core.c"
1195
1196 /* Network progress function is used to poll the network when for
1197    messages. This flushes receive buffers on some  implementations*/
1198 #if CMK_MACHINE_PROGRESS_DEFINED
1199 void CmiMachineProgressImpl() {
1200 }
1201 #endif
1202
1203 static int SendBufferMsg(SMSG_QUEUE *queue);
1204 static void SendRdmaMsg();
1205 static void PumpNetworkSmsg();
1206 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1207 #if CQWRITE
1208 static void PumpCqWriteTransactions();
1209 #endif
1210 #if REMOTE_EVENT
1211 static void PumpRemoteTransactions();
1212 #endif
1213
1214 #if MACHINE_DEBUG_LOG
1215 FILE *debugLog = NULL;
1216 static CmiInt8 buffered_recv_msg = 0;
1217 int         lrts_smsg_success = 0;
1218 int         lrts_received_msg = 0;
1219 #endif
1220
1221 static void sweep_mempool(mempool_type *mptr)
1222 {
1223     int n = 0;
1224     block_header *current = &(mptr->block_head);
1225
1226     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1227     while( current!= NULL) {
1228         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1229         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1230     }
1231     printf("[n %d] sweep_mempool slot END.\n", myrank);
1232 }
1233
1234 inline
1235 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1236 {
1237     block_header *current = *from;
1238
1239     //while(register_memory_size>= MAX_REG_MEM)
1240     //{
1241         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1242             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1243
1244         *from = current;
1245         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1246         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1247         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1248     //}
1249     return GNI_RC_SUCCESS;
1250 }
1251
1252 inline 
1253 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1254 {
1255     gni_return_t status = GNI_RC_SUCCESS;
1256     //int size = GetMempoolsize(msg);
1257     //void *blockaddr = GetMempoolBlockPtr(msg);
1258     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1259    
1260     block_header *current = &(mptr->block_head);
1261     while(register_memory_size>= MAX_REG_MEM)
1262     {
1263         status = deregisterMemory(mptr, &current);
1264         if (status != GNI_RC_SUCCESS) break;
1265     }
1266     if(register_memory_size>= MAX_REG_MEM) return status;
1267
1268     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1269     while(1)
1270     {
1271         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1272         if(status == GNI_RC_SUCCESS)
1273         {
1274             break;
1275         }
1276         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1277         {
1278             GNI_RC_CHECK("registerFromMempool", status);
1279         }
1280         else
1281         {
1282             status = deregisterMemory(mptr, &current);
1283             if (status != GNI_RC_SUCCESS) break;
1284         }
1285     }; 
1286     return status;
1287 }
1288
1289 inline 
1290 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1291 {
1292     static int rank = -1;
1293     int i;
1294     gni_return_t status;
1295     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1296     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1297     mempool_type *mptr;
1298
1299     status = registerFromMempool(mptr1, msg, size, t, cqh);
1300     if (status == GNI_RC_SUCCESS) return status;
1301 #if CMK_SMP 
1302     for (i=0; i<CmiMyNodeSize()+1; i++) {
1303       rank = (rank+1)%(CmiMyNodeSize()+1);
1304       mptr = CpvAccessOther(mempool, rank);
1305       if (mptr == mptr1) continue;
1306       status = registerFromMempool(mptr, msg, size, t, cqh);
1307       if (status == GNI_RC_SUCCESS) return status;
1308     }
1309 #endif
1310     return  GNI_RC_ERROR_RESOURCE;
1311 }
1312
1313 inline
1314 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1315 {
1316     MSG_LIST        *msg_tmp;
1317     MallocMsgList(msg_tmp);
1318     msg_tmp->destNode = destNode;
1319     msg_tmp->size   = size;
1320     msg_tmp->msg    = msg;
1321     msg_tmp->tag    = tag;
1322 #if CMK_WITH_STATS
1323     SMSG_CREATION(msg_tmp)
1324 #endif
1325 #if !CMK_SMP
1326     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1327         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1328         queue->smsg_head_index = destNode;
1329         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1330     }else
1331     {
1332         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1333         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1334     }
1335 #else
1336 #if ONE_SEND_QUEUE
1337     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1338 #else
1339 #if SMP_LOCKS
1340     CmiLock(queue->smsg_msglist_index[destNode].lock);
1341     if(queue->smsg_msglist_index[destNode].pushed == 0)
1342     {
1343         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1344     }
1345     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1346     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1347 #else
1348     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1349 #endif
1350 #endif
1351 #endif
1352 #if PRINT_SYH
1353     buffered_smsg_counter++;
1354 #endif
1355 }
1356
1357 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1358 {
1359     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1360 }
1361
1362 inline
1363 static void setup_smsg_connection(int destNode)
1364 {
1365     mdh_addr_list_t  *new_entry = 0;
1366     gni_post_descriptor_t *pd;
1367     gni_smsg_attr_t      *smsg_attr;
1368     gni_return_t status = GNI_RC_NOT_DONE;
1369     RDMA_REQUEST        *rdma_request_msg;
1370     
1371     if(smsg_available_slot == smsg_expand_slots)
1372     {
1373         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1374         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1375         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1376
1377         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1378             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1379             GNI_MEM_READWRITE,   
1380             -1,
1381             &(new_entry->mdh));
1382         smsg_available_slot = 0; 
1383         new_entry->next = smsg_dynamic_list;
1384         smsg_dynamic_list = new_entry;
1385     }
1386     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1387     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1388     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1389     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1390     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1391     smsg_attr->buff_size = smsg_memlen;
1392     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1393     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1394     smsg_local_attr_vec[destNode] = smsg_attr;
1395     smsg_available_slot++;
1396     MallocPostDesc(pd);
1397     pd->type            = GNI_POST_FMA_PUT;
1398     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1399     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1400     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1401     pd->length          = sizeof(gni_smsg_attr_t);
1402     pd->local_addr      = (uint64_t) smsg_attr;
1403     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1404     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1405     pd->src_cq_hndl     = rdma_tx_cqh;
1406     pd->rdma_mode       = 0;
1407     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1408     print_smsg_attr(smsg_attr);
1409     if(status == GNI_RC_ERROR_RESOURCE )
1410     {
1411         MallocRdmaRequest(rdma_request_msg);
1412         rdma_request_msg->destNode = destNode;
1413         rdma_request_msg->pd = pd;
1414         /* buffer this request */
1415     }
1416 #if PRINT_SYH
1417     if(status != GNI_RC_SUCCESS)
1418        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1419     else
1420         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1421 #endif
1422 }
1423
1424 /* useDynamicSMSG */
1425 inline 
1426 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1427 {
1428     gni_return_t status = GNI_RC_NOT_DONE;
1429
1430     if(mailbox_list->offset == mailbox_list->size)
1431     {
1432         dynamic_smsg_mailbox_t *new_mailbox_entry;
1433         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1434         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1435         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1436         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1437         new_mailbox_entry->offset = 0;
1438         
1439         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1440             new_mailbox_entry->size, smsg_rx_cqh,
1441             GNI_MEM_READWRITE,   
1442             -1,
1443             &(new_mailbox_entry->mem_hndl));
1444
1445         GNI_RC_CHECK("register", status);
1446         new_mailbox_entry->next = mailbox_list;
1447         mailbox_list = new_mailbox_entry;
1448     }
1449     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1450     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1451     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1452     local_smsg_attr->mbox_offset = mailbox_list->offset;
1453     mailbox_list->offset += smsg_memlen;
1454     local_smsg_attr->buff_size = smsg_memlen;
1455     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1456     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1457 }
1458
1459 /* useDynamicSMSG */
1460 inline 
1461 static int connect_to(int destNode)
1462 {
1463     gni_return_t status = GNI_RC_NOT_DONE;
1464     CmiAssert(smsg_connected_flag[destNode] == 0);
1465     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1466     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1467     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1468     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1469     
1470     CMI_GNI_LOCK(global_gni_lock)
1471     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1472     CMI_GNI_UNLOCK(global_gni_lock)
1473     if (status == GNI_RC_ERROR_RESOURCE) {
1474       /* possibly destNode is making connection at the same time */
1475       free(smsg_attr_vector_local[destNode]);
1476       smsg_attr_vector_local[destNode] = NULL;
1477       free(smsg_attr_vector_remote[destNode]);
1478       smsg_attr_vector_remote[destNode] = NULL;
1479       mailbox_list->offset -= smsg_memlen;
1480 #if PRINT_SYH
1481     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1482 #endif
1483       return 0;
1484     }
1485     GNI_RC_CHECK("GNI_Post", status);
1486     smsg_connected_flag[destNode] = 1;
1487 #if PRINT_SYH
1488     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1489 #endif
1490     return 1;
1491 }
1492
1493 inline 
1494 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1495 {
1496     unsigned int          remote_address;
1497     uint32_t              remote_id;
1498     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1499     gni_smsg_attr_t       *smsg_attr;
1500     gni_post_descriptor_t *pd;
1501     gni_post_state_t      post_state;
1502     char                  *real_data; 
1503
1504     if (useDynamicSMSG) {
1505         switch (smsg_connected_flag[destNode]) {
1506         case 0: 
1507             connect_to(destNode);         /* continue to case 1 */
1508         case 1:                           /* pending connection, do nothing */
1509             status = GNI_RC_NOT_DONE;
1510             if(inbuff ==0)
1511                 buffer_small_msgs(queue, msg, size, destNode, tag);
1512             return status;
1513         }
1514     }
1515 #if CMK_SMP
1516 #if ! ONE_SEND_QUEUE
1517     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1518 #endif
1519     {
1520 #else
1521     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1522     {
1523 #endif
1524         //CMI_GNI_LOCK(smsg_mailbox_lock)
1525         CMI_GNI_LOCK(default_tx_cq_lock)
1526 #if CMK_SMP_TRACE_COMMTHREAD
1527         int oldpe = -1;
1528         int oldeventid = -1;
1529         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1530         { 
1531             START_EVENT();
1532             if ( tag == SMALL_DATA_TAG)
1533                 real_data = (char*)msg; 
1534             else 
1535                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1536             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1537             TRACE_COMM_SET_COMM_MSGID(real_data);
1538         }
1539 #endif
1540 #if REMOTE_EVENT
1541         if (tag == LMSG_INIT_TAG) {
1542             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1543             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1544                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1545         }
1546         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1547 #endif
1548 #if     CMK_WITH_STATS
1549         SMSG_TRY_SEND(tag)
1550 #endif
1551 #if CMK_WITH_STATS
1552     double              creation_time;
1553     if (ptr == NULL)
1554         creation_time = CmiWallTimer();
1555     else
1556         creation_time = ptr->creation_time;
1557 #endif
1558
1559     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1560 #if CMK_SMP_TRACE_COMMTHREAD
1561         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1562 #endif
1563         CMI_GNI_UNLOCK(default_tx_cq_lock)
1564         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1565         if(status == GNI_RC_SUCCESS)
1566         {
1567 #if     CMK_WITH_STATS
1568             SMSG_SENT_DONE(creation_time,tag) 
1569 #endif
1570 #if CMK_SMP_TRACE_COMMTHREAD
1571             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1572             { 
1573                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1574             }
1575 #endif
1576         }else
1577             status = GNI_RC_ERROR_RESOURCE;
1578     }
1579     if(status != GNI_RC_SUCCESS && inbuff ==0)
1580         buffer_small_msgs(queue, msg, size, destNode, tag);
1581     return status;
1582 }
1583
1584 inline 
1585 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1586 {
1587     /* construct a control message and send */
1588     CONTROL_MSG         *control_msg_tmp;
1589     MallocControlMsg(control_msg_tmp);
1590     control_msg_tmp->source_addr = (uint64_t)msg;
1591     control_msg_tmp->seq_id    = seqno;
1592     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1593 #if REMOTE_EVENT
1594     control_msg_tmp->ack_index    =  -1;
1595 #endif
1596 #if     USE_LRTS_MEMPOOL
1597     if(size < BIG_MSG)
1598     {
1599         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1600     }
1601     else
1602     {
1603         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1604         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1605         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1606     }
1607 #else
1608     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1609 #endif
1610     return control_msg_tmp;
1611 }
1612
1613 #define BLOCKING_SEND_CONTROL    0
1614
1615 // Large message, send control to receiver, receiver register memory and do a GET, 
1616 // return 1 - send no success
1617 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1618 {
1619     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1620     uint32_t            vmdh_index  = -1;
1621     int                 size;
1622     int                 offset = 0;
1623     uint64_t            source_addr;
1624     int                 register_size; 
1625     void                *msg;
1626
1627     size    =   control_msg_tmp->total_length;
1628     source_addr = control_msg_tmp->source_addr;
1629     register_size = control_msg_tmp->length;
1630
1631 #if  USE_LRTS_MEMPOOL
1632     if( control_msg_tmp->seq_id == 0 ){
1633 #if BLOCKING_SEND_CONTROL
1634         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1635             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1636                 LrtsAdvanceCommunication(0);
1637         }
1638 #endif
1639         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1640         {
1641             msg = (void*)source_addr;
1642             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1643             {
1644                 if(!inbuff)
1645                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1646                 return GNI_RC_ERROR_NOMEM;
1647             }
1648             //register the corresponding mempool
1649             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1650             if(status == GNI_RC_SUCCESS)
1651             {
1652                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1653             }
1654         }else
1655         {
1656             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1657             status = GNI_RC_SUCCESS;
1658         }
1659         if(NoMsgInSend(source_addr))
1660             register_size = GetMempoolsize((void*)(source_addr));
1661         else
1662             register_size = 0;
1663     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1664     {
1665         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1666         source_addr += offset;
1667         size = control_msg_tmp->length;
1668 #if BLOCKING_SEND_CONTROL
1669         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1670             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1671                 LrtsAdvanceCommunication(0);
1672         }
1673 #endif
1674         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1675             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1676             {
1677                 if(!inbuff)
1678                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1679                 return GNI_RC_ERROR_NOMEM;
1680             }
1681             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1682             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1683         }
1684         else
1685         {
1686             status = GNI_RC_SUCCESS;
1687         }
1688         register_size = 0;  
1689     }
1690
1691 #if CMI_EXERT_SEND_CAP
1692     if(SEND_large_pending >= SEND_large_cap)
1693     {
1694         status = GNI_RC_ERROR_NOMEM;
1695     }
1696 #endif
1697  
1698     if(status == GNI_RC_SUCCESS)
1699     {
1700        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1701         if(status == GNI_RC_SUCCESS)
1702         {
1703 #if CMI_EXERT_SEND_CAP
1704             SEND_large_pending++;
1705 #endif
1706             buffered_send_msg += register_size;
1707             if(control_msg_tmp->seq_id == 0)
1708             {
1709                 IncreaseMsgInSend(source_addr);
1710             }
1711             FreeControlMsg(control_msg_tmp);
1712             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1713         }else
1714             status = GNI_RC_ERROR_RESOURCE;
1715
1716     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1717     {
1718         CmiAbort("Memory registor for large msg\n");
1719     }else 
1720     {
1721         status = GNI_RC_ERROR_NOMEM; 
1722         if(!inbuff)
1723             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1724     }
1725     return status;
1726 #else
1727     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1728     if(status == GNI_RC_SUCCESS)
1729     {
1730         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1731         if(status == GNI_RC_SUCCESS)
1732         {
1733             FreeControlMsg(control_msg_tmp);
1734         }
1735     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1736     {
1737         CmiAbort("Memory registor for large msg\n");
1738     }else 
1739     {
1740         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1741     }
1742     return status;
1743 #endif
1744 }
1745
1746 inline void LrtsPrepareEnvelope(char *msg, int size)
1747 {
1748     CmiSetMsgSize(msg, size);
1749     CMI_SET_CHECKSUM(msg, size);
1750 }
1751
1752 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1753 {
1754     gni_return_t        status  =   GNI_RC_SUCCESS;
1755     uint8_t tag;
1756     CONTROL_MSG         *control_msg_tmp;
1757     int                 oob = ( mode & OUT_OF_BAND);
1758     SMSG_QUEUE          *queue;
1759
1760     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1761 #if CMK_USE_OOB
1762     queue = oob? &smsg_oob_queue : &smsg_queue;
1763 #else
1764     queue = &smsg_queue;
1765 #endif
1766
1767     LrtsPrepareEnvelope(msg, size);
1768
1769 #if PRINT_SYH
1770     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1771 #endif 
1772 #if CMK_SMP 
1773     if(size <= SMSG_MAX_MSG)
1774         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1775     else if (size < BIG_MSG) {
1776         control_msg_tmp =  construct_control_msg(size, msg, 0);
1777         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1778     }
1779     else {
1780           CmiSetMsgSeq(msg, 0);
1781           control_msg_tmp =  construct_control_msg(size, msg, 1);
1782           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1783     }
1784 #else   //non-smp, smp(worker sending)
1785     if(size <= SMSG_MAX_MSG)
1786     {
1787         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1788             CmiFree(msg);
1789     }
1790     else if (size < BIG_MSG) {
1791         control_msg_tmp =  construct_control_msg(size, msg, 0);
1792         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1793     }
1794     else {
1795 #if     USE_LRTS_MEMPOOL
1796         CmiSetMsgSeq(msg, 0);
1797         control_msg_tmp =  construct_control_msg(size, msg, 1);
1798         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1799 #else
1800         control_msg_tmp =  construct_control_msg(size, msg, 0);
1801         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1802 #endif
1803     }
1804 #endif
1805     return 0;
1806 }
1807
1808 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1809 {
1810   int i;
1811 #if CMK_BROADCAST_USE_CMIREFERENCE
1812   for(i=0;i<npes;i++) {
1813     if (pes[i] == CmiMyPe())
1814       CmiSyncSend(pes[i], len, msg);
1815     else {
1816       CmiReference(msg);
1817       CmiSyncSendAndFree(pes[i], len, msg);
1818     }
1819   }
1820 #else
1821   for(i=0;i<npes;i++) {
1822     CmiSyncSend(pes[i], len, msg);
1823   }
1824 #endif
1825 }
1826
1827 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1828 {
1829   /* A better asynchronous implementation may be wanted, but at least it works */
1830   CmiSyncListSendFn(npes, pes, len, msg);
1831   return (CmiCommHandle) 0;
1832 }
1833
1834 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1835 {
1836   if (npes == 1) {
1837       CmiSyncSendAndFree(pes[0], len, msg);
1838       return;
1839   }
1840 #if CMK_PERSISTENT_COMM
1841   if (CpvAccess(phs) && len > 1024) {
1842       int i;
1843       for(i=0;i<npes;i++) {
1844         if (pes[i] == CmiMyPe())
1845           CmiSyncSend(pes[i], len, msg);
1846         else {
1847           CmiReference(msg);
1848           CmiSyncSendAndFree(pes[i], len, msg);
1849         }
1850       }
1851       CmiFree(msg);
1852       return;
1853   }
1854 #endif
1855   
1856 #if CMK_BROADCAST_USE_CMIREFERENCE
1857   CmiSyncListSendFn(npes, pes, len, msg);
1858   CmiFree(msg);
1859 #else
1860   int i;
1861   for(i=0;i<npes-1;i++) {
1862     CmiSyncSend(pes[i], len, msg);
1863   }
1864   if (npes>0)
1865     CmiSyncSendAndFree(pes[npes-1], len, msg);
1866   else 
1867     CmiFree(msg);
1868 #endif
1869 }
1870
1871 static void    PumpDatagramConnection();
1872 static      int         event_SetupConnect = 111;
1873 static      int         event_PumpSmsg = 222 ;
1874 static      int         event_PumpTransaction = 333;
1875 static      int         event_PumpRdmaTransaction = 444;
1876 static      int         event_SendBufferSmsg = 444;
1877 static      int         event_SendFmaRdmaMsg = 555;
1878 static      int         event_AdvanceCommunication = 666;
1879
1880 static void registerUserTraceEvents() {
1881 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1882     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1883     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1884     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1885     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1886     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1887     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1888     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1889 #endif
1890 }
1891
1892 static void ProcessDeadlock()
1893 {
1894     static CmiUInt8 *ptr = NULL;
1895     static CmiUInt8  last = 0, mysum, sum;
1896     static int count = 0;
1897     gni_return_t status;
1898     int i;
1899
1900 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1901 //sweep_mempool(CpvAccess(mempool));
1902     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1903     mysum = smsg_send_count + smsg_recv_count;
1904     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1905     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1906     GNI_RC_CHECK("PMI_Allgather", status);
1907     sum = 0;
1908     for (i=0; i<mysize; i++)  sum+= ptr[i];
1909     if (last == 0 || sum == last) 
1910         count++;
1911     else
1912         count = 0;
1913     last = sum;
1914     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1915     if (count == 2) { 
1916         /* detected twice, it is a real deadlock */
1917         if (myrank == 0)  {
1918             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1919             CmiAbort("Fatal> Deadlock detected.");
1920         }
1921
1922     }
1923     _detected_hang = 0;
1924 }
1925
1926 static void CheckProgress()
1927 {
1928     if (smsg_send_count == last_smsg_send_count &&
1929         smsg_recv_count == last_smsg_recv_count ) 
1930     {
1931         _detected_hang = 1;
1932 #if !CMK_SMP
1933         if (_detected_hang) ProcessDeadlock();
1934 #endif
1935
1936     }
1937     else {
1938         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1939         last_smsg_send_count = smsg_send_count;
1940         last_smsg_recv_count = smsg_recv_count;
1941         _detected_hang = 0;
1942     }
1943 }
1944
1945 static void set_limit()
1946 {
1947     //if (!user_set_flag && CmiMyRank() == 0) {
1948     if (CmiMyRank() == 0) {
1949         int mynode = CmiPhysicalNodeID(CmiMyPe());
1950         int numpes = CmiNumPesOnPhysicalNode(mynode);
1951         int numprocesses = numpes / CmiMyNodeSize();
1952         MAX_REG_MEM  = _totalmem / numprocesses;
1953         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1954         if (CmiMyPe() == 0)
1955            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1956         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1957         {
1958              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1959              CmiAbort("memory registration\n");
1960         }
1961     }
1962 }
1963
1964 void LrtsPostCommonInit(int everReturn)
1965 {
1966 #if CMK_DIRECT
1967     CmiDirectInit();
1968 #endif
1969 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1970     CpvInitialize(double, projTraceStart);
1971     /* only PE 0 needs to care about registration (to generate sts file). */
1972     //if (CmiMyPe() == 0) 
1973     {
1974         registerMachineUserEventsFunction(&registerUserTraceEvents);
1975     }
1976 #endif
1977
1978 #if CMK_SMP
1979     CmiIdleState *s=CmiNotifyGetState();
1980     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1981     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1982 #else
1983     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1984     if (useDynamicSMSG)
1985     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1986 #endif
1987
1988 #if ! LARGEPAGE
1989     if (_checkProgress)
1990 #if CMK_SMP
1991     if (CmiMyRank() == 0)
1992 #endif
1993     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1994 #endif
1995  
1996 #if !LARGEPAGE
1997     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1998 #endif
1999 }
2000
2001 /* this is called by worker thread */
2002 void LrtsPostNonLocal(){
2003 #if CMK_SMP_TRACE_COMMTHREAD
2004     double startT, endT;
2005 #endif
2006 #if MULTI_THREAD_SEND
2007     if(mysize == 1) return;
2008 #if CMK_SMP_TRACE_COMMTHREAD
2009     traceEndIdle();
2010 #endif
2011
2012 #if CMK_SMP_TRACE_COMMTHREAD
2013     startT = CmiWallTimer();
2014 #endif
2015
2016 #if CMK_WORKER_SINGLE_TASK
2017     if (CmiMyRank() % 6 == 0)
2018 #endif
2019     PumpNetworkSmsg();
2020
2021 #if CMK_WORKER_SINGLE_TASK
2022     if (CmiMyRank() % 6 == 1)
2023 #endif
2024     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2025
2026 #if CMK_WORKER_SINGLE_TASK
2027     if (CmiMyRank() % 6 == 2)
2028 #endif
2029     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2030
2031 #if REMOTE_EVENT
2032 #if CMK_WORKER_SINGLE_TASK
2033     if (CmiMyRank() % 6 == 3)
2034 #endif
2035     PumpRemoteTransactions();
2036 #endif
2037
2038 #if CMK_USE_OOB
2039     if (SendBufferMsg(&smsg_oob_queue) == 1)
2040 #endif
2041     {
2042 #if CMK_WORKER_SINGLE_TASK
2043     if (CmiMyRank() % 6 == 4)
2044 #endif
2045         SendBufferMsg(&smsg_queue);
2046     }
2047
2048 #if CMK_WORKER_SINGLE_TASK
2049     if (CmiMyRank() % 6 == 5)
2050 #endif
2051     SendRdmaMsg();
2052
2053 #if CMK_SMP_TRACE_COMMTHREAD
2054     endT = CmiWallTimer();
2055     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2056 #endif
2057 #if CMK_SMP_TRACE_COMMTHREAD
2058     traceBeginIdle();
2059 #endif
2060 #endif
2061 }
2062
2063 /* useDynamicSMSG */
2064 static void    PumpDatagramConnection()
2065 {
2066     uint32_t          remote_address;
2067     uint32_t          remote_id;
2068     gni_return_t status;
2069     gni_post_state_t  post_state;
2070     uint64_t          datagram_id;
2071     int i;
2072
2073    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2074    {
2075        if (datagram_id >= mysize) {           /* bound endpoint */
2076            int pe = datagram_id - mysize;
2077            CMI_GNI_LOCK(global_gni_lock)
2078            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2079            CMI_GNI_UNLOCK(global_gni_lock)
2080            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2081            {
2082                CmiAssert(remote_id == pe);
2083                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2084                GNI_RC_CHECK("Dynamic SMSG Init", status);
2085 #if PRINT_SYH
2086                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2087 #endif
2088                CmiAssert(smsg_connected_flag[pe] == 1);
2089                smsg_connected_flag[pe] = 2;
2090            }
2091        }
2092        else {         /* unbound ep */
2093            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2094            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2095            {
2096                CmiAssert(remote_id<mysize);
2097                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2098                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2099                GNI_RC_CHECK("Dynamic SMSG Init", status);
2100 #if PRINT_SYH
2101                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2102 #endif
2103                smsg_connected_flag[remote_id] = 2;
2104
2105                alloc_smsg_attr(&send_smsg_attr);
2106                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2107                GNI_RC_CHECK("post unbound datagram", status);
2108            }
2109        }
2110    }
2111 }
2112
2113 /* pooling CQ to receive network message */
2114 static void PumpNetworkRdmaMsgs()
2115 {
2116     gni_cq_entry_t      event_data;
2117     gni_return_t        status;
2118
2119 }
2120
2121 inline 
2122 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
2123 {
2124     RDMA_REQUEST        *rdma_request_msg;
2125     MallocRdmaRequest(rdma_request_msg);
2126     rdma_request_msg->destNode = inst_id;
2127     rdma_request_msg->pd = pd;
2128 #if REMOTE_EVENT
2129     rdma_request_msg->ack_index = ack_index;
2130 #endif
2131 #if CMK_SMP
2132     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2133 #else
2134     if(sendRdmaBuf == 0)
2135     {
2136         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2137     }else{
2138         sendRdmaTail->next = rdma_request_msg;
2139         sendRdmaTail =  rdma_request_msg;
2140     }
2141 #endif
2142
2143 }
2144
2145 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2146
2147 static void PumpNetworkSmsg()
2148 {
2149     uint64_t            inst_id;
2150     int                 ret;
2151     gni_cq_entry_t      event_data;
2152     gni_return_t        status, status2;
2153     void                *header;
2154     uint8_t             msg_tag;
2155     int                 msg_nbytes;
2156     void                *msg_data;
2157     gni_mem_handle_t    msg_mem_hndl;
2158     gni_smsg_attr_t     *smsg_attr;
2159     gni_smsg_attr_t     *remote_smsg_attr;
2160     int                 init_flag;
2161     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2162     uint64_t            source_addr;
2163     SMSG_QUEUE         *queue = &smsg_queue;
2164 #if   CMK_DIRECT
2165     cmidirectMsg        *direct_msg;
2166 #endif
2167 #if CMI_EXERT_RECV_CAP
2168     int                  recv_cnt = 0;
2169 #endif
2170     while(1)
2171     {
2172         CMI_GNI_LOCK(smsg_rx_cq_lock)
2173         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2174         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2175         if(status != GNI_RC_SUCCESS)
2176             break;
2177         inst_id = GNI_CQ_GET_INST_ID(event_data);
2178 #if REMOTE_EVENT
2179         inst_id = GET_RANK(inst_id);      /* important */
2180 #endif
2181         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2182 #if PRINT_SYH
2183         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2184 #endif
2185         if (useDynamicSMSG) {
2186             /* subtle: smsg may come before connection is setup */
2187             while (smsg_connected_flag[inst_id] != 2) 
2188                PumpDatagramConnection();
2189         }
2190         msg_tag = GNI_SMSG_ANY_TAG;
2191         while(1) {
2192             CMI_GNI_LOCK(smsg_mailbox_lock)
2193             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2194             if (status != GNI_RC_SUCCESS)
2195             {
2196                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2197                 break;
2198             }
2199 #if PRINT_SYH
2200             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2201 #endif
2202             /* copy msg out and then put into queue (small message) */
2203             switch (msg_tag) {
2204             case SMALL_DATA_TAG:
2205             {
2206                 START_EVENT();
2207                 msg_nbytes = CmiGetMsgSize(header);
2208                 msg_data    = CmiAlloc(msg_nbytes);
2209                 memcpy(msg_data, (char*)header, msg_nbytes);
2210                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2211                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2212                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
2213                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2214                 handleOneRecvedMsg(msg_nbytes, msg_data);
2215                 break;
2216             }
2217             case LMSG_INIT_TAG:
2218             {
2219 #if MULTI_THREAD_SEND
2220                 MallocControlMsg(control_msg_tmp);
2221                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2222                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2223                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2224                 getLargeMsgRequest(control_msg_tmp, inst_id);
2225                 FreeControlMsg(control_msg_tmp);
2226 #else
2227                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2228                 getLargeMsgRequest(header, inst_id);
2229                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2230 #endif
2231                 break;
2232             }
2233 #if !REMOTE_EVENT && !CQWRITE
2234             case ACK_TAG:   //msg fit into mempool
2235             {
2236                 /* Get is done, release message . Now put is not used yet*/
2237                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2238                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2239                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2240 #if ! USE_LRTS_MEMPOOL
2241                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2242 #else
2243                 DecreaseMsgInSend(msg);
2244 #endif
2245                 if(NoMsgInSend(msg))
2246                     buffered_send_msg -= GetMempoolsize(msg);
2247                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2248                 CmiFree(msg);
2249 #if CMI_EXERT_SEND_CAP
2250                 SEND_large_pending--;
2251 #endif
2252                 break;
2253             }
2254 #endif
2255             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2256             {
2257 #if MULTI_THREAD_SEND
2258                 MallocControlMsg(header_tmp);
2259                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2260                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2261 #else
2262                 header_tmp = (CONTROL_MSG *) header;
2263 #endif
2264                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2265 #if CMI_EXERT_SEND_CAP
2266                     SEND_large_pending--;
2267 #endif
2268                 void *msg = (void*)(header_tmp->source_addr);
2269                 int cur_seq = CmiGetMsgSeq(msg);
2270                 int offset = ONE_SEG*(cur_seq+1);
2271                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2272                 buffered_send_msg -= header_tmp->length;
2273                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2274                 if (remain_size < 0) remain_size = 0;
2275                 CmiSetMsgSize(msg, remain_size);
2276                 if(remain_size <= 0) //transaction done
2277                 {
2278                     CmiFree(msg);
2279                 }else if (header_tmp->total_length > offset)
2280                 {
2281                     CmiSetMsgSeq(msg, cur_seq+1);
2282                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2283                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2284                     //send next seg
2285                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2286                          // pipelining
2287                     if (header_tmp->seq_id == 1) {
2288                       int i;
2289                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2290                         int seq = cur_seq+i+2;
2291                         CmiSetMsgSeq(msg, seq-1);
2292                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2293                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2294                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2295                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2296                       }
2297                     }
2298                 }
2299 #if MULTI_THREAD_SEND
2300                 FreeControlMsg(header_tmp);
2301 #else
2302                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2303 #endif
2304                 break;
2305             }
2306 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2307             case PUT_DONE_TAG:  {   //persistent message
2308                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2309                 int size = ((CONTROL_MSG *) header)->length;
2310                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2311                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2312                 CmiReference(msg);
2313                 CMI_CHECK_CHECKSUM(msg, size);
2314                 handleOneRecvedMsg(size, msg); 
2315 #if PRINT_SYH
2316                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2317 #endif
2318                 break;
2319             }
2320 #endif
2321 #if CMK_DIRECT
2322             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2323                 //create a trigger message
2324                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2325                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2326                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2327                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2328                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2329                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2330                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2331                 break;
2332 #endif
2333             default:
2334                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2335                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2336                 printf("weird tag problem\n");
2337                 CmiAbort("Unknown tag\n");
2338             }               // end switch
2339 #if PRINT_SYH
2340             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2341 #endif
2342             smsg_recv_count ++;
2343             msg_tag = GNI_SMSG_ANY_TAG;
2344 #if CMI_EXERT_RECV_CAP
2345             if (status == GNI_RC_SUCCESS && ++recv_cnt == RECV_CAP) return;
2346 #endif
2347         } //endwhile GNI_SmsgGetNextWTag
2348     }   //end while GetEvent
2349     if(status == GNI_RC_ERROR_RESOURCE)
2350     {
2351         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2352         GNI_RC_CHECK("Smsg_rx_cq full", status);
2353     }
2354 }
2355
2356 static void printDesc(gni_post_descriptor_t *pd)
2357 {
2358     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2359 }
2360
2361 #if CQWRITE
2362 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2363 {
2364     gni_post_descriptor_t *pd;
2365     gni_return_t        status = GNI_RC_SUCCESS;
2366     
2367     MallocPostDesc(pd);
2368     pd->type = GNI_POST_CQWRITE;
2369     pd->cq_mode = GNI_CQMODE_SILENT;
2370     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2371     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2372     pd->cqwrite_value = data;
2373     pd->remote_mem_hndl = mem_hndl;
2374     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2375     GNI_RC_CHECK("GNI_PostCqWrite", status);
2376 }
2377 #endif
2378
2379 // register memory for a message
2380 // return mem handle
2381 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2382 {
2383     gni_return_t status = GNI_RC_SUCCESS;
2384
2385     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2386
2387 #if CMK_PERSISTENT_COMM
2388       // persistent message is always registered
2389       // BIG_MSG small pieces do not have malloc chunk header
2390     if ((seqno <= 1 || seqno == PERSIST_SEQ) && !IsMemHndlZero(MEMHFIELD(msg))) {
2391         *memh = MEMHFIELD(msg);
2392         return GNI_RC_SUCCESS;
2393     }
2394 #endif
2395     if(seqno == 0 
2396 #if CMK_PERSISTENT_COMM
2397          || seqno == PERSIST_SEQ
2398 #endif
2399       )
2400     {
2401         if(IsMemHndlZero((GetMemHndl(msg))))
2402         {
2403             msg = (void*)(msg);
2404             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2405             if(status == GNI_RC_SUCCESS)
2406                 *memh = GetMemHndl(msg);
2407         }
2408         else {
2409             *memh = GetMemHndl(msg);
2410         }
2411     }
2412     else {
2413         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2414         status = registerMemory(msg, size, memh, NULL); 
2415     }
2416     return status;
2417 }
2418
2419 // for BIG_MSG called on receiver side for receiving control message
2420 // LMSG_INIT_TAG
2421 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2422 {
2423 #if     USE_LRTS_MEMPOOL
2424     CONTROL_MSG         *request_msg;
2425     gni_return_t        status = GNI_RC_SUCCESS;
2426     void                *msg_data;
2427     gni_post_descriptor_t *pd;
2428     gni_mem_handle_t    msg_mem_hndl;
2429     int                 size, transaction_size, offset = 0;
2430     size_t              register_size = 0;
2431
2432     // initial a get to transfer data from the sender side */
2433     request_msg = (CONTROL_MSG *) header;
2434     size = request_msg->total_length;
2435     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2436     MallocPostDesc(pd);
2437 #if CMK_WITH_STATS 
2438     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2439 #endif
2440     if(request_msg->seq_id < 2)   {
2441 #if CMK_SMP_TRACE_COMMTHREAD 
2442         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2443 #endif
2444         msg_data = CmiAlloc(size);
2445         CmiSetMsgSeq(msg_data, 0);
2446         _MEMCHECK(msg_data);
2447     }
2448     else {
2449         offset = ONE_SEG*(request_msg->seq_id-1);
2450         msg_data = (char*)request_msg->dest_addr + offset;
2451     }
2452    
2453     pd->cqwrite_value = request_msg->seq_id;
2454
2455     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2456     SetMemHndlZero(pd->local_mem_hndl);
2457     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2458     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2459         if(NoMsgInRecv( (void*)(msg_data)))
2460             register_size = GetMempoolsize((void*)(msg_data));
2461     }
2462
2463     pd->first_operand = ALIGN64(size);                   //  total length
2464
2465     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2466         pd->type            = GNI_POST_FMA_GET;
2467     else
2468         pd->type            = GNI_POST_RDMA_GET;
2469     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2470     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2471     pd->length          = transaction_size;
2472     pd->local_addr      = (uint64_t) msg_data;
2473     pd->remote_addr     = request_msg->source_addr + offset;
2474     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2475     pd->src_cq_hndl     = rdma_tx_cqh;
2476     pd->rdma_mode       = 0;
2477     pd->amo_cmd         = 0;
2478 #if CMI_EXERT_RDMA_CAP
2479     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2480 #endif
2481     //memory registration success
2482     if(status == GNI_RC_SUCCESS )
2483     {
2484         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2485         CMI_GNI_LOCK(lock)
2486 #if REMOTE_EVENT
2487         if( request_msg->seq_id == 0)
2488         {
2489             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2490             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2491             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2492         }
2493 #endif
2494
2495 #if CMK_WITH_STATS
2496         RDMA_TRY_SEND(pd->type)
2497 #endif
2498         if(pd->type == GNI_POST_RDMA_GET) 
2499         {
2500             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2501         }
2502         else
2503         {
2504             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2505         }
2506         CMI_GNI_UNLOCK(lock)
2507
2508         if(status == GNI_RC_SUCCESS )
2509         {
2510 #if CMI_EXERT_RDMA_CAP
2511             RDMA_pending++;
2512 #endif
2513             if(pd->cqwrite_value == 0)
2514             {
2515 #if MACHINE_DEBUG_LOG
2516                 buffered_recv_msg += register_size;
2517                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2518 #endif
2519                 IncreaseMsgInRecv(msg_data);
2520 #if CMK_SMP_TRACE_COMMTHREAD 
2521                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2522 #endif
2523             }
2524 #if  CMK_WITH_STATS
2525             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2526             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2527 #endif
2528         }
2529     }else
2530     {
2531         SetMemHndlZero((pd->local_mem_hndl));
2532     }
2533     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2534     {
2535 #if REMOTE_EVENT
2536         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2537 #else
2538         bufferRdmaMsg(inst_id, pd, -1); 
2539 #endif
2540     }else if (status != GNI_RC_SUCCESS) {
2541         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2542         GNI_RC_CHECK("GetLargeAFter posting", status);
2543     }
2544 #else
2545     CONTROL_MSG         *request_msg;
2546     gni_return_t        status;
2547     void                *msg_data;
2548     gni_post_descriptor_t *pd;
2549     RDMA_REQUEST        *rdma_request_msg;
2550     gni_mem_handle_t    msg_mem_hndl;
2551     //int source;
2552     // initial a get to transfer data from the sender side */
2553     request_msg = (CONTROL_MSG *) header;
2554     msg_data = CmiAlloc(request_msg->length);
2555     _MEMCHECK(msg_data);
2556
2557     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2558
2559     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2560     {
2561         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2562     }
2563
2564     MallocPostDesc(pd);
2565     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2566         pd->type            = GNI_POST_FMA_GET;
2567     else
2568         pd->type            = GNI_POST_RDMA_GET;
2569     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2570     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2571     pd->length          = ALIGN64(request_msg->length);
2572     pd->local_addr      = (uint64_t) msg_data;
2573     pd->remote_addr     = request_msg->source_addr;
2574     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2575     pd->src_cq_hndl     = rdma_tx_cqh;
2576     pd->rdma_mode       = 0;
2577     pd->amo_cmd         = 0;
2578
2579     //memory registration successful
2580     if(status == GNI_RC_SUCCESS)
2581     {
2582         pd->local_mem_hndl  = msg_mem_hndl;
2583        
2584         if(pd->type == GNI_POST_RDMA_GET) 
2585         {
2586             CMI_GNI_LOCK(rdma_tx_cq_lock)
2587             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2588             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2589         }
2590         else
2591         {
2592             CMI_GNI_LOCK(default_tx_cq_lock)
2593             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2594             CMI_GNI_UNLOCK(default_tx_cq_lock)
2595         }
2596
2597     }else
2598     {
2599         SetMemHndlZero(pd->local_mem_hndl);
2600     }
2601     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2602     {
2603         MallocRdmaRequest(rdma_request_msg);
2604         rdma_request_msg->next = 0;
2605         rdma_request_msg->destNode = inst_id;
2606         rdma_request_msg->pd = pd;
2607         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2608     }else {
2609         GNI_RC_CHECK("AFter posting", status);
2610     }
2611 #endif
2612 }
2613
2614 #if CQWRITE
2615 static void PumpCqWriteTransactions()
2616 {
2617
2618     gni_cq_entry_t          ev;
2619     gni_return_t            status;
2620     void                    *msg;  
2621     int                     msg_size;
2622     while(1) {
2623         //CMI_GNI_LOCK(my_cq_lock) 
2624         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2625         //CMI_GNI_UNLOCK(my_cq_lock)
2626         if(status != GNI_RC_SUCCESS) break;
2627         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2628 #if CMK_PERSISTENT_COMM
2629 #if PRINT_SYH
2630         printf(" %d CQ write event %p\n", myrank, msg);
2631 #endif
2632         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2633 #if PRINT_SYH
2634             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2635 #endif
2636             CmiReference(msg);
2637             msg_size = CmiGetMsgSize(msg);
2638             CMI_CHECK_CHECKSUM(msg, msg_size);
2639             handleOneRecvedMsg(msg_size, msg); 
2640             continue;
2641         }
2642 #endif
2643 #if ! USE_LRTS_MEMPOOL
2644        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2645 #else
2646         DecreaseMsgInSend(msg);
2647 #endif
2648         if(NoMsgInSend(msg))
2649             buffered_send_msg -= GetMempoolsize(msg);
2650         CmiFree(msg);
2651     };
2652     if(status == GNI_RC_ERROR_RESOURCE)
2653     {
2654         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2655     }
2656 }
2657 #endif
2658
2659 #if REMOTE_EVENT
2660 static void PumpRemoteTransactions()
2661 {
2662     gni_cq_entry_t          ev;
2663     gni_return_t            status;
2664     void                    *msg;   
2665     int                     inst_id, index, type, size;
2666
2667     while(1) {
2668         CMI_GNI_LOCK(global_gni_lock)
2669         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2670         CMI_GNI_UNLOCK(global_gni_lock)
2671
2672         if(status != GNI_RC_SUCCESS) break;
2673
2674         inst_id = GNI_CQ_GET_INST_ID(ev);
2675         index = GET_INDEX(inst_id);
2676         type = GET_TYPE(inst_id);
2677         switch (type) {
2678         case 0:    // ACK
2679             CmiAssert(index>=0 && index<ackPool.size);
2680             CmiAssert(GetIndexType(ackPool, index) == 1);
2681             msg = GetIndexAddress(ackPool, index);
2682 #if PRINT_SYH
2683         printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2684 #endif
2685 #if ! USE_LRTS_MEMPOOL
2686            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2687 #else
2688             DecreaseMsgInSend(msg);
2689 #endif
2690             if(NoMsgInSend(msg))
2691                 buffered_send_msg -= GetMempoolsize(msg);
2692             CmiFree(msg);
2693             IndexPool_freeslot(&ackPool, index);
2694 #if CMI_EXERT_SEND_CAP
2695             SEND_large_pending--;
2696 #endif
2697             break;
2698 #if CMK_PERSISTENT_COMM
2699         case 1:  {    // PERSISTENT
2700             CmiLock(persistPool.lock);
2701             CmiAssert(GetIndexType(persistPool, index) == 2);
2702             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2703             CmiUnlock(persistPool.lock);
2704             START_EVENT();
2705             msg = slot->destBuf[0].destAddress;
2706             size = CmiGetMsgSize(msg);
2707             CmiReference(msg);
2708             CMI_CHECK_CHECKSUM(msg, size);
2709             TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2710             handleOneRecvedMsg(size, msg); 
2711             break;
2712             }
2713 #endif
2714         default:
2715             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2716             CmiAbort("PumpRemoteTransactions: unknown type");
2717         }
2718     }
2719     if(status == GNI_RC_ERROR_RESOURCE)
2720     {
2721         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2722     }
2723 }
2724 #endif
2725
2726 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2727 {
2728     gni_cq_entry_t          ev;
2729     gni_return_t            status;
2730     uint64_t                type, inst_id;
2731     gni_post_descriptor_t   *tmp_pd;
2732     MSG_LIST                *ptr;
2733     CONTROL_MSG             *ack_msg_tmp;
2734     ACK_MSG                 *ack_msg;
2735     uint8_t                 msg_tag;
2736 #if CMK_DIRECT
2737     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2738 #endif
2739     SMSG_QUEUE         *queue = &smsg_queue;
2740
2741     while(1) {
2742         CMI_GNI_LOCK(my_cq_lock) 
2743         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2744         CMI_GNI_UNLOCK(my_cq_lock)
2745         if(status != GNI_RC_SUCCESS) break;
2746         
2747         type = GNI_CQ_GET_TYPE(ev);
2748         if (type == GNI_CQ_EVENT_TYPE_POST)
2749         {
2750
2751 #if CMI_EXERT_RDMA_CAP
2752             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2753             RDMA_pending--;
2754 #endif
2755             inst_id     = GNI_CQ_GET_INST_ID(ev);
2756 #if PRINT_SYH
2757             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2758 #endif
2759             CMI_GNI_LOCK(my_cq_lock)
2760             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2761             CMI_GNI_UNLOCK(my_cq_lock)
2762
2763             switch (tmp_pd->type) {
2764 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2765             case GNI_POST_RDMA_PUT:
2766 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2767                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2768 #endif
2769             case GNI_POST_FMA_PUT:
2770                 if(tmp_pd->amo_cmd == 1) {
2771 #if CMK_DIRECT
2772                     //sender ACK to receiver to trigger it is done
2773                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2774                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2775                     msg_tag = DIRECT_PUT_DONE_TAG;
2776 #endif
2777                 }
2778                 else {
2779                     CmiFree((void *)tmp_pd->local_addr);
2780 #if REMOTE_EVENT
2781                     FreePostDesc(tmp_pd);
2782                     continue;
2783 #elif CQWRITE
2784                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2785                     FreePostDesc(tmp_pd);
2786                     continue;
2787 #else
2788                     MallocControlMsg(ack_msg_tmp);
2789                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2790                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2791                     ack_msg_tmp->length  = tmp_pd->length;
2792                     msg_tag = PUT_DONE_TAG;
2793 #endif
2794                 }
2795                 break;
2796 #endif
2797             case GNI_POST_RDMA_GET:
2798             case GNI_POST_FMA_GET:  {
2799 #if  ! USE_LRTS_MEMPOOL
2800                 MallocControlMsg(ack_msg_tmp);
2801                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2802                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2803                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2804                 msg_tag = ACK_TAG;  
2805 #else
2806 #if CMK_WITH_STATS
2807                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2808 #endif
2809                 int seq_id = tmp_pd->cqwrite_value;
2810                 if(seq_id > 0)      // BIG_MSG
2811                 {
2812                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2813                     MallocControlMsg(ack_msg_tmp);
2814                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2815                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2816                     ack_msg_tmp->seq_id = seq_id;
2817                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2818                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2819                     ack_msg_tmp->length = tmp_pd->length;
2820                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2821                     msg_tag = BIG_MSG_TAG; 
2822                 } 
2823                 else
2824                 {
2825                     msg_tag = ACK_TAG; 
2826 #if  !REMOTE_EVENT && !CQWRITE
2827                     MallocAckMsg(ack_msg);
2828                     ack_msg->source_addr = tmp_pd->remote_addr;
2829 #endif
2830                 }
2831 #endif
2832                 break;
2833             }
2834             case  GNI_POST_CQWRITE:
2835                    FreePostDesc(tmp_pd);
2836                    continue;
2837             default:
2838                 CmiPrintf("type=%d\n", tmp_pd->type);
2839                 CmiAbort("PumpLocalTransactions: unknown type!");
2840             }      /* end of switch */
2841
2842 #if CMK_DIRECT
2843             if (tmp_pd->amo_cmd == 1) {
2844                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2845                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2846             }
2847             else
2848 #endif
2849             if (msg_tag == ACK_TAG) {
2850 #if !REMOTE_EVENT
2851 #if   !CQWRITE
2852                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2853                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2854 #else
2855                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2856 #endif
2857 #endif
2858             }
2859             else {
2860                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2861                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2862             }
2863 #if CMK_PERSISTENT_COMM
2864             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2865 #endif
2866             {
2867                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2868 #if PRINT_SYH
2869                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2870 #endif
2871                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2872                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2873
2874                     START_EVENT();
2875                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2876                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2877 #if MACHINE_DEBUG_LOG
2878                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2879                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2880                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2881 #endif
2882                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2883                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2884                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2885                 }else if(msg_tag == BIG_MSG_TAG){
2886                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2887                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2888                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2889                         START_EVENT();
2890 #if PRINT_SYH
2891                         printf("Pipeline msg done [%d]\n", myrank);
2892 #endif
2893 #if                 CMK_SMP_TRACE_COMMTHREAD
2894                         if( tmp_pd->cqwrite_value == 1)
2895                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2896 #endif
2897                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2898                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2899                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2900                     }
2901                 }
2902             }
2903             FreePostDesc(tmp_pd);
2904         }
2905     } //end while
2906     if(status == GNI_RC_ERROR_RESOURCE)
2907     {
2908         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2909         GNI_RC_CHECK("Smsg_tx_cq full", status);
2910     }
2911 }
2912
2913 static void  SendRdmaMsg()
2914 {
2915     gni_return_t            status = GNI_RC_SUCCESS;
2916     gni_mem_handle_t        msg_mem_hndl;
2917     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2918     RDMA_REQUEST            *pre = 0;
2919     uint64_t                register_size = 0;
2920     void                    *msg;
2921     int                     i;
2922 #if CMK_SMP
2923     int len = PCQueueLength(sendRdmaBuf);
2924     for (i=0; i<len; i++)
2925     {
2926 #if CMI_EXERT_RDMA_CAP
2927          if( RDMA_pending >= RDMA_cap) break;
2928 #endif
2929         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2930         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2931         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2932         if (ptr == NULL) break;
2933 #else
2934     ptr = sendRdmaBuf;
2935     while (ptr!=0 )
2936     {
2937 #if CMI_EXERT_RDMA_CAP
2938          if( RDMA_pending >= RDMA_cap) break;
2939 #endif
2940 #endif 
2941         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2942         gni_post_descriptor_t *pd = ptr->pd;
2943         
2944         msg = (void*)(pd->local_addr);
2945         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2946         register_size = 0;
2947         if(pd->cqwrite_value == 0) {
2948             if(NoMsgInRecv(msg))
2949                 register_size = GetMempoolsize(msg);
2950         }
2951
2952         if(status == GNI_RC_SUCCESS)        //mem register good
2953         {
2954             int destNode = ptr->destNode;
2955             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2956             CMI_GNI_LOCK(lock);
2957 #if REMOTE_EVENT
2958             if( pd->cqwrite_value == 0) {
2959                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2960                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
2961                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2962             }
2963 #if CMK_PERSISTENT_COMM
2964             else if (pd->cqwrite_value == PERSIST_SEQ) {
2965                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2966                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
2967                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2968             }
2969 #endif
2970 #endif
2971 #if CMK_WITH_STATS
2972             RDMA_TRY_SEND(pd->type)
2973 #endif
2974 #if CMK_SMP_TRACE_COMMTHREAD
2975 //            int oldpe = -1;
2976 //            int oldeventid = -1;
2977 //            if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
2978 //            { 
2979 //                TRACE_COMM_GET_MSGID((void*)pd->local_addr, &oldpe, &oldeventid);
2980 //                TRACE_COMM_SET_COMM_MSGID((void*)pd->local_addr);
2981 //            }
2982               if(IS_PUT(pd->type) )
2983               { 
2984                   START_EVENT();
2985                   TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);
2986               }
2987 #endif
2988
2989             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2990             {
2991                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
2992             }
2993             else
2994             {
2995                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
2996             }
2997             CMI_GNI_UNLOCK(lock);
2998             
2999 #if CMK_SMP_TRACE_COMMTHREAD
3000 //            if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
3001 //            { 
3002 //                if (oldpe != -1)  TRACE_COMM_SET_MSGID((void*)pd->local_addr, oldpe, oldeventid);
3003 //            }
3004 #endif
3005             if(status == GNI_RC_SUCCESS)    //post good
3006             {
3007 #if CMI_EXERT_RDMA_CAP
3008                 RDMA_pending ++;
3009 #endif
3010 #if !CMK_SMP
3011                 tmp_ptr = ptr;
3012                 if(pre != 0) {
3013                     pre->next = ptr->next;
3014                 }
3015                 else {
3016                     sendRdmaBuf = ptr->next;
3017                 }
3018                 ptr = ptr->next;
3019                 FreeRdmaRequest(tmp_ptr);
3020 #endif
3021                 if(pd->cqwrite_value == 0)
3022                 {
3023 #if CMK_SMP_TRACE_COMMTHREAD 
3024                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3025 #endif
3026                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3027                 }
3028 #if  CMK_WITH_STATS
3029                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3030                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3031 #endif
3032 #if MACHINE_DEBUG_LOG
3033                 buffered_recv_msg += register_size;
3034                 MACHSTATE(8, "GO request from buffered\n"); 
3035 #endif
3036 #if PRINT_SYH
3037                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
3038 #endif
3039             }else           // cannot post
3040             {
3041 #if CMK_SMP
3042                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3043 #else
3044                 pre = ptr;
3045                 ptr = ptr->next;
3046 #endif
3047 #if PRINT_SYH
3048                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3049 #endif
3050                 break;
3051             }
3052         } else          //memory registration fails
3053         {
3054 #if CMK_SMP
3055             PCQueuePush(sendRdmaBuf, (char*)ptr);
3056 #else
3057             pre = ptr;
3058             ptr = ptr->next;
3059 #endif
3060         }
3061     } //end while
3062 #if ! CMK_SMP
3063     if(ptr == 0)
3064         sendRdmaTail = pre;
3065 #endif
3066 }
3067
3068 // return 1 if all messages are sent
3069 static int SendBufferMsg(SMSG_QUEUE *queue)
3070 {
3071     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3072     CONTROL_MSG         *control_msg_tmp;
3073     gni_return_t        status;
3074     int                 done = 1;
3075     uint64_t            register_size;
3076     void                *register_addr;
3077     int                 index_previous = -1;
3078
3079 #if CMK_SMP
3080     int          index = 0;
3081 #if ONE_SEND_QUEUE
3082     memset(destpe_avail, 0, mysize * sizeof(char));
3083     for (index=0; index<1; index++)
3084     {
3085         int i, len = PCQueueLength(queue->sendMsgBuf);
3086         for (i=0; i<len; i++) 
3087         {
3088             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3089             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3090             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3091             if(ptr == NULL) break;
3092             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3093                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3094                 continue;
3095             }
3096 #else
3097 #if SMP_LOCKS
3098     int nonempty = PCQueueLength(nonEmptyQueues);
3099     for(index =0; index<nonempty; index++)
3100     {
3101         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
3102         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
3103         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
3104         if(current_list == NULL) break; 
3105         PCQueue current_queue= current_list-> sendSmsgBuf;
3106         CmiLock(current_list->lock);
3107         int i, len = PCQueueLength(current_queue);
3108         current_list->pushed = 0;
3109         CmiUnlock(current_list->lock);
3110 #else
3111     for(index =0; index<mysize; index++)
3112     {
3113         //if (index == myrank) continue;
3114         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3115         int i, len = PCQueueLength(current_queue);
3116 #endif
3117         for (i=0; i<len; i++) 
3118         {
3119             CMI_PCQUEUEPOP_LOCK(current_queue)
3120             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3121             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3122             if (ptr == 0) break;
3123 #endif
3124 #else
3125     int index = queue->smsg_head_index;
3126     while(index != -1)
3127     {
3128         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3129         pre = 0;
3130         while(ptr != 0)
3131         {
3132 #endif
3133             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3134             status = GNI_RC_ERROR_RESOURCE;
3135             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
3136                 /* connection not exists yet */
3137 #if CMK_SMP
3138                   /* non-smp case, connect is issued in send_smsg_message */
3139                 if (smsg_connected_flag[index] == 0)
3140                     connect_to(ptr->destNode); 
3141 #endif
3142             }
3143             else
3144             switch(ptr->tag)
3145             {
3146             case SMALL_DATA_TAG:
3147                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3148                 if(status == GNI_RC_SUCCESS)
3149                 {
3150                     CmiFree(ptr->msg);
3151                 }
3152                 break;
3153             case LMSG_INIT_TAG:
3154                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3155                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
3156                 break;
3157 #if !REMOTE_EVENT && !CQWRITE
3158             case ACK_TAG:
3159                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3160                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3161                 break;
3162 #endif
3163             case BIG_MSG_TAG:
3164                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3165                 if(status == GNI_RC_SUCCESS)
3166                 {
3167                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3168                 }
3169                 break;
3170 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3171             case PUT_DONE_TAG:
3172                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3173                 if(status == GNI_RC_SUCCESS)
3174                 {
3175                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3176                 }
3177                 break;
3178 #endif
3179 #if CMK_DIRECT
3180             case DIRECT_PUT_DONE_TAG:
3181                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3182                 if(status == GNI_RC_SUCCESS)
3183                 {
3184                     free((CMK_DIRECT_HEADER*)ptr->msg);
3185                 }
3186                 break;
3187
3188 #endif
3189             default:
3190                 printf("Weird tag\n");
3191                 CmiAbort("should not happen\n");
3192             }       // end switch
3193             if(status == GNI_RC_SUCCESS)
3194             {
3195 #if PRINT_SYH
3196                 buffered_smsg_counter--;
3197                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3198 #endif
3199 #if !CMK_SMP
3200                 tmp_ptr = ptr;
3201                 if(pre)
3202                 {
3203                     ptr = pre ->next = ptr->next;
3204                 }else
3205                 {
3206                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3207                 }
3208                 FreeMsgList(tmp_ptr);
3209 #else
3210                 FreeMsgList(ptr);
3211 #endif
3212             }else {
3213 #if CMK_SMP
3214 #if ONE_SEND_QUEUE
3215                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3216 #else
3217                 PCQueuePush(current_queue, (char*)ptr);
3218 #endif
3219 #else
3220                 pre = ptr;
3221                 ptr=ptr->next;
3222 #endif
3223                 done = 0;
3224                 if(status == GNI_RC_ERROR_RESOURCE)
3225                 {
3226 #if CMK_SMP && ONE_SEND_QUEUE 
3227                     destpe_avail[ptr->destNode] = 1;
3228 #else
3229                     break;
3230 #endif
3231                 }
3232             } 
3233         } //end while
3234 #if !CMK_SMP
3235         if(ptr == 0)
3236             queue->smsg_msglist_index[index].tail = pre;
3237         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3238         {
3239             if(index_previous != -1)
3240                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3241             else
3242                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3243         }
3244         else {
3245             index_previous = index;
3246         }
3247         index = queue->smsg_msglist_index[index].next;
3248 #else
3249 #if !ONE_SEND_QUEUE && SMP_LOCKS
3250         CmiLock(current_list->lock);
3251         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3252         {
3253             current_list->pushed = 1;
3254             PCQueuePush(nonEmptyQueues, current_list);
3255         }
3256         CmiUnlock(current_list->lock); 
3257 #endif
3258 #endif
3259
3260     }   // end pooling for all cores
3261     return done;
3262 }
3263
3264 static void ProcessDeadlock();
3265 void LrtsAdvanceCommunication(int whileidle)
3266 {
3267     static int count = 0;
3268     /*  Receive Msg first */
3269 #if CMK_SMP_TRACE_COMMTHREAD
3270     double startT, endT;
3271 #endif
3272     if (useDynamicSMSG && whileidle)
3273     {
3274 #if CMK_SMP_TRACE_COMMTHREAD
3275         startT = CmiWallTimer();
3276 #endif
3277         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3278 #if CMK_SMP_TRACE_COMMTHREAD
3279         endT = CmiWallTimer();
3280         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3281 #endif
3282     }
3283
3284 #if CMK_SMP_TRACE_COMMTHREAD
3285     startT = CmiWallTimer();
3286 #endif
3287     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3288     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3289 #if CMK_SMP_TRACE_COMMTHREAD
3290     endT = CmiWallTimer();
3291     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3292 #endif
3293
3294 #if CMK_SMP_TRACE_COMMTHREAD
3295     startT = CmiWallTimer();
3296 #endif
3297     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3298     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3299 #if CMK_SMP_TRACE_COMMTHREAD
3300     endT = CmiWallTimer();
3301     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3302 #endif
3303
3304 #if CMK_SMP_TRACE_COMMTHREAD
3305     startT = CmiWallTimer();
3306 #endif
3307     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3308
3309 #if CQWRITE
3310     PumpCqWriteTransactions();
3311 #endif
3312
3313 #if REMOTE_EVENT
3314     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions());
3315 #endif
3316
3317     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3318 #if CMK_SMP_TRACE_COMMTHREAD
3319     endT = CmiWallTimer();
3320     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3321 #endif
3322  
3323 #if CMK_SMP_TRACE_COMMTHREAD
3324     startT = CmiWallTimer();
3325 #endif
3326     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3327     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3328 #if CMK_SMP_TRACE_COMMTHREAD
3329     endT = CmiWallTimer();
3330     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3331 #endif
3332
3333     /* Send buffered Message */
3334 #if CMK_SMP_TRACE_COMMTHREAD
3335     startT = CmiWallTimer();
3336 #endif
3337 #if CMK_USE_OOB
3338     if (SendBufferMsg(&smsg_oob_queue) == 1)
3339 #endif
3340     {
3341         STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
3342     }
3343     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3344 #if CMK_SMP_TRACE_COMMTHREAD
3345     endT = CmiWallTimer();
3346     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3347 #endif
3348
3349 #if CMK_SMP && ! LARGEPAGE
3350     if (_detected_hang)  ProcessDeadlock();
3351 #endif
3352 }
3353
3354 static void set_smsg_max()
3355 {
3356     char *env;
3357
3358     if(mysize <=512)
3359     {
3360         SMSG_MAX_MSG = 1024;
3361     }else if (mysize <= 4096)
3362     {
3363         SMSG_MAX_MSG = 1024;
3364     }else if (mysize <= 16384)
3365     {
3366         SMSG_MAX_MSG = 512;
3367     }else {
3368         SMSG_MAX_MSG = 256;
3369     }
3370
3371     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3372     if (env) SMSG_MAX_MSG = atoi(env);
3373     CmiAssert(SMSG_MAX_MSG > 0);
3374 }    
3375
3376 /* useDynamicSMSG */
3377 static void _init_dynamic_smsg()
3378 {
3379     gni_return_t status;
3380     uint32_t     vmdh_index = -1;
3381     int i;
3382
3383     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3384     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3385     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3386     for(i=0; i<mysize; i++) {
3387         smsg_connected_flag[i] = 0;
3388         smsg_attr_vector_local[i] = NULL;
3389         smsg_attr_vector_remote[i] = NULL;
3390     }
3391
3392     set_smsg_max();
3393
3394     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3395     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3396     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3397     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3398     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3399
3400     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3401     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3402     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3403     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3404     mailbox_list->offset = 0;
3405     mailbox_list->next = 0;
3406     
3407     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3408         mailbox_list->size, smsg_rx_cqh,
3409         GNI_MEM_READWRITE,   
3410         vmdh_index,
3411         &(mailbox_list->mem_hndl));
3412     GNI_RC_CHECK("MEMORY registration for smsg", status);
3413
3414     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3415     GNI_RC_CHECK("Unbound EP", status);
3416     
3417     alloc_smsg_attr(&send_smsg_attr);
3418
3419     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3420     GNI_RC_CHECK("post unbound datagram", status);
3421
3422       /* always pre-connect to proc 0 */
3423     //if (myrank != 0) connect_to(0);
3424
3425     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3426     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3427 }
3428
3429 static void _init_static_smsg()
3430 {
3431     gni_smsg_attr_t      *smsg_attr;
3432     gni_smsg_attr_t      remote_smsg_attr;
3433     gni_smsg_attr_t      *smsg_attr_vec;
3434     gni_mem_handle_t     my_smsg_mdh_mailbox;
3435     int      ret, i;
3436     gni_return_t status;
3437     uint32_t              vmdh_index = -1;
3438     mdh_addr_t            base_infor;
3439     mdh_addr_t            *base_addr_vec;
3440
3441     set_smsg_max();
3442     
3443     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3444     
3445     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3446     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3447     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3448     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3449     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3450     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3451     CmiAssert(ret == 0);
3452     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3453     
3454     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3455             smsg_memlen*(mysize), smsg_rx_cqh,
3456             GNI_MEM_READWRITE,   
3457             vmdh_index,
3458             &my_smsg_mdh_mailbox);
3459     register_memory_size += smsg_memlen*(mysize);
3460     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3461
3462     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3463     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3464
3465     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3466     base_infor.mdh =  my_smsg_mdh_mailbox;
3467     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3468
3469     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3470  
3471     for(i=0; i<mysize; i++)
3472     {
3473         if(i==myrank)
3474             continue;
3475         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3476         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3477         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3478         smsg_attr[i].mbox_offset = i*smsg_memlen;
3479         smsg_attr[i].buff_size = smsg_memlen;
3480         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3481         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3482     }
3483
3484     for(i=0; i<mysize; i++)
3485     {
3486         if (myrank == i) continue;
3487
3488         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3489         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3490         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3491         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3492         remote_smsg_attr.buff_size = smsg_memlen;
3493         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3494         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3495
3496         /* initialize the smsg channel */
3497         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3498         GNI_RC_CHECK("SMSG Init", status);
3499     } //end initialization
3500
3501     free(base_addr_vec);
3502     free(smsg_attr);
3503
3504     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3505     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3506
3507
3508 inline
3509 static void _init_send_queue(SMSG_QUEUE *queue)
3510 {
3511      int i;
3512 #if ONE_SEND_QUEUE
3513      queue->sendMsgBuf = PCQueueCreate();
3514      destpe_avail = (char*)malloc(mysize * sizeof(char));
3515 #else
3516      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3517 #if CMK_SMP && SMP_LOCKS
3518      nonEmptyQueues = PCQueueCreate();
3519 #endif
3520      for(i =0; i<mysize; i++)
3521      {
3522 #if CMK_SMP
3523          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3524 #if SMP_LOCKS
3525          queue->smsg_msglist_index[i].pushed = 0;
3526          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3527 #endif
3528 #else
3529          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3530          queue->smsg_msglist_index[i].next = -1;
3531          queue->smsg_head_index = -1;
3532 #endif
3533         
3534      }
3535 #endif
3536 }
3537
3538 inline
3539 static void _init_smsg()
3540 {
3541     if(mysize > 1) {
3542         if (useDynamicSMSG)
3543             _init_dynamic_smsg();
3544         else
3545             _init_static_smsg();
3546     }
3547
3548     _init_send_queue(&smsg_queue);
3549 #if CMK_USE_OOB
3550     _init_send_queue(&smsg_oob_queue);
3551 #endif
3552 }
3553
3554 static void _init_static_msgq()
3555 {
3556     gni_return_t status;
3557     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3558     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3559     msgq_attrs.smsg_q_sz = 1;
3560     msgq_attrs.rcv_pool_sz = 1;
3561     msgq_attrs.num_msgq_eps = 2;
3562     msgq_attrs.nloc_insts = 8;
3563     msgq_attrs.modes = 0;
3564     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3565
3566     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3567     GNI_RC_CHECK("MSGQ Init", status);
3568
3569
3570 }
3571
3572
3573 static CmiUInt8 total_mempool_size = 0;
3574 static CmiUInt8 total_mempool_calls = 0;
3575
3576 #if USE_LRTS_MEMPOOL
3577 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3578 {
3579     void *pool;
3580     int ret;
3581     gni_return_t status = GNI_RC_SUCCESS;
3582
3583     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3584     if (*size < default_size) *size = default_size;
3585 #if LARGEPAGE
3586     // round up to be multiple of _tlbpagesize
3587     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3588     *size = ALIGNHUGEPAGE(*size);
3589 #endif
3590     total_mempool_size += *size;
3591     total_mempool_calls += 1;
3592 #if   !LARGEPAGE
3593     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3594     {
3595         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3596         CmiAbort("alloc_mempool_block");
3597     }
3598 #endif
3599 #if LARGEPAGE
3600     pool = my_get_huge_pages(*size);
3601     ret = pool==NULL;
3602 #else
3603     ret = posix_memalign(&pool, ALIGNBUF, *size);
3604 #endif
3605     if (ret != 0) {
3606 #if CMK_SMP && STEAL_MEMPOOL
3607       pool = steal_mempool_block(size, mem_hndl);
3608       if (pool != NULL) return pool;
3609 #endif
3610       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3611       if (ret == ENOMEM)
3612         CmiAbort("alloc_mempool_block: out of memory.");
3613       else
3614         CmiAbort("alloc_mempool_block: posix_memalign failed");
3615     }
3616 #if LARGEPAGE
3617     CmiMemLock();
3618     register_count++;
3619     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3620     CmiMemUnlock();
3621     if(status != GNI_RC_SUCCESS) {
3622         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3623 sweep_mempool(CpvAccess(mempool));
3624     }
3625     GNI_RC_CHECK("MEMORY_REGISTER", status);
3626 #else
3627     SetMemHndlZero((*mem_hndl));
3628 #endif
3629     return pool;
3630 }
3631
3632 // ptr is a block head pointer
3633 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3634 {
3635     if(!(IsMemHndlZero(mem_hndl)))
3636     {
3637         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3638     }
3639 #if LARGEPAGE
3640     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3641 #else
3642     free(ptr);
3643 #endif
3644 }
3645 #endif
3646
3647 void LrtsPreCommonInit(int everReturn){
3648 #if USE_LRTS_MEMPOOL
3649     CpvInitialize(mempool_type*, mempool);
3650     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3651     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3652 #endif
3653 }
3654
3655 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3656 {
3657     register int            i;
3658     int                     rc;
3659     int                     device_id = 0;
3660     unsigned int            remote_addr;
3661     gni_cdm_handle_t        cdm_hndl;
3662     gni_return_t            status = GNI_RC_SUCCESS;
3663     uint32_t                vmdh_index = -1;
3664     uint8_t                 ptag;
3665     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3666     int                     first_spawned;
3667     int                     physicalID;
3668     char                   *env;
3669
3670     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3671     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3672     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3673    
3674     status = PMI_Init(&first_spawned);
3675     GNI_RC_CHECK("PMI_Init", status);
3676
3677     status = PMI_Get_size(&mysize);
3678     GNI_RC_CHECK("PMI_Getsize", status);
3679
3680     status = PMI_Get_rank(&myrank);
3681     GNI_RC_CHECK("PMI_getrank", status);
3682
3683     //physicalID = CmiPhysicalNodeID(myrank);
3684     
3685     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3686
3687     *myNodeID = myrank;
3688     *numNodes = mysize;
3689   
3690 #if MULTI_THREAD_SEND
3691     /* Currently, we only consider the case that comm. thread will only recv msgs */
3692     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3693 #endif
3694
3695 #if CMI_EXERT_SEND_CAP
3696     CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
3697 #endif
3698
3699     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3700     
3701     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3702     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3703     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3704
3705     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3706     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3707     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3708
3709     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3710     if (env) useDynamicSMSG = 1;
3711     if (!useDynamicSMSG)
3712       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3713     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3714     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3715     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3716     
3717     if(myrank == 0)
3718     {
3719         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3720         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3721     }
3722 #ifdef USE_ONESIDED
3723     onesided_init(NULL, &onesided_hnd);
3724
3725     // this is a GNI test, so use the libonesided bypass functionality
3726     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3727     local_addr = gniGetNicAddress();
3728 #else
3729     ptag = get_ptag();
3730     cookie = get_cookie();
3731 #if 0
3732     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3733 #endif
3734     //Create and attach to the communication  domain */
3735     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3736     GNI_RC_CHECK("GNI_CdmCreate", status);
3737     //* device id The device id is the minor number for the device
3738     //that is assigned to the device by the system when the device is created.
3739     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3740     //where X is the device number 0 default 
3741     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3742     GNI_RC_CHECK("GNI_CdmAttach", status);
3743     local_addr = get_gni_nic_address(0);
3744 #endif
3745     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3746     _MEMCHECK(MPID_UGNI_AllAddr);
3747     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3748     /* create the local completion queue */
3749     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3750     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3751     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3752     
3753     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3754     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3755     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3756
3757     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3758     GNI_RC_CHECK("Create CQ (rx)", status);
3759     
3760     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3761     GNI_RC_CHECK("Create Post CQ (rx)", status);
3762     
3763     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3764     //GNI_RC_CHECK("Create BTE CQ", status);
3765
3766     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3767     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3768     _MEMCHECK(ep_hndl_array);
3769 #if MULTI_THREAD_SEND 
3770     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3771     //default_tx_cq_lock = CmiCreateLock();
3772     rdma_tx_cq_lock = CmiCreateLock();
3773     smsg_rx_cq_lock = CmiCreateLock();
3774     //global_gni_lock  = CmiCreateLock();
3775     //rx_cq_lock  = CmiCreateLock();
3776 #endif
3777     for (i=0; i<mysize; i++) {
3778         if(i == myrank) continue;
3779         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3780         GNI_RC_CHECK("GNI_EpCreate ", status);   
3781         remote_addr = MPID_UGNI_AllAddr[i];
3782         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3783         GNI_RC_CHECK("GNI_EpBind ", status);   
3784     }
3785
3786     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3787     _init_smsg();
3788     PMI_Barrier();
3789
3790 #if     USE_LRTS_MEMPOOL
3791     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3792     if (env) {
3793         _totalmem = CmiReadSize(env);
3794         if (myrank == 0)
3795             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3796     }
3797
3798     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3799     if (env) _mempool_size = CmiReadSize(env);
3800     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3801         _mempool_size = CmiReadSize(env);
3802
3803
3804     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3805     if (env) {
3806         MAX_REG_MEM = CmiReadSize(env);
3807         user_set_flag = 1;
3808     }
3809     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3810         MAX_REG_MEM = CmiReadSize(env);
3811         user_set_flag = 1;
3812     }
3813
3814     env = getenv("CHARM_UGNI_SEND_MAX");
3815     if (env) {
3816         MAX_BUFF_SEND = CmiReadSize(env);
3817         user_set_flag = 1;
3818     }
3819     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3820         MAX_BUFF_SEND = CmiReadSize(env);
3821         user_set_flag = 1;
3822     }
3823
3824     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3825     if (env) {
3826         _mempool_size_limit = CmiReadSize(env);
3827     }
3828
3829     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3830     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3831
3832     if (myrank==0) {
3833         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3834         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3835         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3836             /* memblock can expand to BIG_MSG * 2 size */
3837             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3838             CmiAbort("mempool maximum size is too small. \n");
3839         }
3840 #if MULTI_THREAD_SEND
3841         printf("Charm++> worker thread sending messages\n");
3842 #elif COMM_THREAD_SEND
3843         printf("Charm++> only comm thread send/recv messages\n");
3844 #endif
3845     }
3846
3847 #endif     /* end of USE_LRTS_MEMPOOL */
3848
3849     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3850     if (env) {
3851         BIG_MSG = CmiReadSize(env);
3852         if (BIG_MSG < ONE_SEG)
3853           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3854     }
3855     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3856     if (env) {
3857         BIG_MSG_PIPELINE = atoi(env);
3858     }
3859
3860     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3861     if (env) _checkProgress = 0;
3862     if (mysize == 1) _checkProgress = 0;
3863
3864 #if CMI_EXERT_RDMA_CAP
3865     env = getenv("CHARM_UGNI_RDMA_MAX");
3866     if (env)  {
3867         RDMA_pending = atoi(env);
3868         if (myrank == 0)
3869             printf("Charm++> Max pending RDMA set to: %d\n", RDMA_pending);
3870     }
3871 #endif
3872     
3873     /*
3874     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3875     if (env) 
3876         _tlbpagesize = CmiReadSize(env);
3877     */
3878     /* real gethugepagesize() is only available when hugetlb module linked */
3879     _tlbpagesize = gethugepagesize();
3880     if (myrank == 0) {
3881         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3882     }
3883
3884 #if LARGEPAGE
3885     if (_tlbpagesize == 4096) {
3886         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3887     }
3888 #endif
3889
3890       /* stats related arguments */
3891 #if CMK_WITH_STATS
3892     CmiGetArgStringDesc(*argv,"+gni_stats_root",&counters_dirname,"counter directory name, default counters");
3893
3894     print_stats = CmiGetArgFlag(*argv, "+print_stats");
3895     
3896     stats_off = CmiGetArgFlag(*argv, "+stats_off");
3897
3898     init_comm_stats();
3899 #endif
3900
3901     /* init DMA buffer for medium message */
3902
3903     //_init_DMA_buffer();
3904     
3905     free(MPID_UGNI_AllAddr);
3906
3907 #if CMK_SMP
3908     sendRdmaBuf = PCQueueCreate();
3909 #else
3910     sendRdmaBuf = 0;
3911 #endif
3912
3913 #if MACHINE_DEBUG_LOG
3914     char ln[200];
3915     sprintf(ln,"debugLog.%d",myrank);
3916     debugLog=fopen(ln,"w");
3917 #endif
3918
3919 //    NTK_Init();
3920 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3921
3922 #if  REMOTE_EVENT
3923     SHIFT = 1;
3924     while (1<<SHIFT < mysize) SHIFT++;
3925     CmiAssert(SHIFT < 31);
3926     IndexPool_init(&ackPool);
3927 #if CMK_PERSISTENT_COMM
3928     IndexPool_init(&persistPool);
3929 #endif
3930 #endif
3931 }
3932
3933 void* LrtsAlloc(int n_bytes, int header)
3934 {
3935     void *ptr = NULL;
3936 #if 0
3937     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3938 #endif
3939     if(n_bytes <= SMSG_MAX_MSG)
3940     {
3941         int totalsize = n_bytes+header;
3942         ptr = malloc(totalsize);
3943     }
3944     else {
3945         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3946 #if     USE_LRTS_MEMPOOL
3947         n_bytes = ALIGN64(n_bytes);
3948         if(n_bytes < BIG_MSG)
3949         {
3950             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3951             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3952         }else 
3953         {
3954 #if LARGEPAGE
3955             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3956             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3957             char *res = my_get_huge_pages(n_bytes);
3958 #else
3959             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3960 #endif
3961             if (res) ptr = res + ALIGNBUF - header;
3962         }
3963 #else
3964         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3965         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3966         ptr = res + ALIGNBUF - header;
3967 #endif
3968     }
3969 #if CMK_PERSISTENT_COMM
3970     if (ptr) SetMemHndlZero(MEMHFIELD((char*)ptr+header));
3971 #endif
3972     return ptr;
3973 }
3974
3975 void  LrtsFree(void *msg)
3976 {
3977     CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3978 #if CMK_PERSISTENT_COMM
3979     if (!IsMemHndlZero(MEMHFIELD((char*)msg+sizeof(CmiChunkHeader)))) return;
3980 #endif
3981     if (size <= SMSG_MAX_MSG)
3982         free(msg);
3983     else {
3984         size = ALIGN64(size);
3985         if(size>=BIG_MSG)
3986         {
3987 #if LARGEPAGE
3988             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3989             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3990 #else
3991             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3992 #endif
3993         }
3994         else {
3995 #if    USE_LRTS_MEMPOOL
3996 #if CMK_SMP
3997             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3998 #else
3999             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
4000 #endif
4001 #else
4002             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
4003 #endif
4004         }
4005     }
4006 }
4007
4008 void LrtsExit()
4009 {
4010 #if CMK_WITH_STATS
4011 #if CMK_SMP
4012     if(CmiMyRank() == CmiMyNodeSize())
4013 #endif
4014     if (print_stats) print_comm_stats();
4015 #endif
4016     /* free memory ? */
4017 #if USE_LRTS_MEMPOOL
4018     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
4019     mempool_destroy(CpvAccess(mempool));
4020 #endif
4021     PMI_Finalize();
4022     exit(0);
4023 }
4024
4025 void LrtsDrainResources()
4026 {
4027     if(mysize == 1) return;
4028     while (
4029 #if CMK_USE_OOB
4030            !SendBufferMsg(&smsg_oob_queue) ||
4031 #endif
4032            !SendBufferMsg(&smsg_queue) 
4033           )
4034     {
4035         if (useDynamicSMSG)
4036             PumpDatagramConnection();
4037         PumpNetworkSmsg();
4038         PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
4039         PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
4040 #if REMOTE_EVENT
4041         PumpRemoteTransactions();
4042 #endif
4043         SendRdmaMsg();
4044     }
4045     PMI_Barrier();
4046 }
4047
4048 void LrtsAbort(const char *message) {
4049     fprintf(stderr, "[%d] CmiAbort: %s\n", myrank, message);
4050     CmiPrintStackTrace(0);
4051     PMI_Abort(-1, message);
4052 }
4053
4054 /**************************  TIMER FUNCTIONS **************************/
4055 #if CMK_TIMER_USE_SPECIAL
4056 /* MPI calls are not threadsafe, even the timer on some machines */
4057 static CmiNodeLock  timerLock = 0;
4058 static int _absoluteTime = 0;
4059 static int _is_global = 0;
4060 static struct timespec start_ts;
4061
4062 inline int CmiTimerIsSynchronized() {
4063     return 0;
4064 }
4065
4066 inline int CmiTimerAbsolute() {
4067     return _absoluteTime;
4068 }
4069
4070 double CmiStartTimer() {
4071     return 0.0;
4072 }
4073
4074 double CmiInitTime() {
4075     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
4076 }
4077
4078 void CmiTimerInit(char **argv) {
4079     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
4080     if (_absoluteTime && CmiMyPe() == 0)
4081         printf("Ch