added CQWrite to replace ACK msg
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37
38 #include <gni_pub.h>
39 #include <pmi.h>
40 //#include <numatoolkit.h>
41
42 #include "converse.h"
43
44 #define     LARGEPAGE              0
45
46 #if LARGEPAGE
47 #include <hugetlbfs.h>
48 #endif
49
50 #if CMK_DIRECT
51 #include "cmidirect.h"
52 #endif
53
54 #if CMK_SMP
55 #define MULTI_THREAD_SEND          0
56 #define COMM_THREAD_SEND           1
57 #endif
58
59 #if CMK_SMP && COMM_THREAD_SEND
60 #define PIGGYBACK_ACK              0
61 #endif
62
63 #define CMI_EXERT_SEND_CAP      0
64 #define CMI_EXERT_RECV_CAP      0
65
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP 16
68 #endif
69
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP 2
72 #endif
73
74 #define USE_LRTS_MEMPOOL                  1
75
76 #define CQWRITE                     1
77 #define REMOTE_EVENT                      1
78
79 #define PRINT_SYH  0
80
81 // Trace communication thread
82 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
83 #define TRACE_THRESHOLD     0.00005
84 #define CMI_MPI_TRACE_MOREDETAILED 0
85 #undef CMI_MPI_TRACE_USEREVENTS
86 #define CMI_MPI_TRACE_USEREVENTS 1
87 #else
88 #undef CMK_SMP_TRACE_COMMTHREAD
89 #define CMK_SMP_TRACE_COMMTHREAD 0
90 #endif
91
92 #define CMK_TRACE_COMMOVERHEAD 0
93 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
94 #undef CMI_MPI_TRACE_USEREVENTS
95 #define CMI_MPI_TRACE_USEREVENTS 1
96 #else
97 #undef CMK_TRACE_COMMOVERHEAD
98 #define CMK_TRACE_COMMOVERHEAD 0
99 #endif
100
101 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
102 CpvStaticDeclare(double, projTraceStart);
103 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
104 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
105 #else
106 #define  START_EVENT()
107 #define  END_EVENT(x)
108 #endif
109
110 #if USE_LRTS_MEMPOOL
111
112 #define oneMB (1024ll*1024)
113 #define oneGB (1024ll*1024*1024)
114
115 static CmiInt8 _mempool_size = 8*oneMB;
116 static CmiInt8 _expand_mem =  4*oneMB;
117 static CmiInt8 _mempool_size_limit = 0;
118
119 static CmiInt8 _totalmem = 0.8*oneGB;
120
121 #if LARGEPAGE
122 static int BIG_MSG  =  16*oneMB;
123 static int ONE_SEG  =  4*oneMB;
124 #else
125 static int BIG_MSG  =  4*oneMB;
126 static int ONE_SEG  =  2*oneMB;
127 #endif
128 #if MULTI_THREAD_SEND
129 static int BIG_MSG_PIPELINE = 1;
130 #else
131 static int BIG_MSG_PIPELINE = 4;
132 #endif
133
134 // dynamic flow control
135 static CmiInt8 buffered_send_msg = 0;
136 static CmiInt8 register_memory_size = 0;
137
138 #if LARGEPAGE
139 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
140 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
141 static CmiInt8  register_count = 0;
142 #else
143 #if CMK_SMP && COMM_THREAD_SEND 
144 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
145 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
146 #else
147 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
148 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
149 #endif
150
151
152 #endif
153
154 #endif     /* end USE_LRTS_MEMPOOL */
155
156 #if MULTI_THREAD_SEND 
157 #define     CMI_GNI_LOCK(x)       CmiLock(x);
158 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
159 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
160 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
161 #else
162 #define     CMI_GNI_LOCK(x)
163 #define     CMI_GNI_UNLOCK(x)
164 #define     CMI_PCQUEUEPOP_LOCK(Q)   
165 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
166 #endif
167
168 static int _tlbpagesize = 4096;
169
170 //static int _smpd_count  = 0;
171
172 static int   user_set_flag  = 0;
173
174 static int _checkProgress = 1;             /* check deadlock */
175 static int _detected_hang = 0;
176
177 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
178
179 // dynamic SMSG
180 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
181
182 static int avg_smsg_connection = 32;
183 static int                 *smsg_connected_flag= 0;
184 static gni_smsg_attr_t     **smsg_attr_vector_local;
185 static gni_smsg_attr_t     **smsg_attr_vector_remote;
186 static gni_ep_handle_t     ep_hndl_unbound;
187 static gni_smsg_attr_t     send_smsg_attr;
188 static gni_smsg_attr_t     recv_smsg_attr;
189
190 typedef struct _dynamic_smsg_mailbox{
191    void     *mailbox_base;
192    int      size;
193    int      offset;
194    gni_mem_handle_t  mem_hndl;
195    struct      _dynamic_smsg_mailbox  *next;
196 }dynamic_smsg_mailbox_t;
197
198 static dynamic_smsg_mailbox_t  *mailbox_list;
199
200 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
201 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
202
203 #if PRINT_SYH
204 int         lrts_send_msg_id = 0;
205 int         lrts_local_done_msg = 0;
206 int         lrts_send_rdma_success = 0;
207 #endif
208
209 #include "machine.h"
210
211 #include "pcqueue.h"
212
213 #include "mempool.h"
214
215 #if CMK_PERSISTENT_COMM
216 #include "machine-persistent.h"
217 #endif
218
219 //#define  USE_ONESIDED 1
220 #ifdef USE_ONESIDED
221 //onesided implementation is wrong, since no place to restore omdh
222 #include "onesided.h"
223 onesided_hnd_t   onesided_hnd;
224 onesided_md_t    omdh;
225 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
226
227 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
228
229 #else
230 uint8_t   onesided_hnd, omdh;
231 #if REMOTE_EVENT
232 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status) \
233     if(register_memory_size+size>= MAX_REG_MEM) { \
234         status = GNI_RC_ERROR_NOMEM;} \
235     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, rdma_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
236         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
237 #else
238 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
239         if (register_memory_size + size >= MAX_REG_MEM) { \
240             status = GNI_RC_ERROR_NOMEM; \
241         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
242             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
243 #endif
244 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
245     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
246              register_memory_size -= size; \
247          else CmiAbort("MEM_DEregister");  \
248     } while (0)
249 #endif
250
251 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
252 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
253 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
254 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
255 #define   IncreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)++
256 #define   DecreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)--
257 #define   IncreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)++
258 #define   DecreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)--
259 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
260 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
261 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
262 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
263 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
264 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
265
266 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
267 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
268
269 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
270 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
271 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
272 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
273
274 #define ALIGNBUF                64
275
276 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
277 /* If SMSG is not used */
278
279 #define FMA_PER_CORE  1024
280 #define FMA_BUFFER_SIZE 1024
281
282 /* If SMSG is used */
283 static int  SMSG_MAX_MSG = 1024;
284 #define SMSG_MAX_CREDIT 72 
285
286 #define MSGQ_MAXSIZE       2048
287 /* large message transfer with FMA or BTE */
288 #define LRTS_GNI_RDMA_THRESHOLD  1024 
289
290 #if CMK_SMP
291 static int  REMOTE_QUEUE_ENTRIES=163840; 
292 static int LOCAL_QUEUE_ENTRIES=163840; 
293 #else
294 static int  REMOTE_QUEUE_ENTRIES=20480;
295 static int LOCAL_QUEUE_ENTRIES=20480; 
296 #endif
297
298 #define BIG_MSG_TAG             0x26
299 #define PUT_DONE_TAG            0x28
300 #define DIRECT_PUT_DONE_TAG     0x29
301 #define ACK_TAG                 0x30
302 /* SMSG is data message */
303 #define SMALL_DATA_TAG          0x31
304 #define SMALL_DATA_ACK_TAG      0x32
305 /* SMSG is a control message to initialize a BTE */
306 #define LMSG_INIT_TAG           0x39 
307 #define LMSG_INIT_ACK_TAG       0x3a 
308
309 #define DEBUG
310 #ifdef GNI_RC_CHECK
311 #undef GNI_RC_CHECK
312 #endif
313 #ifdef DEBUG
314 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
315 #else
316 #define GNI_RC_CHECK(msg,rc)
317 #endif
318
319 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
320 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
321 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
322
323 static int useStaticMSGQ = 0;
324 static int useStaticFMA = 0;
325 static int mysize, myrank;
326 static gni_nic_handle_t   nic_hndl;
327
328 typedef struct {
329     gni_mem_handle_t mdh;
330     uint64_t addr;
331 } mdh_addr_t ;
332 // this is related to dynamic SMSG
333
334 typedef struct mdh_addr_list{
335     gni_mem_handle_t mdh;
336    void *addr;
337     struct mdh_addr_list *next;
338 }mdh_addr_list_t;
339
340 static unsigned int         smsg_memlen;
341 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
342 mdh_addr_t          setup_mem;
343 mdh_addr_t          *smsg_connection_vec = 0;
344 gni_mem_handle_t    smsg_connection_memhndl;
345 static int          smsg_expand_slots = 10;
346 static int          smsg_available_slot = 0;
347 static void         *smsg_mailbox_mempool = 0;
348 mdh_addr_list_t     *smsg_dynamic_list = 0;
349
350 static void             *smsg_mailbox_base;
351 gni_msgq_attr_t         msgq_attrs;
352 gni_msgq_handle_t       msgq_handle;
353 gni_msgq_ep_attr_t      msgq_ep_attrs;
354 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
355
356 /* =====Beginning of Declarations of Machine Specific Variables===== */
357 static int cookie;
358 static int modes = 0;
359 static gni_cq_handle_t       smsg_rx_cqh = NULL;
360 static gni_cq_handle_t       default_tx_cqh = NULL;
361 static gni_cq_handle_t       rdma_tx_cqh = NULL;
362 static gni_cq_handle_t       rdma_rx_cqh = NULL;
363 static gni_cq_handle_t       post_tx_cqh = NULL;
364 static gni_ep_handle_t       *ep_hndl_array;
365
366 static CmiNodeLock           *ep_lock_array;
367 static CmiNodeLock           default_tx_cq_lock; 
368 static CmiNodeLock           rdma_tx_cq_lock; 
369 static CmiNodeLock           global_gni_lock; 
370 static CmiNodeLock           rx_cq_lock;
371 static CmiNodeLock           smsg_mailbox_lock;
372 static CmiNodeLock           smsg_rx_cq_lock;
373 static CmiNodeLock           *mempool_lock;
374
375 typedef struct msg_list
376 {
377     uint32_t destNode;
378     uint32_t size;
379     void *msg;
380     uint8_t tag;
381 #if !CMK_SMP
382     struct msg_list *next;
383 #endif
384 }MSG_LIST;
385
386
387 typedef struct control_msg
388 {
389     uint64_t            source_addr;    /* address from the start of buffer  */
390     uint64_t            dest_addr;      /* address from the start of buffer */
391     int                 total_length;   /* total length */
392     int                 length;         /* length of this packet */
393     uint8_t             seq_id;         //big message   0 meaning single message
394     gni_mem_handle_t    source_mem_hndl;
395     struct control_msg *next;
396 } CONTROL_MSG;
397
398 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
399
400 typedef struct ack_msg
401 {
402     uint64_t            source_addr;    /* address from the start of buffer  */
403 #if ! USE_LRTS_MEMPOOL
404     gni_mem_handle_t    source_mem_hndl;
405     int                 length;          /* total length */
406 #endif
407     struct ack_msg     *next;
408 } ACK_MSG;
409
410 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
411
412 #if CMK_DIRECT
413 typedef struct{
414     uint64_t    handler_addr;
415 }CMK_DIRECT_HEADER;
416
417 typedef struct {
418     char core[CmiMsgHeaderSizeBytes];
419     uint64_t handler;
420 }cmidirectMsg;
421
422 //SYH
423 CpvDeclare(int, CmiHandleDirectIdx);
424 void CmiHandleDirectMsg(cmidirectMsg* msg)
425 {
426
427     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
428    (*(_handle->callbackFnPtr))(_handle->callbackData);
429    CmiFree(msg);
430 }
431
432 void CmiDirectInit()
433 {
434     CpvInitialize(int,  CmiHandleDirectIdx);
435     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
436 }
437
438 #endif
439 typedef struct  rmda_msg
440 {
441     int                   destNode;
442     gni_post_descriptor_t *pd;
443 #if !CMK_SMP
444     struct  rmda_msg      *next;
445 #endif
446 }RDMA_REQUEST;
447
448
449 #if CMK_SMP
450 #define SMP_LOCKS               0
451 #define ONE_SEND_QUEUE                  0
452 PCQueue sendRdmaBuf;
453 typedef struct  msg_list_index
454 {
455     PCQueue     sendSmsgBuf;
456     int         pushed;
457     CmiNodeLock   lock;
458 } MSG_LIST_INDEX;
459 char                *destpe_avail;
460 #if  !ONE_SEND_QUEUE && SMP_LOCKS
461     PCQueue     nonEmptyQueues;
462 #endif
463 #else         /* non-smp */
464
465 static RDMA_REQUEST        *sendRdmaBuf = 0;
466 static RDMA_REQUEST        *sendRdmaTail = 0;
467 typedef struct  msg_list_index
468 {
469     int         next;
470     MSG_LIST    *sendSmsgBuf;
471     MSG_LIST    *tail;
472 } MSG_LIST_INDEX;
473
474 #endif
475
476 // buffered send queue
477 #if ! ONE_SEND_QUEUE
478 typedef struct smsg_queue
479 {
480     MSG_LIST_INDEX   *smsg_msglist_index;
481     int               smsg_head_index;
482 } SMSG_QUEUE;
483 #else
484 typedef struct smsg_queue
485 {
486     PCQueue       sendMsgBuf;
487 }  SMSG_QUEUE;
488 #endif
489
490 SMSG_QUEUE                  smsg_queue;
491 #if PIGGYBACK_ACK
492 SMSG_QUEUE                  smsg_ack_queue;
493 #endif
494 #if CMK_USE_OOB
495 SMSG_QUEUE                  smsg_oob_queue;
496 #endif
497
498 #if CMK_SMP
499
500 #define FreeMsgList(d)   free(d);
501 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
502
503 #else
504
505 static MSG_LIST       *msglist_freelist=0;
506
507 #define FreeMsgList(d)  \
508   do { \
509   (d)->next = msglist_freelist;\
510   msglist_freelist = d; \
511   } while (0)
512
513 #define MallocMsgList(d) \
514   do {  \
515   d = msglist_freelist;\
516   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
517              _MEMCHECK(d);\
518   } else msglist_freelist = d->next; \
519   d->next =0;  \
520   } while (0)
521
522 #endif
523
524 #if CMK_SMP
525
526 #define FreeControlMsg(d)      free(d);
527 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
528
529 #else
530
531 static CONTROL_MSG    *control_freelist=0;
532
533 #define FreeControlMsg(d)       \
534   do { \
535   (d)->next = control_freelist;\
536   control_freelist = d; \
537   } while (0);
538
539 #define MallocControlMsg(d) \
540   do {  \
541   d = control_freelist;\
542   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
543              _MEMCHECK(d);\
544   } else control_freelist = d->next;  \
545   } while (0);
546
547 #endif
548
549 #if CMK_SMP
550
551 #define FreeAckMsg(d)      free(d);
552 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
553
554 #else
555
556 static ACK_MSG        *ack_freelist=0;
557
558 #define FreeAckMsg(d)       \
559   do { \
560   (d)->next = ack_freelist;\
561   ack_freelist = d; \
562   } while (0)
563
564 #define MallocAckMsg(d) \
565   do { \
566   d = ack_freelist;\
567   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
568              _MEMCHECK(d);\
569   } else ack_freelist = d->next; \
570   } while (0)
571
572 #endif
573
574
575 # if CMK_SMP
576 #define FreeRdmaRequest(d)       free(d);
577 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
578 #else
579
580 static RDMA_REQUEST         *rdma_freelist = NULL;
581
582 #define FreeRdmaRequest(d)       \
583   do {  \
584   (d)->next = rdma_freelist;\
585   rdma_freelist = d;    \
586   } while (0)
587
588 #define MallocRdmaRequest(d) \
589   do {   \
590   d = rdma_freelist;\
591   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
592              _MEMCHECK(d);\
593   } else rdma_freelist = d->next; \
594   d->next =0;   \
595   } while (0)
596 #endif
597
598 /* reuse gni_post_descriptor_t */
599 static gni_post_descriptor_t *post_freelist=0;
600
601 #if  !CMK_SMP
602 #define FreePostDesc(d)       \
603     (d)->next_descr = post_freelist;\
604     post_freelist = d;
605
606 #define MallocPostDesc(d) \
607   d = post_freelist;\
608   if (d==0) { \
609      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
610      d->next_descr = 0;\
611       _MEMCHECK(d);\
612   } else post_freelist = d->next_descr;
613 #else
614
615 #define FreePostDesc(d)     free(d);
616 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
617
618 #endif
619
620
621 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
622 static int      buffered_smsg_counter = 0;
623
624 /* SmsgSend return success but message sent is not confirmed by remote side */
625 static MSG_LIST *buffered_fma_head = 0;
626 static MSG_LIST *buffered_fma_tail = 0;
627
628 /* functions  */
629 #define IsFree(a,ind)  !( a& (1<<(ind) ))
630 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
631 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
632
633 CpvDeclare(mempool_type*, mempool);
634
635 #if CMK_WITH_STATS
636 typedef struct comm_thread_stats
637 {
638 int      count_in_send_buffered_ack;
639 double   time_in_send_buffered_ack;
640 double   max_time_in_send_buffered_ack;
641 int      count_in_send_buffered_smsg;
642 double   time_in_send_buffered_smsg;
643 double   max_time_in_send_buffered_smsg;
644 } Comm_Thread_Stats;
645
646 static Comm_Thread_Stats   comm_stats;
647
648 static void init_comm_stats()
649 {
650   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
651 }
652
653 #define STATS_ACK_TIME(x)   \
654         { double t = CmiWallTimer(); \
655           x;        \
656           t = CmiWallTimer() - t;          \
657           comm_stats.count_in_send_buffered_ack ++;        \
658           comm_stats.time_in_send_buffered_ack += t;   \
659           if (t>comm_stats.max_time_in_send_buffered_ack)      \
660               comm_stats.max_time_in_send_buffered_ack = t;    \
661         }
662
663 static void print_comm_stats()
664 {
665     printf("PE[%d]  count/time in send buffered ack:   %d %f\n",  myrank, comm_stats.count_in_send_buffered_ack, comm_stats.time_in_send_buffered_ack);
666     printf("PE[%d]  max time in send buffered ack:     %f\n",  myrank, comm_stats.max_time_in_send_buffered_ack);
667 }
668 #else
669 #define STATS_ACK_TIME(x)            x
670 #endif
671
672 static int print_stats = 0;
673
674 static void
675 allgather(void *in,void *out, int len)
676 {
677     static int *ivec_ptr=NULL,already_called=0,job_size=0;
678     int i,rc;
679     int my_rank;
680     char *tmp_buf,*out_ptr;
681
682     if(!already_called) {
683
684         rc = PMI_Get_size(&job_size);
685         CmiAssert(rc == PMI_SUCCESS);
686         rc = PMI_Get_rank(&my_rank);
687         CmiAssert(rc == PMI_SUCCESS);
688
689         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
690         CmiAssert(ivec_ptr != NULL);
691
692         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
693         CmiAssert(rc == PMI_SUCCESS);
694
695         already_called = 1;
696
697     }
698
699     tmp_buf = (char *)malloc(job_size * len);
700     CmiAssert(tmp_buf);
701
702     rc = PMI_Allgather(in,tmp_buf,len);
703     CmiAssert(rc == PMI_SUCCESS);
704
705     out_ptr = out;
706
707     for(i=0;i<job_size;i++) {
708
709         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
710
711     }
712
713     free(tmp_buf);
714 }
715
716 static void
717 allgather_2(void *in,void *out, int len)
718 {
719     //PMI_Allgather is out of order
720     int i,rc, extend_len;
721     int  rank_index;
722     char *out_ptr, *out_ref;
723     char *in2;
724
725     extend_len = sizeof(int) + len;
726     in2 = (char*)malloc(extend_len);
727
728     memcpy(in2, &myrank, sizeof(int));
729     memcpy(in2+sizeof(int), in, len);
730
731     out_ptr = (char*)malloc(mysize*extend_len);
732
733     rc = PMI_Allgather(in2, out_ptr, extend_len);
734     GNI_RC_CHECK("allgather", rc);
735
736     out_ref = out;
737
738     for(i=0;i<mysize;i++) {
739         //rank index 
740         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
741         //copy to the rank index slot
742         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
743     }
744
745     free(out_ptr);
746     free(in2);
747
748 }
749
750 static unsigned int get_gni_nic_address(int device_id)
751 {
752     unsigned int address, cpu_id;
753     gni_return_t status;
754     int i, alps_dev_id=-1,alps_address=-1;
755     char *token, *p_ptr;
756
757     p_ptr = getenv("PMI_GNI_DEV_ID");
758     if (!p_ptr) {
759         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
760        
761         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
762     } else {
763         while ((token = strtok(p_ptr,":")) != NULL) {
764             alps_dev_id = atoi(token);
765             if (alps_dev_id == device_id) {
766                 break;
767             }
768             p_ptr = NULL;
769         }
770         CmiAssert(alps_dev_id != -1);
771         p_ptr = getenv("PMI_GNI_LOC_ADDR");
772         CmiAssert(p_ptr != NULL);
773         i = 0;
774         while ((token = strtok(p_ptr,":")) != NULL) {
775             if (i == alps_dev_id) {
776                 alps_address = atoi(token);
777                 break;
778             }
779             p_ptr = NULL;
780             ++i;
781         }
782         CmiAssert(alps_address != -1);
783         address = alps_address;
784     }
785     return address;
786 }
787
788 static uint8_t get_ptag(void)
789 {
790     char *p_ptr, *token;
791     uint8_t ptag;
792
793     p_ptr = getenv("PMI_GNI_PTAG");
794     CmiAssert(p_ptr != NULL);
795     token = strtok(p_ptr, ":");
796     ptag = (uint8_t)atoi(token);
797     return ptag;
798         
799 }
800
801 static uint32_t get_cookie(void)
802 {
803     uint32_t cookie;
804     char *p_ptr, *token;
805
806     p_ptr = getenv("PMI_GNI_COOKIE");
807     CmiAssert(p_ptr != NULL);
808     token = strtok(p_ptr, ":");
809     cookie = (uint32_t)atoi(token);
810
811     return cookie;
812 }
813
814 #if LARGEPAGE
815
816 /* directly mmap memory from hugetlbfs for large pages */
817
818 #include <sys/stat.h>
819 #include <fcntl.h>
820 #include <sys/mman.h>
821
822 // size must be _tlbpagesize aligned
823 void *my_get_huge_pages(size_t size)
824 {
825     char filename[512];
826     int fd;
827     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
828     void *ptr = NULL;
829
830     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
831     fd = open(filename, O_RDWR | O_CREAT, mode);
832     if (fd == -1) {
833         CmiAbort("my_get_huge_pages: open filed");
834     }
835     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
836     if (ptr == MAP_FAILED) ptr = NULL;
837 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
838     close(fd);
839     unlink(filename);
840     return ptr;
841 }
842
843 void my_free_huge_pages(void *ptr, int size)
844 {
845 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
846     int ret = munmap(ptr, size);
847     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
848 }
849
850 #endif
851
852 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
853 /* TODO: add any that are related */
854 /* =====End of Definitions of Message-Corruption Related Macros=====*/
855
856
857 #include "machine-lrts.h"
858 #include "machine-common-core.c"
859
860 /* Network progress function is used to poll the network when for
861    messages. This flushes receive buffers on some  implementations*/
862 #if CMK_MACHINE_PROGRESS_DEFINED
863 void CmiMachineProgressImpl() {
864 }
865 #endif
866
867 static void SendRdmaMsg();
868 static void PumpNetworkSmsg();
869 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
870 static void PumpCqWriteTransactions();
871 static int SendBufferMsg(SMSG_QUEUE *queue);
872
873 #if MACHINE_DEBUG_LOG
874 FILE *debugLog = NULL;
875 static CmiInt8 buffered_recv_msg = 0;
876 int         lrts_smsg_success = 0;
877 int         lrts_received_msg = 0;
878 #endif
879
880 static void sweep_mempool(mempool_type *mptr)
881 {
882     int n = 0;
883     block_header *current = &(mptr->block_head);
884
885     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
886     while( current!= NULL) {
887         printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
888         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
889     }
890     printf("[n %d] sweep_mempool slot END.\n", myrank);
891 }
892
893 inline
894 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
895 {
896     block_header *current = *from;
897
898     //while(register_memory_size>= MAX_REG_MEM)
899     //{
900         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
901             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
902
903         *from = current;
904         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
905         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
906         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
907     //}
908     return GNI_RC_SUCCESS;
909 }
910
911 inline 
912 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl)
913 {
914     gni_return_t status = GNI_RC_SUCCESS;
915     //int size = GetMempoolsize(msg);
916     //void *blockaddr = GetMempoolBlockPtr(msg);
917     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
918    
919     block_header *current = &(mptr->block_head);
920     while(register_memory_size>= MAX_REG_MEM)
921     {
922         status = deregisterMemory(mptr, &current);
923         if (status != GNI_RC_SUCCESS) break;
924     }
925     if(register_memory_size>= MAX_REG_MEM) return status;
926
927     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
928     while(1)
929     {
930         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
931         if(status == GNI_RC_SUCCESS)
932         {
933             break;
934         }
935         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
936         {
937             CmiAbort("Memory registor for mempool fails\n");
938         }
939         else
940         {
941             status = deregisterMemory(mptr, &current);
942             if (status != GNI_RC_SUCCESS) break;
943         }
944     }; 
945     return status;
946 }
947
948 inline 
949 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t)
950 {
951     static int rank = -1;
952     int i;
953     gni_return_t status;
954     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
955     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
956     mempool_type *mptr;
957
958     status = registerFromMempool(mptr1, msg, size, t);
959     if (status == GNI_RC_SUCCESS) return status;
960 #if CMK_SMP 
961     for (i=0; i<CmiMyNodeSize()+1; i++) {
962       rank = (rank+1)%(CmiMyNodeSize()+1);
963       mptr = CpvAccessOther(mempool, rank);
964       if (mptr == mptr1) continue;
965       status = registerFromMempool(mptr, msg, size, t);
966       if (status == GNI_RC_SUCCESS) return status;
967     }
968 #endif
969     return  GNI_RC_ERROR_RESOURCE;
970 }
971
972 inline
973 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
974 {
975     MSG_LIST        *msg_tmp;
976     MallocMsgList(msg_tmp);
977     msg_tmp->destNode = destNode;
978     msg_tmp->size   = size;
979     msg_tmp->msg    = msg;
980     msg_tmp->tag    = tag;
981
982 #if !CMK_SMP
983     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
984         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
985         queue->smsg_head_index = destNode;
986         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
987     }else
988     {
989         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
990         queue->smsg_msglist_index[destNode].tail = msg_tmp;
991     }
992 #else
993 #if ONE_SEND_QUEUE
994     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
995 #else
996 #if SMP_LOCKS
997     CmiLock(queue->smsg_msglist_index[destNode].lock);
998     if(queue->smsg_msglist_index[destNode].pushed == 0)
999     {
1000         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1001     }
1002     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1003     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1004 #else
1005     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1006 #endif
1007 #endif
1008 #endif
1009 #if PRINT_SYH
1010     buffered_smsg_counter++;
1011 #endif
1012 }
1013
1014 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1015 {
1016     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1017 }
1018
1019 inline
1020 static void setup_smsg_connection(int destNode)
1021 {
1022     mdh_addr_list_t  *new_entry = 0;
1023     gni_post_descriptor_t *pd;
1024     gni_smsg_attr_t      *smsg_attr;
1025     gni_return_t status = GNI_RC_NOT_DONE;
1026     RDMA_REQUEST        *rdma_request_msg;
1027     
1028     if(smsg_available_slot == smsg_expand_slots)
1029     {
1030         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1031         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1032         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1033
1034         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1035             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1036             GNI_MEM_READWRITE,   
1037             -1,
1038             &(new_entry->mdh));
1039         smsg_available_slot = 0; 
1040         new_entry->next = smsg_dynamic_list;
1041         smsg_dynamic_list = new_entry;
1042     }
1043     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1044     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1045     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1046     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1047     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1048     smsg_attr->buff_size = smsg_memlen;
1049     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1050     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1051     smsg_local_attr_vec[destNode] = smsg_attr;
1052     smsg_available_slot++;
1053     MallocPostDesc(pd);
1054     pd->type            = GNI_POST_FMA_PUT;
1055     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1056     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1057     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1058     pd->length          = sizeof(gni_smsg_attr_t);
1059     pd->local_addr      = (uint64_t) smsg_attr;
1060     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1061     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1062     pd->src_cq_hndl     = rdma_tx_cqh;
1063     pd->rdma_mode       = 0;
1064     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1065     print_smsg_attr(smsg_attr);
1066     if(status == GNI_RC_ERROR_RESOURCE )
1067     {
1068         MallocRdmaRequest(rdma_request_msg);
1069         rdma_request_msg->destNode = destNode;
1070         rdma_request_msg->pd = pd;
1071         /* buffer this request */
1072     }
1073 #if PRINT_SYH
1074     if(status != GNI_RC_SUCCESS)
1075        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1076     else
1077         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1078 #endif
1079 }
1080
1081 /* useDynamicSMSG */
1082 inline 
1083 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1084 {
1085     gni_return_t status = GNI_RC_NOT_DONE;
1086
1087     if(mailbox_list->offset == mailbox_list->size)
1088     {
1089         dynamic_smsg_mailbox_t *new_mailbox_entry;
1090         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1091         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1092         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1093         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1094         new_mailbox_entry->offset = 0;
1095         
1096         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1097             new_mailbox_entry->size, smsg_rx_cqh,
1098             GNI_MEM_READWRITE,   
1099             -1,
1100             &(new_mailbox_entry->mem_hndl));
1101
1102         GNI_RC_CHECK("register", status);
1103         new_mailbox_entry->next = mailbox_list;
1104         mailbox_list = new_mailbox_entry;
1105     }
1106     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1107     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1108     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1109     local_smsg_attr->mbox_offset = mailbox_list->offset;
1110     mailbox_list->offset += smsg_memlen;
1111     local_smsg_attr->buff_size = smsg_memlen;
1112     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1113     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1114 }
1115
1116 /* useDynamicSMSG */
1117 inline 
1118 static int connect_to(int destNode)
1119 {
1120     gni_return_t status = GNI_RC_NOT_DONE;
1121     CmiAssert(smsg_connected_flag[destNode] == 0);
1122     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1123     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1124     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1125     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1126     
1127     CMI_GNI_LOCK(global_gni_lock)
1128     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1129     CMI_GNI_UNLOCK(global_gni_lock)
1130     if (status == GNI_RC_ERROR_RESOURCE) {
1131       /* possibly destNode is making connection at the same time */
1132       free(smsg_attr_vector_local[destNode]);
1133       smsg_attr_vector_local[destNode] = NULL;
1134       free(smsg_attr_vector_remote[destNode]);
1135       smsg_attr_vector_remote[destNode] = NULL;
1136       mailbox_list->offset -= smsg_memlen;
1137       return 0;
1138     }
1139     GNI_RC_CHECK("GNI_Post", status);
1140     smsg_connected_flag[destNode] = 1;
1141     return 1;
1142 }
1143
1144 #if PIGGYBACK_ACK
1145 static void * piggyback_ack(int destNode, int msgsize, int *count)
1146 {
1147     int i;
1148     if (PCQueueEmpty(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf)) return NULL;
1149     int len = PCQueueLength(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
1150     int piggycount = (SMSG_MAX_MSG - msgsize)/sizeof(uint64_t);
1151     if (piggycount > len+1) piggycount = len + 1;
1152     if (piggycount <= 5) return NULL;
1153     uint64_t * buf = (uint64_t*)CmiTmpAlloc(piggycount * sizeof(uint64_t));
1154     CmiAssert(buf != NULL);
1155 //printf("[%d] piggyback_ack: %d\n", myrank, piggycount);
1156     for (i=0; i<piggycount-1; i++) {
1157         CMI_PCQUEUEPOP_LOCK(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf)
1158         MSG_LIST *ptr = (MSG_LIST*)PCQueuePop(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
1159         CMI_PCQUEUEPOP_UNLOCK(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf)
1160         ACK_MSG *msg = ptr->msg;
1161         buf[i+1] = msg->source_addr;
1162         FreeAckMsg(msg);
1163         FreeMsgList(ptr);
1164     }
1165     buf[0] = i;
1166     *count = i + 1;
1167     return buf;
1168 }
1169
1170
1171 static void piggyback_ack_done(int destNode, uint64_t *buf, int done)
1172 {
1173     if (!done)
1174     {
1175         int i;
1176         for (i=0; i<buf[0]; i++) {
1177             MSG_LIST *msg_tmp;
1178             MallocMsgList(msg_tmp);
1179             ACK_MSG  *ack_msg;
1180             MallocAckMsg(ack_msg);
1181             ack_msg->source_addr = buf[i+1];
1182             msg_tmp->size = ACK_MSG_SIZE;
1183             msg_tmp->msg = ack_msg;
1184             msg_tmp->tag = ACK_TAG;
1185             msg_tmp->destNode = destNode;
1186             PCQueuePush(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1187         }
1188     }
1189     CmiTmpFree(buf);
1190 }
1191 #endif
1192
1193 inline 
1194 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
1195 {
1196     unsigned int          remote_address;
1197     uint32_t              remote_id;
1198     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1199     gni_smsg_attr_t       *smsg_attr;
1200     gni_post_descriptor_t *pd;
1201     gni_post_state_t      post_state;
1202     char                  *real_data; 
1203
1204     if (useDynamicSMSG) {
1205         switch (smsg_connected_flag[destNode]) {
1206         case 0: 
1207             connect_to(destNode);         /* continue to case 1 */
1208         case 1:                           /* pending connection, do nothing */
1209             status = GNI_RC_NOT_DONE;
1210             if(inbuff ==0)
1211                 buffer_small_msgs(queue, msg, size, destNode, tag);
1212             return status;
1213         }
1214     }
1215 #if CMK_SMP
1216 #if ! ONE_SEND_QUEUE
1217     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1218 #endif
1219     {
1220 #else
1221     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1222     {
1223 #endif
1224         uint64_t *buf = NULL;
1225         int bufsize = 0;
1226 #if PIGGYBACK_ACK
1227         if (tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG) {
1228             int nack = 0;
1229             buf = piggyback_ack(destNode, size, &nack);
1230             if (buf) {
1231                 tag = (tag == SMALL_DATA_TAG) ? SMALL_DATA_ACK_TAG : LMSG_INIT_ACK_TAG;
1232                 bufsize = nack * sizeof(uint64_t);
1233             }
1234         }
1235 #endif
1236         //CMI_GNI_LOCK(smsg_mailbox_lock)
1237         CMI_GNI_LOCK(default_tx_cq_lock)
1238 #if CMK_SMP_TRACE_COMMTHREAD
1239         int oldpe = -1;
1240         int oldeventid = -1;
1241         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1242         { 
1243             START_EVENT();
1244             if ( tag == SMALL_DATA_TAG)
1245                 real_data = (char*)msg; 
1246             else 
1247                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1248             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1249             TRACE_COMM_SET_COMM_MSGID(real_data);
1250         }
1251 #endif
1252         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], buf, bufsize, msg, size, 0, tag);
1253 #if CMK_SMP_TRACE_COMMTHREAD
1254         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1255 #endif
1256         CMI_GNI_UNLOCK(default_tx_cq_lock)
1257         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1258         if(status == GNI_RC_SUCCESS)
1259         {
1260 #if CMK_SMP_TRACE_COMMTHREAD
1261             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == SMALL_DATA_ACK_TAG || tag == LMSG_INIT_ACK_TAG)
1262             { 
1263                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1264             }
1265 #endif
1266             smsg_send_count ++;
1267         }else
1268             status = GNI_RC_ERROR_RESOURCE;
1269 #if PIGGYBACK_ACK
1270         if (buf) {
1271             piggyback_ack_done(destNode, buf, status==GNI_RC_SUCCESS);
1272             tag = (tag == SMALL_DATA_ACK_TAG) ? SMALL_DATA_TAG : LMSG_INIT_TAG;
1273         }
1274 #endif
1275     }
1276     if(status != GNI_RC_SUCCESS && inbuff ==0)
1277         buffer_small_msgs(queue, msg, size, destNode, tag);
1278     return status;
1279 }
1280
1281 inline 
1282 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1283 {
1284     /* construct a control message and send */
1285     CONTROL_MSG         *control_msg_tmp;
1286     MallocControlMsg(control_msg_tmp);
1287     control_msg_tmp->source_addr = (uint64_t)msg;
1288     control_msg_tmp->seq_id    = seqno;
1289     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1290 #if     USE_LRTS_MEMPOOL
1291     if(size < BIG_MSG)
1292     {
1293         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1294     }
1295     else
1296     {
1297         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1298         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1299         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1300     }
1301 #else
1302     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1303 #endif
1304     return control_msg_tmp;
1305 }
1306
1307 #define BLOCKING_SEND_CONTROL    0
1308
1309 // Large message, send control to receiver, receiver register memory and do a GET, 
1310 // return 1 - send no success
1311 inline
1312 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1313 {
1314     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1315     uint32_t            vmdh_index  = -1;
1316     int                 size;
1317     int                 offset = 0;
1318     uint64_t            source_addr;
1319     int                 register_size; 
1320     void                *msg;
1321
1322     size    =   control_msg_tmp->total_length;
1323     source_addr = control_msg_tmp->source_addr;
1324     register_size = control_msg_tmp->length;
1325
1326 #if  USE_LRTS_MEMPOOL
1327     if( control_msg_tmp->seq_id == 0 ){
1328 #if BLOCKING_SEND_CONTROL
1329         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1330             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1331                 LrtsAdvanceCommunication(0);
1332         }
1333 #endif
1334         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1335         {
1336             msg = (void*)source_addr;
1337             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1338             {
1339                 if(!inbuff)
1340                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1341                 return GNI_RC_ERROR_NOMEM;
1342             }
1343             //register the corresponding mempool
1344             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1345             if(status == GNI_RC_SUCCESS)
1346             {
1347                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1348             }
1349         }else
1350         {
1351             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1352             status = GNI_RC_SUCCESS;
1353         }
1354         if(NoMsgInSend( control_msg_tmp->source_addr))
1355             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1356         else
1357             register_size = 0;
1358     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1359     {
1360         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1361         source_addr += offset;
1362         size = control_msg_tmp->length;
1363 #if BLOCKING_SEND_CONTROL
1364         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1365             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1366                 LrtsAdvanceCommunication(0);
1367         }
1368 #endif
1369         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1370             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1371             {
1372                 if(!inbuff)
1373                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1374                 return GNI_RC_ERROR_NOMEM;
1375             }
1376             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1377             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1378         }
1379         else
1380         {
1381             status = GNI_RC_SUCCESS;
1382         }
1383         register_size = 0;  
1384     }
1385
1386     if(status == GNI_RC_SUCCESS)
1387     {
1388         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff);  
1389         if(status == GNI_RC_SUCCESS)
1390         {
1391             buffered_send_msg += register_size;
1392             if(control_msg_tmp->seq_id == 0)
1393             {
1394                 IncreaseMsgInSend(source_addr);
1395             }
1396             FreeControlMsg(control_msg_tmp);
1397             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1398         }else
1399             status = GNI_RC_ERROR_RESOURCE;
1400
1401     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1402     {
1403         CmiAbort("Memory registor for large msg\n");
1404     }else 
1405     {
1406         status = GNI_RC_ERROR_NOMEM; 
1407         if(!inbuff)
1408             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1409     }
1410     return status;
1411 #else
1412     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1413     if(status == GNI_RC_SUCCESS)
1414     {
1415         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0);  
1416         if(status == GNI_RC_SUCCESS)
1417         {
1418             FreeControlMsg(control_msg_tmp);
1419         }
1420     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1421     {
1422         CmiAbort("Memory registor for large msg\n");
1423     }else 
1424     {
1425         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1426     }
1427     return status;
1428 #endif
1429 }
1430
1431 inline void LrtsPrepareEnvelope(char *msg, int size)
1432 {
1433     CmiSetMsgSize(msg, size);
1434 }
1435
1436 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1437 {
1438     gni_return_t        status  =   GNI_RC_SUCCESS;
1439     uint8_t tag;
1440     CONTROL_MSG         *control_msg_tmp;
1441     int                 oob = ( mode & OUT_OF_BAND);
1442     SMSG_QUEUE          *queue;
1443
1444     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1445 #if CMK_USE_OOB
1446     queue = oob? &smsg_oob_queue : &smsg_queue;
1447 #else
1448     queue = &smsg_queue;
1449 #endif
1450
1451     LrtsPrepareEnvelope(msg, size);
1452
1453 #if PRINT_SYH
1454     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1455 #endif 
1456 #if CMK_SMP 
1457     if(size <= SMSG_MAX_MSG)
1458         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1459     else if (size < BIG_MSG) {
1460         control_msg_tmp =  construct_control_msg(size, msg, 0);
1461         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1462     }
1463     else {
1464           CmiSetMsgSeq(msg, 0);
1465           control_msg_tmp =  construct_control_msg(size, msg, 1);
1466           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1467     }
1468 #else   //non-smp, smp(worker sending)
1469     if(size <= SMSG_MAX_MSG)
1470     {
1471         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1472             CmiFree(msg);
1473     }
1474     else if (size < BIG_MSG) {
1475         control_msg_tmp =  construct_control_msg(size, msg, 0);
1476         send_large_messages(queue, destNode, control_msg_tmp, 0);
1477     }
1478     else {
1479 #if     USE_LRTS_MEMPOOL
1480         CmiSetMsgSeq(msg, 0);
1481         control_msg_tmp =  construct_control_msg(size, msg, 1);
1482         send_large_messages(queue, destNode, control_msg_tmp, 0);
1483 #else
1484         control_msg_tmp =  construct_control_msg(size, msg, 0);
1485         send_large_messages(queue, destNode, control_msg_tmp, 0);
1486 #endif
1487     }
1488 #endif
1489     return 0;
1490 }
1491
1492 static void    PumpDatagramConnection();
1493 static      int         event_SetupConnect = 111;
1494 static      int         event_PumpSmsg = 222 ;
1495 static      int         event_PumpTransaction = 333;
1496 static      int         event_PumpRdmaTransaction = 444;
1497 static      int         event_SendBufferSmsg = 444;
1498 static      int         event_SendFmaRdmaMsg = 555;
1499 static      int         event_AdvanceCommunication = 666;
1500 static void registerUserTraceEvents() {
1501 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1502     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1503     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1504     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1505     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1506     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1507     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1508     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1509 #endif
1510 }
1511
1512 static void ProcessDeadlock()
1513 {
1514     static CmiUInt8 *ptr = NULL;
1515     static CmiUInt8  last = 0, mysum, sum;
1516     static int count = 0;
1517     gni_return_t status;
1518     int i;
1519
1520 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1521 //sweep_mempool(CpvAccess(mempool));
1522     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1523     mysum = smsg_send_count + smsg_recv_count;
1524     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1525     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1526     GNI_RC_CHECK("PMI_Allgather", status);
1527     sum = 0;
1528     for (i=0; i<mysize; i++)  sum+= ptr[i];
1529     if (last == 0 || sum == last) 
1530         count++;
1531     else
1532         count = 0;
1533     last = sum;
1534     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1535     if (count == 2) { 
1536         /* detected twice, it is a real deadlock */
1537         if (myrank == 0)  {
1538             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1539             CmiAbort("Fatal> Deadlock detected.");
1540         }
1541
1542     }
1543     _detected_hang = 0;
1544 }
1545
1546 static void CheckProgress()
1547 {
1548     if (smsg_send_count == last_smsg_send_count &&
1549         smsg_recv_count == last_smsg_recv_count ) 
1550     {
1551         _detected_hang = 1;
1552 #if !CMK_SMP
1553         if (_detected_hang) ProcessDeadlock();
1554 #endif
1555
1556     }
1557     else {
1558         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1559         last_smsg_send_count = smsg_send_count;
1560         last_smsg_recv_count = smsg_recv_count;
1561         _detected_hang = 0;
1562     }
1563 }
1564
1565 static void set_limit()
1566 {
1567     //if (!user_set_flag && CmiMyRank() == 0) {
1568     if (CmiMyRank() == 0) {
1569         int mynode = CmiPhysicalNodeID(CmiMyPe());
1570         int numpes = CmiNumPesOnPhysicalNode(mynode);
1571         int numprocesses = numpes / CmiMyNodeSize();
1572         MAX_REG_MEM  = _totalmem / numprocesses;
1573         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1574         if (CmiMyPe() == 0)
1575            printf("mem_max = %lld, send_max =%lld\n", MAX_REG_MEM, MAX_BUFF_SEND);
1576     }
1577 }
1578
1579 void LrtsPostCommonInit(int everReturn)
1580 {
1581 #if CMK_DIRECT
1582     CmiDirectInit();
1583 #endif
1584 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1585     CpvInitialize(double, projTraceStart);
1586     /* only PE 0 needs to care about registration (to generate sts file). */
1587     //if (CmiMyPe() == 0) 
1588     {
1589         registerMachineUserEventsFunction(&registerUserTraceEvents);
1590     }
1591 #endif
1592
1593 #if CMK_SMP
1594     CmiIdleState *s=CmiNotifyGetState();
1595     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1596     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1597 #else
1598     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1599     if (useDynamicSMSG)
1600     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1601 #endif
1602
1603    /* if (_checkProgress)
1604 #if CMK_SMP
1605     if (CmiMyRank() == 0)
1606 #endif
1607     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1608     */
1609 #if !LARGEPAGE
1610     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1611 #endif
1612 }
1613
1614 /* this is called by worker thread */
1615 void LrtsPostNonLocal(){
1616 #if CMK_SMP_TRACE_COMMTHREAD
1617     double startT, endT;
1618 #endif
1619 #if MULTI_THREAD_SEND
1620     if(mysize == 1) return;
1621 #if CMK_SMP_TRACE_COMMTHREAD
1622     traceEndIdle();
1623 #endif
1624
1625 #if CMK_SMP_TRACE_COMMTHREAD
1626     startT = CmiWallTimer();
1627 #endif
1628     PumpNetworkSmsg();
1629     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1630     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1631 #if CMK_USE_OOB
1632     if (SendBufferMsg(&smsg_oob_queue) == 1)
1633 #endif
1634     {
1635 #if PIGGYBACK_ACK
1636     //if (count%10 == 0) SendBufferMsg(&smsg_ack_queue);
1637     if (SendBufferMsg(&smsg_queue) == 1) {
1638         //if (count++ % 10 == 0) 
1639         SendBufferMsg(&smsg_ack_queue);
1640     }
1641 #else
1642     SendBufferMsg(&smsg_queue);
1643 #endif
1644     }
1645
1646     SendRdmaMsg();
1647 #if CMK_SMP_TRACE_COMMTHREAD
1648     endT = CmiWallTimer();
1649     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1650 #endif
1651 #if CMK_SMP_TRACE_COMMTHREAD
1652     traceBeginIdle();
1653 #endif
1654 #endif
1655 }
1656
1657 /* useDynamicSMSG */
1658 static void    PumpDatagramConnection()
1659 {
1660     uint32_t          remote_address;
1661     uint32_t          remote_id;
1662     gni_return_t status;
1663     gni_post_state_t  post_state;
1664     uint64_t          datagram_id;
1665     int i;
1666
1667    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1668    {
1669        if (datagram_id >= mysize) {           /* bound endpoint */
1670            int pe = datagram_id - mysize;
1671            CMI_GNI_LOCK(global_gni_lock)
1672            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1673            CMI_GNI_UNLOCK(global_gni_lock)
1674            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1675            {
1676                CmiAssert(remote_id == pe);
1677                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1678                GNI_RC_CHECK("Dynamic SMSG Init", status);
1679 #if PRINT_SYH
1680                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1681 #endif
1682                CmiAssert(smsg_connected_flag[pe] == 1);
1683                smsg_connected_flag[pe] = 2;
1684            }
1685        }
1686        else {         /* unbound ep */
1687            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1688            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1689            {
1690                CmiAssert(remote_id<mysize);
1691                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1692                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1693                GNI_RC_CHECK("Dynamic SMSG Init", status);
1694 #if PRINT_SYH
1695                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1696 #endif
1697                smsg_connected_flag[remote_id] = 2;
1698
1699                alloc_smsg_attr(&send_smsg_attr);
1700                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1701                GNI_RC_CHECK("post unbound datagram", status);
1702            }
1703        }
1704    }
1705 }
1706
1707 /* pooling CQ to receive network message */
1708 static void PumpNetworkRdmaMsgs()
1709 {
1710     gni_cq_entry_t      event_data;
1711     gni_return_t        status;
1712
1713 }
1714
1715 inline 
1716 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1717 {
1718     RDMA_REQUEST        *rdma_request_msg;
1719     MallocRdmaRequest(rdma_request_msg);
1720     rdma_request_msg->destNode = inst_id;
1721     rdma_request_msg->pd = pd;
1722 #if CMK_SMP
1723     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1724 #else
1725     if(sendRdmaBuf == 0)
1726     {
1727         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1728     }else{
1729         sendRdmaTail->next = rdma_request_msg;
1730         sendRdmaTail =  rdma_request_msg;
1731     }
1732 #endif
1733
1734 }
1735
1736 #if PIGGYBACK_ACK
1737 int processPiggybackAckHeader(void *header)
1738 {
1739     int i;
1740     uint64_t *buf = (uint64_t*)header;
1741     int piggycount = buf[0];
1742 //printf("[%d] got piggyback msg: %d\n", myrank, piggycount);
1743     for (i=0; i<piggycount; i++) {
1744         void *msg = (void*)(buf[i+1]);
1745         CmiAssert(msg != NULL);
1746         DecreaseMsgInSend(msg);
1747         if(NoMsgInSend(msg))
1748             buffered_send_msg -= GetMempoolsize(msg);
1749         CmiFree(msg);
1750     }
1751     return piggycount;
1752 }
1753 #endif
1754
1755 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1756
1757 static void PumpNetworkSmsg()
1758 {
1759     uint64_t            inst_id;
1760     int                 ret;
1761     gni_cq_entry_t      event_data;
1762     gni_return_t        status, status2;
1763     void                *header;
1764     uint8_t             msg_tag;
1765     int                 msg_nbytes;
1766     void                *msg_data;
1767     gni_mem_handle_t    msg_mem_hndl;
1768     gni_smsg_attr_t     *smsg_attr;
1769     gni_smsg_attr_t     *remote_smsg_attr;
1770     int                 init_flag;
1771     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1772     uint64_t            source_addr;
1773     SMSG_QUEUE         *queue = &smsg_queue;
1774 #if     CMK_DIRECT
1775     cmidirectMsg        *direct_msg;
1776 #endif
1777     while(1)
1778     {
1779         CMI_GNI_LOCK(smsg_rx_cq_lock)
1780         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1781         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
1782         if(status != GNI_RC_SUCCESS)
1783             break;
1784         inst_id = GNI_CQ_GET_INST_ID(event_data);
1785         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1786 #if PRINT_SYH
1787         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1788 #endif
1789         if (useDynamicSMSG) {
1790             /* subtle: smsg may come before connection is setup */
1791             while (smsg_connected_flag[inst_id] != 2) 
1792                PumpDatagramConnection();
1793         }
1794         msg_tag = GNI_SMSG_ANY_TAG;
1795         while(1) {
1796             CMI_GNI_LOCK(smsg_mailbox_lock)
1797             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1798             if (status != GNI_RC_SUCCESS)
1799             {
1800                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1801                 break;
1802             }
1803 #if PRINT_SYH
1804             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1805 #endif
1806             /* copy msg out and then put into queue (small message) */
1807             switch (msg_tag) {
1808 #if PIGGYBACK_ACK
1809             case SMALL_DATA_ACK_TAG:
1810             {
1811                 int piggycount = processPiggybackAckHeader(header);
1812                 header = (uint64_t*)header + piggycount + 1;
1813             }
1814 #endif
1815             case SMALL_DATA_TAG:
1816             {
1817                 START_EVENT();
1818                 msg_nbytes = CmiGetMsgSize(header);
1819                 msg_data    = CmiAlloc(msg_nbytes);
1820                 memcpy(msg_data, (char*)header, msg_nbytes);
1821                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1822                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1823                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1824                 handleOneRecvedMsg(msg_nbytes, msg_data);
1825                 break;
1826             }
1827 #if PIGGYBACK_ACK
1828             case LMSG_INIT_ACK_TAG:
1829             {
1830                 int piggycount = processPiggybackAckHeader(header);
1831                 header = (uint64_t*)header + piggycount + 1;
1832             }
1833 #endif
1834             case LMSG_INIT_TAG:
1835             {
1836 #if MULTI_THREAD_SEND
1837                 MallocControlMsg(control_msg_tmp);
1838                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
1839                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1840                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1841                 getLargeMsgRequest(control_msg_tmp, inst_id);
1842                 FreeControlMsg(control_msg_tmp);
1843 #else
1844                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1845                 getLargeMsgRequest(header, inst_id);
1846                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1847 #endif
1848                 break;
1849             }
1850             case ACK_TAG:   //msg fit into mempool
1851             {
1852                 /* Get is done, release message . Now put is not used yet*/
1853                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1854                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1855                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1856 #if ! USE_LRTS_MEMPOOL
1857                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1858 #else
1859                 DecreaseMsgInSend(msg);
1860 #endif
1861                 if(NoMsgInSend(msg))
1862                     buffered_send_msg -= GetMempoolsize(msg);
1863                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1864                 CmiFree(msg);
1865                 break;
1866             }
1867             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1868             {
1869 #if MULTI_THREAD_SEND
1870                 MallocControlMsg(header_tmp);
1871                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
1872                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1873                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1874                    /* FIXME: leak */
1875 #else
1876                 header_tmp = (CONTROL_MSG *) header;
1877                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1878 #endif
1879                 void *msg = (void*)(header_tmp->source_addr);
1880                 int cur_seq = CmiGetMsgSeq(msg);
1881                 int offset = ONE_SEG*(cur_seq+1);
1882                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1883                 buffered_send_msg -= header_tmp->length;
1884                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1885                 if (remain_size < 0) remain_size = 0;
1886                 CmiSetMsgSize(msg, remain_size);
1887                 if(remain_size <= 0) //transaction done
1888                 {
1889                     CmiFree(msg);
1890                 }else if (header_tmp->total_length > offset)
1891                 {
1892                     CmiSetMsgSeq(msg, cur_seq+1);
1893                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1894                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1895                     //send next seg
1896                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1897                          // pipelining
1898                     if (header_tmp->seq_id == 1) {
1899                       int i;
1900                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1901                         int seq = cur_seq+i+2;
1902                         CmiSetMsgSeq(msg, seq-1);
1903                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1904                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1905                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1906                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1907                       }
1908                     }
1909                 }
1910 #if MULTI_THREAD_SEND
1911                 FreeControlMsg(header_tmp);
1912 #else
1913                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1914 #endif
1915                 break;
1916             }
1917 #if CMK_PERSISTENT_COMM
1918             case PUT_DONE_TAG: //persistent message
1919                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1920                 int size = ((CONTROL_MSG *) header)->length;
1921                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1922                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1923                 CmiReference(msg);
1924                 handleOneRecvedMsg(size, msg); 
1925                 break;
1926 #endif
1927 #if CMK_DIRECT
1928             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1929                 //create a trigger message
1930                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
1931                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
1932                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1933                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1934                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
1935                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
1936                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1937                 break;
1938 #endif
1939             default: {
1940                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1941                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1942                 printf("weird tag problem\n");
1943                 CmiAbort("Unknown tag\n");
1944                      }
1945             }               // end switch
1946 #if PRINT_SYH
1947             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1948 #endif
1949             smsg_recv_count ++;
1950             msg_tag = GNI_SMSG_ANY_TAG;
1951         } //endwhile getNext
1952     }   //end while GetEvent
1953     if(status == GNI_RC_ERROR_RESOURCE)
1954     {
1955         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
1956         GNI_RC_CHECK("Smsg_rx_cq full", status);
1957     }
1958 }
1959
1960 static void printDesc(gni_post_descriptor_t *pd)
1961 {
1962     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1963 }
1964 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
1965 {
1966     gni_post_descriptor_t *pd;
1967     gni_return_t        status = GNI_RC_SUCCESS;
1968     
1969     MallocPostDesc(pd);
1970
1971     pd->type = GNI_POST_CQWRITE;
1972     pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
1973     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
1974     pd->cqwrite_value = data;
1975     pd->remote_mem_hndl = mem_hndl;
1976     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
1977
1978     GNI_RC_CHECK("GNI_PostCqWrite", status);
1979
1980 }
1981 // for BIG_MSG called on receiver side for receiving control message
1982 // LMSG_INIT_TAG
1983 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1984 {
1985 #if     USE_LRTS_MEMPOOL
1986     CONTROL_MSG         *request_msg;
1987     gni_return_t        status = GNI_RC_SUCCESS;
1988     void                *msg_data;
1989     gni_post_descriptor_t *pd;
1990     gni_mem_handle_t    msg_mem_hndl;
1991     int source, size, transaction_size, offset = 0;
1992     size_t     register_size = 0;
1993
1994     // initial a get to transfer data from the sender side */
1995     request_msg = (CONTROL_MSG *) header;
1996     size = request_msg->total_length;
1997     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1998     if(request_msg->seq_id < 2)   {
1999         //START_EVENT();
2000         msg_data = CmiAlloc(size);
2001         CmiSetMsgSeq(msg_data, 0);
2002         //TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
2003         _MEMCHECK(msg_data);
2004     }
2005     else {
2006         offset = ONE_SEG*(request_msg->seq_id-1);
2007         msg_data = (char*)request_msg->dest_addr + offset;
2008     }
2009    
2010     MallocPostDesc(pd);
2011     pd->cqwrite_value = request_msg->seq_id;
2012     if( request_msg->seq_id == 0)
2013     {
2014         pd->local_mem_hndl= GetMemHndl(msg_data);
2015         transaction_size = ALIGN64(size);
2016         if(IsMemHndlZero(pd->local_mem_hndl))
2017         {   
2018             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
2019             if(status == GNI_RC_SUCCESS)
2020             {
2021                 pd->local_mem_hndl = GetMemHndl(msg_data);
2022             }
2023             else
2024             {
2025                 SetMemHndlZero(pd->local_mem_hndl);
2026             }
2027         }
2028         if(NoMsgInRecv( (void*)(msg_data)))
2029             register_size = GetMempoolsize((void*)(msg_data));
2030         else
2031             register_size = 0;
2032     }
2033     else{
2034         transaction_size = ALIGN64(request_msg->length);
2035         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
2036         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2037         {
2038             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2039         }
2040     }
2041     pd->first_operand = ALIGN64(size);                   //  total length
2042
2043     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2044         pd->type            = GNI_POST_FMA_GET;
2045     else
2046         pd->type            = GNI_POST_RDMA_GET;
2047     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2048     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2049     pd->length          = transaction_size;
2050     pd->local_addr      = (uint64_t) msg_data;
2051     pd->remote_addr     = request_msg->source_addr + offset;
2052     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2053     pd->src_cq_hndl     = rdma_tx_cqh;
2054     pd->rdma_mode       = 0;
2055     pd->amo_cmd         = 0;
2056
2057     //memory registration success
2058     if(status == GNI_RC_SUCCESS)
2059     {
2060         if(pd->type == GNI_POST_RDMA_GET) 
2061         {
2062             CMI_GNI_LOCK(rdma_tx_cq_lock)
2063             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2064             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2065         }
2066         else
2067         {
2068             CMI_GNI_LOCK(default_tx_cq_lock)
2069             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2070             CMI_GNI_UNLOCK(default_tx_cq_lock)
2071         }
2072
2073         if(status == GNI_RC_SUCCESS )
2074         {
2075             if(pd->cqwrite_value == 0)
2076             {
2077 #if MACHINE_DEBUG_LOG
2078                 buffered_recv_msg += register_size;
2079                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2080 #endif
2081                 IncreaseMsgInRecv(msg_data);
2082             }
2083         }
2084     }else
2085     {
2086         SetMemHndlZero((pd->local_mem_hndl));
2087     }
2088     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2089     {
2090         bufferRdmaMsg(inst_id, pd); 
2091     }else {
2092          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
2093         GNI_RC_CHECK("GetLargeAFter posting", status);
2094     }
2095 #else
2096     CONTROL_MSG         *request_msg;
2097     gni_return_t        status;
2098     void                *msg_data;
2099     gni_post_descriptor_t *pd;
2100     RDMA_REQUEST        *rdma_request_msg;
2101     gni_mem_handle_t    msg_mem_hndl;
2102     //int source;
2103     // initial a get to transfer data from the sender side */
2104     request_msg = (CONTROL_MSG *) header;
2105     msg_data = CmiAlloc(request_msg->length);
2106     _MEMCHECK(msg_data);
2107
2108     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
2109
2110     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2111     {
2112         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2113     }
2114
2115     MallocPostDesc(pd);
2116     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2117         pd->type            = GNI_POST_FMA_GET;
2118     else
2119         pd->type            = GNI_POST_RDMA_GET;
2120     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2121     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2122     pd->length          = ALIGN64(request_msg->length);
2123     pd->local_addr      = (uint64_t) msg_data;
2124     pd->remote_addr     = request_msg->source_addr;
2125     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2126     pd->src_cq_hndl     = rdma_tx_cqh;
2127     pd->rdma_mode       = 0;
2128     pd->amo_cmd         = 0;
2129
2130     //memory registration successful
2131     if(status == GNI_RC_SUCCESS)
2132     {
2133         pd->local_mem_hndl  = msg_mem_hndl;
2134        
2135         if(pd->type == GNI_POST_RDMA_GET) 
2136         {
2137             CMI_GNI_LOCK(rdma_tx_cq_lock)
2138             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2139             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2140         }
2141         else
2142         {
2143             CMI_GNI_LOCK(default_tx_cq_lock)
2144             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2145             CMI_GNI_UNLOCK(default_tx_cq_lock)
2146         }
2147
2148     }else
2149     {
2150         SetMemHndlZero(pd->local_mem_hndl);
2151     }
2152     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2153     {
2154         MallocRdmaRequest(rdma_request_msg);
2155         rdma_request_msg->next = 0;
2156         rdma_request_msg->destNode = inst_id;
2157         rdma_request_msg->pd = pd;
2158         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2159     }else {
2160         GNI_RC_CHECK("AFter posting", status);
2161     }
2162 #endif
2163 }
2164
2165 static void PumpCqWriteTransactions()
2166 {
2167
2168     gni_cq_entry_t          ev;
2169     gni_return_t            status;
2170     void                    *msg;   
2171     while(1) {
2172         //CMI_GNI_LOCK(my_cq_lock) 
2173         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2174         //CMI_GNI_UNLOCK(my_cq_lock)
2175         if(status != GNI_RC_SUCCESS) break;
2176         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2177
2178         DecreaseMsgInSend(msg);
2179 #if ! USE_LRTS_MEMPOOL
2180        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2181 #else
2182         DecreaseMsgInSend(msg);
2183 #endif
2184         if(NoMsgInSend(msg))
2185             buffered_send_msg -= GetMempoolsize(msg);
2186         CmiFree(msg);
2187     };
2188     if(status == GNI_RC_ERROR_RESOURCE)
2189     {
2190         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2191     }
2192 }
2193
2194
2195 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2196 {
2197     gni_cq_entry_t          ev;
2198     gni_return_t            status;
2199     uint64_t                type, inst_id;
2200     gni_post_descriptor_t   *tmp_pd;
2201     MSG_LIST                *ptr;
2202     CONTROL_MSG             *ack_msg_tmp;
2203     ACK_MSG                 *ack_msg;
2204     uint8_t                 msg_tag;
2205 #if CMK_DIRECT
2206     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2207 #endif
2208     SMSG_QUEUE         *queue = &smsg_queue;
2209
2210     while(1) {
2211         CMI_GNI_LOCK(my_cq_lock) 
2212         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2213         CMI_GNI_UNLOCK(my_cq_lock)
2214         if(status != GNI_RC_SUCCESS) break;
2215         
2216         type = GNI_CQ_GET_TYPE(ev);
2217         if (type == GNI_CQ_EVENT_TYPE_POST)
2218         {
2219             inst_id     = GNI_CQ_GET_INST_ID(ev);
2220 #if PRINT_SYH
2221             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2222 #endif
2223             CMI_GNI_LOCK(my_cq_lock)
2224             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2225             CMI_GNI_UNLOCK(my_cq_lock)
2226
2227             switch (tmp_pd->type) {
2228 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2229             case GNI_POST_RDMA_PUT:
2230 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2231                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2232 #endif
2233             case GNI_POST_FMA_PUT:
2234                 if(tmp_pd->amo_cmd == 1) {
2235                     //sender ACK to receiver to trigger it is done
2236                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2237                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2238                     msg_tag = DIRECT_PUT_DONE_TAG;
2239                 }
2240                 else {
2241                     CmiFree((void *)tmp_pd->local_addr);
2242                     MallocControlMsg(ack_msg_tmp);
2243                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2244                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2245                     msg_tag = PUT_DONE_TAG;
2246                 }
2247                 break;
2248 #endif
2249             case GNI_POST_RDMA_GET:
2250             case GNI_POST_FMA_GET:  {
2251 #if  ! USE_LRTS_MEMPOOL
2252                 MallocControlMsg(ack_msg_tmp);
2253                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2254                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2255                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2256                 msg_tag = ACK_TAG;  
2257 #else
2258                 int seq_id = tmp_pd->cqwrite_value;
2259                 if(seq_id > 0)      // BIG_MSG
2260                 {
2261                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2262                     MallocControlMsg(ack_msg_tmp);
2263                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2264                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2265                     ack_msg_tmp->seq_id = seq_id;
2266                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2267                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2268                     ack_msg_tmp->length = tmp_pd->length;
2269                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2270                     msg_tag = BIG_MSG_TAG; 
2271                 } 
2272                 else
2273                 {
2274                     msg_tag = ACK_TAG; 
2275 #if         !CQWRITE
2276                     MallocAckMsg(ack_msg);
2277                     ack_msg->source_addr = tmp_pd->remote_addr;
2278 #endif
2279                 }
2280 #endif
2281                 break;
2282             }
2283             case  GNI_POST_CQWRITE:
2284                    FreePostDesc(tmp_pd);
2285                    continue;
2286             default:
2287                 CmiPrintf("type=%d\n", tmp_pd->type);
2288                 CmiAbort("PumpLocalTransactions: unknown type!");
2289             }      /* end of switch */
2290
2291 #if CMK_DIRECT
2292             if (tmp_pd->amo_cmd == 1) {
2293                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
2294                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2295             }
2296             else
2297 #endif
2298             if (msg_tag == ACK_TAG) {
2299 #if         !CQWRITE
2300 #if ! PIGGYBACK_ACK
2301                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
2302                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2303 #else
2304                 buffer_small_msgs(&smsg_ack_queue, ack_msg, ACK_MSG_SIZE, inst_id, msg_tag);
2305 #endif
2306 #else
2307                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2308 #endif
2309             }
2310             else {
2311                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0); 
2312                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2313             }
2314 #if CMK_PERSISTENT_COMM
2315             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2316 #endif
2317             {
2318                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2319 #if PRINT_SYH
2320                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2321 #endif
2322                     START_EVENT();
2323                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2324                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2325 #if MACHINE_DEBUG_LOG
2326                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2327                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2328                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2329 #endif
2330                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2331                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2332                 }else if(msg_tag == BIG_MSG_TAG){
2333                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2334                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2335                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2336                         START_EVENT();
2337 #if PRINT_SYH
2338                         printf("Pipeline msg done [%d]\n", myrank);
2339 #endif
2340                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2341                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2342                     }
2343                 }
2344             }
2345             FreePostDesc(tmp_pd);
2346         }
2347     } //end while
2348     if(status == GNI_RC_ERROR_RESOURCE)
2349     {
2350         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2351         GNI_RC_CHECK("Smsg_tx_cq full", status);
2352     }
2353 }
2354
2355 static void  SendRdmaMsg()
2356 {
2357     gni_return_t            status = GNI_RC_SUCCESS;
2358     gni_mem_handle_t        msg_mem_hndl;
2359     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2360     RDMA_REQUEST            *pre = 0;
2361     uint64_t                register_size = 0;
2362     void                    *msg;
2363     int                     i;
2364 #if CMK_SMP
2365     int len = PCQueueLength(sendRdmaBuf);
2366     for (i=0; i<len; i++)
2367     {
2368         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2369         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2370         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2371         if (ptr == NULL) break;
2372 #else
2373     ptr = sendRdmaBuf;
2374     while (ptr!=0)
2375     {
2376 #endif 
2377         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2378         gni_post_descriptor_t *pd = ptr->pd;
2379         status = GNI_RC_SUCCESS;
2380         
2381         if(pd->cqwrite_value == 0)
2382         {
2383             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2384             {
2385                 msg = (void*)(pd->local_addr);
2386                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
2387                 if(status == GNI_RC_SUCCESS)
2388                 {
2389                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2390                 }
2391             }else
2392             {
2393                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2394             }
2395             if(NoMsgInRecv( (void*)(pd->local_addr)))
2396                 register_size = GetMempoolsize((void*)(pd->local_addr));
2397             else
2398                 register_size = 0;
2399         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2400         {
2401             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
2402         }
2403         if(status == GNI_RC_SUCCESS)        //mem register good
2404         {
2405             if(pd->type == GNI_POST_RDMA_GET) 
2406             {
2407                 CMI_GNI_LOCK(rdma_tx_cq_lock)
2408                     status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2409                 CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2410             }
2411             else
2412             {
2413                 CMI_GNI_LOCK(default_tx_cq_lock)
2414                     status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2415                 CMI_GNI_UNLOCK(default_tx_cq_lock)
2416             }
2417             
2418             if(status == GNI_RC_SUCCESS)    //post good
2419             {
2420 #if !CMK_SMP
2421                 tmp_ptr = ptr;
2422                 if(pre != 0) {
2423                     pre->next = ptr->next;
2424                 }
2425                 else {
2426                     sendRdmaBuf = ptr->next;
2427                 }
2428                 ptr = ptr->next;
2429                 FreeRdmaRequest(tmp_ptr);
2430 #endif
2431                 if(pd->cqwrite_value == 0)
2432                 {
2433                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2434                 }
2435 #if MACHINE_DEBUG_LOG
2436                 buffered_recv_msg += register_size;
2437                 MACHSTATE(8, "GO request from buffered\n"); 
2438 #endif
2439             }else           // cannot post
2440             {
2441 #if CMK_SMP
2442                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2443 #else
2444                 pre = ptr;
2445                 ptr = ptr->next;
2446 #endif
2447                 break;
2448             }
2449         } else          //memory registration fails
2450         {
2451 #if CMK_SMP
2452             PCQueuePush(sendRdmaBuf, (char*)ptr);
2453 #else
2454             pre = ptr;
2455             ptr = ptr->next;
2456 #endif
2457         }
2458     } //end while
2459 #if ! CMK_SMP
2460     if(ptr == 0)
2461         sendRdmaTail = pre;
2462 #endif
2463 }
2464
2465 // return 1 if all messages are sent
2466 static int SendBufferMsg(SMSG_QUEUE *queue)
2467 {
2468     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2469     CONTROL_MSG         *control_msg_tmp;
2470     gni_return_t        status;
2471     int                 done = 1;
2472     uint64_t            register_size;
2473     void                *register_addr;
2474     int                 index_previous = -1;
2475 #if CMI_EXERT_SEND_CAP
2476     int                 sent_cnt = 0;
2477 #endif
2478
2479 #if CMK_SMP
2480     int          index = 0;
2481 #if ONE_SEND_QUEUE
2482     memset(destpe_avail, 0, mysize * sizeof(char));
2483     for (index=0; index<1; index++)
2484     {
2485         int i, len = PCQueueLength(queue->sendMsgBuf);
2486         for (i=0; i<len; i++) 
2487         {
2488             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2489             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2490             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2491             if(ptr == NULL) break;
2492             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2493                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2494                 continue;
2495             }
2496 #else
2497 #if SMP_LOCKS
2498     int nonempty = PCQueueLength(nonEmptyQueues);
2499     for(index =0; index<nonempty; index++)
2500     {
2501         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2502         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2503         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
2504         if(current_list == NULL) break; 
2505         PCQueue current_queue= current_list-> sendSmsgBuf;
2506         CmiLock(current_list->lock);
2507         int i, len = PCQueueLength(current_queue);
2508         current_list->pushed = 0;
2509         CmiUnlock(current_list->lock);
2510 #else
2511     for(index =0; index<mysize; index++)
2512     {
2513         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2514         int i, len = PCQueueLength(current_queue);
2515 #endif
2516         for (i=0; i<len; i++) 
2517         {
2518             CMI_PCQUEUEPOP_LOCK(current_queue)
2519             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2520             CMI_PCQUEUEPOP_UNLOCK(current_queue)
2521             if (ptr == 0) break;
2522 #endif
2523 #else
2524     int index = queue->smsg_head_index;
2525     while(index != -1)
2526     {
2527         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2528         pre = 0;
2529         while(ptr != 0)
2530         {
2531 #endif
2532             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2533             status = GNI_RC_ERROR_RESOURCE;
2534             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2535                 /* connection not exists yet */
2536             }
2537             else
2538             switch(ptr->tag)
2539             {
2540             case SMALL_DATA_TAG:
2541                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2542                 if(status == GNI_RC_SUCCESS)
2543                 {
2544                     CmiFree(ptr->msg);
2545                 }
2546                 break;
2547             case LMSG_INIT_TAG:
2548                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2549                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2550                 break;
2551             case ACK_TAG:
2552                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2553                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2554                 break;
2555             case BIG_MSG_TAG:
2556                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2557                 if(status == GNI_RC_SUCCESS)
2558                 {
2559                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2560                 }
2561                 break;
2562 #if CMK_DIRECT
2563             case DIRECT_PUT_DONE_TAG:
2564                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2565                 if(status == GNI_RC_SUCCESS)
2566                 {
2567                     free((CMK_DIRECT_HEADER*)ptr->msg);
2568                 }
2569                 break;
2570
2571 #endif
2572             default:
2573                 printf("Weird tag\n");
2574                 CmiAbort("should not happen\n");
2575             }       // end switch
2576             if(status == GNI_RC_SUCCESS)
2577             {
2578 #if !CMK_SMP
2579                 tmp_ptr = ptr;
2580                 if(pre)
2581                 {
2582                     ptr = pre ->next = ptr->next;
2583                 }else
2584                 {
2585                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2586                 }
2587                 FreeMsgList(tmp_ptr);
2588 #else
2589                 FreeMsgList(ptr);
2590 #endif
2591 #if PRINT_SYH
2592                 buffered_smsg_counter--;
2593                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2594 #endif
2595 #if CMI_EXERT_SEND_CAP
2596                 sent_cnt++;
2597                 if(sent_cnt == SEND_CAP)
2598                     break;
2599 #endif
2600             }else {
2601 #if CMK_SMP
2602 #if ONE_SEND_QUEUE
2603                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2604 #else
2605                 PCQueuePush(current_queue, (char*)ptr);
2606 #endif
2607 #else
2608                 pre = ptr;
2609                 ptr=ptr->next;
2610 #endif
2611                 done = 0;
2612                 if(status == GNI_RC_ERROR_RESOURCE)
2613                 {
2614 #if CMK_SMP && ONE_SEND_QUEUE 
2615                     destpe_avail[ptr->destNode] = 1;
2616 #else
2617                     break;
2618 #endif
2619                 }
2620             } 
2621         } //end while
2622 #if !CMK_SMP
2623         if(ptr == 0)
2624             queue->smsg_msglist_index[index].tail = pre;
2625         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2626         {
2627             if(index_previous != -1)
2628                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2629             else
2630                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2631         }
2632         else {
2633             index_previous = index;
2634         }
2635         index = queue->smsg_msglist_index[index].next;
2636 #else
2637 #if !ONE_SEND_QUEUE && SMP_LOCKS
2638         CmiLock(current_list->lock);
2639         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2640         {
2641             current_list->pushed = 1;
2642             PCQueuePush(nonEmptyQueues, current_list);
2643         }
2644         CmiUnlock(current_list->lock); 
2645 #endif
2646 #endif
2647
2648 #if CMI_EXERT_SEND_CAP
2649         if(sent_cnt == SEND_CAP)
2650                 break;
2651 #endif
2652     }   // end pooling for all cores
2653     return done;
2654 }
2655
2656 static void ProcessDeadlock();
2657 void LrtsAdvanceCommunication(int whileidle)
2658 {
2659     static int count = 0;
2660     /*  Receive Msg first */
2661 #if CMK_SMP_TRACE_COMMTHREAD
2662     double startT, endT;
2663 #endif
2664     if (useDynamicSMSG && whileidle)
2665     {
2666 #if CMK_SMP_TRACE_COMMTHREAD
2667         startT = CmiWallTimer();
2668 #endif
2669         PumpDatagramConnection();
2670 #if CMK_SMP_TRACE_COMMTHREAD
2671         endT = CmiWallTimer();
2672         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
2673 #endif
2674     }
2675
2676 #if CMK_SMP_TRACE_COMMTHREAD
2677     startT = CmiWallTimer();
2678 #endif
2679     PumpNetworkSmsg();
2680     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2681 #if CMK_SMP_TRACE_COMMTHREAD
2682     endT = CmiWallTimer();
2683     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
2684 #endif
2685
2686 #if CMK_SMP_TRACE_COMMTHREAD
2687     startT = CmiWallTimer();
2688 #endif
2689     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2690     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
2691 #if CMK_SMP_TRACE_COMMTHREAD
2692     endT = CmiWallTimer();
2693     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
2694 #endif
2695
2696 #if CMK_SMP_TRACE_COMMTHREAD
2697     startT = CmiWallTimer();
2698 #endif
2699     PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock);
2700
2701 #if CQWRITE
2702     PumpCqWriteTransactions();
2703 #endif
2704     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
2705 #if CMK_SMP_TRACE_COMMTHREAD
2706     endT = CmiWallTimer();
2707     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
2708 #endif
2709  
2710     /* Send buffered Message */
2711 #if CMK_SMP_TRACE_COMMTHREAD
2712     startT = CmiWallTimer();
2713 #endif
2714 #if CMK_USE_OOB
2715     if (SendBufferMsg(&smsg_oob_queue) == 1)
2716 #endif
2717     {
2718 #if PIGGYBACK_ACK
2719     //if (count%10 == 0) SendBufferMsg(&smsg_ack_queue);
2720     if (SendBufferMsg(&smsg_queue) == 1 || count++ % 10 == 0) {
2721         STATS_ACK_TIME(SendBufferMsg(&smsg_ack_queue));
2722     }
2723 #else
2724     SendBufferMsg(&smsg_queue);
2725 #endif
2726     }
2727     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2728 #if CMK_SMP_TRACE_COMMTHREAD
2729     endT = CmiWallTimer();
2730     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
2731 #endif
2732
2733 #if CMK_SMP_TRACE_COMMTHREAD
2734     startT = CmiWallTimer();
2735 #endif
2736     SendRdmaMsg();
2737     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2738 #if CMK_SMP_TRACE_COMMTHREAD
2739     endT = CmiWallTimer();
2740     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
2741 #endif
2742
2743 //#if CMK_SMP
2744 //    if (_detected_hang)  ProcessDeadlock();
2745 //#endif
2746 }
2747
2748 /* useDynamicSMSG */
2749 static void _init_dynamic_smsg()
2750 {
2751     gni_return_t status;
2752     uint32_t     vmdh_index = -1;
2753     int i;
2754
2755     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2756     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2757     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2758     for(i=0; i<mysize; i++) {
2759         smsg_connected_flag[i] = 0;
2760         smsg_attr_vector_local[i] = NULL;
2761         smsg_attr_vector_remote[i] = NULL;
2762     }
2763     if(mysize <=512)
2764     {
2765         SMSG_MAX_MSG = 4096;
2766     }else if (mysize <= 4096)
2767     {
2768         SMSG_MAX_MSG = 4096/mysize * 1024;
2769     }else if (mysize <= 16384)
2770     {
2771         SMSG_MAX_MSG = 512;
2772     }else {
2773         SMSG_MAX_MSG = 256;
2774     }
2775
2776     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2777     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2778     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2779     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2780     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2781
2782     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2783     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2784     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2785     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2786     mailbox_list->offset = 0;
2787     mailbox_list->next = 0;
2788     
2789     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2790         mailbox_list->size, smsg_rx_cqh,
2791         GNI_MEM_READWRITE,   
2792         vmdh_index,
2793         &(mailbox_list->mem_hndl));
2794     GNI_RC_CHECK("MEMORY registration for smsg", status);
2795
2796     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
2797     GNI_RC_CHECK("Unbound EP", status);
2798     
2799     alloc_smsg_attr(&send_smsg_attr);
2800
2801     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2802     GNI_RC_CHECK("post unbound datagram", status);
2803
2804       /* always pre-connect to proc 0 */
2805     //if (myrank != 0) connect_to(0);
2806 }
2807
2808 static void _init_static_smsg()
2809 {
2810     gni_smsg_attr_t      *smsg_attr;
2811     gni_smsg_attr_t      remote_smsg_attr;
2812     gni_smsg_attr_t      *smsg_attr_vec;
2813     gni_mem_handle_t     my_smsg_mdh_mailbox;
2814     int      ret, i;
2815     gni_return_t status;
2816     uint32_t              vmdh_index = -1;
2817     mdh_addr_t            base_infor;
2818     mdh_addr_t            *base_addr_vec;
2819     char *env;
2820
2821     if(mysize <=512)
2822     {
2823         SMSG_MAX_MSG = 1024;
2824     }else if (mysize <= 4096)
2825     {
2826         SMSG_MAX_MSG = 1024;
2827     }else if (mysize <= 16384)
2828     {
2829         SMSG_MAX_MSG = 512;
2830     }else {
2831         SMSG_MAX_MSG = 256;
2832     }
2833     
2834     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2835     if (env) SMSG_MAX_MSG = atoi(env);
2836     CmiAssert(SMSG_MAX_MSG > 0);
2837
2838     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2839     
2840     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2841     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2842     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2843     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2844     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2845     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2846     CmiAssert(ret == 0);
2847     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2848     
2849     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2850             smsg_memlen*(mysize), smsg_rx_cqh,
2851             GNI_MEM_READWRITE,   
2852             vmdh_index,
2853             &my_smsg_mdh_mailbox);
2854     register_memory_size += smsg_memlen*(mysize);
2855     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2856
2857     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2858     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2859
2860     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2861     base_infor.mdh =  my_smsg_mdh_mailbox;
2862     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2863
2864     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2865  
2866     for(i=0; i<mysize; i++)
2867     {
2868         if(i==myrank)
2869             continue;
2870         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2871         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2872         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2873         smsg_attr[i].mbox_offset = i*smsg_memlen;
2874         smsg_attr[i].buff_size = smsg_memlen;
2875         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2876         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2877     }
2878
2879     for(i=0; i<mysize; i++)
2880     {
2881         if (myrank == i) continue;
2882
2883         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2884         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2885         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2886         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2887         remote_smsg_attr.buff_size = smsg_memlen;
2888         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2889         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2890
2891         /* initialize the smsg channel */
2892         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2893         GNI_RC_CHECK("SMSG Init", status);
2894     } //end initialization
2895
2896     free(base_addr_vec);
2897     free(smsg_attr);
2898
2899     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2900     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2901
2902
2903 inline
2904 static void _init_send_queue(SMSG_QUEUE *queue)
2905 {
2906      int i;
2907 #if ONE_SEND_QUEUE
2908      queue->sendMsgBuf = PCQueueCreate();
2909      destpe_avail = (char*)malloc(mysize * sizeof(char));
2910 #else
2911      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2912 #if CMK_SMP && SMP_LOCKS
2913      nonEmptyQueues = PCQueueCreate();
2914 #endif
2915      for(i =0; i<mysize; i++)
2916      {
2917 #if CMK_SMP
2918          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2919 #if SMP_LOCKS
2920          queue->smsg_msglist_index[i].pushed = 0;
2921          queue->smsg_msglist_index[i].lock = CmiCreateLock();
2922 #endif
2923 #else
2924          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2925          queue->smsg_msglist_index[i].next = -1;
2926          queue->smsg_head_index = -1;
2927 #endif
2928         
2929      }
2930 #endif
2931 }
2932
2933 inline
2934 static void _init_smsg()
2935 {
2936     if(mysize > 1) {
2937         if (useDynamicSMSG)
2938             _init_dynamic_smsg();
2939         else
2940             _init_static_smsg();
2941     }
2942
2943     _init_send_queue(&smsg_queue);
2944 #if PIGGYBACK_ACK
2945     _init_send_queue(&smsg_ack_queue);
2946 #endif
2947 #if CMK_USE_OOB
2948     _init_send_queue(&smsg_oob_queue);
2949 #endif
2950 }
2951
2952 static void _init_static_msgq()
2953 {
2954     gni_return_t status;
2955     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2956     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2957     msgq_attrs.smsg_q_sz = 1;
2958     msgq_attrs.rcv_pool_sz = 1;
2959     msgq_attrs.num_msgq_eps = 2;
2960     msgq_attrs.nloc_insts = 8;
2961     msgq_attrs.modes = 0;
2962     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2963
2964     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2965     GNI_RC_CHECK("MSGQ Init", status);
2966
2967
2968 }
2969
2970 #if CMK_SMP && STEAL_MEMPOOL
2971 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2972 {
2973     void *pool = NULL;
2974     int i, k;
2975     // check other ranks
2976     for (k=0; k<CmiMyNodeSize()+1; k++) {
2977         i = (CmiMyRank()+k)%CmiMyNodeSize();
2978         if (i==CmiMyRank()) continue;
2979         mempool_type *mptr = CpvAccessOther(mempool, i);
2980         CmiLock(mptr->mempoolLock);
2981         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2982         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2983             CmiUnlock(mptr->mempoolLock);
2984             continue;
2985         }
2986         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2987         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2988             /* search in the free list */
2989           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2990           mempool_header *current = free_header;
2991           while (current) {
2992             if (current->next_free == (char*)header-(char*)mptr) break;
2993             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2994           }
2995           if (current == NULL) {         /* not found in free list */
2996             CmiUnlock(mptr->mempoolLock);
2997             continue;
2998           }
2999 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
3000             /* search the previous memblock, and remove the tail */
3001           mempool_block *ptr = (mempool_block *)mptr;
3002           while (ptr) {
3003             if (ptr->memblock_next == mptr->memblock_tail) break;
3004             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
3005           }
3006           CmiAssert(ptr!=NULL);
3007           ptr->memblock_next = 0;
3008           mptr->memblock_tail = (char*)ptr - (char*)mptr;
3009
3010             /* remove memblock from the free list */
3011           current->next_free = header->next_free;
3012           if (header == free_header) mptr->freelist_head = header->next_free;
3013
3014           CmiUnlock(mptr->mempoolLock);
3015
3016           pool = (void*)tail;
3017           *mem_hndl = tail->mem_hndl;
3018           *size = tail->size;
3019           return pool;
3020         }
3021         CmiUnlock(mptr->mempoolLock);
3022     }
3023
3024       /* steal failed, deregister and free memblock now */
3025     int freed = 0;
3026     for (k=0; k<CmiMyNodeSize()+1; k++) {
3027         i = (CmiMyRank()+k)%CmiMyNodeSize();
3028         mempool_type *mptr = CpvAccessOther(mempool, i);
3029         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
3030
3031         mempool_block *mempools_head = &(mptr->mempools_head);
3032         mempool_block *current = mempools_head;
3033         mempool_block *prev = NULL;
3034
3035         while (current) {
3036           int isfree = 0;
3037           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
3038 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
3039           mempool_header *cur = free_header;
3040           mempool_header *header;
3041           if (current != mempools_head) {
3042             header = (mempool_header*)((char*)current + sizeof(mempool_block));
3043              /* search in free list */
3044             if (header->size == current->size - sizeof(mempool_block)) {
3045               cur = free_header;
3046               while (cur) {
3047                 if (cur->next_free == (char*)header-(char*)mptr) break;
3048                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
3049               }
3050               if (cur != NULL) isfree = 1;
3051             }
3052           }
3053           if (isfree) {
3054               /* remove from free list */
3055             cur->next_free = header->next_free;
3056             if (header == free_header) mptr->freelist_head = header->next_free;
3057              // deregister
3058             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
3059             GNI_RC_CHECK("steal Mempool de-register", status);
3060             mempool_block *ptr = current;
3061             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
3062             prev->memblock_next = current?(char*)current - (char*)mptr:0;
3063 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
3064             freed += ptr->size;
3065             free(ptr);
3066              // try now
3067             if (freed > *size) {
3068               if (pool == NULL) {
3069                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
3070                 CmiAssert(ret == 0);
3071               }
3072               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
3073               if (status == GNI_RC_SUCCESS) {
3074                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
3075 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
3076                 return pool;
3077               }
3078 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
3079             }
3080           }
3081           else {
3082              prev = current;
3083              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
3084           }
3085         }
3086
3087         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
3088     }
3089       /* still no luck registering pool */
3090     if (pool) free(pool);
3091     return NULL;
3092 }
3093 #endif
3094
3095 static CmiUInt8 total_mempool_size = 0;
3096 static CmiUInt8 total_mempool_calls = 0;
3097
3098 #if USE_LRTS_MEMPOOL
3099 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3100 {
3101     void *pool;
3102     int ret;
3103     gni_return_t status = GNI_RC_SUCCESS;
3104
3105     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3106     if (*size < default_size) *size = default_size;
3107 #if LARGEPAGE
3108     // round up to be multiple of _tlbpagesize
3109     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3110     *size = ALIGNHUGEPAGE(*size);
3111 #endif
3112     total_mempool_size += *size;
3113     total_mempool_calls += 1;
3114 #if   !LARGEPAGE
3115     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3116     {
3117         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3118         CmiAbort("alloc_mempool_block");
3119     }
3120 #endif
3121 #if LARGEPAGE
3122     pool = my_get_huge_pages(*size);
3123     ret = pool==NULL;
3124 #else
3125     ret = posix_memalign(&pool, ALIGNBUF, *size);
3126 #endif
3127     if (ret != 0) {
3128 #if CMK_SMP && STEAL_MEMPOOL
3129       pool = steal_mempool_block(size, mem_hndl);
3130       if (pool != NULL) return pool;
3131 #endif
3132       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3133       if (ret == ENOMEM)
3134         CmiAbort("alloc_mempool_block: out of memory.");
3135       else
3136         CmiAbort("alloc_mempool_block: posix_memalign failed");
3137     }
3138 #if LARGEPAGE
3139     CmiMemLock();
3140     register_count++;
3141     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, status);
3142     CmiMemUnlock();
3143     if(status != GNI_RC_SUCCESS) {
3144         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3145 sweep_mempool(CpvAccess(mempool));
3146     }
3147     GNI_RC_CHECK("MEMORY_REGISTER", status);
3148 #else
3149     SetMemHndlZero((*mem_hndl));
3150 #endif
3151     return pool;
3152 }
3153
3154 // ptr is a block head pointer
3155 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3156 {
3157     if(!(IsMemHndlZero(mem_hndl)))
3158     {
3159         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3160     }
3161 #if LARGEPAGE
3162     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3163 #else
3164     free(ptr);
3165 #endif
3166 }
3167 #endif
3168
3169 void LrtsPreCommonInit(int everReturn){
3170 #if USE_LRTS_MEMPOOL
3171     CpvInitialize(mempool_type*, mempool);
3172     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3173     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3174 #endif
3175 }
3176
3177 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3178 {
3179     register int            i;
3180     int                     rc;
3181     int                     device_id = 0;
3182     unsigned int            remote_addr;
3183     gni_cdm_handle_t        cdm_hndl;
3184     gni_return_t            status = GNI_RC_SUCCESS;
3185     uint32_t                vmdh_index = -1;
3186     uint8_t                 ptag;
3187     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3188     int                     first_spawned;
3189     int                     physicalID;
3190     char                   *env;
3191
3192     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3193     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3194     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3195    
3196     status = PMI_Init(&first_spawned);
3197     GNI_RC_CHECK("PMI_Init", status);
3198
3199     status = PMI_Get_size(&mysize);
3200     GNI_RC_CHECK("PMI_Getsize", status);
3201
3202     status = PMI_Get_rank(&myrank);
3203     GNI_RC_CHECK("PMI_getrank", status);
3204
3205     //physicalID = CmiPhysicalNodeID(myrank);
3206     
3207     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3208
3209     *myNodeID = myrank;
3210     *numNodes = mysize;
3211   
3212 #if MULTI_THREAD_SEND
3213     /* Currently, we only consider the case that comm. thread will only recv msgs */
3214     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3215 #endif
3216
3217     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3218     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3219     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3220
3221     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3222     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3223     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3224
3225     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3226     if (env) useDynamicSMSG = 1;
3227     if (!useDynamicSMSG)
3228       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3229     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3230     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3231     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3232     
3233     if(myrank == 0)
3234     {
3235         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3236         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3237     }
3238 #ifdef USE_ONESIDED
3239     onesided_init(NULL, &onesided_hnd);
3240
3241     // this is a GNI test, so use the libonesided bypass functionality
3242     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3243     local_addr = gniGetNicAddress();
3244 #else
3245     ptag = get_ptag();
3246     cookie = get_cookie();
3247 #if 0
3248     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3249 #endif
3250     //Create and attach to the communication  domain */
3251     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3252     GNI_RC_CHECK("GNI_CdmCreate", status);
3253     //* device id The device id is the minor number for the device
3254     //that is assigned to the device by the system when the device is created.
3255     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3256     //where X is the device number 0 default 
3257     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3258     GNI_RC_CHECK("GNI_CdmAttach", status);
3259     local_addr = get_gni_nic_address(0);
3260 #endif
3261     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3262     _MEMCHECK(MPID_UGNI_AllAddr);
3263     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3264     /* create the local completion queue */
3265     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3266     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3267     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3268     
3269     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3270     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3271     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3272
3273     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3274     GNI_RC_CHECK("Create CQ (rx)", status);
3275     
3276     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3277     GNI_RC_CHECK("Create Post CQ (rx)", status);
3278     
3279     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3280     //GNI_RC_CHECK("Create BTE CQ", status);
3281
3282     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3283     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3284     _MEMCHECK(ep_hndl_array);
3285 #if MULTI_THREAD_SEND 
3286     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3287     //default_tx_cq_lock = CmiCreateLock();
3288     rdma_tx_cq_lock = CmiCreateLock();
3289     smsg_rx_cq_lock = CmiCreateLock();
3290     //global_gni_lock  = CmiCreateLock();
3291     //rx_cq_lock  = CmiCreateLock();
3292 #endif
3293     for (i=0; i<mysize; i++) {
3294         if(i == myrank) continue;
3295         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3296         GNI_RC_CHECK("GNI_EpCreate ", status);   
3297         remote_addr = MPID_UGNI_AllAddr[i];
3298         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3299         GNI_RC_CHECK("GNI_EpBind ", status);   
3300     }
3301
3302     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3303     _init_smsg();
3304     PMI_Barrier();
3305
3306 #if     USE_LRTS_MEMPOOL
3307     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3308     if (env) {
3309         _totalmem = CmiReadSize(env);
3310         if (myrank == 0)
3311             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3312     }
3313
3314     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3315     if (env) _mempool_size = CmiReadSize(env);
3316     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3317         _mempool_size = CmiReadSize(env);
3318
3319
3320     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3321     if (env) {
3322         MAX_REG_MEM = CmiReadSize(env);
3323         user_set_flag = 1;
3324     }
3325     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3326         MAX_REG_MEM = CmiReadSize(env);
3327         user_set_flag = 1;
3328     }
3329
3330     env = getenv("CHARM_UGNI_SEND_MAX");
3331     if (env) {
3332         MAX_BUFF_SEND = CmiReadSize(env);
3333         user_set_flag = 1;
3334     }
3335     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3336         MAX_BUFF_SEND = CmiReadSize(env);
3337         user_set_flag = 1;
3338     }
3339
3340     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3341     if (env) {
3342         _mempool_size_limit = CmiReadSize(env);
3343     }
3344
3345     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3346     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3347
3348     if (myrank==0) {
3349         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3350         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3351         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3352             /* memblock can expand to BIG_MSG * 2 size */
3353             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3354             CmiAbort("mempool maximum size is too small. \n");
3355         }
3356 #if MULTI_THREAD_SEND
3357         printf("Charm++> worker thread sending messages\n");
3358 #elif COMM_THREAD_SEND
3359         printf("Charm++> only comm thread send/recv messages\n");
3360 #endif
3361     }
3362
3363 #endif     /* end of USE_LRTS_MEMPOOL */
3364
3365     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3366     if (env) {
3367         BIG_MSG = CmiReadSize(env);
3368         if (BIG_MSG < ONE_SEG)
3369           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3370     }
3371     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3372     if (env) {
3373         BIG_MSG_PIPELINE = atoi(env);
3374     }
3375
3376     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3377     if (env) _checkProgress = 0;
3378     if (mysize == 1) _checkProgress = 0;
3379
3380     
3381     /*
3382     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3383     if (env) 
3384         _tlbpagesize = CmiReadSize(env);
3385     */
3386     /* real gethugepagesize() is only available when hugetlb module linked */
3387     _tlbpagesize = gethugepagesize();
3388     if (myrank == 0) {
3389         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3390     }
3391
3392 #if LARGEPAGE
3393     if (_tlbpagesize == 4096) {
3394         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3395     }
3396 #endif
3397
3398     print_stats = CmiGetArgFlag(*argv, "+print_stats");
3399
3400     /* init DMA buffer for medium message */
3401
3402     //_init_DMA_buffer();
3403     
3404     free(MPID_UGNI_AllAddr);
3405 #if CMK_SMP
3406     sendRdmaBuf = PCQueueCreate();
3407 #else
3408     sendRdmaBuf = 0;
3409 #endif
3410 #if MACHINE_DEBUG_LOG
3411     char ln[200];
3412     sprintf(ln,"debugLog.%d",myrank);
3413     debugLog=fopen(ln,"w");
3414 #endif
3415
3416 //    NTK_Init();
3417 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3418
3419 #if CMK_WITH_STATS
3420     init_comm_stats();
3421 #endif
3422 }
3423
3424 void* LrtsAlloc(int n_bytes, int header)
3425 {
3426     void *ptr = NULL;
3427 #if 0
3428     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3429 #endif
3430     if(n_bytes <= SMSG_MAX_MSG)
3431     {
3432         int totalsize = n_bytes+header;
3433         ptr = malloc(totalsize);
3434     }
3435     else {
3436         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3437 #if     USE_LRTS_MEMPOOL
3438         n_bytes = ALIGN64(n_bytes);
3439         if(n_bytes < BIG_MSG)
3440         {
3441             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3442             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3443         }else 
3444         {
3445 #if LARGEPAGE
3446             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3447             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3448             char *res = my_get_huge_pages(n_bytes);
3449 #else
3450             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3451 #endif
3452             if (res) ptr = res + ALIGNBUF - header;
3453         }
3454 #else
3455         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3456         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3457         ptr = res + ALIGNBUF - header;
3458 #endif
3459     }
3460     return ptr;
3461 }
3462
3463 void  LrtsFree(void *msg)
3464 {
3465     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3466     if (size <= SMSG_MAX_MSG)
3467         free(msg);
3468     else {
3469         size = ALIGN64(size);
3470