7bd4ff41eb6708d14aed3ffc9a75b2ac5c7983c8
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           1
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     0
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_CAP      0
65 #define CMI_EXERT_RECV_CAP      0
66 #define CMI_EXERT_RDMA_CAP      0
67 #if CMI_EXERT_SEND_CAP
68 int SEND_large_cap = 100;
69 int SEND_large_pending = 0;
70 #endif
71
72 #if CMI_EXERT_RECV_CAP
73 #define RECV_CAP  4                  /* cap <= 2 sometimes hang */
74 #endif
75
76 #if CMI_EXERT_RDMA_CAP
77 int   RDMA_cap =   100;
78 int   RDMA_pending = 0;
79 #endif
80
81 #define USE_LRTS_MEMPOOL                  1
82
83 #define PRINT_SYH                         0
84
85 // Trace communication thread
86 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
87 #define TRACE_THRESHOLD     0.00005
88 #define CMI_MPI_TRACE_MOREDETAILED 0
89 #undef CMI_MPI_TRACE_USEREVENTS
90 #define CMI_MPI_TRACE_USEREVENTS 1
91 #else
92 #undef CMK_SMP_TRACE_COMMTHREAD
93 #define CMK_SMP_TRACE_COMMTHREAD 0
94 #endif
95
96 #define CMK_TRACE_COMMOVERHEAD 0
97 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
98 #undef CMI_MPI_TRACE_USEREVENTS
99 #define CMI_MPI_TRACE_USEREVENTS 1
100 #else
101 #undef CMK_TRACE_COMMOVERHEAD
102 #define CMK_TRACE_COMMOVERHEAD 0
103 #endif
104
105 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
106 CpvStaticDeclare(double, projTraceStart);
107 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
108 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
109 #else
110 #define  START_EVENT()
111 #define  END_EVENT(x)
112 #endif
113
114 #if USE_LRTS_MEMPOOL
115
116 #define oneMB (1024ll*1024)
117 #define oneGB (1024ll*1024*1024)
118
119 static CmiInt8 _mempool_size = 8*oneMB;
120 static CmiInt8 _expand_mem =  4*oneMB;
121 static CmiInt8 _mempool_size_limit = 0;
122
123 static CmiInt8 _totalmem = 0.8*oneGB;
124
125 #if LARGEPAGE
126 static CmiInt8 BIG_MSG  =  16*oneMB;
127 static CmiInt8 ONE_SEG  =  4*oneMB;
128 #else
129 static CmiInt8 BIG_MSG  =  4*oneMB;
130 static CmiInt8 ONE_SEG  =  2*oneMB;
131 #endif
132 #if MULTI_THREAD_SEND
133 static int BIG_MSG_PIPELINE = 1;
134 #else
135 static int BIG_MSG_PIPELINE = 4;
136 #endif
137
138 // dynamic flow control
139 static CmiInt8 buffered_send_msg = 0;
140 static CmiInt8 register_memory_size = 0;
141
142 #if LARGEPAGE
143 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
144 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
145 static CmiInt8  register_count = 0;
146 #else
147 #if CMK_SMP && COMM_THREAD_SEND 
148 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
149 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
150 #else
151 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
152 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
153 #endif
154
155
156 #endif
157
158 #endif     /* end USE_LRTS_MEMPOOL */
159
160 #if MULTI_THREAD_SEND 
161 #define     CMI_GNI_LOCK(x)       CmiLock(x);
162 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
163 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
164 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
165 #else
166 #define     CMI_GNI_LOCK(x)
167 #define     CMI_GNI_UNLOCK(x)
168 #define     CMI_PCQUEUEPOP_LOCK(Q)   
169 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
170 #endif
171
172 static int _tlbpagesize = 4096;
173
174 //static int _smpd_count  = 0;
175
176 static int   user_set_flag  = 0;
177
178 static int _checkProgress = 1;             /* check deadlock */
179 static int _detected_hang = 0;
180
181 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
182
183 // dynamic SMSG
184 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
185
186 static int avg_smsg_connection = 32;
187 static int                 *smsg_connected_flag= 0;
188 static gni_smsg_attr_t     **smsg_attr_vector_local;
189 static gni_smsg_attr_t     **smsg_attr_vector_remote;
190 static gni_ep_handle_t     ep_hndl_unbound;
191 static gni_smsg_attr_t     send_smsg_attr;
192 static gni_smsg_attr_t     recv_smsg_attr;
193
194 typedef struct _dynamic_smsg_mailbox{
195    void     *mailbox_base;
196    int      size;
197    int      offset;
198    gni_mem_handle_t  mem_hndl;
199    struct      _dynamic_smsg_mailbox  *next;
200 }dynamic_smsg_mailbox_t;
201
202 static dynamic_smsg_mailbox_t  *mailbox_list;
203
204 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
205 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
206
207 #if PRINT_SYH
208 int         lrts_send_msg_id = 0;
209 int         lrts_local_done_msg = 0;
210 int         lrts_send_rdma_success = 0;
211 #endif
212
213 #include "machine.h"
214
215 #include "pcqueue.h"
216
217 #include "mempool.h"
218
219 #if CMK_PERSISTENT_COMM
220 #include "machine-persistent.h"
221 #endif
222
223 //#define  USE_ONESIDED 1
224 #ifdef USE_ONESIDED
225 //onesided implementation is wrong, since no place to restore omdh
226 #include "onesided.h"
227 onesided_hnd_t   onesided_hnd;
228 onesided_md_t    omdh;
229 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
230
231 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
232
233 #else
234 uint8_t   onesided_hnd, omdh;
235
236 #if REMOTE_EVENT || CQWRITE 
237 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
238     if(register_memory_size+size>= MAX_REG_MEM) { \
239         status = GNI_RC_ERROR_NOMEM;} \
240     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
241         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
242 #else
243 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
244         if (register_memory_size + size >= MAX_REG_MEM) { \
245             status = GNI_RC_ERROR_NOMEM; \
246         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
247             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
248 #endif
249
250 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
251     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
252              register_memory_size -= size; \
253          else CmiAbort("MEM_DEregister");  \
254     } while (0)
255 #endif
256
257 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
258 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
259 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
260 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
261 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
262 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
263 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
264 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
265 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
266 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
267 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
268 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
269 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
270 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
271
272 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
273 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
274
275 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
276 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
277 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
278 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
279
280 #define ALIGNBUF                64
281
282 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
283 /* If SMSG is not used */
284
285 #define FMA_PER_CORE  1024
286 #define FMA_BUFFER_SIZE 1024
287
288 /* If SMSG is used */
289 static int  SMSG_MAX_MSG = 1024;
290 #define SMSG_MAX_CREDIT    72
291
292 #define MSGQ_MAXSIZE       2048
293
294 /* large message transfer with FMA or BTE */
295 #if ! REMOTE_EVENT
296 #define LRTS_GNI_RDMA_THRESHOLD  1024 
297 #else
298    /* remote events only work with RDMA */
299 #define LRTS_GNI_RDMA_THRESHOLD  0 
300 #endif
301
302 #if CMK_SMP
303 static int  REMOTE_QUEUE_ENTRIES=163840; 
304 static int LOCAL_QUEUE_ENTRIES=163840; 
305 #else
306 static int  REMOTE_QUEUE_ENTRIES=20480;
307 static int LOCAL_QUEUE_ENTRIES=20480; 
308 #endif
309
310 #define BIG_MSG_TAG             0x26
311 #define PUT_DONE_TAG            0x28
312 #define DIRECT_PUT_DONE_TAG     0x29
313 #define ACK_TAG                 0x30
314 /* SMSG is data message */
315 #define SMALL_DATA_TAG          0x31
316 /* SMSG is a control message to initialize a BTE */
317 #define LMSG_INIT_TAG           0x39 
318
319 #define DEBUG
320 #ifdef GNI_RC_CHECK
321 #undef GNI_RC_CHECK
322 #endif
323 #ifdef DEBUG
324 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
325 #else
326 #define GNI_RC_CHECK(msg,rc)
327 #endif
328
329 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
330 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
331 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
332
333 static int useStaticMSGQ = 0;
334 static int useStaticFMA = 0;
335 static int mysize, myrank;
336 static gni_nic_handle_t   nic_hndl;
337
338 typedef struct {
339     gni_mem_handle_t mdh;
340     uint64_t addr;
341 } mdh_addr_t ;
342 // this is related to dynamic SMSG
343
344 typedef struct mdh_addr_list{
345     gni_mem_handle_t mdh;
346    void *addr;
347     struct mdh_addr_list *next;
348 }mdh_addr_list_t;
349
350 static unsigned int         smsg_memlen;
351 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
352 mdh_addr_t          setup_mem;
353 mdh_addr_t          *smsg_connection_vec = 0;
354 gni_mem_handle_t    smsg_connection_memhndl;
355 static int          smsg_expand_slots = 10;
356 static int          smsg_available_slot = 0;
357 static void         *smsg_mailbox_mempool = 0;
358 mdh_addr_list_t     *smsg_dynamic_list = 0;
359
360 static void             *smsg_mailbox_base;
361 gni_msgq_attr_t         msgq_attrs;
362 gni_msgq_handle_t       msgq_handle;
363 gni_msgq_ep_attr_t      msgq_ep_attrs;
364 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
365
366 /* =====Beginning of Declarations of Machine Specific Variables===== */
367 static int cookie;
368 static int modes = 0;
369 static gni_cq_handle_t       smsg_rx_cqh = NULL;
370 static gni_cq_handle_t       default_tx_cqh = NULL;
371 static gni_cq_handle_t       rdma_tx_cqh = NULL;
372 static gni_cq_handle_t       rdma_rx_cqh = NULL;
373 static gni_cq_handle_t       post_tx_cqh = NULL;
374 static gni_ep_handle_t       *ep_hndl_array;
375
376 static CmiNodeLock           *ep_lock_array;
377 static CmiNodeLock           default_tx_cq_lock; 
378 static CmiNodeLock           rdma_tx_cq_lock; 
379 static CmiNodeLock           global_gni_lock; 
380 static CmiNodeLock           rx_cq_lock;
381 static CmiNodeLock           smsg_mailbox_lock;
382 static CmiNodeLock           smsg_rx_cq_lock;
383 static CmiNodeLock           *mempool_lock;
384 //#define     CMK_WITH_STATS      1
385 typedef struct msg_list
386 {
387     uint32_t destNode;
388     uint32_t size;
389     void *msg;
390     uint8_t tag;
391 #if !CMK_SMP
392     struct msg_list *next;
393 #endif
394 #if CMK_WITH_STATS
395     double  creation_time;
396 #endif
397 }MSG_LIST;
398
399
400 typedef struct control_msg
401 {
402     uint64_t            source_addr;    /* address from the start of buffer  */
403     uint64_t            dest_addr;      /* address from the start of buffer */
404     int                 total_length;   /* total length */
405     int                 length;         /* length of this packet */
406 #if REMOTE_EVENT
407     int                 ack_index;      /* index from integer to address */
408 #endif
409     uint8_t             seq_id;         //big message   0 meaning single message
410     gni_mem_handle_t    source_mem_hndl;
411     struct control_msg *next;
412 } CONTROL_MSG;
413
414 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
415
416 typedef struct ack_msg
417 {
418     uint64_t            source_addr;    /* address from the start of buffer  */
419 #if ! USE_LRTS_MEMPOOL
420     gni_mem_handle_t    source_mem_hndl;
421     int                 length;          /* total length */
422 #endif
423     struct ack_msg     *next;
424 } ACK_MSG;
425
426 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
427
428 #if CMK_DIRECT
429 typedef struct{
430     uint64_t    handler_addr;
431 }CMK_DIRECT_HEADER;
432
433 typedef struct {
434     char core[CmiMsgHeaderSizeBytes];
435     uint64_t handler;
436 }cmidirectMsg;
437
438 //SYH
439 CpvDeclare(int, CmiHandleDirectIdx);
440 void CmiHandleDirectMsg(cmidirectMsg* msg)
441 {
442
443     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
444    (*(_handle->callbackFnPtr))(_handle->callbackData);
445    CmiFree(msg);
446 }
447
448 void CmiDirectInit()
449 {
450     CpvInitialize(int,  CmiHandleDirectIdx);
451     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
452 }
453
454 #endif
455 typedef struct  rmda_msg
456 {
457     int                   destNode;
458 #if REMOTE_EVENT
459     int                   ack_index;
460 #endif
461     gni_post_descriptor_t *pd;
462 #if !CMK_SMP
463     struct  rmda_msg      *next;
464 #endif
465 }RDMA_REQUEST;
466
467
468 #if CMK_SMP
469 #define SMP_LOCKS               0
470 #define ONE_SEND_QUEUE                  0
471 PCQueue sendRdmaBuf;
472 typedef struct  msg_list_index
473 {
474     PCQueue     sendSmsgBuf;
475     int         pushed;
476     CmiNodeLock   lock;
477 } MSG_LIST_INDEX;
478 char                *destpe_avail;
479 #if  !ONE_SEND_QUEUE && SMP_LOCKS
480     PCQueue     nonEmptyQueues;
481 #endif
482 #else         /* non-smp */
483
484 static RDMA_REQUEST        *sendRdmaBuf = 0;
485 static RDMA_REQUEST        *sendRdmaTail = 0;
486 typedef struct  msg_list_index
487 {
488     int         next;
489     MSG_LIST    *sendSmsgBuf;
490     MSG_LIST    *tail;
491 } MSG_LIST_INDEX;
492
493 #endif
494
495 // buffered send queue
496 #if ! ONE_SEND_QUEUE
497 typedef struct smsg_queue
498 {
499     MSG_LIST_INDEX   *smsg_msglist_index;
500     int               smsg_head_index;
501 } SMSG_QUEUE;
502 #else
503 typedef struct smsg_queue
504 {
505     PCQueue       sendMsgBuf;
506 }  SMSG_QUEUE;
507 #endif
508
509 SMSG_QUEUE                  smsg_queue;
510 #if CMK_USE_OOB
511 SMSG_QUEUE                  smsg_oob_queue;
512 #endif
513
514 #if CMK_SMP
515
516 #define FreeMsgList(d)   free(d);
517 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
518
519 #else
520
521 static MSG_LIST       *msglist_freelist=0;
522
523 #define FreeMsgList(d)  \
524   do { \
525   (d)->next = msglist_freelist;\
526   msglist_freelist = d; \
527   } while (0)
528
529 #define MallocMsgList(d) \
530   do {  \
531   d = msglist_freelist;\
532   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
533              _MEMCHECK(d);\
534   } else msglist_freelist = d->next; \
535   d->next =0;  \
536   } while (0)
537
538 #endif
539
540 #if CMK_SMP
541
542 #define FreeControlMsg(d)      free(d);
543 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
544
545 #else
546
547 static CONTROL_MSG    *control_freelist=0;
548
549 #define FreeControlMsg(d)       \
550   do { \
551   (d)->next = control_freelist;\
552   control_freelist = d; \
553   } while (0);
554
555 #define MallocControlMsg(d) \
556   do {  \
557   d = control_freelist;\
558   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
559              _MEMCHECK(d);\
560   } else control_freelist = d->next;  \
561   } while (0);
562
563 #endif
564
565 #if CMK_SMP
566
567 #define FreeAckMsg(d)      free(d);
568 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
569
570 #else
571
572 static ACK_MSG        *ack_freelist=0;
573
574 #define FreeAckMsg(d)       \
575   do { \
576   (d)->next = ack_freelist;\
577   ack_freelist = d; \
578   } while (0)
579
580 #define MallocAckMsg(d) \
581   do { \
582   d = ack_freelist;\
583   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
584              _MEMCHECK(d);\
585   } else ack_freelist = d->next; \
586   } while (0)
587
588 #endif
589
590
591 # if CMK_SMP
592 #define FreeRdmaRequest(d)       free(d);
593 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
594 #else
595
596 static RDMA_REQUEST         *rdma_freelist = NULL;
597
598 #define FreeRdmaRequest(d)       \
599   do {  \
600   (d)->next = rdma_freelist;\
601   rdma_freelist = d;    \
602   } while (0)
603
604 #define MallocRdmaRequest(d) \
605   do {   \
606   d = rdma_freelist;\
607   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
608              _MEMCHECK(d);\
609   } else rdma_freelist = d->next; \
610   d->next =0;   \
611   } while (0)
612 #endif
613
614 /* reuse gni_post_descriptor_t */
615 static gni_post_descriptor_t *post_freelist=0;
616
617 #if  !CMK_SMP
618 #define FreePostDesc(d)       \
619     (d)->next_descr = post_freelist;\
620     post_freelist = d;
621
622 #define MallocPostDesc(d) \
623   d = post_freelist;\
624   if (d==0) { \
625      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
626      d->next_descr = 0;\
627       _MEMCHECK(d);\
628   } else post_freelist = d->next_descr;
629 #else
630
631 #define FreePostDesc(d)     free(d);
632 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
633
634 #endif
635
636
637 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
638 static int      buffered_smsg_counter = 0;
639
640 /* SmsgSend return success but message sent is not confirmed by remote side */
641 static MSG_LIST *buffered_fma_head = 0;
642 static MSG_LIST *buffered_fma_tail = 0;
643
644 /* functions  */
645 #define IsFree(a,ind)  !( a& (1<<(ind) ))
646 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
647 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
648
649 CpvDeclare(mempool_type*, mempool);
650
651 #if REMOTE_EVENT
652 /* ack pool for remote events */
653
654 static int  SHIFT   =           18;
655 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
656 #define RANK_MASK               ((1<<SHIFT) - 1)
657 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
658
659 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
660 #define GET_RANK(evt)           ((evt) & RANK_MASK)
661 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
662
663 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
664
665 #if CMK_SMP
666 #define INIT_SIZE                4096
667 #else
668 #define INIT_SIZE                1024
669 #endif
670
671 struct IndexStruct {
672 void *addr;
673 int next;
674 int type;     // 1: ACK   2: Persistent
675 };
676
677 typedef struct IndexPool {
678     struct IndexStruct   *indexes;
679     int                   size;
680     int                   freehead;
681     CmiNodeLock           lock;
682 } IndexPool;
683
684 static IndexPool  ackPool;
685 #if CMK_PERSISTENT_COMM
686 static IndexPool  persistPool;
687 #endif
688
689 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
690 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
691
692 static void IndexPool_init(IndexPool *pool)
693 {
694     int i;
695     if ((1<<SHIFT) < mysize) 
696         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
697     pool->size = INIT_SIZE;
698     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
699     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
700     for (i=0; i<pool->size-1; i++) {
701         pool->indexes[i].next = i+1;
702         pool->indexes[i].type = 0;
703     }
704     pool->indexes[i].next = -1;
705     pool->freehead = 0;
706 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
707     pool->lock  = CmiCreateLock();
708 #else
709     pool->lock  = 0;
710 #endif
711 }
712
713 static
714 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
715 {
716     int i;
717     int s;
718 #if MULTI_THREAD_SEND  
719     CmiLock(pool->lock);
720 #endif
721     s = pool->freehead;
722     if (s == -1) {
723         int newsize = pool->size * 2;
724         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
725         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
726         struct IndexStruct *old_ackpool = pool->indexes;
727         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
728         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
729         for (i=pool->size; i<newsize-1; i++) {
730             pool->indexes[i].next = i+1;
731             pool->indexes[i].type = 0;
732         }
733         pool->indexes[i].next = -1;
734         pool->indexes[i].type = 0;
735         pool->freehead = pool->size;
736         s = pool->size;
737         pool->size = newsize;
738         free(old_ackpool);
739     }
740     pool->freehead = pool->indexes[s].next;
741     pool->indexes[s].addr = addr;
742     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
743     pool->indexes[s].type = type;
744 #if MULTI_THREAD_SEND
745     CmiUnlock(pool->lock);
746 #endif
747     return s;
748 }
749
750 static
751 inline  void IndexPool_freeslot(IndexPool *pool, int s)
752 {
753     CmiAssert(s>=0 && s<pool->size);
754 #if MULTI_THREAD_SEND
755     CmiLock(pool->lock);
756 #endif
757     pool->indexes[s].next = pool->freehead;
758     pool->indexes[s].type = 0;
759     pool->freehead = s;
760 #if MULTI_THREAD_SEND
761     CmiUnlock(pool->lock);
762 #endif
763 }
764
765
766 #endif
767
768 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
769 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
770 #define CHARM_MAGIC_NUMBER               126
771
772 #if CMK_ERROR_CHECKING
773 extern unsigned char computeCheckSum(unsigned char *data, int len);
774 static int checksum_flag = 0;
775 #define CMI_SET_CHECKSUM(msg, len)      \
776         if (checksum_flag)  {   \
777           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
778           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
779         }
780 #define CMI_CHECK_CHECKSUM(msg, len)    \
781         if (checksum_flag)      \
782           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
783             CmiAbort("Fatal error: checksum doesn't agree!\n");
784 #else
785 #define CMI_SET_CHECKSUM(msg, len)
786 #define CMI_CHECK_CHECKSUM(msg, len)
787 #endif
788 /* =====End of Definitions of Message-Corruption Related Macros=====*/
789
790 static int print_stats = 0;
791 static int stats_off = 0;
792 void CmiTurnOnStats()
793 {
794     stats_off = 0;
795     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
796 }
797
798 void CmiTurnOffStats()
799 {
800     stats_off = 1;
801 }
802
803 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
804
805 #if CMK_WITH_STATS
806 FILE *counterLog = NULL;
807 typedef struct comm_thread_stats
808 {
809     uint64_t  smsg_data_count;
810     uint64_t  lmsg_init_count;
811     uint64_t  ack_count;
812     uint64_t  big_msg_ack_count;
813     uint64_t  smsg_count;
814     uint64_t  direct_put_done_count;
815     uint64_t  put_done_count;
816     //times of calling SmsgSend
817     uint64_t  try_smsg_data_count;
818     uint64_t  try_lmsg_init_count;
819     uint64_t  try_ack_count;
820     uint64_t  try_big_msg_ack_count;
821     uint64_t  try_direct_put_done_count;
822     uint64_t  try_put_done_count;
823     uint64_t  try_smsg_count;
824     
825     double    max_time_in_send_buffered_smsg;
826     double    all_time_in_send_buffered_smsg;
827
828     uint64_t  rdma_get_count, rdma_put_count;
829     uint64_t  try_rdma_get_count, try_rdma_put_count;
830     double    max_time_from_control_to_rdma_init;
831     double    all_time_from_control_to_rdma_init;
832
833     double    max_time_from_rdma_init_to_rdma_done;
834     double    all_time_from_rdma_init_to_rdma_done;
835
836     int      count_in_PumpNetwork;
837     double   time_in_PumpNetwork;
838     double   max_time_in_PumpNetwork;
839     int      count_in_SendBufferMsg_smsg;
840     double   time_in_SendBufferMsg_smsg;
841     double   max_time_in_SendBufferMsg_smsg;
842     int      count_in_SendRdmaMsg;
843     double   time_in_SendRdmaMsg;
844     double   max_time_in_SendRdmaMsg;
845     int      count_in_PumpRemoteTransactions;
846     double   time_in_PumpRemoteTransactions;
847     double   max_time_in_PumpRemoteTransactions;
848     int      count_in_PumpLocalTransactions_rdma;
849     double   time_in_PumpLocalTransactions_rdma;
850     double   max_time_in_PumpLocalTransactions_rdma;
851     int      count_in_PumpDatagramConnection;
852     double   time_in_PumpDatagramConnection;
853     double   max_time_in_PumpDatagramConnection;
854 } Comm_Thread_Stats;
855
856 static Comm_Thread_Stats   comm_stats;
857
858 static char *counters_dirname = "counters";
859
860 static void init_comm_stats()
861 {
862   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
863   if (print_stats){
864       char ln[200];
865       int code = mkdir(counters_dirname, 00777); 
866       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
867       counterLog=fopen(ln,"w");
868       if (counterLog == NULL) CmiAbort("Counter files open failed");
869   }
870 }
871
872 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
873
874 #define SMSG_SENT_DONE(creation_time, tag)  \
875         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
876             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
877             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
878             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
879             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
880             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
881             comm_stats.smsg_count++; \
882             double inbuff_time = CmiWallTimer() - creation_time;   \
883             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
884             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
885         }
886
887 #define SMSG_TRY_SEND(tag)  \
888         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
889             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
890             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
891             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
892             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
893             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
894             comm_stats.try_smsg_count++; \
895         }
896
897 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
898
899 #define  RDMA_TRANS_DONE(x)      \
900          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
901              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
902              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
903          }
904
905 #define  RDMA_TRANS_INIT(type, x)      \
906          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
907              double rdma_trans_time = CmiWallTimer() - x ; \
908              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
909              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
910          }
911
912 #define STATS_PUMPNETWORK_TIME(x)   \
913         { double t = CmiWallTimer(); \
914           x;        \
915           t = CmiWallTimer() - t;          \
916           comm_stats.count_in_PumpNetwork++;        \
917           comm_stats.time_in_PumpNetwork += t;   \
918           if (t>comm_stats.max_time_in_PumpNetwork)      \
919               comm_stats.max_time_in_PumpNetwork = t;    \
920         }
921
922 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
923         { double t = CmiWallTimer(); \
924           x;        \
925           t = CmiWallTimer() - t;          \
926           comm_stats.count_in_PumpRemoteTransactions ++;        \
927           comm_stats.time_in_PumpRemoteTransactions += t;   \
928           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
929               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
930         }
931
932 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
933         { double t = CmiWallTimer(); \
934           x;        \
935           t = CmiWallTimer() - t;          \
936           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
937           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
938           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
939               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
940         }
941
942 #define STATS_SEND_SMSGS_TIME(x)   \
943         { double t = CmiWallTimer(); \
944           x;        \
945           t = CmiWallTimer() - t;          \
946           comm_stats.count_in_SendBufferMsg_smsg ++;        \
947           comm_stats.time_in_SendBufferMsg_smsg += t;   \
948           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
949               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
950         }
951
952 #define STATS_SENDRDMAMSG_TIME(x)   \
953         { double t = CmiWallTimer(); \
954           x;        \
955           t = CmiWallTimer() - t;          \
956           comm_stats.count_in_SendRdmaMsg ++;        \
957           comm_stats.time_in_SendRdmaMsg += t;   \
958           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
959               comm_stats.max_time_in_SendRdmaMsg = t;    \
960         }
961
962 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
963         { double t = CmiWallTimer(); \
964           x;        \
965           t = CmiWallTimer() - t;          \
966           comm_stats.count_in_PumpDatagramConnection ++;        \
967           comm_stats.time_in_PumpDatagramConnection += t;   \
968           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
969               comm_stats.max_time_in_PumpDatagramConnection = t;    \
970         }
971
972 static void print_comm_stats()
973 {
974     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
975     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
976             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
977             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
978     
979     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
980             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
981             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
982
983     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
984     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
985     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
986
987
988     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
989     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
990     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
991     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
992     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
993     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
994     if (useDynamicSMSG)
995     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
996
997     fclose(counterLog);
998 }
999
1000 #else
1001 #define STATS_PUMPNETWORK_TIME(x)                  x
1002 #define STATS_SEND_SMSGS_TIME(x)                   x
1003 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
1004 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1005 #define STATS_SENDRDMAMSG_TIME(x)                  x
1006 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1007 #endif
1008
1009 static void
1010 allgather(void *in,void *out, int len)
1011 {
1012     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1013     int i,rc;
1014     int my_rank;
1015     char *tmp_buf,*out_ptr;
1016
1017     if(!already_called) {
1018
1019         rc = PMI_Get_size(&job_size);
1020         CmiAssert(rc == PMI_SUCCESS);
1021         rc = PMI_Get_rank(&my_rank);
1022         CmiAssert(rc == PMI_SUCCESS);
1023
1024         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1025         CmiAssert(ivec_ptr != NULL);
1026
1027         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1028         CmiAssert(rc == PMI_SUCCESS);
1029
1030         already_called = 1;
1031
1032     }
1033
1034     tmp_buf = (char *)malloc(job_size * len);
1035     CmiAssert(tmp_buf);
1036
1037     rc = PMI_Allgather(in,tmp_buf,len);
1038     CmiAssert(rc == PMI_SUCCESS);
1039
1040     out_ptr = out;
1041
1042     for(i=0;i<job_size;i++) {
1043
1044         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1045
1046     }
1047
1048     free(tmp_buf);
1049 }
1050
1051 static void
1052 allgather_2(void *in,void *out, int len)
1053 {
1054     //PMI_Allgather is out of order
1055     int i,rc, extend_len;
1056     int  rank_index;
1057     char *out_ptr, *out_ref;
1058     char *in2;
1059
1060     extend_len = sizeof(int) + len;
1061     in2 = (char*)malloc(extend_len);
1062
1063     memcpy(in2, &myrank, sizeof(int));
1064     memcpy(in2+sizeof(int), in, len);
1065
1066     out_ptr = (char*)malloc(mysize*extend_len);
1067
1068     rc = PMI_Allgather(in2, out_ptr, extend_len);
1069     GNI_RC_CHECK("allgather", rc);
1070
1071     out_ref = out;
1072
1073     for(i=0;i<mysize;i++) {
1074         //rank index 
1075         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1076         //copy to the rank index slot
1077         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1078     }
1079
1080     free(out_ptr);
1081     free(in2);
1082
1083 }
1084
1085 static unsigned int get_gni_nic_address(int device_id)
1086 {
1087     unsigned int address, cpu_id;
1088     gni_return_t status;
1089     int i, alps_dev_id=-1,alps_address=-1;
1090     char *token, *p_ptr;
1091
1092     p_ptr = getenv("PMI_GNI_DEV_ID");
1093     if (!p_ptr) {
1094         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1095        
1096         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1097     } else {
1098         while ((token = strtok(p_ptr,":")) != NULL) {
1099             alps_dev_id = atoi(token);
1100             if (alps_dev_id == device_id) {
1101                 break;
1102             }
1103             p_ptr = NULL;
1104         }
1105         CmiAssert(alps_dev_id != -1);
1106         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1107         CmiAssert(p_ptr != NULL);
1108         i = 0;
1109         while ((token = strtok(p_ptr,":")) != NULL) {
1110             if (i == alps_dev_id) {
1111                 alps_address = atoi(token);
1112                 break;
1113             }
1114             p_ptr = NULL;
1115             ++i;
1116         }
1117         CmiAssert(alps_address != -1);
1118         address = alps_address;
1119     }
1120     return address;
1121 }
1122
1123 static uint8_t get_ptag(void)
1124 {
1125     char *p_ptr, *token;
1126     uint8_t ptag;
1127
1128     p_ptr = getenv("PMI_GNI_PTAG");
1129     CmiAssert(p_ptr != NULL);
1130     token = strtok(p_ptr, ":");
1131     ptag = (uint8_t)atoi(token);
1132     return ptag;
1133         
1134 }
1135
1136 static uint32_t get_cookie(void)
1137 {
1138     uint32_t cookie;
1139     char *p_ptr, *token;
1140
1141     p_ptr = getenv("PMI_GNI_COOKIE");
1142     CmiAssert(p_ptr != NULL);
1143     token = strtok(p_ptr, ":");
1144     cookie = (uint32_t)atoi(token);
1145
1146     return cookie;
1147 }
1148
1149 #if LARGEPAGE
1150
1151 /* directly mmap memory from hugetlbfs for large pages */
1152
1153 #include <sys/stat.h>
1154 #include <fcntl.h>
1155 #include <sys/mman.h>
1156 #include <hugetlbfs.h>
1157
1158 // size must be _tlbpagesize aligned
1159 void *my_get_huge_pages(size_t size)
1160 {
1161     char filename[512];
1162     int fd;
1163     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1164     void *ptr = NULL;
1165
1166     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1167     fd = open(filename, O_RDWR | O_CREAT, mode);
1168     if (fd == -1) {
1169         CmiAbort("my_get_huge_pages: open filed");
1170     }
1171     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1172     if (ptr == MAP_FAILED) ptr = NULL;
1173 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1174     close(fd);
1175     unlink(filename);
1176     return ptr;
1177 }
1178
1179 void my_free_huge_pages(void *ptr, int size)
1180 {
1181 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1182     int ret = munmap(ptr, size);
1183     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1184 }
1185
1186 #endif
1187
1188 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1189 /* TODO: add any that are related */
1190 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1191
1192
1193 #include "machine-lrts.h"
1194 #include "machine-common-core.c"
1195
1196 /* Network progress function is used to poll the network when for
1197    messages. This flushes receive buffers on some  implementations*/
1198 #if CMK_MACHINE_PROGRESS_DEFINED
1199 void CmiMachineProgressImpl() {
1200 }
1201 #endif
1202
1203 static int SendBufferMsg(SMSG_QUEUE *queue);
1204 static void SendRdmaMsg();
1205 static void PumpNetworkSmsg();
1206 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1207 #if CQWRITE
1208 static void PumpCqWriteTransactions();
1209 #endif
1210 #if REMOTE_EVENT
1211 static void PumpRemoteTransactions();
1212 #endif
1213
1214 #if MACHINE_DEBUG_LOG
1215 FILE *debugLog = NULL;
1216 static CmiInt8 buffered_recv_msg = 0;
1217 int         lrts_smsg_success = 0;
1218 int         lrts_received_msg = 0;
1219 #endif
1220
1221 static void sweep_mempool(mempool_type *mptr)
1222 {
1223     int n = 0;
1224     block_header *current = &(mptr->block_head);
1225
1226     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1227     while( current!= NULL) {
1228         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1229         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1230     }
1231     printf("[n %d] sweep_mempool slot END.\n", myrank);
1232 }
1233
1234 inline
1235 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1236 {
1237     block_header *current = *from;
1238
1239     //while(register_memory_size>= MAX_REG_MEM)
1240     //{
1241         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1242             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1243
1244         *from = current;
1245         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1246         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1247         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1248     //}
1249     return GNI_RC_SUCCESS;
1250 }
1251
1252 inline 
1253 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1254 {
1255     gni_return_t status = GNI_RC_SUCCESS;
1256     //int size = GetMempoolsize(msg);
1257     //void *blockaddr = GetMempoolBlockPtr(msg);
1258     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1259    
1260     block_header *current = &(mptr->block_head);
1261     while(register_memory_size>= MAX_REG_MEM)
1262     {
1263         status = deregisterMemory(mptr, &current);
1264         if (status != GNI_RC_SUCCESS) break;
1265     }
1266     if(register_memory_size>= MAX_REG_MEM) return status;
1267
1268     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1269     while(1)
1270     {
1271         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1272         if(status == GNI_RC_SUCCESS)
1273         {
1274             break;
1275         }
1276         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1277         {
1278             CmiAbort("Memory registor for mempool fails\n");
1279         }
1280         else
1281         {
1282             status = deregisterMemory(mptr, &current);
1283             if (status != GNI_RC_SUCCESS) break;
1284         }
1285     }; 
1286     return status;
1287 }
1288
1289 inline 
1290 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1291 {
1292     static int rank = -1;
1293     int i;
1294     gni_return_t status;
1295     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1296     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1297     mempool_type *mptr;
1298
1299     status = registerFromMempool(mptr1, msg, size, t, cqh);
1300     if (status == GNI_RC_SUCCESS) return status;
1301 #if CMK_SMP 
1302     for (i=0; i<CmiMyNodeSize()+1; i++) {
1303       rank = (rank+1)%(CmiMyNodeSize()+1);
1304       mptr = CpvAccessOther(mempool, rank);
1305       if (mptr == mptr1) continue;
1306       status = registerFromMempool(mptr, msg, size, t, cqh);
1307       if (status == GNI_RC_SUCCESS) return status;
1308     }
1309 #endif
1310     return  GNI_RC_ERROR_RESOURCE;
1311 }
1312
1313 inline
1314 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1315 {
1316     MSG_LIST        *msg_tmp;
1317     MallocMsgList(msg_tmp);
1318     msg_tmp->destNode = destNode;
1319     msg_tmp->size   = size;
1320     msg_tmp->msg    = msg;
1321     msg_tmp->tag    = tag;
1322 #if CMK_WITH_STATS
1323     SMSG_CREATION(msg_tmp)
1324 #endif
1325 #if !CMK_SMP
1326     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1327         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1328         queue->smsg_head_index = destNode;
1329         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1330     }else
1331     {
1332         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1333         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1334     }
1335 #else
1336 #if ONE_SEND_QUEUE
1337     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1338 #else
1339 #if SMP_LOCKS
1340     CmiLock(queue->smsg_msglist_index[destNode].lock);
1341     if(queue->smsg_msglist_index[destNode].pushed == 0)
1342     {
1343         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1344     }
1345     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1346     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1347 #else
1348     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1349 #endif
1350 #endif
1351 #endif
1352 #if PRINT_SYH
1353     buffered_smsg_counter++;
1354 #endif
1355 }
1356
1357 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1358 {
1359     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1360 }
1361
1362 inline
1363 static void setup_smsg_connection(int destNode)
1364 {
1365     mdh_addr_list_t  *new_entry = 0;
1366     gni_post_descriptor_t *pd;
1367     gni_smsg_attr_t      *smsg_attr;
1368     gni_return_t status = GNI_RC_NOT_DONE;
1369     RDMA_REQUEST        *rdma_request_msg;
1370     
1371     if(smsg_available_slot == smsg_expand_slots)
1372     {
1373         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1374         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1375         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1376
1377         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1378             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1379             GNI_MEM_READWRITE,   
1380             -1,
1381             &(new_entry->mdh));
1382         smsg_available_slot = 0; 
1383         new_entry->next = smsg_dynamic_list;
1384         smsg_dynamic_list = new_entry;
1385     }
1386     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1387     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1388     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1389     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1390     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1391     smsg_attr->buff_size = smsg_memlen;
1392     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1393     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1394     smsg_local_attr_vec[destNode] = smsg_attr;
1395     smsg_available_slot++;
1396     MallocPostDesc(pd);
1397     pd->type            = GNI_POST_FMA_PUT;
1398     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1399     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1400     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1401     pd->length          = sizeof(gni_smsg_attr_t);
1402     pd->local_addr      = (uint64_t) smsg_attr;
1403     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1404     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1405     pd->src_cq_hndl     = rdma_tx_cqh;
1406     pd->rdma_mode       = 0;
1407     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1408     print_smsg_attr(smsg_attr);
1409     if(status == GNI_RC_ERROR_RESOURCE )
1410     {
1411         MallocRdmaRequest(rdma_request_msg);
1412         rdma_request_msg->destNode = destNode;
1413         rdma_request_msg->pd = pd;
1414         /* buffer this request */
1415     }
1416 #if PRINT_SYH
1417     if(status != GNI_RC_SUCCESS)
1418        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1419     else
1420         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1421 #endif
1422 }
1423
1424 /* useDynamicSMSG */
1425 inline 
1426 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1427 {
1428     gni_return_t status = GNI_RC_NOT_DONE;
1429
1430     if(mailbox_list->offset == mailbox_list->size)
1431     {
1432         dynamic_smsg_mailbox_t *new_mailbox_entry;
1433         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1434         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1435         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1436         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1437         new_mailbox_entry->offset = 0;
1438         
1439         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1440             new_mailbox_entry->size, smsg_rx_cqh,
1441             GNI_MEM_READWRITE,   
1442             -1,
1443             &(new_mailbox_entry->mem_hndl));
1444
1445         GNI_RC_CHECK("register", status);
1446         new_mailbox_entry->next = mailbox_list;
1447         mailbox_list = new_mailbox_entry;
1448     }
1449     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1450     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1451     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1452     local_smsg_attr->mbox_offset = mailbox_list->offset;
1453     mailbox_list->offset += smsg_memlen;
1454     local_smsg_attr->buff_size = smsg_memlen;
1455     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1456     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1457 }
1458
1459 /* useDynamicSMSG */
1460 inline 
1461 static int connect_to(int destNode)
1462 {
1463     gni_return_t status = GNI_RC_NOT_DONE;
1464     CmiAssert(smsg_connected_flag[destNode] == 0);
1465     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1466     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1467     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1468     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1469     
1470     CMI_GNI_LOCK(global_gni_lock)
1471     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1472     CMI_GNI_UNLOCK(global_gni_lock)
1473     if (status == GNI_RC_ERROR_RESOURCE) {
1474       /* possibly destNode is making connection at the same time */
1475       free(smsg_attr_vector_local[destNode]);
1476       smsg_attr_vector_local[destNode] = NULL;
1477       free(smsg_attr_vector_remote[destNode]);
1478       smsg_attr_vector_remote[destNode] = NULL;
1479       mailbox_list->offset -= smsg_memlen;
1480 #if PRINT_SYH
1481     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1482 #endif
1483       return 0;
1484     }
1485     GNI_RC_CHECK("GNI_Post", status);
1486     smsg_connected_flag[destNode] = 1;
1487 #if PRINT_SYH
1488     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1489 #endif
1490     return 1;
1491 }
1492
1493 inline 
1494 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1495 {
1496     unsigned int          remote_address;
1497     uint32_t              remote_id;
1498     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1499     gni_smsg_attr_t       *smsg_attr;
1500     gni_post_descriptor_t *pd;
1501     gni_post_state_t      post_state;
1502     char                  *real_data; 
1503
1504     if (useDynamicSMSG) {
1505         switch (smsg_connected_flag[destNode]) {
1506         case 0: 
1507             connect_to(destNode);         /* continue to case 1 */
1508         case 1:                           /* pending connection, do nothing */
1509             status = GNI_RC_NOT_DONE;
1510             if(inbuff ==0)
1511                 buffer_small_msgs(queue, msg, size, destNode, tag);
1512             return status;
1513         }
1514     }
1515 #if CMK_SMP
1516 #if ! ONE_SEND_QUEUE
1517     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1518 #endif
1519     {
1520 #else
1521     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1522     {
1523 #endif
1524         //CMI_GNI_LOCK(smsg_mailbox_lock)
1525         CMI_GNI_LOCK(default_tx_cq_lock)
1526 #if CMK_SMP_TRACE_COMMTHREAD
1527         int oldpe = -1;
1528         int oldeventid = -1;
1529         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1530         { 
1531             START_EVENT();
1532             if ( tag == SMALL_DATA_TAG)
1533                 real_data = (char*)msg; 
1534             else 
1535                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1536             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1537             TRACE_COMM_SET_COMM_MSGID(real_data);
1538         }
1539 #endif
1540 #if REMOTE_EVENT
1541         if (tag == LMSG_INIT_TAG) {
1542             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1543             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1544                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1545         }
1546         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1547 #endif
1548 #if     CMK_WITH_STATS
1549         SMSG_TRY_SEND(tag)
1550 #endif
1551 #if CMK_WITH_STATS
1552     double              creation_time;
1553     if (ptr == NULL)
1554         creation_time = CmiWallTimer();
1555     else
1556         creation_time = ptr->creation_time;
1557 #endif
1558
1559     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1560 #if CMK_SMP_TRACE_COMMTHREAD
1561         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1562 #endif
1563         CMI_GNI_UNLOCK(default_tx_cq_lock)
1564         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1565         if(status == GNI_RC_SUCCESS)
1566         {
1567 #if     CMK_WITH_STATS
1568             SMSG_SENT_DONE(creation_time,tag) 
1569 #endif
1570 #if CMK_SMP_TRACE_COMMTHREAD
1571             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1572             { 
1573                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1574             }
1575 #endif
1576         }else
1577             status = GNI_RC_ERROR_RESOURCE;
1578     }
1579     if(status != GNI_RC_SUCCESS && inbuff ==0)
1580         buffer_small_msgs(queue, msg, size, destNode, tag);
1581     return status;
1582 }
1583
1584 inline 
1585 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1586 {
1587     /* construct a control message and send */
1588     CONTROL_MSG         *control_msg_tmp;
1589     MallocControlMsg(control_msg_tmp);
1590     control_msg_tmp->source_addr = (uint64_t)msg;
1591     control_msg_tmp->seq_id    = seqno;
1592     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1593 #if REMOTE_EVENT
1594     control_msg_tmp->ack_index    =  -1;
1595 #endif
1596 #if     USE_LRTS_MEMPOOL
1597     if(size < BIG_MSG)
1598     {
1599         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1600     }
1601     else
1602     {
1603         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1604         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1605         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1606     }
1607 #else
1608     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1609 #endif
1610     return control_msg_tmp;
1611 }
1612
1613 #define BLOCKING_SEND_CONTROL    0
1614
1615 // Large message, send control to receiver, receiver register memory and do a GET, 
1616 // return 1 - send no success
1617 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1618 {
1619     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1620     uint32_t            vmdh_index  = -1;
1621     int                 size;
1622     int                 offset = 0;
1623     uint64_t            source_addr;
1624     int                 register_size; 
1625     void                *msg;
1626
1627     size    =   control_msg_tmp->total_length;
1628     source_addr = control_msg_tmp->source_addr;
1629     register_size = control_msg_tmp->length;
1630
1631 #if  USE_LRTS_MEMPOOL
1632     if( control_msg_tmp->seq_id == 0 ){
1633 #if BLOCKING_SEND_CONTROL
1634         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1635             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1636                 LrtsAdvanceCommunication(0);
1637         }
1638 #endif
1639         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1640         {
1641             msg = (void*)source_addr;
1642             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1643             {
1644                 if(!inbuff)
1645                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1646                 return GNI_RC_ERROR_NOMEM;
1647             }
1648             //register the corresponding mempool
1649             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1650             if(status == GNI_RC_SUCCESS)
1651             {
1652                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1653             }
1654         }else
1655         {
1656             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1657             status = GNI_RC_SUCCESS;
1658         }
1659         if(NoMsgInSend(source_addr))
1660             register_size = GetMempoolsize((void*)(source_addr));
1661         else
1662             register_size = 0;
1663     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1664     {
1665         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1666         source_addr += offset;
1667         size = control_msg_tmp->length;
1668 #if BLOCKING_SEND_CONTROL
1669         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1670             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1671                 LrtsAdvanceCommunication(0);
1672         }
1673 #endif
1674         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1675             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1676             {
1677                 if(!inbuff)
1678                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1679                 return GNI_RC_ERROR_NOMEM;
1680             }
1681             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1682             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1683         }
1684         else
1685         {
1686             status = GNI_RC_SUCCESS;
1687         }
1688         register_size = 0;  
1689     }
1690
1691 #if CMI_EXERT_SEND_CAP
1692     if(SEND_large_pending >= SEND_large_cap)
1693     {
1694         status = GNI_RC_ERROR_NOMEM;
1695     }
1696 #endif
1697  
1698     if(status == GNI_RC_SUCCESS)
1699     {
1700        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1701         if(status == GNI_RC_SUCCESS)
1702         {
1703 #if CMI_EXERT_SEND_CAP
1704             SEND_large_pending++;
1705 #endif
1706             buffered_send_msg += register_size;
1707             if(control_msg_tmp->seq_id == 0)
1708             {
1709                 IncreaseMsgInSend(source_addr);
1710             }
1711             FreeControlMsg(control_msg_tmp);
1712             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1713         }else
1714             status = GNI_RC_ERROR_RESOURCE;
1715
1716     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1717     {
1718         CmiAbort("Memory registor for large msg\n");
1719     }else 
1720     {
1721         status = GNI_RC_ERROR_NOMEM; 
1722         if(!inbuff)
1723             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1724     }
1725     return status;
1726 #else
1727     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1728     if(status == GNI_RC_SUCCESS)
1729     {
1730         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1731         if(status == GNI_RC_SUCCESS)
1732         {
1733             FreeControlMsg(control_msg_tmp);
1734         }
1735     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1736     {
1737         CmiAbort("Memory registor for large msg\n");
1738     }else 
1739     {
1740         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1741     }
1742     return status;
1743 #endif
1744 }
1745
1746 inline void LrtsPrepareEnvelope(char *msg, int size)
1747 {
1748     CmiSetMsgSize(msg, size);
1749     CMI_SET_CHECKSUM(msg, size);
1750 }
1751
1752 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1753 {
1754     gni_return_t        status  =   GNI_RC_SUCCESS;
1755     uint8_t tag;
1756     CONTROL_MSG         *control_msg_tmp;
1757     int                 oob = ( mode & OUT_OF_BAND);
1758     SMSG_QUEUE          *queue;
1759
1760     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1761 #if CMK_USE_OOB
1762     queue = oob? &smsg_oob_queue : &smsg_queue;
1763 #else
1764     queue = &smsg_queue;
1765 #endif
1766
1767     LrtsPrepareEnvelope(msg, size);
1768
1769 #if PRINT_SYH
1770     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1771 #endif 
1772 #if CMK_SMP 
1773     if(size <= SMSG_MAX_MSG)
1774         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1775     else if (size < BIG_MSG) {
1776         control_msg_tmp =  construct_control_msg(size, msg, 0);
1777         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1778     }
1779     else {
1780           CmiSetMsgSeq(msg, 0);
1781           control_msg_tmp =  construct_control_msg(size, msg, 1);
1782           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1783     }
1784 #else   //non-smp, smp(worker sending)
1785     if(size <= SMSG_MAX_MSG)
1786     {
1787         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1788             CmiFree(msg);
1789     }
1790     else if (size < BIG_MSG) {
1791         control_msg_tmp =  construct_control_msg(size, msg, 0);
1792         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1793     }
1794     else {
1795 #if     USE_LRTS_MEMPOOL
1796         CmiSetMsgSeq(msg, 0);
1797         control_msg_tmp =  construct_control_msg(size, msg, 1);
1798         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1799 #else
1800         control_msg_tmp =  construct_control_msg(size, msg, 0);
1801         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1802 #endif
1803     }
1804 #endif
1805     return 0;
1806 }
1807
1808 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1809 {
1810   int i;
1811 #if CMK_BROADCAST_USE_CMIREFERENCE
1812   for(i=0;i<npes;i++) {
1813     if (pes[i] == CmiMyPe())
1814       CmiSyncSend(pes[i], len, msg);
1815     else {
1816       CmiReference(msg);
1817       CmiSyncSendAndFree(pes[i], len, msg);
1818     }
1819   }
1820 #else
1821   for(i=0;i<npes;i++) {
1822     CmiSyncSend(pes[i], len, msg);
1823   }
1824 #endif
1825 }
1826
1827 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1828 {
1829   /* A better asynchronous implementation may be wanted, but at least it works */
1830   CmiSyncListSendFn(npes, pes, len, msg);
1831   return (CmiCommHandle) 0;
1832 }
1833
1834 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1835 {
1836   if (npes == 1) {
1837       CmiSyncSendAndFree(pes[0], len, msg);
1838       return;
1839   }
1840 #if CMK_PERSISTENT_COMM
1841   if (CpvAccess(phs) && len > 1024) {
1842       int i;
1843       for(i=0;i<npes;i++) {
1844         if (pes[i] == CmiMyPe())
1845           CmiSyncSend(pes[i], len, msg);
1846         else {
1847           CmiReference(msg);
1848           CmiSyncSendAndFree(pes[i], len, msg);
1849         }
1850       }
1851       CmiFree(msg);
1852       return;
1853   }
1854 #endif
1855   
1856 #if CMK_BROADCAST_USE_CMIREFERENCE
1857   if (npes == 1) {
1858     CmiSyncSendAndFree(pes[0], len, msg);
1859     return;
1860   }
1861   CmiSyncListSendFn(npes, pes, len, msg);
1862   CmiFree(msg);
1863 #else
1864   int i;
1865   for(i=0;i<npes-1;i++) {
1866     CmiSyncSend(pes[i], len, msg);
1867   }
1868   if (npes>0)
1869     CmiSyncSendAndFree(pes[npes-1], len, msg);
1870   else 
1871     CmiFree(msg);
1872 #endif
1873 }
1874
1875 static void    PumpDatagramConnection();
1876 static      int         event_SetupConnect = 111;
1877 static      int         event_PumpSmsg = 222 ;
1878 static      int         event_PumpTransaction = 333;
1879 static      int         event_PumpRdmaTransaction = 444;
1880 static      int         event_SendBufferSmsg = 444;
1881 static      int         event_SendFmaRdmaMsg = 555;
1882 static      int         event_AdvanceCommunication = 666;
1883
1884 static void registerUserTraceEvents() {
1885 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1886     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1887     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1888     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1889     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1890     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1891     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1892     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1893 #endif
1894 }
1895
1896 static void ProcessDeadlock()
1897 {
1898     static CmiUInt8 *ptr = NULL;
1899     static CmiUInt8  last = 0, mysum, sum;
1900     static int count = 0;
1901     gni_return_t status;
1902     int i;
1903
1904 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1905 //sweep_mempool(CpvAccess(mempool));
1906     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1907     mysum = smsg_send_count + smsg_recv_count;
1908     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1909     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1910     GNI_RC_CHECK("PMI_Allgather", status);
1911     sum = 0;
1912     for (i=0; i<mysize; i++)  sum+= ptr[i];
1913     if (last == 0 || sum == last) 
1914         count++;
1915     else
1916         count = 0;
1917     last = sum;
1918     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1919     if (count == 2) { 
1920         /* detected twice, it is a real deadlock */
1921         if (myrank == 0)  {
1922             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1923             CmiAbort("Fatal> Deadlock detected.");
1924         }
1925
1926     }
1927     _detected_hang = 0;
1928 }
1929
1930 static void CheckProgress()
1931 {
1932     if (smsg_send_count == last_smsg_send_count &&
1933         smsg_recv_count == last_smsg_recv_count ) 
1934     {
1935         _detected_hang = 1;
1936 #if !CMK_SMP
1937         if (_detected_hang) ProcessDeadlock();
1938 #endif
1939
1940     }
1941     else {
1942         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1943         last_smsg_send_count = smsg_send_count;
1944         last_smsg_recv_count = smsg_recv_count;
1945         _detected_hang = 0;
1946     }
1947 }
1948
1949 static void set_limit()
1950 {
1951     //if (!user_set_flag && CmiMyRank() == 0) {
1952     if (CmiMyRank() == 0) {
1953         int mynode = CmiPhysicalNodeID(CmiMyPe());
1954         int numpes = CmiNumPesOnPhysicalNode(mynode);
1955         int numprocesses = numpes / CmiMyNodeSize();
1956         MAX_REG_MEM  = _totalmem / numprocesses;
1957         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1958         if (CmiMyPe() == 0)
1959            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1960         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1961         {
1962              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1963              CmiAbort("memory registration\n");
1964         }
1965     }
1966 }
1967
1968 void LrtsPostCommonInit(int everReturn)
1969 {
1970 #if CMK_DIRECT
1971     CmiDirectInit();
1972 #endif
1973 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1974     CpvInitialize(double, projTraceStart);
1975     /* only PE 0 needs to care about registration (to generate sts file). */
1976     //if (CmiMyPe() == 0) 
1977     {
1978         registerMachineUserEventsFunction(&registerUserTraceEvents);
1979     }
1980 #endif
1981
1982 #if CMK_SMP
1983     CmiIdleState *s=CmiNotifyGetState();
1984     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1985     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1986 #else
1987     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1988     if (useDynamicSMSG)
1989     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1990 #endif
1991
1992 #if ! LARGEPAGE
1993     if (_checkProgress)
1994 #if CMK_SMP
1995     if (CmiMyRank() == 0)
1996 #endif
1997     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1998 #endif
1999  
2000 #if !LARGEPAGE
2001     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
2002 #endif
2003 }
2004
2005 /* this is called by worker thread */
2006 void LrtsPostNonLocal(){
2007 #if CMK_SMP_TRACE_COMMTHREAD
2008     double startT, endT;
2009 #endif
2010 #if MULTI_THREAD_SEND
2011     if(mysize == 1) return;
2012 #if CMK_SMP_TRACE_COMMTHREAD
2013     traceEndIdle();
2014 #endif
2015
2016 #if CMK_SMP_TRACE_COMMTHREAD
2017     startT = CmiWallTimer();
2018 #endif
2019
2020 #if CMK_WORKER_SINGLE_TASK
2021     if (CmiMyRank() % 6 == 0)
2022 #endif
2023     PumpNetworkSmsg();
2024
2025 #if CMK_WORKER_SINGLE_TASK
2026     if (CmiMyRank() % 6 == 1)
2027 #endif
2028     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2029
2030 #if CMK_WORKER_SINGLE_TASK
2031     if (CmiMyRank() % 6 == 2)
2032 #endif
2033     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2034
2035 #if REMOTE_EVENT
2036 #if CMK_WORKER_SINGLE_TASK
2037     if (CmiMyRank() % 6 == 3)
2038 #endif
2039     PumpRemoteTransactions();
2040 #endif
2041
2042 #if CMK_USE_OOB
2043     if (SendBufferMsg(&smsg_oob_queue) == 1)
2044 #endif
2045     {
2046 #if CMK_WORKER_SINGLE_TASK
2047     if (CmiMyRank() % 6 == 4)
2048 #endif
2049         SendBufferMsg(&smsg_queue);
2050     }
2051
2052 #if CMK_WORKER_SINGLE_TASK
2053     if (CmiMyRank() % 6 == 5)
2054 #endif
2055     SendRdmaMsg();
2056
2057 #if CMK_SMP_TRACE_COMMTHREAD
2058     endT = CmiWallTimer();
2059     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2060 #endif
2061 #if CMK_SMP_TRACE_COMMTHREAD
2062     traceBeginIdle();
2063 #endif
2064 #endif
2065 }
2066
2067 /* useDynamicSMSG */
2068 static void    PumpDatagramConnection()
2069 {
2070     uint32_t          remote_address;
2071     uint32_t          remote_id;
2072     gni_return_t status;
2073     gni_post_state_t  post_state;
2074     uint64_t          datagram_id;
2075     int i;
2076
2077    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2078    {
2079        if (datagram_id >= mysize) {           /* bound endpoint */
2080            int pe = datagram_id - mysize;
2081            CMI_GNI_LOCK(global_gni_lock)
2082            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2083            CMI_GNI_UNLOCK(global_gni_lock)
2084            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2085            {
2086                CmiAssert(remote_id == pe);
2087                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2088                GNI_RC_CHECK("Dynamic SMSG Init", status);
2089 #if PRINT_SYH
2090                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
2091 #endif
2092                CmiAssert(smsg_connected_flag[pe] == 1);
2093                smsg_connected_flag[pe] = 2;
2094            }
2095        }
2096        else {         /* unbound ep */
2097            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2098            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2099            {
2100                CmiAssert(remote_id<mysize);
2101                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2102                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2103                GNI_RC_CHECK("Dynamic SMSG Init", status);
2104 #if PRINT_SYH
2105                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
2106 #endif
2107                smsg_connected_flag[remote_id] = 2;
2108
2109                alloc_smsg_attr(&send_smsg_attr);
2110                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2111                GNI_RC_CHECK("post unbound datagram", status);
2112            }
2113        }
2114    }
2115 }
2116
2117 /* pooling CQ to receive network message */
2118 static void PumpNetworkRdmaMsgs()
2119 {
2120     gni_cq_entry_t      event_data;
2121     gni_return_t        status;
2122
2123 }
2124
2125 inline 
2126 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
2127 {
2128     RDMA_REQUEST        *rdma_request_msg;
2129     MallocRdmaRequest(rdma_request_msg);
2130     rdma_request_msg->destNode = inst_id;
2131     rdma_request_msg->pd = pd;
2132 #if REMOTE_EVENT
2133     rdma_request_msg->ack_index = ack_index;
2134 #endif
2135 #if CMK_SMP
2136     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2137 #else
2138     if(sendRdmaBuf == 0)
2139     {
2140         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2141     }else{
2142         sendRdmaTail->next = rdma_request_msg;
2143         sendRdmaTail =  rdma_request_msg;
2144     }
2145 #endif
2146
2147 }
2148
2149 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2150
2151 static void PumpNetworkSmsg()
2152 {
2153     uint64_t            inst_id;
2154     int                 ret;
2155     gni_cq_entry_t      event_data;
2156     gni_return_t        status, status2;
2157     void                *header;
2158     uint8_t             msg_tag;
2159     int                 msg_nbytes;
2160     void                *msg_data;
2161     gni_mem_handle_t    msg_mem_hndl;
2162     gni_smsg_attr_t     *smsg_attr;
2163     gni_smsg_attr_t     *remote_smsg_attr;
2164     int                 init_flag;
2165     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2166     uint64_t            source_addr;
2167     SMSG_QUEUE         *queue = &smsg_queue;
2168 #if   CMK_DIRECT
2169     cmidirectMsg        *direct_msg;
2170 #endif
2171 #if CMI_EXERT_RECV_CAP
2172     int                  recv_cnt = 0;
2173 #endif
2174     while(1)
2175     {
2176         CMI_GNI_LOCK(smsg_rx_cq_lock)
2177         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2178         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2179         if(status != GNI_RC_SUCCESS)
2180             break;
2181         inst_id = GNI_CQ_GET_INST_ID(event_data);
2182 #if REMOTE_EVENT
2183         inst_id = GET_RANK(inst_id);      /* important */
2184 #endif
2185         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2186 #if PRINT_SYH
2187         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2188 #endif
2189         if (useDynamicSMSG) {
2190             /* subtle: smsg may come before connection is setup */
2191             while (smsg_connected_flag[inst_id] != 2) 
2192                PumpDatagramConnection();
2193         }
2194         msg_tag = GNI_SMSG_ANY_TAG;
2195         while(1) {
2196             CMI_GNI_LOCK(smsg_mailbox_lock)
2197             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2198             if (status != GNI_RC_SUCCESS)
2199             {
2200                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2201                 break;
2202             }
2203 #if PRINT_SYH
2204             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2205 #endif
2206             /* copy msg out and then put into queue (small message) */
2207             switch (msg_tag) {
2208             case SMALL_DATA_TAG:
2209             {
2210                 START_EVENT();
2211                 msg_nbytes = CmiGetMsgSize(header);
2212                 msg_data    = CmiAlloc(msg_nbytes);
2213                 memcpy(msg_data, (char*)header, msg_nbytes);
2214                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2215                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2216                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
2217                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2218                 handleOneRecvedMsg(msg_nbytes, msg_data);
2219                 break;
2220             }
2221             case LMSG_INIT_TAG:
2222             {
2223 #if MULTI_THREAD_SEND
2224                 MallocControlMsg(control_msg_tmp);
2225                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2226                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2227                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2228                 getLargeMsgRequest(control_msg_tmp, inst_id);
2229                 FreeControlMsg(control_msg_tmp);
2230 #else
2231                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2232                 getLargeMsgRequest(header, inst_id);
2233                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2234 #endif
2235                 break;
2236             }
2237 #if !REMOTE_EVENT && !CQWRITE
2238             case ACK_TAG:   //msg fit into mempool
2239             {
2240                 /* Get is done, release message . Now put is not used yet*/
2241                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2242                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2243                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2244 #if ! USE_LRTS_MEMPOOL
2245                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2246 #else
2247                 DecreaseMsgInSend(msg);
2248 #endif
2249                 if(NoMsgInSend(msg))
2250                     buffered_send_msg -= GetMempoolsize(msg);
2251                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2252                 CmiFree(msg);
2253 #if CMI_EXERT_SEND_CAP
2254                 SEND_large_pending--;
2255 #endif
2256                 break;
2257             }
2258 #endif
2259             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2260             {
2261 #if MULTI_THREAD_SEND
2262                 MallocControlMsg(header_tmp);
2263                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2264                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2265 #else
2266                 header_tmp = (CONTROL_MSG *) header;
2267 #endif
2268                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2269 #if CMI_EXERT_SEND_CAP
2270                     SEND_large_pending--;
2271 #endif
2272                 void *msg = (void*)(header_tmp->source_addr);
2273                 int cur_seq = CmiGetMsgSeq(msg);
2274                 int offset = ONE_SEG*(cur_seq+1);
2275                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2276                 buffered_send_msg -= header_tmp->length;
2277                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2278                 if (remain_size < 0) remain_size = 0;
2279                 CmiSetMsgSize(msg, remain_size);
2280                 if(remain_size <= 0) //transaction done
2281                 {
2282                     CmiFree(msg);
2283                 }else if (header_tmp->total_length > offset)
2284                 {
2285                     CmiSetMsgSeq(msg, cur_seq+1);
2286                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2287                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2288                     //send next seg
2289                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2290                          // pipelining
2291                     if (header_tmp->seq_id == 1) {
2292                       int i;
2293                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2294                         int seq = cur_seq+i+2;
2295                         CmiSetMsgSeq(msg, seq-1);
2296                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2297                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2298                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2299                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2300                       }
2301                     }
2302                 }
2303 #if MULTI_THREAD_SEND
2304                 FreeControlMsg(header_tmp);
2305 #else
2306                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2307 #endif
2308                 break;
2309             }
2310 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2311             case PUT_DONE_TAG:  {   //persistent message
2312                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2313                 int size = ((CONTROL_MSG *) header)->length;
2314                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2315                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2316                 CmiReference(msg);
2317                 CMI_CHECK_CHECKSUM(msg, size);
2318                 handleOneRecvedMsg(size, msg); 
2319 #if PRINT_SYH
2320                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2321 #endif
2322                 break;
2323             }
2324 #endif
2325 #if CMK_DIRECT
2326             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2327                 //create a trigger message
2328                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2329                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2330                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2331                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2332                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2333                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2334                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2335                 break;
2336 #endif
2337             default:
2338                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2339                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2340                 printf("weird tag problem\n");
2341                 CmiAbort("Unknown tag\n");
2342             }               // end switch
2343 #if PRINT_SYH
2344             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2345 #endif
2346             smsg_recv_count ++;
2347             msg_tag = GNI_SMSG_ANY_TAG;
2348 #if CMI_EXERT_RECV_CAP
2349             if (status == GNI_RC_SUCCESS && ++recv_cnt == RECV_CAP) return;
2350 #endif
2351         } //endwhile GNI_SmsgGetNextWTag
2352     }   //end while GetEvent
2353     if(status == GNI_RC_ERROR_RESOURCE)
2354     {
2355         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2356         GNI_RC_CHECK("Smsg_rx_cq full", status);
2357     }
2358 }
2359
2360 static void printDesc(gni_post_descriptor_t *pd)
2361 {
2362     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2363 }
2364
2365 #if CQWRITE
2366 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2367 {
2368     gni_post_descriptor_t *pd;
2369     gni_return_t        status = GNI_RC_SUCCESS;
2370     
2371     MallocPostDesc(pd);
2372     pd->type = GNI_POST_CQWRITE;
2373     pd->cq_mode = GNI_CQMODE_SILENT;
2374     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2375     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2376     pd->cqwrite_value = data;
2377     pd->remote_mem_hndl = mem_hndl;
2378     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2379     GNI_RC_CHECK("GNI_PostCqWrite", status);
2380 }
2381 #endif
2382
2383 // register memory for a message
2384 // return mem handle
2385 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2386 {
2387     gni_return_t status = GNI_RC_SUCCESS;
2388
2389     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2390
2391 #if CMK_PERSISTENT_COMM
2392       // persistent message is always registered
2393       // BIG_MSG small pieces do not have malloc chunk header
2394     if ((seqno <= 1 || seqno == PERSIST_SEQ) && !IsMemHndlZero(MEMHFIELD(msg))) {
2395         *memh = MEMHFIELD(msg);
2396         return GNI_RC_SUCCESS;
2397     }
2398 #endif
2399     if(seqno == 0 
2400 #if CMK_PERSISTENT_COMM
2401          || seqno == PERSIST_SEQ
2402 #endif
2403       )
2404     {
2405         if(IsMemHndlZero((GetMemHndl(msg))))
2406         {
2407             msg = (void*)(msg);
2408             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2409             if(status == GNI_RC_SUCCESS)
2410                 *memh = GetMemHndl(msg);
2411         }
2412         else {
2413             *memh = GetMemHndl(msg);
2414         }
2415     }
2416     else {
2417         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2418         status = registerMemory(msg, size, memh, NULL); 
2419     }
2420     return status;
2421 }
2422
2423 // for BIG_MSG called on receiver side for receiving control message
2424 // LMSG_INIT_TAG
2425 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2426 {
2427 #if     USE_LRTS_MEMPOOL
2428     CONTROL_MSG         *request_msg;
2429     gni_return_t        status = GNI_RC_SUCCESS;
2430     void                *msg_data;
2431     gni_post_descriptor_t *pd;
2432     gni_mem_handle_t    msg_mem_hndl;
2433     int                 size, transaction_size, offset = 0;
2434     size_t              register_size = 0;
2435
2436     // initial a get to transfer data from the sender side */
2437     request_msg = (CONTROL_MSG *) header;
2438     size = request_msg->total_length;
2439     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2440     MallocPostDesc(pd);
2441 #if CMK_WITH_STATS 
2442     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2443 #endif
2444     if(request_msg->seq_id < 2)   {
2445 #if CMK_SMP_TRACE_COMMTHREAD 
2446         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2447 #endif
2448         msg_data = CmiAlloc(size);
2449         CmiSetMsgSeq(msg_data, 0);
2450         _MEMCHECK(msg_data);
2451     }
2452     else {
2453         offset = ONE_SEG*(request_msg->seq_id-1);
2454         msg_data = (char*)request_msg->dest_addr + offset;
2455     }
2456    
2457     pd->cqwrite_value = request_msg->seq_id;
2458
2459     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2460     SetMemHndlZero(pd->local_mem_hndl);
2461     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2462     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2463         if(NoMsgInRecv( (void*)(msg_data)))
2464             register_size = GetMempoolsize((void*)(msg_data));
2465     }
2466
2467     pd->first_operand = ALIGN64(size);                   //  total length
2468
2469     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2470         pd->type            = GNI_POST_FMA_GET;
2471     else
2472         pd->type            = GNI_POST_RDMA_GET;
2473     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2474     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2475     pd->length          = transaction_size;
2476     pd->local_addr      = (uint64_t) msg_data;
2477     pd->remote_addr     = request_msg->source_addr + offset;
2478     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2479     pd->src_cq_hndl     = rdma_tx_cqh;
2480     pd->rdma_mode       = 0;
2481     pd->amo_cmd         = 0;
2482 #if CMI_EXERT_RDMA_CAP
2483     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2484 #endif
2485     //memory registration success
2486     if(status == GNI_RC_SUCCESS )
2487     {
2488         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2489         CMI_GNI_LOCK(lock)
2490 #if REMOTE_EVENT
2491         if( request_msg->seq_id == 0)
2492         {
2493             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2494             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2495             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2496         }
2497 #endif
2498
2499 #if CMK_WITH_STATS
2500         RDMA_TRY_SEND(pd->type)
2501 #endif
2502         if(pd->type == GNI_POST_RDMA_GET) 
2503         {
2504             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2505         }
2506         else
2507         {
2508             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2509         }
2510         CMI_GNI_UNLOCK(lock)
2511
2512         if(status == GNI_RC_SUCCESS )
2513         {
2514 #if CMI_EXERT_RDMA_CAP
2515             RDMA_pending++;
2516 #endif
2517             if(pd->cqwrite_value == 0)
2518             {
2519 #if MACHINE_DEBUG_LOG
2520                 buffered_recv_msg += register_size;
2521                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2522 #endif
2523                 IncreaseMsgInRecv(msg_data);
2524 #if CMK_SMP_TRACE_COMMTHREAD 
2525                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2526 #endif
2527             }
2528 #if  CMK_WITH_STATS
2529             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2530             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2531 #endif
2532         }
2533     }else
2534     {
2535         SetMemHndlZero((pd->local_mem_hndl));
2536     }
2537     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2538     {
2539 #if REMOTE_EVENT
2540         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2541 #else
2542         bufferRdmaMsg(inst_id, pd, -1); 
2543 #endif
2544     }else if (status != GNI_RC_SUCCESS) {
2545         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2546         GNI_RC_CHECK("GetLargeAFter posting", status);
2547     }
2548 #else
2549     CONTROL_MSG         *request_msg;
2550     gni_return_t        status;
2551     void                *msg_data;
2552     gni_post_descriptor_t *pd;
2553     RDMA_REQUEST        *rdma_request_msg;
2554     gni_mem_handle_t    msg_mem_hndl;
2555     //int source;
2556     // initial a get to transfer data from the sender side */
2557     request_msg = (CONTROL_MSG *) header;
2558     msg_data = CmiAlloc(request_msg->length);
2559     _MEMCHECK(msg_data);
2560
2561     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2562
2563     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2564     {
2565         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2566     }
2567
2568     MallocPostDesc(pd);
2569     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2570         pd->type            = GNI_POST_FMA_GET;
2571     else
2572         pd->type            = GNI_POST_RDMA_GET;
2573     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2574     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2575     pd->length          = ALIGN64(request_msg->length);
2576     pd->local_addr      = (uint64_t) msg_data;
2577     pd->remote_addr     = request_msg->source_addr;
2578     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2579     pd->src_cq_hndl     = rdma_tx_cqh;
2580     pd->rdma_mode       = 0;
2581     pd->amo_cmd         = 0;
2582
2583     //memory registration successful
2584     if(status == GNI_RC_SUCCESS)
2585     {
2586         pd->local_mem_hndl  = msg_mem_hndl;
2587        
2588         if(pd->type == GNI_POST_RDMA_GET) 
2589         {
2590             CMI_GNI_LOCK(rdma_tx_cq_lock)
2591             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2592             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2593         }
2594         else
2595         {
2596             CMI_GNI_LOCK(default_tx_cq_lock)
2597             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2598             CMI_GNI_UNLOCK(default_tx_cq_lock)
2599         }
2600
2601     }else
2602     {
2603         SetMemHndlZero(pd->local_mem_hndl);
2604     }
2605     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2606     {
2607         MallocRdmaRequest(rdma_request_msg);
2608         rdma_request_msg->next = 0;
2609         rdma_request_msg->destNode = inst_id;
2610         rdma_request_msg->pd = pd;
2611         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2612     }else {
2613         GNI_RC_CHECK("AFter posting", status);
2614     }
2615 #endif
2616 }
2617
2618 #if CQWRITE
2619 static void PumpCqWriteTransactions()
2620 {
2621
2622     gni_cq_entry_t          ev;
2623     gni_return_t            status;
2624     void                    *msg;  
2625     int                     msg_size;
2626     while(1) {
2627         //CMI_GNI_LOCK(my_cq_lock) 
2628         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2629         //CMI_GNI_UNLOCK(my_cq_lock)
2630         if(status != GNI_RC_SUCCESS) break;
2631         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2632 #if CMK_PERSISTENT_COMM
2633 #if PRINT_SYH
2634         printf(" %d CQ write event %p\n", myrank, msg);
2635 #endif
2636         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2637 #if PRINT_SYH
2638             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2639 #endif
2640             CmiReference(msg);
2641             msg_size = CmiGetMsgSize(msg);
2642             CMI_CHECK_CHECKSUM(msg, msg_size);
2643             handleOneRecvedMsg(msg_size, msg); 
2644             continue;
2645         }
2646 #endif
2647 #if ! USE_LRTS_MEMPOOL
2648        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2649 #else
2650         DecreaseMsgInSend(msg);
2651 #endif
2652         if(NoMsgInSend(msg))
2653             buffered_send_msg -= GetMempoolsize(msg);
2654         CmiFree(msg);
2655     };
2656     if(status == GNI_RC_ERROR_RESOURCE)
2657     {
2658         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2659     }
2660 }
2661 #endif
2662
2663 #if REMOTE_EVENT
2664 static void PumpRemoteTransactions()
2665 {
2666     gni_cq_entry_t          ev;
2667     gni_return_t            status;
2668     void                    *msg;   
2669     int                     inst_id, index, type, size;
2670
2671     while(1) {
2672         CMI_GNI_LOCK(global_gni_lock)
2673         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2674         CMI_GNI_UNLOCK(global_gni_lock)
2675
2676         if(status != GNI_RC_SUCCESS) break;
2677
2678         inst_id = GNI_CQ_GET_INST_ID(ev);
2679         index = GET_INDEX(inst_id);
2680         type = GET_TYPE(inst_id);
2681         switch (type) {
2682         case 0:    // ACK
2683             CmiAssert(index>=0 && index<ackPool.size);
2684             CmiAssert(GetIndexType(ackPool, index) == 1);
2685             msg = GetIndexAddress(ackPool, index);
2686 #if PRINT_SYH
2687         printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2688 #endif
2689 #if ! USE_LRTS_MEMPOOL
2690            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2691 #else
2692             DecreaseMsgInSend(msg);
2693 #endif
2694             if(NoMsgInSend(msg))
2695                 buffered_send_msg -= GetMempoolsize(msg);
2696             CmiFree(msg);
2697             IndexPool_freeslot(&ackPool, index);
2698 #if CMI_EXERT_SEND_CAP
2699             SEND_large_pending--;
2700 #endif
2701             break;
2702 #if CMK_PERSISTENT_COMM
2703         case 1:  {    // PERSISTENT
2704             CmiLock(persistPool.lock);
2705             CmiAssert(GetIndexType(persistPool, index) == 2);
2706             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2707             CmiUnlock(persistPool.lock);
2708             START_EVENT();
2709             msg = slot->destBuf[0].destAddress;
2710             size = CmiGetMsgSize(msg);
2711             CmiReference(msg);
2712             CMI_CHECK_CHECKSUM(msg, size);
2713             TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2714             handleOneRecvedMsg(size, msg); 
2715             break;
2716             }
2717 #endif
2718         default:
2719             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2720             CmiAbort("PumpRemoteTransactions: unknown type");
2721         }
2722     }
2723     if(status == GNI_RC_ERROR_RESOURCE)
2724     {
2725         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2726     }
2727 }
2728 #endif
2729
2730 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2731 {
2732     gni_cq_entry_t          ev;
2733     gni_return_t            status;
2734     uint64_t                type, inst_id;
2735     gni_post_descriptor_t   *tmp_pd;
2736     MSG_LIST                *ptr;
2737     CONTROL_MSG             *ack_msg_tmp;
2738     ACK_MSG                 *ack_msg;
2739     uint8_t                 msg_tag;
2740 #if CMK_DIRECT
2741     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2742 #endif
2743     SMSG_QUEUE         *queue = &smsg_queue;
2744
2745     while(1) {
2746         CMI_GNI_LOCK(my_cq_lock) 
2747         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2748         CMI_GNI_UNLOCK(my_cq_lock)
2749         if(status != GNI_RC_SUCCESS) break;
2750         
2751         type = GNI_CQ_GET_TYPE(ev);
2752         if (type == GNI_CQ_EVENT_TYPE_POST)
2753         {
2754
2755 #if CMI_EXERT_RDMA_CAP
2756             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2757             RDMA_pending--;
2758 #endif
2759             inst_id     = GNI_CQ_GET_INST_ID(ev);
2760 #if PRINT_SYH
2761             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2762 #endif
2763             CMI_GNI_LOCK(my_cq_lock)
2764             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2765             CMI_GNI_UNLOCK(my_cq_lock)
2766
2767             switch (tmp_pd->type) {
2768 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2769             case GNI_POST_RDMA_PUT:
2770 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2771                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2772 #endif
2773             case GNI_POST_FMA_PUT:
2774                 if(tmp_pd->amo_cmd == 1) {
2775 #if CMK_DIRECT
2776                     //sender ACK to receiver to trigger it is done
2777                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2778                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2779                     msg_tag = DIRECT_PUT_DONE_TAG;
2780 #endif
2781                 }
2782                 else {
2783                     CmiFree((void *)tmp_pd->local_addr);
2784 #if REMOTE_EVENT
2785                     FreePostDesc(tmp_pd);
2786                     continue;
2787 #elif CQWRITE
2788                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2789                     FreePostDesc(tmp_pd);
2790                     continue;
2791 #else
2792                     MallocControlMsg(ack_msg_tmp);
2793                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2794                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2795                     ack_msg_tmp->length  = tmp_pd->length;
2796                     msg_tag = PUT_DONE_TAG;
2797 #endif
2798                 }
2799                 break;
2800 #endif
2801             case GNI_POST_RDMA_GET:
2802             case GNI_POST_FMA_GET:  {
2803 #if  ! USE_LRTS_MEMPOOL
2804                 MallocControlMsg(ack_msg_tmp);
2805                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2806                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2807                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2808                 msg_tag = ACK_TAG;  
2809 #else
2810 #if CMK_WITH_STATS
2811                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2812 #endif
2813                 int seq_id = tmp_pd->cqwrite_value;
2814                 if(seq_id > 0)      // BIG_MSG
2815                 {
2816                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2817                     MallocControlMsg(ack_msg_tmp);
2818                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2819                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2820                     ack_msg_tmp->seq_id = seq_id;
2821                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2822                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2823                     ack_msg_tmp->length = tmp_pd->length;
2824                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2825                     msg_tag = BIG_MSG_TAG; 
2826                 } 
2827                 else
2828                 {
2829                     msg_tag = ACK_TAG; 
2830 #if  !REMOTE_EVENT && !CQWRITE
2831                     MallocAckMsg(ack_msg);
2832                     ack_msg->source_addr = tmp_pd->remote_addr;
2833 #endif
2834                 }
2835 #endif
2836                 break;
2837             }
2838             case  GNI_POST_CQWRITE:
2839                    FreePostDesc(tmp_pd);
2840                    continue;
2841             default:
2842                 CmiPrintf("type=%d\n", tmp_pd->type);
2843                 CmiAbort("PumpLocalTransactions: unknown type!");
2844             }      /* end of switch */
2845
2846 #if CMK_DIRECT
2847             if (tmp_pd->amo_cmd == 1) {
2848                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2849                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2850             }
2851             else
2852 #endif
2853             if (msg_tag == ACK_TAG) {
2854 #if !REMOTE_EVENT
2855 #if   !CQWRITE
2856                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2857                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2858 #else
2859                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2860 #endif
2861 #endif
2862             }
2863             else {
2864                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2865                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2866             }
2867 #if CMK_PERSISTENT_COMM
2868             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2869 #endif
2870             {
2871                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2872 #if PRINT_SYH
2873                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2874 #endif
2875                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2876                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2877
2878                     START_EVENT();
2879                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2880                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2881 #if MACHINE_DEBUG_LOG
2882                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2883                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2884                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2885 #endif
2886                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2887                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2888                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2889                 }else if(msg_tag == BIG_MSG_TAG){
2890                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2891                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2892                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2893                         START_EVENT();
2894 #if PRINT_SYH
2895                         printf("Pipeline msg done [%d]\n", myrank);
2896 #endif
2897 #if                 CMK_SMP_TRACE_COMMTHREAD
2898                         if( tmp_pd->cqwrite_value == 1)
2899                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2900 #endif
2901                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2902                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2903                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2904                     }
2905                 }
2906             }
2907             FreePostDesc(tmp_pd);
2908         }
2909     } //end while
2910     if(status == GNI_RC_ERROR_RESOURCE)
2911     {
2912         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2913         GNI_RC_CHECK("Smsg_tx_cq full", status);
2914     }
2915 }
2916
2917 static void  SendRdmaMsg()
2918 {
2919     gni_return_t            status = GNI_RC_SUCCESS;
2920     gni_mem_handle_t        msg_mem_hndl;
2921     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2922     RDMA_REQUEST            *pre = 0;
2923     uint64_t                register_size = 0;
2924     void                    *msg;
2925     int                     i;
2926 #if CMK_SMP
2927     int len = PCQueueLength(sendRdmaBuf);
2928     for (i=0; i<len; i++)
2929     {
2930 #if CMI_EXERT_RDMA_CAP
2931          if( RDMA_pending >= RDMA_cap) break;
2932 #endif
2933         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2934         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2935         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2936         if (ptr == NULL) break;
2937 #else
2938     ptr = sendRdmaBuf;
2939     while (ptr!=0 )
2940     {
2941 #if CMI_EXERT_RDMA_CAP
2942          if( RDMA_pending >= RDMA_cap) break;
2943 #endif
2944 #endif 
2945         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2946         gni_post_descriptor_t *pd = ptr->pd;
2947         
2948         msg = (void*)(pd->local_addr);
2949         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2950         register_size = 0;
2951         if(pd->cqwrite_value == 0) {
2952             if(NoMsgInRecv(msg))
2953                 register_size = GetMempoolsize(msg);
2954         }
2955
2956         if(status == GNI_RC_SUCCESS)        //mem register good
2957         {
2958             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2959             CMI_GNI_LOCK(lock);
2960 #if REMOTE_EVENT
2961             if( pd->cqwrite_value == 0) {
2962                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2963                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2964                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2965             }
2966 #if CMK_PERSISTENT_COMM
2967             else if (pd->cqwrite_value == PERSIST_SEQ) {
2968                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2969                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, PERSIST_EVENT(ptr->ack_index));
2970                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2971             }
2972 #endif
2973 #endif
2974 #if CMK_WITH_STATS
2975             RDMA_TRY_SEND(pd->type)
2976 #endif
2977 #if CMK_SMP_TRACE_COMMTHREAD
2978             int oldpe = -1;
2979             int oldeventid = -1;
2980             START_EVENT();
2981             if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
2982             { 
2983                 TRACE_COMM_GET_MSGID((void*)pd->local_addr, &oldpe, &oldeventid);
2984                 TRACE_COMM_SET_COMM_MSGID((void*)pd->local_addr);
2985             }
2986 #endif
2987
2988             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2989             {
2990                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2991             }
2992             else
2993             {
2994                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2995             }
2996             CMI_GNI_UNLOCK(lock);
2997             
2998 #if CMK_SMP_TRACE_COMMTHREAD
2999             if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
3000             { 
3001                 if (oldpe != -1)  TRACE_COMM_SET_MSGID((void*)pd->local_addr, oldpe, oldeventid);
3002             }
3003 #endif
3004             if(status == GNI_RC_SUCCESS)    //post good
3005             {
3006 #if CMI_EXERT_RDMA_CAP
3007                 RDMA_pending ++;
3008 #endif
3009 #if !CMK_SMP
3010                 tmp_ptr = ptr;
3011                 if(pre != 0) {
3012                     pre->next = ptr->next;
3013                 }
3014                 else {
3015                     sendRdmaBuf = ptr->next;
3016                 }
3017                 ptr = ptr->next;
3018                 FreeRdmaRequest(tmp_ptr);
3019 #endif
3020                 if(pd->cqwrite_value == 0)
3021                 {
3022 #if CMK_SMP_TRACE_COMMTHREAD 
3023                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3024                     if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
3025                     { 
3026                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);
3027                     }
3028 #endif
3029                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3030                 }
3031 #if  CMK_WITH_STATS
3032                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3033                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3034 #endif
3035 #if MACHINE_DEBUG_LOG
3036                 buffered_recv_msg += register_size;
3037                 MACHSTATE(8, "GO request from buffered\n"); 
3038 #endif
3039 #if PRINT_SYH
3040                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
3041 #endif
3042             }else           // cannot post
3043             {
3044 #if CMK_SMP
3045                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3046 #else
3047                 pre = ptr;
3048                 ptr = ptr->next;
3049 #endif
3050 #if PRINT_SYH
3051                 printf("[%d] SendRdmaMsg: post failed. seqno: %d\n", myrank, pd->cqwrite_value);
3052 #endif
3053                 break;
3054             }
3055         } else          //memory registration fails
3056         {
3057 #if CMK_SMP
3058             PCQueuePush(sendRdmaBuf, (char*)ptr);
3059 #else
3060             pre = ptr;
3061             ptr = ptr->next;
3062 #endif
3063         }
3064     } //end while
3065 #if ! CMK_SMP
3066     if(ptr == 0)
3067         sendRdmaTail = pre;
3068 #endif
3069 }
3070
3071 // return 1 if all messages are sent
3072 static int SendBufferMsg(SMSG_QUEUE *queue)
3073 {
3074     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3075     CONTROL_MSG         *control_msg_tmp;
3076     gni_return_t        status;
3077     int                 done = 1;
3078     uint64_t            register_size;
3079     void                *register_addr;
3080     int                 index_previous = -1;
3081
3082 #if CMK_SMP
3083     int          index = 0;
3084 #if ONE_SEND_QUEUE
3085     memset(destpe_avail, 0, mysize * sizeof(char));
3086     for (index=0; index<1; index++)
3087     {
3088         int i, len = PCQueueLength(queue->sendMsgBuf);
3089         for (i=0; i<len; i++) 
3090         {
3091             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3092             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3093             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3094             if(ptr == NULL) break;
3095             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3096                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3097                 continue;
3098             }
3099 #else
3100 #if SMP_LOCKS
3101     int nonempty = PCQueueLength(nonEmptyQueues);
3102     for(index =0; index<nonempty; index++)
3103     {
3104         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
3105         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
3106         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
3107         if(current_list == NULL) break; 
3108         PCQueue current_queue= current_list-> sendSmsgBuf;
3109         CmiLock(current_list->lock);
3110         int i, len = PCQueueLength(current_queue);
3111         current_list->pushed = 0;
3112         CmiUnlock(current_list->lock);
3113 #else
3114     for(index =0; index<mysize; index++)
3115     {
3116         //if (index == myrank) continue;
3117         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3118         int i, len = PCQueueLength(current_queue);
3119 #endif
3120         for (i=0; i<len; i++) 
3121         {
3122             CMI_PCQUEUEPOP_LOCK(current_queue)
3123             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3124             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3125             if (ptr == 0) break;
3126 #endif
3127 #else
3128     int index = queue->smsg_head_index;
3129     while(index != -1)
3130     {
3131         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3132         pre = 0;
3133         while(ptr != 0)
3134         {
3135 #endif
3136             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3137             status = GNI_RC_ERROR_RESOURCE;
3138             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
3139                 /* connection not exists yet */
3140 #if CMK_SMP
3141                   /* non-smp case, connect is issued in send_smsg_message */
3142                 if (smsg_connected_flag[index] == 0)
3143                     connect_to(ptr->destNode); 
3144 #endif
3145             }
3146             else
3147             switch(ptr->tag)
3148             {
3149             case SMALL_DATA_TAG:
3150                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3151                 if(status == GNI_RC_SUCCESS)
3152                 {
3153                     CmiFree(ptr->msg);
3154                 }
3155                 break;
3156             case LMSG_INIT_TAG:
3157                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3158                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
3159                 break;
3160 #if !REMOTE_EVENT && !CQWRITE
3161             case ACK_TAG:
3162                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3163                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3164                 break;
3165 #endif
3166             case BIG_MSG_TAG:
3167                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3168                 if(status == GNI_RC_SUCCESS)
3169                 {
3170                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3171                 }
3172                 break;
3173 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3174             case PUT_DONE_TAG:
3175                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3176                 if(status == GNI_RC_SUCCESS)
3177                 {
3178                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3179                 }
3180                 break;
3181 #endif
3182 #if CMK_DIRECT
3183             case DIRECT_PUT_DONE_TAG:
3184                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3185                 if(status == GNI_RC_SUCCESS)
3186                 {
3187                     free((CMK_DIRECT_HEADER*)ptr->msg);
3188                 }
3189                 break;
3190
3191 #endif
3192             default:
3193                 printf("Weird tag\n");
3194                 CmiAbort("should not happen\n");
3195             }       // end switch
3196             if(status == GNI_RC_SUCCESS)
3197             {
3198 #if PRINT_SYH
3199                 buffered_smsg_counter--;
3200                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3201 #endif
3202 #if !CMK_SMP
3203                 tmp_ptr = ptr;
3204                 if(pre)
3205                 {
3206                     ptr = pre ->next = ptr->next;
3207                 }else
3208                 {
3209                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3210                 }
3211                 FreeMsgList(tmp_ptr);
3212 #else
3213                 FreeMsgList(ptr);
3214 #endif
3215             }else {
3216 #if CMK_SMP
3217 #if ONE_SEND_QUEUE
3218                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3219 #else
3220                 PCQueuePush(current_queue, (char*)ptr);
3221 #endif
3222 #else
3223                 pre = ptr;
3224                 ptr=ptr->next;
3225 #endif
3226                 done = 0;
3227                 if(status == GNI_RC_ERROR_RESOURCE)
3228                 {
3229 #if CMK_SMP && ONE_SEND_QUEUE 
3230                     destpe_avail[ptr->destNode] = 1;
3231 #else
3232                     break;
3233 #endif
3234                 }
3235             } 
3236         } //end while
3237 #if !CMK_SMP
3238         if(ptr == 0)
3239             queue->smsg_msglist_index[index].tail = pre;
3240         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3241         {
3242             if(index_previous != -1)
3243                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3244             else
3245                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3246         }
3247         else {
3248             index_previous = index;
3249         }
3250         index = queue->smsg_msglist_index[index].next;
3251 #else
3252 #if !ONE_SEND_QUEUE && SMP_LOCKS
3253         CmiLock(current_list->lock);
3254         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3255         {
3256             current_list->pushed = 1;
3257             PCQueuePush(nonEmptyQueues, current_list);
3258         }
3259         CmiUnlock(current_list->lock); 
3260 #endif
3261 #endif
3262
3263     }   // end pooling for all cores
3264     return done;
3265 }
3266
3267 static void ProcessDeadlock();
3268 void LrtsAdvanceCommunication(int whileidle)
3269 {
3270     static int count = 0;
3271     /*  Receive Msg first */
3272 #if CMK_SMP_TRACE_COMMTHREAD
3273     double startT, endT;
3274 #endif
3275     if (useDynamicSMSG && whileidle)
3276     {
3277 #if CMK_SMP_TRACE_COMMTHREAD
3278         startT = CmiWallTimer();
3279 #endif
3280         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3281 #if CMK_SMP_TRACE_COMMTHREAD
3282         endT = CmiWallTimer();
3283         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3284 #endif
3285     }
3286
3287 #if CMK_SMP_TRACE_COMMTHREAD
3288     startT = CmiWallTimer();
3289 #endif
3290     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3291     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3292 #if CMK_SMP_TRACE_COMMTHREAD
3293     endT = CmiWallTimer();
3294     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3295 #endif
3296
3297 #if CMK_SMP_TRACE_COMMTHREAD
3298     startT = CmiWallTimer();
3299 #endif
3300     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3301     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3302 #if CMK_SMP_TRACE_COMMTHREAD
3303     endT = CmiWallTimer();
3304     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3305 #endif
3306
3307 #if CMK_SMP_TRACE_COMMTHREAD
3308     startT = CmiWallTimer();
3309 #endif
3310     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3311
3312 #if CQWRITE
3313     PumpCqWriteTransactions();
3314 #endif
3315
3316 #if REMOTE_EVENT
3317     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions());
3318 #endif
3319
3320     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3321 #if CMK_SMP_TRACE_COMMTHREAD
3322     endT = CmiWallTimer();
3323     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3324 #endif
3325  
3326 #if CMK_SMP_TRACE_COMMTHREAD
3327     startT = CmiWallTimer();
3328 #endif
3329     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3330     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3331 #if CMK_SMP_TRACE_COMMTHREAD
3332     endT = CmiWallTimer();
3333     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3334 #endif
3335
3336     /* Send buffered Message */
3337 #if CMK_SMP_TRACE_COMMTHREAD
3338     startT = CmiWallTimer();
3339 #endif
3340 #if CMK_USE_OOB
3341     if (SendBufferMsg(&smsg_oob_queue) == 1)
3342 #endif
3343     {
3344         STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
3345     }
3346     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3347 #if CMK_SMP_TRACE_COMMTHREAD
3348     endT = CmiWallTimer();
3349     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3350 #endif
3351
3352 #if CMK_SMP && ! LARGEPAGE
3353     if (_detected_hang)  ProcessDeadlock();
3354 #endif
3355 }
3356
3357 /* useDynamicSMSG */
3358 static void _init_dynamic_smsg()
3359 {
3360     gni_return_t status;
3361     uint32_t     vmdh_index = -1;
3362     int i;
3363
3364     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3365     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3366     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3367     for(i=0; i<mysize; i++) {
3368         smsg_connected_flag[i] = 0;
3369         smsg_attr_vector_local[i] = NULL;
3370         smsg_attr_vector_remote[i] = NULL;
3371     }
3372     if(mysize <=512)
3373     {
3374         SMSG_MAX_MSG = 4096;
3375     }else if (mysize <= 4096)
3376     {
3377         SMSG_MAX_MSG = 4096/mysize * 1024;
3378     }else if (mysize <= 16384)
3379     {
3380         SMSG_MAX_MSG = 512;
3381     }else {
3382         SMSG_MAX_MSG = 256;
3383     }
3384
3385     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3386     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3387     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3388     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3389     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3390
3391     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3392     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3393     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3394     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3395     mailbox_list->offset = 0;
3396     mailbox_list->next = 0;
3397     
3398     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3399         mailbox_list->size, smsg_rx_cqh,
3400         GNI_MEM_READWRITE,   
3401         vmdh_index,
3402         &(mailbox_list->mem_hndl));
3403     GNI_RC_CHECK("MEMORY registration for smsg", status);
3404
3405     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3406     GNI_RC_CHECK("Unbound EP", status);
3407     
3408     alloc_smsg_attr(&send_smsg_attr);
3409
3410     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3411     GNI_RC_CHECK("post unbound datagram", status);
3412
3413       /* always pre-connect to proc 0 */
3414     //if (myrank != 0) connect_to(0);
3415 }
3416
3417 static void _init_static_smsg()
3418 {
3419     gni_smsg_attr_t      *smsg_attr;
3420     gni_smsg_attr_t      remote_smsg_attr;
3421     gni_smsg_attr_t      *smsg_attr_vec;
3422     gni_mem_handle_t     my_smsg_mdh_mailbox;
3423     int      ret, i;
3424     gni_return_t status;
3425     uint32_t              vmdh_index = -1;
3426     mdh_addr_t            base_infor;
3427     mdh_addr_t            *base_addr_vec;
3428     char *env;
3429
3430     if(mysize <=512)
3431     {
3432         SMSG_MAX_MSG = 1024;
3433     }else if (mysize <= 4096)
3434     {
3435         SMSG_MAX_MSG = 1024;
3436     }else if (mysize <= 16384)
3437     {
3438         SMSG_MAX_MSG = 512;
3439     }else {
3440         SMSG_MAX_MSG = 256;
3441     }
3442     
3443     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3444     if (env) SMSG_MAX_MSG = atoi(env);
3445     CmiAssert(SMSG_MAX_MSG > 0);
3446
3447     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3448     
3449     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3450     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3451     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3452     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3453     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3454     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3455     CmiAssert(ret == 0);
3456     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3457     
3458     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3459             smsg_memlen*(mysize), smsg_rx_cqh,
3460             GNI_MEM_READWRITE,   
3461             vmdh_index,
3462             &my_smsg_mdh_mailbox);
3463     register_memory_size += smsg_memlen*(mysize);
3464     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3465
3466     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3467     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3468
3469     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3470     base_infor.mdh =  my_smsg_mdh_mailbox;
3471     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3472
3473     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3474  
3475     for(i=0; i<mysize; i++)
3476     {
3477         if(i==myrank)
3478             continue;
3479         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3480         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3481         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3482         smsg_attr[i].mbox_offset = i*smsg_memlen;
3483         smsg_attr[i].buff_size = smsg_memlen;
3484         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3485         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3486     }
3487
3488     for(i=0; i<mysize; i++)
3489     {
3490         if (myrank == i) continue;
3491
3492         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3493         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3494         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3495         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3496         remote_smsg_attr.buff_size = smsg_memlen;
3497         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3498         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3499
3500         /* initialize the smsg channel */
3501         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3502         GNI_RC_CHECK("SMSG Init", status);
3503     } //end initialization
3504
3505     free(base_addr_vec);
3506     free(smsg_attr);
3507
3508     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3509     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3510
3511
3512 inline
3513 static void _init_send_queue(SMSG_QUEUE *queue)
3514 {
3515      int i;
3516 #if ONE_SEND_QUEUE
3517      queue->sendMsgBuf = PCQueueCreate();
3518      destpe_avail = (char*)malloc(mysize * sizeof(char));
3519 #else
3520      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3521 #if CMK_SMP && SMP_LOCKS
3522      nonEmptyQueues = PCQueueCreate();
3523 #endif
3524      for(i =0; i<mysize; i++)
3525      {
3526 #if CMK_SMP
3527          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3528 #if SMP_LOCKS
3529          queue->smsg_msglist_index[i].pushed = 0;
3530          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3531 #endif
3532 #else
3533          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3534          queue->smsg_msglist_index[i].next = -1;
3535          queue->smsg_head_index = -1;
3536 #endif
3537         
3538      }
3539 #endif
3540 }
3541
3542 inline
3543 static void _init_smsg()
3544 {
3545     if(mysize > 1) {
3546         if (useDynamicSMSG)
3547             _init_dynamic_smsg();
3548         else
3549             _init_static_smsg();
3550     }
3551
3552     _init_send_queue(&smsg_queue);
3553 #if CMK_USE_OOB
3554     _init_send_queue(&smsg_oob_queue);
3555 #endif
3556 }
3557
3558 static void _init_static_msgq()
3559 {
3560     gni_return_t status;
3561     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3562     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3563     msgq_attrs.smsg_q_sz = 1;
3564     msgq_attrs.rcv_pool_sz = 1;
3565     msgq_attrs.num_msgq_eps = 2;
3566     msgq_attrs.nloc_insts = 8;
3567     msgq_attrs.modes = 0;
3568     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3569
3570     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3571     GNI_RC_CHECK("MSGQ Init", status);
3572
3573
3574 }
3575
3576
3577 static CmiUInt8 total_mempool_size = 0;
3578 static CmiUInt8 total_mempool_calls = 0;
3579
3580 #if USE_LRTS_MEMPOOL
3581 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3582 {
3583     void *pool;
3584     int ret;
3585     gni_return_t status = GNI_RC_SUCCESS;
3586
3587     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3588     if (*size < default_size) *size = default_size;
3589 #if LARGEPAGE
3590     // round up to be multiple of _tlbpagesize
3591     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3592     *size = ALIGNHUGEPAGE(*size);
3593 #endif
3594     total_mempool_size += *size;
3595     total_mempool_calls += 1;
3596 #if   !LARGEPAGE
3597     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3598     {
3599         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3600         CmiAbort("alloc_mempool_block");
3601     }
3602 #endif
3603 #if LARGEPAGE
3604     pool = my_get_huge_pages(*size);
3605     ret = pool==NULL;
3606 #else
3607     ret = posix_memalign(&pool, ALIGNBUF, *size);
3608 #endif
3609     if (ret != 0) {
3610 #if CMK_SMP && STEAL_MEMPOOL
3611       pool = steal_mempool_block(size, mem_hndl);
3612       if (pool != NULL) return pool;
3613 #endif
3614       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3615       if (ret == ENOMEM)
3616         CmiAbort("alloc_mempool_block: out of memory.");
3617       else
3618         CmiAbort("alloc_mempool_block: posix_memalign failed");
3619     }
3620 #if LARGEPAGE
3621     CmiMemLock();
3622     register_count++;
3623     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3624     CmiMemUnlock();
3625     if(status != GNI_RC_SUCCESS) {
3626         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3627 sweep_mempool(CpvAccess(mempool));
3628     }
3629     GNI_RC_CHECK("MEMORY_REGISTER", status);
3630 #else
3631     SetMemHndlZero((*mem_hndl));
3632 #endif
3633     return pool;
3634 }
3635
3636 // ptr is a block head pointer
3637 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3638 {
3639     if(!(IsMemHndlZero(mem_hndl)))
3640     {
3641         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3642     }
3643 #if LARGEPAGE
3644     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3645 #else
3646     free(ptr);
3647 #endif
3648 }
3649 #endif
3650
3651 void LrtsPreCommonInit(int everReturn){
3652 #if USE_LRTS_MEMPOOL
3653     CpvInitialize(mempool_type*, mempool);
3654     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3655     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3656 #endif
3657 }
3658
3659 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3660 {
3661     register int            i;
3662     int                     rc;
3663     int                     device_id = 0;
3664     unsigned int            remote_addr;
3665     gni_cdm_handle_t        cdm_hndl;
3666     gni_return_t            status = GNI_RC_SUCCESS;
3667     uint32_t                vmdh_index = -1;
3668     uint8_t                 ptag;
3669     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3670     int                     first_spawned;
3671     int                     physicalID;
3672     char                   *env;
3673
3674     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3675     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3676     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3677    
3678     status = PMI_Init(&first_spawned);
3679     GNI_RC_CHECK("PMI_Init", status);
3680
3681     status = PMI_Get_size(&mysize);
3682     GNI_RC_CHECK("PMI_Getsize", status);
3683
3684     status = PMI_Get_rank(&myrank);
3685     GNI_RC_CHECK("PMI_getrank", status);
3686
3687     //physicalID = CmiPhysicalNodeID(myrank);
3688     
3689     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3690
3691     *myNodeID = myrank;
3692     *numNodes = mysize;
3693   
3694 #if MULTI_THREAD_SEND
3695     /* Currently, we only consider the case that comm. thread will only recv msgs */
3696     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3697 #endif
3698
3699 #if CMI_EXERT_SEND_CAP
3700     CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
3701 #endif
3702
3703     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3704     
3705     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3706     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3707     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3708
3709     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3710     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3711     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3712
3713     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3714     if (env) useDynamicSMSG = 1;
3715     if (!useDynamicSMSG)
3716       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3717     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3718     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3719     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3720     
3721     if(myrank == 0)
3722     {
3723         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3724         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3725     }
3726 #ifdef USE_ONESIDED
3727     onesided_init(NULL, &onesided_hnd);
3728
3729     // this is a GNI test, so use the libonesided bypass functionality
3730     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3731     local_addr = gniGetNicAddress();
3732 #else
3733     ptag = get_ptag();
3734     cookie = get_cookie();
3735 #if 0
3736     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3737 #endif
3738     //Create and attach to the communication  domain */
3739     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3740     GNI_RC_CHECK("GNI_CdmCreate", status);
3741     //* device id The device id is the minor number for the device
3742     //that is assigned to the device by the system when the device is created.
3743     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3744     //where X is the device number 0 default 
3745     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3746     GNI_RC_CHECK("GNI_CdmAttach", status);
3747     local_addr = get_gni_nic_address(0);
3748 #endif
3749     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3750     _MEMCHECK(MPID_UGNI_AllAddr);
3751     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3752     /* create the local completion queue */
3753     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3754     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3755     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3756     
3757     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3758     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3759     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3760
3761     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3762     GNI_RC_CHECK("Create CQ (rx)", status);
3763     
3764     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3765     GNI_RC_CHECK("Create Post CQ (rx)", status);
3766     
3767     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3768     //GNI_RC_CHECK("Create BTE CQ", status);
3769
3770     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3771     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3772     _MEMCHECK(ep_hndl_array);
3773 #if MULTI_THREAD_SEND 
3774     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3775     //default_tx_cq_lock = CmiCreateLock();
3776     rdma_tx_cq_lock = CmiCreateLock();
3777     smsg_rx_cq_lock = CmiCreateLock();
3778     //global_gni_lock  = CmiCreateLock();
3779     //rx_cq_lock  = CmiCreateLock();
3780 #endif
3781     for (i=0; i<mysize; i++) {
3782         if(i == myrank) continue;
3783         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3784         GNI_RC_CHECK("GNI_EpCreate ", status);   
3785         remote_addr = MPID_UGNI_AllAddr[i];
3786         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3787         GNI_RC_CHECK("GNI_EpBind ", status);   
3788     }
3789
3790     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3791     _init_smsg();
3792     PMI_Barrier();
3793
3794 #if     USE_LRTS_MEMPOOL
3795     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3796     if (env) {
3797         _totalmem = CmiReadSize(env);
3798         if (myrank == 0)
3799             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3800     }
3801
3802     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3803     if (env) _mempool_size = CmiReadSize(env);
3804     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3805         _mempool_size = CmiReadSize(env);
3806
3807
3808     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3809     if (env) {
3810         MAX_REG_MEM = CmiReadSize(env);
3811         user_set_flag = 1;
3812     }
3813     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3814         MAX_REG_MEM = CmiReadSize(env);
3815         user_set_flag = 1;
3816     }
3817
3818     env = getenv("CHARM_UGNI_SEND_MAX");
3819     if (env) {
3820         MAX_BUFF_SEND = CmiReadSize(env);
3821         user_set_flag = 1;
3822     }
3823     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3824         MAX_BUFF_SEND = CmiReadSize(env);
3825         user_set_flag = 1;
3826     }
3827
3828     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3829     if (env) {
3830         _mempool_size_limit = CmiReadSize(env);
3831     }
3832
3833     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3834     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3835
3836     if (myrank==0) {
3837         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3838         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3839         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3840             /* memblock can expand to BIG_MSG * 2 size */
3841             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3842             CmiAbort("mempool maximum size is too small. \n");
3843         }
3844 #if MULTI_THREAD_SEND
3845         printf("Charm++> worker thread sending messages\n");
3846 #elif COMM_THREAD_SEND
3847         printf("Charm++> only comm thread send/recv messages\n");
3848 #endif
3849     }
3850
3851 #endif     /* end of USE_LRTS_MEMPOOL */
3852
3853     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3854     if (env) {
3855         BIG_MSG = CmiReadSize(env);
3856         if (BIG_MSG < ONE_SEG)
3857           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3858     }
3859     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3860     if (env) {
3861         BIG_MSG_PIPELINE = atoi(env);
3862     }
3863
3864     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3865     if (env) _checkProgress = 0;
3866     if (mysize == 1) _checkProgress = 0;
3867
3868 #if CMI_EXERT_RDMA_CAP
3869     env = getenv("CHARM_UGNI_RDMA_MAX");
3870     if (env)  {
3871         RDMA_pending = atoi(env);
3872         if (myrank == 0)
3873             printf("Charm++> Max pending RDMA set to: %d\n", RDMA_pending);
3874     }
3875 #endif
3876     
3877     /*
3878     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3879     if (env) 
3880         _tlbpagesize = CmiReadSize(env);
3881     */
3882     /* real gethugepagesize() is only available when hugetlb module linked */
3883     _tlbpagesize = gethugepagesize();
3884     if (myrank == 0) {
3885         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3886     }
3887
3888 #if LARGEPAGE
3889     if (_tlbpagesize == 4096) {
3890         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3891     }
3892 #endif
3893
3894       /* stats related arguments */
3895 #if CMK_WITH_STATS
3896     CmiGetArgStringDesc(*argv,"+gni_stats_root",&counters_dirname,"counter directory name, default counters");
3897
3898     print_stats = CmiGetArgFlag(*argv, "+print_stats");
3899     
3900     stats_off = CmiGetArgFlag(*argv, "+stats_off");
3901
3902     init_comm_stats();
3903 #endif
3904
3905     /* init DMA buffer for medium message */
3906
3907     //_init_DMA_buffer();
3908     
3909     free(MPID_UGNI_AllAddr);
3910
3911 #if CMK_SMP
3912     sendRdmaBuf = PCQueueCreate();
3913 #else
3914     sendRdmaBuf = 0;
3915 #endif
3916
3917 #if MACHINE_DEBUG_LOG
3918     char ln[200];
3919     sprintf(ln,"debugLog.%d",myrank);
3920     debugLog=fopen(ln,"w");
3921 #endif
3922
3923 //    NTK_Init();
3924 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3925
3926 #if  REMOTE_EVENT
3927     SHIFT = 1;
3928     while (1<<SHIFT < mysize) SHIFT++;
3929     CmiAssert(SHIFT < 31);
3930     IndexPool_init(&ackPool);
3931 #if CMK_PERSISTENT_COMM
3932     IndexPool_init(&persistPool);
3933 #endif
3934 #endif
3935 }
3936
3937 void* LrtsAlloc(int n_bytes, int header)
3938 {
3939     void *ptr = NULL;
3940 #if 0
3941     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3942 #endif
3943     if(n_bytes <= SMSG_MAX_MSG)
3944     {
3945         int totalsize = n_bytes+header;
3946         ptr = malloc(totalsize);
3947     }
3948     else {
3949         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3950 #if     USE_LRTS_MEMPOOL
3951         n_bytes = ALIGN64(n_bytes);
3952         if(n_bytes < BIG_MSG)
3953         {
3954             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3955             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3956         }else 
3957         {
3958 #if LARGEPAGE
3959             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3960             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3961             char *res = my_get_huge_pages(n_bytes);
3962 #else
3963             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3964 #endif
3965             if (res) ptr = res + ALIGNBUF - header;
3966         }
3967 #else
3968         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3969         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3970         ptr = res + ALIGNBUF - header;
3971 #endif
3972     }
3973 #if CMK_PERSISTENT_COMM
3974     if (ptr) SetMemHndlZero(MEMHFIELD((char*)ptr+header));
3975 #endif
3976     return ptr;
3977 }
3978
3979 void  LrtsFree(void *msg)
3980 {
3981     CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3982 #if CMK_PERSISTENT_COMM
3983     if (!IsMemHndlZero(MEMHFIELD((char*)msg+sizeof(CmiChunkHeader)))) return;
3984 #endif
3985     if (size <= SMSG_MAX_MSG)
3986         free(msg);
3987     else {
3988         size = ALIGN64(size);
3989         if(size>=BIG_MSG)
3990         {
3991 #if LARGEPAGE
3992             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3993             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3994 #else
3995             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3996 #endif
3997         }
3998         else {
3999 #if    USE_LRTS_MEMPOOL
4000 #if CMK_SMP
4001             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
4002 #else
4003             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
4004 #endif
4005 #else
4006             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
4007 #endif
4008         }
4009     }
4010 }
4011
4012 void LrtsExit()
4013 {
4014 #if CMK_WITH_STATS
4015 #if CMK_SMP
4016     if(CmiMyRank() == CmiMyNodeSize())
4017 #endif
4018     if (print_stats) print_comm_stats();
4019 #endif
4020     /* free memory ? */
4021 #if USE_LRTS_MEMPOOL
4022     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
4023     mempool_destroy(CpvAccess(mempool));
4024 #endif
4025     PMI_Finalize();
4026     exit(0);
4027 }
4028
4029 void LrtsDrainResources()
4030 {
4031     if(mysize == 1) return;
4032     while (
4033 #if CMK_USE_OOB
4034            !SendBufferMsg(&smsg_oob_queue) ||
4035 #endif
4036            !SendBufferMsg(&smsg_queue) 
4037           )
4038     {
4039         if (useDynamicSMSG)
4040             PumpDatagramConnection();
4041         PumpNetworkSmsg();
4042         PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
4043         PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
4044 #if REMOTE_EVENT
4045         PumpRemoteTransactions();
4046 #endif
4047         SendRdmaMsg();
4048     }
4049     PMI_Barrier();
4050 }
4051
4052 void LrtsAbort(const char *message) {
4053     fprintf(stderr, "[%d] CmiAbort: %s\n", myrank, message);
4054     CmiPrintStackTrace(0);
4055     PMI_Abort(-1, message);
4056 }
4057
4058 /**************************  TIMER FUNCTIONS **************************/
4059 #if CMK_TIMER_USE_SPECIAL
4060 /* MPI calls are not threadsafe, even the timer on some machines */
4061 static CmiNodeLock  timerLock = 0;
4062 static int _absoluteTime = 0;
4063 static int _is_global = 0;
4064 static struct timespec start_ts;
4065
4066 inline int CmiTimerIsSynchronized() {
4067     return 0;
4068 }
4069
4070 inline int CmiTimerAbsolute() {
4071     return _absoluteTime;
4072 }
4073
4074 double CmiStartTimer() {
4075     return 0.0;
4076 }
4077
4078 double CmiInitTime() {
4079     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
4080 }
4081