a new mode that both worker and comm thread send and receive messages.
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <gni_pub.h>
38 #include <pmi.h>
39
40 //#include <numatoolkit.h>
41
42 #include "converse.h"
43
44 #define     LARGEPAGE             0
45
46 #if LARGEPAGE
47 #include <hugetlbfs.h>
48 #endif
49
50 #if CMK_DIRECT
51 #include "cmidirect.h"
52 #endif
53 #define PRINT_SYH  0
54
55 #define USE_LRTS_MEMPOOL                  1
56
57 #define REMOTE_EVENT                      0
58
59 #define CMI_EXERT_SEND_CAP      0
60 #define CMI_EXERT_RECV_CAP      0
61
62 #if CMI_EXERT_SEND_CAP
63 #define SEND_CAP 16
64 #endif
65
66 #if CMI_EXERT_RECV_CAP
67 #define RECV_CAP 2
68 #endif
69
70 #if CMK_SMP
71 #if LARGEPAGE
72 #define MULTI_THREAD_SEND 1
73 #else
74 #define COMM_THREAD_SEND 1
75 #endif
76 #endif
77
78 #if CMK_SMP
79 #define PIGGYBACK_ACK              0
80 #endif
81
82 // Trace communication thread
83 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
84 #define TRACE_THRESHOLD     0.00005
85 #define CMI_MPI_TRACE_MOREDETAILED 0
86 #undef CMI_MPI_TRACE_USEREVENTS
87 #define CMI_MPI_TRACE_USEREVENTS 1
88 #else
89 #undef CMK_SMP_TRACE_COMMTHREAD
90 #define CMK_SMP_TRACE_COMMTHREAD 0
91 #endif
92
93 #define CMK_TRACE_COMMOVERHEAD 0
94 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
95 #undef CMI_MPI_TRACE_USEREVENTS
96 #define CMI_MPI_TRACE_USEREVENTS 1
97 #else
98 #undef CMK_TRACE_COMMOVERHEAD
99 #define CMK_TRACE_COMMOVERHEAD 0
100 #endif
101
102 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
103 CpvStaticDeclare(double, projTraceStart);
104 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
105 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
106 #else
107 #define  START_EVENT()
108 #define  END_EVENT(x)
109 #endif
110
111 #if USE_LRTS_MEMPOOL
112
113 #define oneMB (1024ll*1024)
114 #define oneGB (1024ll*1024*1024)
115
116 static CmiInt8 _mempool_size = 8*oneMB;
117 static CmiInt8 _expand_mem =  4*oneMB;
118 static CmiInt8 _mempool_size_limit = 0;
119
120 static CmiInt8 _totalmem = 0.8*oneGB;
121
122 #if LARGEPAGE
123 static int BIG_MSG  =  16*oneMB;
124 static int ONE_SEG  =  4*oneMB;
125 #else
126 static int BIG_MSG  =  4*oneMB;
127 static int ONE_SEG  =  2*oneMB;
128 #endif
129 #if MULTI_THREAD_SEND
130 static int BIG_MSG_PIPELINE = 1;
131 #else
132 static int BIG_MSG_PIPELINE = 4;
133 #endif
134
135 // dynamic flow control
136 static CmiInt8 buffered_send_msg = 0;
137 static CmiInt8 register_memory_size = 0;
138
139 #if LARGEPAGE
140 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
141 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
142 static CmiInt8  register_count = 0;
143 #else
144 #if CMK_SMP && COMM_THREAD_SEND 
145 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
146 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
147 #else
148 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
149 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
150 #endif
151
152
153 #endif
154
155 #endif     /* end USE_LRTS_MEMPOOL */
156
157 #if MULTI_THREAD_SEND
158 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
159 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
160 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
161 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
162 #else
163 #define     CMI_GNI_LOCK
164 #define     CMI_GNI_UNLOCK
165 #define     CMI_PCQUEUEPOP_LOCK(Q)   
166 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
167 #endif
168
169 static int _tlbpagesize = 4096;
170
171 //static int _smpd_count  = 0;
172
173 static int   user_set_flag  = 0;
174
175 static int _checkProgress = 1;             /* check deadlock */
176 static int _detected_hang = 0;
177
178 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
179
180 // dynamic SMSG
181 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
182
183 static int avg_smsg_connection = 32;
184 static int                 *smsg_connected_flag= 0;
185 static gni_smsg_attr_t     **smsg_attr_vector_local;
186 static gni_smsg_attr_t     **smsg_attr_vector_remote;
187 static gni_ep_handle_t     ep_hndl_unbound;
188 static gni_smsg_attr_t     send_smsg_attr;
189 static gni_smsg_attr_t     recv_smsg_attr;
190
191 typedef struct _dynamic_smsg_mailbox{
192    void     *mailbox_base;
193    int      size;
194    int      offset;
195    gni_mem_handle_t  mem_hndl;
196    struct      _dynamic_smsg_mailbox  *next;
197 }dynamic_smsg_mailbox_t;
198
199 static dynamic_smsg_mailbox_t  *mailbox_list;
200
201 int         rdma_id = 0;
202
203 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
204 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
205
206 #if PRINT_SYH
207 int         lrts_send_msg_id = 0;
208 int         lrts_local_done_msg = 0;
209 int         lrts_send_rdma_success = 0;
210 #endif
211
212 #include "machine.h"
213
214 #include "pcqueue.h"
215
216 #include "mempool.h"
217
218 #if CMK_PERSISTENT_COMM
219 #include "machine-persistent.h"
220 #endif
221
222 //#define  USE_ONESIDED 1
223 #ifdef USE_ONESIDED
224 //onesided implementation is wrong, since no place to restore omdh
225 #include "onesided.h"
226 onesided_hnd_t   onesided_hnd;
227 onesided_md_t    omdh;
228 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
229
230 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
231
232 #else
233 uint8_t   onesided_hnd, omdh;
234 #if REMOTE_EVENT
235 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
236          status = GNI_RC_ERROR_NOMEM;} \
237         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
238                 if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
239 #else
240 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
241     do {   \
242         if (register_memory_size + size >= MAX_REG_MEM) { \
243             status = GNI_RC_ERROR_NOMEM; \
244         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
245             if(status == GNI_RC_SUCCESS) register_memory_size += size; } \
246     } while(0)
247 #endif
248 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
249     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
250              register_memory_size -= size; \
251          else CmiAbort("MEM_DEregister");  \
252     } while (0)
253 #endif
254
255 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
256 #define   IncreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)++
257 #define   DecreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)--
258 #define   IncreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)++
259 #define   DecreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)--
260 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
261 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
262 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
263 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
264 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
265 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
266 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
267 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
268 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
269
270 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
271 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
272
273 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
274 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
275 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
276 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
277
278 #define ALIGNBUF                64
279
280 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
281 /* If SMSG is not used */
282
283 #define FMA_PER_CORE  1024
284 #define FMA_BUFFER_SIZE 1024
285
286 /* If SMSG is used */
287 static int  SMSG_MAX_MSG = 1024;
288 #define SMSG_MAX_CREDIT 72 
289
290 #define MSGQ_MAXSIZE       2048
291 /* large message transfer with FMA or BTE */
292 #define LRTS_GNI_RDMA_THRESHOLD  1024 
293
294 #if CMK_SMP
295 static int  REMOTE_QUEUE_ENTRIES=163840; 
296 static int LOCAL_QUEUE_ENTRIES=163840; 
297 #else
298 static int  REMOTE_QUEUE_ENTRIES=20480;
299 static int LOCAL_QUEUE_ENTRIES=20480; 
300 #endif
301
302 #define BIG_MSG_TAG             0x26
303 #define PUT_DONE_TAG            0x28
304 #define DIRECT_PUT_DONE_TAG     0x29
305 #define ACK_TAG                 0x30
306 /* SMSG is data message */
307 #define SMALL_DATA_TAG          0x31
308 #define SMALL_DATA_ACK_TAG      0x32
309 /* SMSG is a control message to initialize a BTE */
310 #define LMSG_INIT_TAG           0x39 
311 #define LMSG_INIT_ACK_TAG       0x3a 
312
313 #define DEBUG
314 #ifdef GNI_RC_CHECK
315 #undef GNI_RC_CHECK
316 #endif
317 #ifdef DEBUG
318 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
319 #else
320 #define GNI_RC_CHECK(msg,rc)
321 #endif
322
323 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
324 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
325 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
326
327 static int useStaticMSGQ = 0;
328 static int useStaticFMA = 0;
329 static int mysize, myrank;
330 gni_nic_handle_t      nic_hndl;
331
332 typedef struct {
333     gni_mem_handle_t mdh;
334     uint64_t addr;
335 } mdh_addr_t ;
336 // this is related to dynamic SMSG
337
338 typedef struct mdh_addr_list{
339     gni_mem_handle_t mdh;
340    void *addr;
341     struct mdh_addr_list *next;
342 }mdh_addr_list_t;
343
344 static unsigned int         smsg_memlen;
345 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
346 mdh_addr_t          setup_mem;
347 mdh_addr_t          *smsg_connection_vec = 0;
348 gni_mem_handle_t    smsg_connection_memhndl;
349 static int          smsg_expand_slots = 10;
350 static int          smsg_available_slot = 0;
351 static void         *smsg_mailbox_mempool = 0;
352 mdh_addr_list_t     *smsg_dynamic_list = 0;
353
354 static void             *smsg_mailbox_base;
355 gni_msgq_attr_t         msgq_attrs;
356 gni_msgq_handle_t       msgq_handle;
357 gni_msgq_ep_attr_t      msgq_ep_attrs;
358 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
359
360 /* =====Beginning of Declarations of Machine Specific Variables===== */
361 static int cookie;
362 static int modes = 0;
363 static gni_cq_handle_t       smsg_rx_cqh = NULL;
364 static gni_cq_handle_t       smsg_tx_cqh = NULL;
365 static gni_cq_handle_t       post_rx_cqh = NULL;
366 static gni_cq_handle_t       post_tx_cqh = NULL;
367 static gni_ep_handle_t       *ep_hndl_array;
368 #if CMK_SMP && MULTI_THREAD_SEND
369 static CmiNodeLock           *ep_lock_array;
370 static CmiNodeLock           tx_cq_lock; 
371 static CmiNodeLock           rx_cq_lock;
372 static CmiNodeLock           *mempool_lock;
373 #endif
374
375 typedef struct msg_list
376 {
377     uint32_t destNode;
378     uint32_t size;
379     void *msg;
380     uint8_t tag;
381 #if !CMK_SMP
382     struct msg_list *next;
383 #endif
384 }MSG_LIST;
385
386
387 typedef struct control_msg
388 {
389     uint64_t            source_addr;    /* address from the start of buffer  */
390     uint64_t            dest_addr;      /* address from the start of buffer */
391     int                 total_length;   /* total length */
392     int                 length;         /* length of this packet */
393     uint8_t             seq_id;         //big message   0 meaning single message
394     gni_mem_handle_t    source_mem_hndl;
395     struct control_msg *next;
396 } CONTROL_MSG;
397
398 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
399
400 typedef struct ack_msg
401 {
402     uint64_t            source_addr;    /* address from the start of buffer  */
403 #if ! USE_LRTS_MEMPOOL
404     gni_mem_handle_t    source_mem_hndl;
405     int                 length;          /* total length */
406 #endif
407     struct ack_msg     *next;
408 } ACK_MSG;
409
410 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
411
412 #if CMK_DIRECT
413 typedef struct{
414     uint64_t    handler_addr;
415 }CMK_DIRECT_HEADER;
416
417 typedef struct {
418     char core[CmiMsgHeaderSizeBytes];
419     uint64_t handler;
420 }cmidirectMsg;
421
422 //SYH
423 CpvDeclare(int, CmiHandleDirectIdx);
424 void CmiHandleDirectMsg(cmidirectMsg* msg)
425 {
426
427     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
428    (*(_handle->callbackFnPtr))(_handle->callbackData);
429    CmiFree(msg);
430 }
431
432 void CmiDirectInit()
433 {
434     CpvInitialize(int,  CmiHandleDirectIdx);
435     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
436 }
437
438 #endif
439 typedef struct  rmda_msg
440 {
441     int                   destNode;
442     gni_post_descriptor_t *pd;
443 #if !CMK_SMP
444     struct  rmda_msg      *next;
445 #endif
446 }RDMA_REQUEST;
447
448
449 #if CMK_SMP
450 #define SMP_LOCKS               0
451 #define ONE_SEND_QUEUE                  0
452 PCQueue sendRdmaBuf;
453 typedef struct  msg_list_index
454 {
455     PCQueue     sendSmsgBuf;
456     int         pushed;
457     CmiNodeLock   lock;
458 } MSG_LIST_INDEX;
459 char                *destpe_avail;
460 #if  !ONE_SEND_QUEUE && SMP_LOCKS
461     PCQueue     nonEmptyQueues;
462 #endif
463 #else         /* non-smp */
464
465 static RDMA_REQUEST        *sendRdmaBuf = 0;
466 static RDMA_REQUEST        *sendRdmaTail = 0;
467 typedef struct  msg_list_index
468 {
469     int         next;
470     MSG_LIST    *sendSmsgBuf;
471     MSG_LIST    *tail;
472 } MSG_LIST_INDEX;
473
474 #endif
475
476 // buffered send queue
477 #if ! ONE_SEND_QUEUE
478 typedef struct smsg_queue
479 {
480     MSG_LIST_INDEX   *smsg_msglist_index;
481     int               smsg_head_index;
482 } SMSG_QUEUE;
483 #else
484 typedef struct smsg_queue
485 {
486     PCQueue       sendMsgBuf;
487 }  SMSG_QUEUE;
488 #endif
489
490 SMSG_QUEUE                  smsg_queue;
491 #if PIGGYBACK_ACK
492 SMSG_QUEUE                  smsg_ack_queue;
493 #endif
494 #if CMK_USE_OOB
495 SMSG_QUEUE                  smsg_oob_queue;
496 #endif
497
498 #if CMK_SMP
499
500 #define FreeMsgList(d)   free(d);
501 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
502
503 #else
504
505 static MSG_LIST       *msglist_freelist=0;
506
507 #define FreeMsgList(d)  \
508   do { \
509   (d)->next = msglist_freelist;\
510   msglist_freelist = d; \
511   } while (0)
512
513 #define MallocMsgList(d) \
514   do {  \
515   d = msglist_freelist;\
516   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
517              _MEMCHECK(d);\
518   } else msglist_freelist = d->next; \
519   d->next =0;  \
520   } while (0)
521
522 #endif
523
524 #if CMK_SMP
525
526 #define FreeControlMsg(d)      free(d);
527 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
528
529 #else
530
531 static CONTROL_MSG    *control_freelist=0;
532
533 #define FreeControlMsg(d)       \
534   do { \
535   (d)->next = control_freelist;\
536   control_freelist = d; \
537   } while (0);
538
539 #define MallocControlMsg(d) \
540   do {  \
541   d = control_freelist;\
542   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
543              _MEMCHECK(d);\
544   } else control_freelist = d->next;  \
545   } while (0);
546
547 #endif
548
549 #if CMK_SMP
550
551 #define FreeAckMsg(d)      free(d);
552 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
553
554 #else
555
556 static ACK_MSG        *ack_freelist=0;
557
558 #define FreeAckMsg(d)       \
559   do { \
560   (d)->next = ack_freelist;\
561   ack_freelist = d; \
562   } while (0)
563
564 #define MallocAckMsg(d) \
565   do { \
566   d = ack_freelist;\
567   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
568              _MEMCHECK(d);\
569   } else ack_freelist = d->next; \
570   } while (0)
571
572 #endif
573
574
575 # if CMK_SMP
576 #define FreeRdmaRequest(d)       free(d);
577 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
578 #else
579
580 static RDMA_REQUEST         *rdma_freelist = NULL;
581
582 #define FreeRdmaRequest(d)       \
583   do {  \
584   (d)->next = rdma_freelist;\
585   rdma_freelist = d;    \
586   } while (0)
587
588 #define MallocRdmaRequest(d) \
589   do {   \
590   d = rdma_freelist;\
591   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
592              _MEMCHECK(d);\
593   } else rdma_freelist = d->next; \
594   d->next =0;   \
595   } while (0)
596 #endif
597
598 /* reuse gni_post_descriptor_t */
599 static gni_post_descriptor_t *post_freelist=0;
600
601 #if  !CMK_SMP
602 #define FreePostDesc(d)       \
603     (d)->next_descr = post_freelist;\
604     post_freelist = d;
605
606 #define MallocPostDesc(d) \
607   d = post_freelist;\
608   if (d==0) { \
609      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
610      d->next_descr = 0;\
611       _MEMCHECK(d);\
612   } else post_freelist = d->next_descr;
613 #else
614
615 #define FreePostDesc(d)     free(d);
616 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
617
618 #endif
619
620
621 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
622 static int      buffered_smsg_counter = 0;
623
624 /* SmsgSend return success but message sent is not confirmed by remote side */
625 static MSG_LIST *buffered_fma_head = 0;
626 static MSG_LIST *buffered_fma_tail = 0;
627
628 /* functions  */
629 #define IsFree(a,ind)  !( a& (1<<(ind) ))
630 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
631 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
632
633 CpvDeclare(mempool_type*, mempool);
634
635 /* get the upper bound of log 2 */
636 int mylog2(int size)
637 {
638     int op = size;
639     unsigned int ret=0;
640     unsigned int mask = 0;
641     int i;
642     while(op>0)
643     {
644         op = op >> 1;
645         ret++;
646
647     }
648     for(i=1; i<ret; i++)
649     {
650         mask = mask << 1;
651         mask +=1;
652     }
653
654     ret -= ((size &mask) ? 0:1);
655     return ret;
656 }
657
658 static void
659 allgather(void *in,void *out, int len)
660 {
661     static int *ivec_ptr=NULL,already_called=0,job_size=0;
662     int i,rc;
663     int my_rank;
664     char *tmp_buf,*out_ptr;
665
666     if(!already_called) {
667
668         rc = PMI_Get_size(&job_size);
669         CmiAssert(rc == PMI_SUCCESS);
670         rc = PMI_Get_rank(&my_rank);
671         CmiAssert(rc == PMI_SUCCESS);
672
673         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
674         CmiAssert(ivec_ptr != NULL);
675
676         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
677         CmiAssert(rc == PMI_SUCCESS);
678
679         already_called = 1;
680
681     }
682
683     tmp_buf = (char *)malloc(job_size * len);
684     CmiAssert(tmp_buf);
685
686     rc = PMI_Allgather(in,tmp_buf,len);
687     CmiAssert(rc == PMI_SUCCESS);
688
689     out_ptr = out;
690
691     for(i=0;i<job_size;i++) {
692
693         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
694
695     }
696
697     free(tmp_buf);
698 }
699
700 static void
701 allgather_2(void *in,void *out, int len)
702 {
703     //PMI_Allgather is out of order
704     int i,rc, extend_len;
705     int  rank_index;
706     char *out_ptr, *out_ref;
707     char *in2;
708
709     extend_len = sizeof(int) + len;
710     in2 = (char*)malloc(extend_len);
711
712     memcpy(in2, &myrank, sizeof(int));
713     memcpy(in2+sizeof(int), in, len);
714
715     out_ptr = (char*)malloc(mysize*extend_len);
716
717     rc = PMI_Allgather(in2, out_ptr, extend_len);
718     GNI_RC_CHECK("allgather", rc);
719
720     out_ref = out;
721
722     for(i=0;i<mysize;i++) {
723         //rank index 
724         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
725         //copy to the rank index slot
726         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
727     }
728
729     free(out_ptr);
730     free(in2);
731
732 }
733
734 static unsigned int get_gni_nic_address(int device_id)
735 {
736     unsigned int address, cpu_id;
737     gni_return_t status;
738     int i, alps_dev_id=-1,alps_address=-1;
739     char *token, *p_ptr;
740
741     p_ptr = getenv("PMI_GNI_DEV_ID");
742     if (!p_ptr) {
743         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
744        
745         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
746     } else {
747         while ((token = strtok(p_ptr,":")) != NULL) {
748             alps_dev_id = atoi(token);
749             if (alps_dev_id == device_id) {
750                 break;
751             }
752             p_ptr = NULL;
753         }
754         CmiAssert(alps_dev_id != -1);
755         p_ptr = getenv("PMI_GNI_LOC_ADDR");
756         CmiAssert(p_ptr != NULL);
757         i = 0;
758         while ((token = strtok(p_ptr,":")) != NULL) {
759             if (i == alps_dev_id) {
760                 alps_address = atoi(token);
761                 break;
762             }
763             p_ptr = NULL;
764             ++i;
765         }
766         CmiAssert(alps_address != -1);
767         address = alps_address;
768     }
769     return address;
770 }
771
772 static uint8_t get_ptag(void)
773 {
774     char *p_ptr, *token;
775     uint8_t ptag;
776
777     p_ptr = getenv("PMI_GNI_PTAG");
778     CmiAssert(p_ptr != NULL);
779     token = strtok(p_ptr, ":");
780     ptag = (uint8_t)atoi(token);
781     return ptag;
782         
783 }
784
785 static uint32_t get_cookie(void)
786 {
787     uint32_t cookie;
788     char *p_ptr, *token;
789
790     p_ptr = getenv("PMI_GNI_COOKIE");
791     CmiAssert(p_ptr != NULL);
792     token = strtok(p_ptr, ":");
793     cookie = (uint32_t)atoi(token);
794
795     return cookie;
796 }
797
798 #if LARGEPAGE
799 #include <sys/stat.h>
800 #include <fcntl.h>
801 #include <sys/mman.h>
802
803 // size must be _tlbpagesize aligned
804 void *my_get_huge_pages(size_t size)
805 {
806     char filename[512];
807     int fd;
808     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
809     void *ptr = NULL;
810
811     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
812     fd = open(filename, O_RDWR | O_CREAT, mode);
813     if (fd == -1) {
814         CmiAbort("my_get_huge_pages: open filed");
815     }
816     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
817     if (ptr == MAP_FAILED) ptr = NULL;
818 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
819     close(fd);
820     unlink(filename);
821     return ptr;
822 }
823
824 void my_free_huge_pages(void *ptr, int size)
825 {
826 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
827     int ret = munmap(ptr, size);
828     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
829 }
830
831 #endif
832
833 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
834 /* TODO: add any that are related */
835 /* =====End of Definitions of Message-Corruption Related Macros=====*/
836
837
838 #include "machine-lrts.h"
839 #include "machine-common-core.c"
840
841 /* Network progress function is used to poll the network when for
842    messages. This flushes receive buffers on some  implementations*/
843 #if CMK_MACHINE_PROGRESS_DEFINED
844 void CmiMachineProgressImpl() {
845 }
846 #endif
847
848 static void SendRdmaMsg();
849 static void PumpNetworkSmsg();
850 static void PumpLocalRdmaTransactions();
851 static int SendBufferMsg(SMSG_QUEUE *queue);
852
853 #if MACHINE_DEBUG_LOG
854 FILE *debugLog = NULL;
855 static CmiInt8 buffered_recv_msg = 0;
856 int         lrts_smsg_success = 0;
857 int         lrts_received_msg = 0;
858 #endif
859
860 static void sweep_mempool(mempool_type *mptr)
861 {
862     int n = 0;
863     block_header *current = &(mptr->block_head);
864
865     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
866     while( current!= NULL) {
867         printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
868         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
869     }
870     printf("[n %d] sweep_mempool slot END.\n", myrank);
871 }
872
873 inline
874 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
875 {
876     block_header *current = *from;
877
878     //while(register_memory_size>= MAX_REG_MEM)
879     //{
880         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
881             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
882
883         *from = current;
884         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
885         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
886         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
887     //}
888     return GNI_RC_SUCCESS;
889 }
890
891 inline 
892 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl)
893 {
894     gni_return_t status = GNI_RC_SUCCESS;
895     //int size = GetMempoolsize(msg);
896     //void *blockaddr = GetMempoolBlockPtr(msg);
897     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
898    
899     block_header *current = &(mptr->block_head);
900     while(register_memory_size>= MAX_REG_MEM)
901     {
902         status = deregisterMemory(mptr, &current);
903         if (status != GNI_RC_SUCCESS) break;
904     }
905     if(register_memory_size>= MAX_REG_MEM) return status;
906
907     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
908     while(1)
909     {
910         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
911         if(status == GNI_RC_SUCCESS)
912         {
913             break;
914         }
915         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
916         {
917             CmiAbort("Memory registor for mempool fails\n");
918         }
919         else
920         {
921             status = deregisterMemory(mptr, &current);
922             if (status != GNI_RC_SUCCESS) break;
923         }
924     }; 
925     return status;
926 }
927
928 inline 
929 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t)
930 {
931     static int rank = -1;
932     int i;
933     gni_return_t status;
934     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
935     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
936     mempool_type *mptr;
937
938     status = registerFromMempool(mptr1, msg, size, t);
939     if (status == GNI_RC_SUCCESS) return status;
940 #if CMK_SMP 
941     for (i=0; i<CmiMyNodeSize()+1; i++) {
942       rank = (rank+1)%(CmiMyNodeSize()+1);
943       mptr = CpvAccessOther(mempool, rank);
944       if (mptr == mptr1) continue;
945       status = registerFromMempool(mptr, msg, size, t);
946       if (status == GNI_RC_SUCCESS) return status;
947     }
948 #endif
949     return  GNI_RC_ERROR_RESOURCE;
950 }
951
952 inline
953 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
954 {
955     MSG_LIST        *msg_tmp;
956     MallocMsgList(msg_tmp);
957     msg_tmp->destNode = destNode;
958     msg_tmp->size   = size;
959     msg_tmp->msg    = msg;
960     msg_tmp->tag    = tag;
961
962 #if !CMK_SMP
963     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
964         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
965         queue->smsg_head_index = destNode;
966         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
967     }else
968     {
969         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
970         queue->smsg_msglist_index[destNode].tail = msg_tmp;
971     }
972 #else
973 #if ONE_SEND_QUEUE
974     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
975 #else
976 #if SMP_LOCKS
977     CmiLock(queue->smsg_msglist_index[destNode].lock);
978     if(queue->smsg_msglist_index[destNode].pushed == 0)
979     {
980         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
981     }
982     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
983     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
984 #else
985     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
986 #endif
987 #endif
988 #endif
989 #if PRINT_SYH
990     buffered_smsg_counter++;
991 #endif
992 }
993
994 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
995 {
996     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
997 }
998
999 inline
1000 static void setup_smsg_connection(int destNode)
1001 {
1002     mdh_addr_list_t  *new_entry = 0;
1003     gni_post_descriptor_t *pd;
1004     gni_smsg_attr_t      *smsg_attr;
1005     gni_return_t status = GNI_RC_NOT_DONE;
1006     RDMA_REQUEST        *rdma_request_msg;
1007     
1008     if(smsg_available_slot == smsg_expand_slots)
1009     {
1010         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1011         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1012         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1013
1014         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1015             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1016             GNI_MEM_READWRITE,   
1017             -1,
1018             &(new_entry->mdh));
1019         smsg_available_slot = 0; 
1020         new_entry->next = smsg_dynamic_list;
1021         smsg_dynamic_list = new_entry;
1022     }
1023     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1024     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1025     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1026     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1027     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1028     smsg_attr->buff_size = smsg_memlen;
1029     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1030     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1031     smsg_local_attr_vec[destNode] = smsg_attr;
1032     smsg_available_slot++;
1033     MallocPostDesc(pd);
1034     pd->type            = GNI_POST_FMA_PUT;
1035     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1036     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1037     pd->length          = sizeof(gni_smsg_attr_t);
1038     pd->local_addr      = (uint64_t) smsg_attr;
1039     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1040     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1041     pd->src_cq_hndl     = 0;
1042     pd->rdma_mode       = 0;
1043     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1044     print_smsg_attr(smsg_attr);
1045     if(status == GNI_RC_ERROR_RESOURCE )
1046     {
1047         MallocRdmaRequest(rdma_request_msg);
1048         rdma_request_msg->destNode = destNode;
1049         rdma_request_msg->pd = pd;
1050         /* buffer this request */
1051     }
1052 #if PRINT_SYH
1053     if(status != GNI_RC_SUCCESS)
1054        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1055     else
1056         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1057 #endif
1058 }
1059
1060 /* useDynamicSMSG */
1061 inline 
1062 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1063 {
1064     gni_return_t status = GNI_RC_NOT_DONE;
1065
1066     if(mailbox_list->offset == mailbox_list->size)
1067     {
1068         dynamic_smsg_mailbox_t *new_mailbox_entry;
1069         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1070         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1071         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1072         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1073         new_mailbox_entry->offset = 0;
1074         
1075         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1076             new_mailbox_entry->size, smsg_rx_cqh,
1077             GNI_MEM_READWRITE,   
1078             -1,
1079             &(new_mailbox_entry->mem_hndl));
1080
1081         GNI_RC_CHECK("register", status);
1082         new_mailbox_entry->next = mailbox_list;
1083         mailbox_list = new_mailbox_entry;
1084     }
1085     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1086     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1087     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1088     local_smsg_attr->mbox_offset = mailbox_list->offset;
1089     mailbox_list->offset += smsg_memlen;
1090     local_smsg_attr->buff_size = smsg_memlen;
1091     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1092     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1093 }
1094
1095 /* useDynamicSMSG */
1096 inline 
1097 static int connect_to(int destNode)
1098 {
1099     gni_return_t status = GNI_RC_NOT_DONE;
1100     CmiAssert(smsg_connected_flag[destNode] == 0);
1101     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1102     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1103     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1104     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1105     
1106     CMI_GNI_LOCK
1107     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1108     CMI_GNI_UNLOCK
1109     if (status == GNI_RC_ERROR_RESOURCE) {
1110       /* possibly destNode is making connection at the same time */
1111       free(smsg_attr_vector_local[destNode]);
1112       smsg_attr_vector_local[destNode] = NULL;
1113       free(smsg_attr_vector_remote[destNode]);
1114       smsg_attr_vector_remote[destNode] = NULL;
1115       mailbox_list->offset -= smsg_memlen;
1116       return 0;
1117     }
1118     GNI_RC_CHECK("GNI_Post", status);
1119     smsg_connected_flag[destNode] = 1;
1120     return 1;
1121 }
1122
1123 #if PIGGYBACK_ACK
1124 static void * piggyback_ack(int destNode, int msgsize, int *count)
1125 {
1126     int i;
1127     if (PCQueueEmpty(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf)) return NULL;
1128     int len = PCQueueLength(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
1129     int piggycount = (SMSG_MAX_MSG - msgsize)/sizeof(uint64_t);
1130     if (piggycount > len+1) piggycount = len + 1;
1131     if (piggycount <= 5) return NULL;
1132     uint64_t * buf = (uint64_t*)CmiTmpAlloc(piggycount * sizeof(uint64_t));
1133     CmiAssert(buf != NULL);
1134     buf[0] = piggycount-1;
1135 //printf("[%d] piggyback_ack: %d\n", myrank, piggycount);
1136     for (i=0; i<piggycount-1; i++) {
1137         MSG_LIST *ptr = (MSG_LIST*)PCQueuePop(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
1138         CmiAssert(ptr != NULL);
1139         ACK_MSG *msg = ptr->msg;
1140         buf[i+1] = msg->source_addr;
1141         FreeAckMsg(msg);
1142         FreeMsgList(ptr);
1143     }
1144     *count = piggycount;
1145     return buf;
1146 }
1147
1148
1149 static void piggyback_ack_done(int destNode, uint64_t *buf, int done)
1150 {
1151     if (!done)
1152     {
1153         int i;
1154         for (i=0; i<buf[0]; i++) {
1155             MSG_LIST *msg_tmp;
1156             MallocMsgList(msg_tmp);
1157             ACK_MSG  *ack_msg;
1158             MallocAckMsg(ack_msg);
1159             ack_msg->source_addr = buf[i+1];
1160             msg_tmp->size = ACK_MSG_SIZE;
1161             msg_tmp->msg = ack_msg;
1162             msg_tmp->tag = ACK_TAG;
1163             msg_tmp->destNode = destNode;
1164             PCQueuePush(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1165         }
1166     }
1167     CmiTmpFree(buf);
1168 }
1169 #endif
1170
1171 inline 
1172 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
1173 {
1174     unsigned int          remote_address;
1175     uint32_t              remote_id;
1176     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1177     gni_smsg_attr_t       *smsg_attr;
1178     gni_post_descriptor_t *pd;
1179     gni_post_state_t      post_state;
1180     char                  *real_data; 
1181
1182     if (useDynamicSMSG) {
1183         switch (smsg_connected_flag[destNode]) {
1184         case 0: 
1185             connect_to(destNode);         /* continue to case 1 */
1186         case 1:                           /* pending connection, do nothing */
1187             status = GNI_RC_NOT_DONE;
1188             if(inbuff ==0)
1189                 buffer_small_msgs(queue, msg, size, destNode, tag);
1190             return status;
1191         }
1192     }
1193 #if CMK_SMP
1194 #if ! ONE_SEND_QUEUE
1195     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1196 #endif
1197     {
1198 #else
1199     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1200     {
1201 #endif
1202         uint64_t *buf = NULL;
1203         int bufsize = 0;
1204 #if PIGGYBACK_ACK
1205         if (tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG) {
1206             int nack = 0;
1207             buf = piggyback_ack(destNode, size, &nack);
1208             if (buf) {
1209                 tag = (tag == SMALL_DATA_TAG) ? SMALL_DATA_ACK_TAG : LMSG_INIT_ACK_TAG;
1210                 bufsize = nack * sizeof(uint64_t);
1211             }
1212         }
1213 #endif
1214         CMI_GNI_LOCK
1215 #if CMK_SMP_TRACE_COMMTHREAD
1216         int oldpe = -1;
1217         int oldeventid = -1;
1218         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1219         { 
1220             START_EVENT();
1221             if ( tag == SMALL_DATA_TAG)
1222                 real_data = (char*)msg; 
1223             else 
1224                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1225             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1226             TRACE_COMM_SET_COMM_MSGID(real_data);
1227         }
1228 #endif
1229         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], buf, bufsize, msg, size, 0, tag);
1230 #if CMK_SMP_TRACE_COMMTHREAD
1231         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1232 #endif
1233         CMI_GNI_UNLOCK
1234         if(status == GNI_RC_SUCCESS)
1235         {
1236 #if CMK_SMP_TRACE_COMMTHREAD
1237             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == SMALL_DATA_ACK_TAG || tag == LMSG_INIT_ACK_TAG)
1238             { 
1239                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1240             }
1241 #endif
1242             smsg_send_count ++;
1243         }else
1244             status = GNI_RC_ERROR_RESOURCE;
1245 #if PIGGYBACK_ACK
1246         if (buf) {
1247             piggyback_ack_done(destNode, buf, status==GNI_RC_SUCCESS);
1248             tag = (tag == SMALL_DATA_ACK_TAG) ? SMALL_DATA_TAG : LMSG_INIT_TAG;
1249         }
1250 #endif
1251     }
1252     if(status != GNI_RC_SUCCESS && inbuff ==0)
1253         buffer_small_msgs(queue, msg, size, destNode, tag);
1254     return status;
1255 }
1256
1257 inline 
1258 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1259 {
1260     /* construct a control message and send */
1261     CONTROL_MSG         *control_msg_tmp;
1262     MallocControlMsg(control_msg_tmp);
1263     control_msg_tmp->source_addr = (uint64_t)msg;
1264     control_msg_tmp->seq_id    = seqno;
1265     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1266 #if     USE_LRTS_MEMPOOL
1267     if(size < BIG_MSG)
1268     {
1269         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1270     }
1271     else
1272     {
1273         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1274         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1275         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1276     }
1277 #else
1278     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1279 #endif
1280     return control_msg_tmp;
1281 }
1282
1283 #define BLOCKING_SEND_CONTROL    0
1284
1285 // Large message, send control to receiver, receiver register memory and do a GET, 
1286 // return 1 - send no success
1287 inline
1288 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1289 {
1290     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1291     uint32_t            vmdh_index  = -1;
1292     int                 size;
1293     int                 offset = 0;
1294     uint64_t            source_addr;
1295     int                 register_size; 
1296     void                *msg;
1297
1298     size    =   control_msg_tmp->total_length;
1299     source_addr = control_msg_tmp->source_addr;
1300     register_size = control_msg_tmp->length;
1301
1302 #if  USE_LRTS_MEMPOOL
1303     if( control_msg_tmp->seq_id == 0 ){
1304 #if BLOCKING_SEND_CONTROL
1305         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1306             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1307                 LrtsAdvanceCommunication(0);
1308         }
1309 #endif
1310         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1311         {
1312             msg = (void*)source_addr;
1313             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1314             {
1315                 if(!inbuff)
1316                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1317                 return GNI_RC_ERROR_NOMEM;
1318             }
1319             //register the corresponding mempool
1320             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1321             if(status == GNI_RC_SUCCESS)
1322             {
1323                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1324             }
1325         }else
1326         {
1327             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1328             status = GNI_RC_SUCCESS;
1329         }
1330         if(NoMsgInSend( control_msg_tmp->source_addr))
1331             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1332         else
1333             register_size = 0;
1334     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1335     {
1336         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1337         source_addr += offset;
1338         size = control_msg_tmp->length;
1339 #if BLOCKING_SEND_CONTROL
1340         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1341             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1342                 LrtsAdvanceCommunication(0);
1343         }
1344 #endif
1345         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1346             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1347             {
1348                 if(!inbuff)
1349                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1350                 return GNI_RC_ERROR_NOMEM;
1351             }
1352             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1353             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1354         }
1355         else
1356         {
1357             status = GNI_RC_SUCCESS;
1358         }
1359         register_size = 0;  
1360     }
1361
1362     if(status == GNI_RC_SUCCESS)
1363     {
1364         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff);  
1365         if(status == GNI_RC_SUCCESS)
1366         {
1367             buffered_send_msg += register_size;
1368             if(control_msg_tmp->seq_id == 0)
1369             {
1370                 IncreaseMsgInSend(source_addr);
1371             }
1372             FreeControlMsg(control_msg_tmp);
1373             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1374         }else
1375             status = GNI_RC_ERROR_RESOURCE;
1376
1377     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1378     {
1379         CmiAbort("Memory registor for large msg\n");
1380     }else 
1381     {
1382         status = GNI_RC_ERROR_NOMEM; 
1383         if(!inbuff)
1384             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1385     }
1386     return status;
1387 #else
1388     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1389     if(status == GNI_RC_SUCCESS)
1390     {
1391         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0);  
1392         if(status == GNI_RC_SUCCESS)
1393         {
1394             FreeControlMsg(control_msg_tmp);
1395         }
1396     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1397     {
1398         CmiAbort("Memory registor for large msg\n");
1399     }else 
1400     {
1401         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1402     }
1403     return status;
1404 #endif
1405 }
1406
1407 inline void LrtsPrepareEnvelope(char *msg, int size)
1408 {
1409     CmiSetMsgSize(msg, size);
1410 }
1411
1412 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1413 {
1414     gni_return_t        status  =   GNI_RC_SUCCESS;
1415     uint8_t tag;
1416     CONTROL_MSG         *control_msg_tmp;
1417     int                 oob = ( mode & OUT_OF_BAND);
1418     SMSG_QUEUE          *queue;
1419
1420     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1421 #if CMK_USE_OOB
1422     queue = oob? &smsg_oob_queue : &smsg_queue;
1423 #else
1424     queue = &smsg_queue;
1425 #endif
1426
1427     LrtsPrepareEnvelope(msg, size);
1428
1429 #if PRINT_SYH
1430     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1431 #endif 
1432 #if CMK_SMP && COMM_THREAD_SEND
1433     if(size <= SMSG_MAX_MSG)
1434         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1435     else if (size < BIG_MSG) {
1436         control_msg_tmp =  construct_control_msg(size, msg, 0);
1437         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1438     }
1439     else {
1440           CmiSetMsgSeq(msg, 0);
1441           control_msg_tmp =  construct_control_msg(size, msg, 1);
1442           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1443     }
1444 #else   //non-smp, smp(worker sending)
1445     if(size <= SMSG_MAX_MSG)
1446     {
1447         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1448             CmiFree(msg);
1449     }
1450     else if (size < BIG_MSG) {
1451         control_msg_tmp =  construct_control_msg(size, msg, 0);
1452         send_large_messages(queue, destNode, control_msg_tmp, 0);
1453     }
1454     else {
1455 #if     USE_LRTS_MEMPOOL
1456         CmiSetMsgSeq(msg, 0);
1457         control_msg_tmp =  construct_control_msg(size, msg, 1);
1458         send_large_messages(queue, destNode, control_msg_tmp, 0);
1459 #else
1460         control_msg_tmp =  construct_control_msg(size, msg, 0);
1461         send_large_messages(queue, destNode, control_msg_tmp, 0);
1462 #endif
1463     }
1464 #endif
1465     return 0;
1466 }
1467
1468 static void    PumpDatagramConnection();
1469 static void registerUserTraceEvents() {
1470 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1471     traceRegisterUserEvent("setting up connections", 10);
1472     traceRegisterUserEvent("Receiving small msgs", 20);
1473     traceRegisterUserEvent("Release local transaction", 30);
1474     traceRegisterUserEvent("Sending buffered small msgs", 40);
1475     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1476 #endif
1477 }
1478
1479 static void ProcessDeadlock()
1480 {
1481     static CmiUInt8 *ptr = NULL;
1482     static CmiUInt8  last = 0, mysum, sum;
1483     static int count = 0;
1484     gni_return_t status;
1485     int i;
1486
1487 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1488 //sweep_mempool(CpvAccess(mempool));
1489     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1490     mysum = smsg_send_count + smsg_recv_count;
1491     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1492     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1493     GNI_RC_CHECK("PMI_Allgather", status);
1494     sum = 0;
1495     for (i=0; i<mysize; i++)  sum+= ptr[i];
1496     if (last == 0 || sum == last) 
1497         count++;
1498     else
1499         count = 0;
1500     last = sum;
1501     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1502     if (count == 2) { 
1503         /* detected twice, it is a real deadlock */
1504         if (myrank == 0)  {
1505             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1506             CmiAbort("Fatal> Deadlock detected.");
1507         }
1508
1509     }
1510     _detected_hang = 0;
1511 }
1512
1513 static void CheckProgress()
1514 {
1515     if (smsg_send_count == last_smsg_send_count &&
1516         smsg_recv_count == last_smsg_recv_count ) 
1517     {
1518         _detected_hang = 1;
1519 #if !CMK_SMP
1520         if (_detected_hang) ProcessDeadlock();
1521 #endif
1522
1523     }
1524     else {
1525         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1526         last_smsg_send_count = smsg_send_count;
1527         last_smsg_recv_count = smsg_recv_count;
1528         _detected_hang = 0;
1529     }
1530 }
1531
1532 static void set_limit()
1533 {
1534     //if (!user_set_flag && CmiMyRank() == 0) {
1535     if (CmiMyRank() == 0) {
1536         int mynode = CmiPhysicalNodeID(CmiMyPe());
1537         int numpes = CmiNumPesOnPhysicalNode(mynode);
1538         int numprocesses = numpes / CmiMyNodeSize();
1539         MAX_REG_MEM  = _totalmem / numprocesses;
1540         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1541         if (CmiMyPe() == 0)
1542            printf("mem_max = %lld, send_max =%lld\n", MAX_REG_MEM, MAX_BUFF_SEND);
1543     }
1544 }
1545
1546 void LrtsPostCommonInit(int everReturn)
1547 {
1548 #if CMK_DIRECT
1549     CmiDirectInit();
1550 #endif
1551 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1552     CpvInitialize(double, projTraceStart);
1553     /* only PE 0 needs to care about registration (to generate sts file). */
1554     if (CmiMyPe() == 0) {
1555         registerMachineUserEventsFunction(&registerUserTraceEvents);
1556     }
1557 #endif
1558
1559 #if CMK_SMP
1560     CmiIdleState *s=CmiNotifyGetState();
1561     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1562     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1563 #else
1564     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1565     if (useDynamicSMSG)
1566     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1567 #endif
1568
1569     if (_checkProgress)
1570 #if CMK_SMP
1571     if (CmiMyRank() == 0)
1572 #endif
1573     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1574 #if !LARGEPAGE
1575     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1576 #endif
1577 }
1578
1579 /* this is called by worker thread */
1580 void LrtsPostNonLocal(){
1581 #if MULTI_THREAD_SEND
1582     if(mysize == 1) return;
1583     //printf("[%d,%d] worker call communication\n", CmiMyNode(), CmiMyRank());
1584     PumpNetworkSmsg();
1585     PumpLocalRdmaTransactions();
1586     
1587 #if CMK_USE_OOB
1588     if (SendBufferMsg(&smsg_oob_queue) == 1)
1589 #endif
1590     {
1591 #if PIGGYBACK_ACK
1592     //if (count%10 == 0) SendBufferMsg(&smsg_ack_queue);
1593     if (SendBufferMsg(&smsg_queue) == 1) {
1594         //if (count++ % 10 == 0) 
1595         SendBufferMsg(&smsg_ack_queue);
1596     }
1597 #else
1598     SendBufferMsg(&smsg_queue);
1599 #endif
1600     }
1601
1602     SendRdmaMsg();
1603     //LrtsAdvanceCommunication(1);
1604 #endif
1605 }
1606
1607 /* useDynamicSMSG */
1608 static void    PumpDatagramConnection()
1609 {
1610     uint32_t          remote_address;
1611     uint32_t          remote_id;
1612     gni_return_t status;
1613     gni_post_state_t  post_state;
1614     uint64_t          datagram_id;
1615     int i;
1616
1617    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1618    {
1619        if (datagram_id >= mysize) {           /* bound endpoint */
1620            int pe = datagram_id - mysize;
1621            CMI_GNI_LOCK
1622            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1623            CMI_GNI_UNLOCK
1624            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1625            {
1626                CmiAssert(remote_id == pe);
1627                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1628                GNI_RC_CHECK("Dynamic SMSG Init", status);
1629 #if PRINT_SYH
1630                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1631 #endif
1632                CmiAssert(smsg_connected_flag[pe] == 1);
1633                smsg_connected_flag[pe] = 2;
1634            }
1635        }
1636        else {         /* unbound ep */
1637            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1638            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1639            {
1640                CmiAssert(remote_id<mysize);
1641                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1642                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1643                GNI_RC_CHECK("Dynamic SMSG Init", status);
1644 #if PRINT_SYH
1645                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1646 #endif
1647                smsg_connected_flag[remote_id] = 2;
1648
1649                alloc_smsg_attr(&send_smsg_attr);
1650                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1651                GNI_RC_CHECK("post unbound datagram", status);
1652            }
1653        }
1654    }
1655 }
1656
1657 /* pooling CQ to receive network message */
1658 static void PumpNetworkRdmaMsgs()
1659 {
1660     gni_cq_entry_t      event_data;
1661     gni_return_t        status;
1662
1663 }
1664
1665 inline 
1666 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1667 {
1668     RDMA_REQUEST        *rdma_request_msg;
1669     MallocRdmaRequest(rdma_request_msg);
1670     rdma_request_msg->destNode = inst_id;
1671     rdma_request_msg->pd = pd;
1672 #if CMK_SMP
1673     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1674 #else
1675     if(sendRdmaBuf == 0)
1676     {
1677         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1678     }else{
1679         sendRdmaTail->next = rdma_request_msg;
1680         sendRdmaTail =  rdma_request_msg;
1681     }
1682 #endif
1683
1684 }
1685
1686 #if PIGGYBACK_ACK
1687 int processPiggybackAckHeader(void *header)
1688 {
1689     int i;
1690     uint64_t *buf = (uint64_t*)header;
1691     int piggycount = buf[0];
1692 //printf("[%d] got piggyback msg: %d\n", myrank, piggycount);
1693     for (i=0; i<piggycount; i++) {
1694         void *msg = (void*)(buf[i+1]);
1695         CmiAssert(msg != NULL);
1696         DecreaseMsgInSend(msg);
1697         if(NoMsgInSend(msg))
1698             buffered_send_msg -= GetMempoolsize(msg);
1699         CmiFree(msg);
1700     }
1701     return piggycount;
1702 }
1703 #endif
1704
1705 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1706
1707 static void PumpNetworkSmsg()
1708 {
1709     uint64_t            inst_id;
1710     int                 ret;
1711     gni_cq_entry_t      event_data;
1712     gni_return_t        status, status2;
1713     void                *header;
1714     uint8_t             msg_tag;
1715     int                 msg_nbytes;
1716     void                *msg_data;
1717     gni_mem_handle_t    msg_mem_hndl;
1718     gni_smsg_attr_t     *smsg_attr;
1719     gni_smsg_attr_t     *remote_smsg_attr;
1720     int                 init_flag;
1721     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1722     uint64_t            source_addr;
1723     SMSG_QUEUE         *queue = &smsg_queue;
1724 #if     CMK_DIRECT
1725     cmidirectMsg        *direct_msg;
1726 #endif
1727     while(1)
1728     {
1729         CMI_GNI_LOCK
1730         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1731         CMI_GNI_UNLOCK
1732         if(status != GNI_RC_SUCCESS)
1733             break;
1734         inst_id = GNI_CQ_GET_INST_ID(event_data);
1735         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1736 #if PRINT_SYH
1737         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1738 #endif
1739         if (useDynamicSMSG) {
1740             /* subtle: smsg may come before connection is setup */
1741             while (smsg_connected_flag[inst_id] != 2) 
1742                PumpDatagramConnection();
1743         }
1744         msg_tag = GNI_SMSG_ANY_TAG;
1745         while(1) {
1746             CMI_GNI_LOCK
1747             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1748             if (status != GNI_RC_SUCCESS)
1749             {
1750                 CMI_GNI_UNLOCK
1751                 break;
1752             }
1753 #if PRINT_SYH
1754             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1755 #endif
1756             /* copy msg out and then put into queue (small message) */
1757             switch (msg_tag) {
1758 #if PIGGYBACK_ACK
1759             case SMALL_DATA_ACK_TAG:
1760             {
1761                 int piggycount = processPiggybackAckHeader(header);
1762                 header = (uint64_t*)header + piggycount + 1;
1763                 msg_nbytes -= (piggycount+1)*sizeof(uint64_t);
1764             }
1765 #endif
1766             case SMALL_DATA_TAG:
1767             {
1768                 START_EVENT();
1769                 msg_nbytes = CmiGetMsgSize(header);
1770                 msg_data    = CmiAlloc(msg_nbytes);
1771                 memcpy(msg_data, (char*)header, msg_nbytes);
1772                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1773                 CMI_GNI_UNLOCK
1774 #if CMK_SMP_TRACE_COMMTHREAD
1775                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1776 #endif
1777                 handleOneRecvedMsg(msg_nbytes, msg_data);
1778                 break;
1779             }
1780 #if PIGGYBACK_ACK
1781             case LMSG_INIT_ACK_TAG:
1782             {
1783                 int piggycount = processPiggybackAckHeader(header);
1784                 header = (uint64_t*)header + piggycount + 1;
1785                 msg_nbytes -= (piggycount+1)*sizeof(uint64_t);
1786             }
1787 #endif
1788             case LMSG_INIT_TAG:
1789             {
1790                 MallocControlMsg(control_msg_tmp);
1791                 memcpy(control_msg_tmp, header, sizeof(CONTROL_MSG));
1792                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1793                 CMI_GNI_UNLOCK
1794                 getLargeMsgRequest(control_msg_tmp, inst_id);
1795                 break;
1796             }
1797             case ACK_TAG:   //msg fit into mempool
1798             {
1799                 /* Get is done, release message . Now put is not used yet*/
1800                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1801                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1802                 CMI_GNI_UNLOCK
1803 #if ! USE_LRTS_MEMPOOL
1804                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1805 #else
1806                 DecreaseMsgInSend(msg);
1807 #endif
1808                 if(NoMsgInSend(msg))
1809                     buffered_send_msg -= GetMempoolsize(msg);
1810                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1811                 CmiFree(msg);
1812                 break;
1813             }
1814             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1815             {
1816                 MallocControlMsg(header_tmp);
1817                 memcpy(header_tmp, header, sizeof(CONTROL_MSG));
1818                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1819                 CMI_GNI_UNLOCK
1820                 void *msg = (void*)(header_tmp->source_addr);
1821                 int cur_seq = CmiGetMsgSeq(msg);
1822                 int offset = ONE_SEG*(cur_seq+1);
1823                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1824                 buffered_send_msg -= header_tmp->length;
1825                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1826                 if (remain_size < 0) remain_size = 0;
1827                 CmiSetMsgSize(msg, remain_size);
1828                 if(remain_size <= 0) //transaction done
1829                 {
1830                     CmiFree(msg);
1831                 }else if (header_tmp->total_length > offset)
1832                 {
1833                     CmiSetMsgSeq(msg, cur_seq+1);
1834                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1835                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1836                     //send next seg
1837                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1838                          // pipelining
1839                     if (header_tmp->seq_id == 1) {
1840                       int i;
1841                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1842                         int seq = cur_seq+i+2;
1843                         CmiSetMsgSeq(msg, seq-1);
1844                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1845                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1846                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1847                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1848                       }
1849                     }
1850                 }
1851                 break;
1852             }
1853 #if CMK_PERSISTENT_COMM
1854             case PUT_DONE_TAG: //persistent message
1855                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1856                 int size = ((CONTROL_MSG *) header)->length;
1857                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1858                 CMI_GNI_UNLOCK
1859                 CmiReference(msg);
1860                 handleOneRecvedMsg(size, msg); 
1861                 break;
1862 #endif
1863 #if CMK_DIRECT
1864             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1865                 //create a trigger message
1866                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
1867                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
1868                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1869                 CMI_GNI_UNLOCK
1870                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
1871                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
1872                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1873                 break;
1874 #endif
1875             default: {
1876                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1877                 CMI_GNI_UNLOCK
1878                 printf("weird tag problem\n");
1879                 CmiAbort("Unknown tag\n");
1880                      }
1881             }               // end switch
1882 #if PRINT_SYH
1883             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1884 #endif
1885             smsg_recv_count ++;
1886             msg_tag = GNI_SMSG_ANY_TAG;
1887         } //endwhile getNext
1888     }   //end while GetEvent
1889     if(status == GNI_RC_ERROR_RESOURCE)
1890     {
1891         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
1892         GNI_RC_CHECK("Smsg_rx_cq full", status);
1893     }
1894 }
1895
1896 static void printDesc(gni_post_descriptor_t *pd)
1897 {
1898     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1899 }
1900
1901 // for BIG_MSG called on receiver side for receiving control message
1902 // LMSG_INIT_TAG
1903 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1904 {
1905 #if     USE_LRTS_MEMPOOL
1906     CONTROL_MSG         *request_msg;
1907     gni_return_t        status = GNI_RC_SUCCESS;
1908     void                *msg_data;
1909     gni_post_descriptor_t *pd;
1910     gni_mem_handle_t    msg_mem_hndl;
1911     int source, size, transaction_size, offset = 0;
1912     size_t     register_size = 0;
1913
1914     // initial a get to transfer data from the sender side */
1915     request_msg = (CONTROL_MSG *) header;
1916     size = request_msg->total_length;
1917     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1918     if(request_msg->seq_id < 2)   {
1919         msg_data = CmiAlloc(size);
1920         CmiSetMsgSeq(msg_data, 0);
1921         _MEMCHECK(msg_data);
1922     }
1923     else {
1924         offset = ONE_SEG*(request_msg->seq_id-1);
1925         msg_data = (char*)request_msg->dest_addr + offset;
1926     }
1927    
1928     MallocPostDesc(pd);
1929     pd->cqwrite_value = request_msg->seq_id;
1930     if( request_msg->seq_id == 0)
1931     {
1932         pd->local_mem_hndl= GetMemHndl(msg_data);
1933         transaction_size = ALIGN64(size);
1934         if(IsMemHndlZero(pd->local_mem_hndl))
1935         {   
1936             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1937             if(status == GNI_RC_SUCCESS)
1938             {
1939                 pd->local_mem_hndl = GetMemHndl(msg_data);
1940             }
1941             else
1942             {
1943                 SetMemHndlZero(pd->local_mem_hndl);
1944             }
1945         }
1946         if(NoMsgInRecv( (void*)(msg_data)))
1947             register_size = GetMempoolsize((void*)(msg_data));
1948         else
1949             register_size = 0;
1950     }
1951     else{
1952         transaction_size = ALIGN64(request_msg->length);
1953         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1954         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1955         {
1956             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1957         }
1958     }
1959     pd->first_operand = ALIGN64(size);                   //  total length
1960
1961     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1962         pd->type            = GNI_POST_FMA_GET;
1963     else
1964         pd->type            = GNI_POST_RDMA_GET;
1965 #if REMOTE_EVENT
1966     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1967 #else
1968     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1969 #endif
1970     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1971     pd->length          = transaction_size;
1972     pd->local_addr      = (uint64_t) msg_data;
1973     pd->remote_addr     = request_msg->source_addr + offset;
1974     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1975     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1976     pd->rdma_mode       = 0;
1977     pd->amo_cmd         = 0;
1978
1979     //memory registration success
1980     if(status == GNI_RC_SUCCESS)
1981     {
1982         CMI_GNI_LOCK
1983         if(pd->type == GNI_POST_RDMA_GET) 
1984             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1985         else
1986             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1987         CMI_GNI_UNLOCK
1988
1989         if(status == GNI_RC_SUCCESS )
1990         {
1991             if(pd->cqwrite_value == 0)
1992             {
1993 #if MACHINE_DEBUG_LOG
1994                 buffered_recv_msg += register_size;
1995                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1996 #endif
1997                 IncreaseMsgInRecv(msg_data);
1998             }
1999         }
2000     }else
2001     {
2002         SetMemHndlZero((pd->local_mem_hndl));
2003     }
2004     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2005     {
2006         bufferRdmaMsg(inst_id, pd); 
2007     }else {
2008          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
2009         GNI_RC_CHECK("GetLargeAFter posting", status);
2010     }
2011 #else
2012     CONTROL_MSG         *request_msg;
2013     gni_return_t        status;
2014     void                *msg_data;
2015     gni_post_descriptor_t *pd;
2016     RDMA_REQUEST        *rdma_request_msg;
2017     gni_mem_handle_t    msg_mem_hndl;
2018     //int source;
2019     // initial a get to transfer data from the sender side */
2020     request_msg = (CONTROL_MSG *) header;
2021     msg_data = CmiAlloc(request_msg->length);
2022     _MEMCHECK(msg_data);
2023
2024     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
2025
2026     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2027     {
2028         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2029     }
2030
2031     MallocPostDesc(pd);
2032     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2033         pd->type            = GNI_POST_FMA_GET;
2034     else
2035         pd->type            = GNI_POST_RDMA_GET;
2036 #if REMOTE_EVENT
2037     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
2038 #else
2039     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2040 #endif
2041     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2042     pd->length          = ALIGN64(request_msg->length);
2043     pd->local_addr      = (uint64_t) msg_data;
2044     pd->remote_addr     = request_msg->source_addr;
2045     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2046     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
2047     pd->rdma_mode       = 0;
2048     pd->amo_cmd         = 0;
2049
2050     //memory registration successful
2051     if(status == GNI_RC_SUCCESS)
2052     {
2053         pd->local_mem_hndl  = msg_mem_hndl;
2054         CMI_GNI_LOCK
2055         if(pd->type == GNI_POST_RDMA_GET) 
2056             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2057         else
2058             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2059         CMI_GNI_UNLOCK
2060     }else
2061     {
2062         SetMemHndlZero(pd->local_mem_hndl);
2063     }
2064     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2065     {
2066         MallocRdmaRequest(rdma_request_msg);
2067         rdma_request_msg->next = 0;
2068         rdma_request_msg->destNode = inst_id;
2069         rdma_request_msg->pd = pd;
2070         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2071     }else {
2072         GNI_RC_CHECK("AFter posting", status);
2073     }
2074 #endif
2075 }
2076
2077 static void PumpLocalRdmaTransactions()
2078 {
2079     gni_cq_entry_t          ev;
2080     gni_return_t            status;
2081     uint64_t                type, inst_id;
2082     gni_post_descriptor_t   *tmp_pd;
2083     MSG_LIST                *ptr;
2084     CONTROL_MSG             *ack_msg_tmp;
2085     ACK_MSG                 *ack_msg;
2086     uint8_t                 msg_tag;
2087 #if CMK_DIRECT
2088     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2089 #endif
2090     SMSG_QUEUE         *queue = &smsg_queue;
2091
2092     while(1) {
2093         CMI_GNI_LOCK 
2094         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
2095         CMI_GNI_UNLOCK
2096         if(status != GNI_RC_SUCCESS) break;
2097         
2098         type = GNI_CQ_GET_TYPE(ev);
2099         if (type == GNI_CQ_EVENT_TYPE_POST)
2100         {
2101             inst_id     = GNI_CQ_GET_INST_ID(ev);
2102 #if PRINT_SYH
2103             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2104 #endif
2105             CMI_GNI_LOCK
2106             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
2107             CMI_GNI_UNLOCK
2108
2109             switch (tmp_pd->type) {
2110 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2111             case GNI_POST_RDMA_PUT:
2112 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2113                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2114 #endif
2115             case GNI_POST_FMA_PUT:
2116                 if(tmp_pd->amo_cmd == 1) {
2117                     //sender ACK to receiver to trigger it is done
2118                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2119                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2120                     msg_tag = DIRECT_PUT_DONE_TAG;
2121                 }
2122                 else {
2123                     CmiFree((void *)tmp_pd->local_addr);
2124                     MallocControlMsg(ack_msg_tmp);
2125                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2126                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2127                     msg_tag = PUT_DONE_TAG;
2128                 }
2129                 break;
2130 #endif
2131             case GNI_POST_RDMA_GET:
2132             case GNI_POST_FMA_GET:  {
2133 #if  ! USE_LRTS_MEMPOOL
2134                 MallocControlMsg(ack_msg_tmp);
2135                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2136                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2137                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2138                 msg_tag = ACK_TAG;  
2139 #else
2140                 int seq_id = tmp_pd->cqwrite_value;
2141                 if(seq_id > 0)      // BIG_MSG
2142                 {
2143                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2144                     MallocControlMsg(ack_msg_tmp);
2145                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2146                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2147                     ack_msg_tmp->seq_id = seq_id;
2148                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2149                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2150                     ack_msg_tmp->length = tmp_pd->length;
2151                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2152                     msg_tag = BIG_MSG_TAG; 
2153                 } 
2154                 else
2155                 {
2156                     MallocAckMsg(ack_msg);
2157                     ack_msg->source_addr = tmp_pd->remote_addr;
2158                     msg_tag = ACK_TAG;  
2159                     // ack_msg_tmp->dest_addr = tmp_pd->local_addr; ???
2160                 }
2161 #endif
2162                 break;
2163             }
2164             default:
2165                 CmiPrintf("type=%d\n", tmp_pd->type);
2166                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
2167             }      /* end of switch */
2168
2169 #if CMK_DIRECT
2170             if (tmp_pd->amo_cmd == 1) {
2171                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
2172                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2173             }
2174             else
2175 #endif
2176             if (msg_tag == ACK_TAG) {
2177 #if ! PIGGYBACK_ACK
2178                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
2179                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2180 #else
2181                 buffer_small_msgs(&smsg_ack_queue, ack_msg, ACK_MSG_SIZE, inst_id, msg_tag);
2182 #endif
2183             }
2184             else {
2185                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0); 
2186                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2187             }
2188 #if CMK_PERSISTENT_COMM
2189             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2190 #endif
2191             {
2192                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2193 #if PRINT_SYH
2194                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2195 #endif
2196                     START_EVENT();
2197                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2198                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2199 #if MACHINE_DEBUG_LOG
2200                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2201                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2202                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2203 #endif
2204 #if CMK_SMP_TRACE_COMMTHREAD
2205                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2206 #endif
2207                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2208                 }else if(msg_tag == BIG_MSG_TAG){
2209                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2210                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2211                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2212                         START_EVENT();
2213 #if PRINT_SYH
2214                         printf("Pipeline msg done [%d]\n", myrank);
2215 #endif
2216 #if CMK_SMP_TRACE_COMMTHREAD
2217                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2218 #endif
2219                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2220                     }
2221                 }
2222             }
2223             FreePostDesc(tmp_pd);
2224         }
2225     } //end while
2226     if(status == GNI_RC_ERROR_RESOURCE)
2227     {
2228         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2229         GNI_RC_CHECK("Smsg_tx_cq full", status);
2230     }
2231 }
2232
2233 static void  SendRdmaMsg()
2234 {
2235     gni_return_t            status = GNI_RC_SUCCESS;
2236     gni_mem_handle_t        msg_mem_hndl;
2237     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2238     RDMA_REQUEST            *pre = 0;
2239     uint64_t                register_size = 0;
2240     void                    *msg;
2241     int                     i;
2242 #if CMK_SMP
2243     int len = PCQueueLength(sendRdmaBuf);
2244     for (i=0; i<len; i++)
2245     {
2246         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2247         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2248         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2249         if (ptr == NULL) break;
2250 #else
2251     ptr = sendRdmaBuf;
2252     while (ptr!=0)
2253     {
2254 #endif 
2255         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2256         gni_post_descriptor_t *pd = ptr->pd;
2257         status = GNI_RC_SUCCESS;
2258         
2259         if(pd->cqwrite_value == 0)
2260         {
2261             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2262             {
2263                 msg = (void*)(pd->local_addr);
2264                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
2265                 if(status == GNI_RC_SUCCESS)
2266                 {
2267                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2268                 }
2269             }else
2270             {
2271                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2272             }
2273             if(NoMsgInRecv( (void*)(pd->local_addr)))
2274                 register_size = GetMempoolsize((void*)(pd->local_addr));
2275             else
2276                 register_size = 0;
2277         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2278         {
2279             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
2280         }
2281         if(status == GNI_RC_SUCCESS)        //mem register good
2282         {
2283             CMI_GNI_LOCK
2284             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2285                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2286             else
2287                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2288             CMI_GNI_UNLOCK
2289             if(status == GNI_RC_SUCCESS)    //post good
2290             {
2291 #if !CMK_SMP
2292                 tmp_ptr = ptr;
2293                 if(pre != 0) {
2294                     pre->next = ptr->next;
2295                 }
2296                 else {
2297                     sendRdmaBuf = ptr->next;
2298                 }
2299                 ptr = ptr->next;
2300                 FreeRdmaRequest(tmp_ptr);
2301 #endif
2302                 if(pd->cqwrite_value == 0)
2303                 {
2304                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2305                 }
2306 #if MACHINE_DEBUG_LOG
2307                 buffered_recv_msg += register_size;
2308                 MACHSTATE(8, "GO request from buffered\n"); 
2309 #endif
2310             }else           // cannot post
2311             {
2312 #if CMK_SMP
2313                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2314 #else
2315                 pre = ptr;
2316                 ptr = ptr->next;
2317 #endif
2318                 break;
2319             }
2320         } else          //memory registration fails
2321         {
2322 #if CMK_SMP
2323             PCQueuePush(sendRdmaBuf, (char*)ptr);
2324 #else
2325             pre = ptr;
2326             ptr = ptr->next;
2327 #endif
2328         }
2329     } //end while
2330 #if ! CMK_SMP
2331     if(ptr == 0)
2332         sendRdmaTail = pre;
2333 #endif
2334 }
2335
2336 // return 1 if all messages are sent
2337 static int SendBufferMsg(SMSG_QUEUE *queue)
2338 {
2339     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2340     CONTROL_MSG         *control_msg_tmp;
2341     gni_return_t        status;
2342     int                 done = 1;
2343     uint64_t            register_size;
2344     void                *register_addr;
2345     int                 index_previous = -1;
2346 #if CMI_EXERT_SEND_CAP
2347     int                 sent_cnt = 0;
2348 #endif
2349
2350 #if CMK_SMP
2351     int          index = 0;
2352 #if ONE_SEND_QUEUE
2353     memset(destpe_avail, 0, mysize * sizeof(char));
2354     for (index=0; index<1; index++)
2355     {
2356         int i, len = PCQueueLength(queue->sendMsgBuf);
2357         for (i=0; i<len; i++) 
2358         {
2359             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2360             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2361             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2362             if(ptr == NULL) break;
2363             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2364                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2365                 continue;
2366             }
2367 #else
2368 #if SMP_LOCKS
2369     int nonempty = PCQueueLength(nonEmptyQueues);
2370     for(index =0; index<nonempty; index++)
2371     {
2372         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2373         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2374         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
2375         if(current_list == NULL) break; 
2376         PCQueue current_queue= current_list-> sendSmsgBuf;
2377         CmiLock(current_list->lock);
2378         int i, len = PCQueueLength(current_queue);
2379         current_list->pushed = 0;
2380         CmiUnlock(current_list->lock);
2381 #else
2382     for(index =0; index<mysize; index++)
2383     {
2384         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2385         int i, len = PCQueueLength(current_queue);
2386 #endif
2387         for (i=0; i<len; i++) 
2388         {
2389             CMI_PCQUEUEPOP_LOCK(current_queue)
2390             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2391             CMI_PCQUEUEPOP_UNLOCK(current_queue)
2392             if (ptr == 0) break;
2393 #endif
2394 #else
2395     int index = queue->smsg_head_index;
2396     while(index != -1)
2397     {
2398         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2399         pre = 0;
2400         while(ptr != 0)
2401         {
2402 #endif
2403             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2404             status = GNI_RC_ERROR_RESOURCE;
2405             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2406                 /* connection not exists yet */
2407             }
2408             else
2409             switch(ptr->tag)
2410             {
2411             case SMALL_DATA_TAG:
2412                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2413                 if(status == GNI_RC_SUCCESS)
2414                 {
2415                     CmiFree(ptr->msg);
2416                 }
2417                 break;
2418             case LMSG_INIT_TAG:
2419                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2420                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2421                 break;
2422             case ACK_TAG:
2423                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2424                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2425                 break;
2426             case BIG_MSG_TAG:
2427                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2428                 if(status == GNI_RC_SUCCESS)
2429                 {
2430                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2431                 }
2432                 break;
2433 #if CMK_DIRECT
2434             case DIRECT_PUT_DONE_TAG:
2435                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2436                 if(status == GNI_RC_SUCCESS)
2437                 {
2438                     free((CMK_DIRECT_HEADER*)ptr->msg);
2439                 }
2440                 break;
2441
2442 #endif
2443             default:
2444                 printf("Weird tag\n");
2445                 CmiAbort("should not happen\n");
2446             }       // end switch
2447             if(status == GNI_RC_SUCCESS)
2448             {
2449 #if !CMK_SMP
2450                 tmp_ptr = ptr;
2451                 if(pre)
2452                 {
2453                     ptr = pre ->next = ptr->next;
2454                 }else
2455                 {
2456                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2457                 }
2458                 FreeMsgList(tmp_ptr);
2459 #else
2460                 FreeMsgList(ptr);
2461 #endif
2462 #if PRINT_SYH
2463                 buffered_smsg_counter--;
2464                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2465 #endif
2466 #if CMI_EXERT_SEND_CAP
2467                 sent_cnt++;
2468                 if(sent_cnt == SEND_CAP)
2469                     break;
2470 #endif
2471             }else {
2472 #if CMK_SMP
2473 #if ONE_SEND_QUEUE
2474                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2475 #else
2476                 PCQueuePush(current_queue, (char*)ptr);
2477 #endif
2478 #else
2479                 pre = ptr;
2480                 ptr=ptr->next;
2481 #endif
2482                 done = 0;
2483                 if(status == GNI_RC_ERROR_RESOURCE)
2484                 {
2485 #if CMK_SMP && ONE_SEND_QUEUE 
2486                     destpe_avail[ptr->destNode] = 1;
2487 #else
2488                     break;
2489 #endif
2490                 }
2491             } 
2492         } //end while
2493 #if !CMK_SMP
2494         if(ptr == 0)
2495             queue->smsg_msglist_index[index].tail = pre;
2496         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2497         {
2498             if(index_previous != -1)
2499                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2500             else
2501                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2502         }
2503         else {
2504             index_previous = index;
2505         }
2506         index = queue->smsg_msglist_index[index].next;
2507 #else
2508 #if !ONE_SEND_QUEUE && SMP_LOCKS
2509         CmiLock(current_list->lock);
2510         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2511         {
2512             current_list->pushed = 1;
2513             PCQueuePush(nonEmptyQueues, current_list);
2514         }
2515         CmiUnlock(current_list->lock); 
2516 #endif
2517 #endif
2518
2519 #if CMI_EXERT_SEND_CAP
2520         if(sent_cnt == SEND_CAP)
2521                 break;
2522 #endif
2523     }   // end pooling for all cores
2524     return done;
2525 }
2526
2527 static void ProcessDeadlock();
2528 void LrtsAdvanceCommunication(int whileidle)
2529 {
2530     static int count = 0;
2531     /*  Receive Msg first */
2532 #if CMK_SMP_TRACE_COMMTHREAD
2533     double startT, endT;
2534 #endif
2535     if (useDynamicSMSG && whileidle)
2536     {
2537 #if CMK_SMP_TRACE_COMMTHREAD
2538         startT = CmiWallTimer();
2539 #endif
2540         PumpDatagramConnection();
2541 #if CMK_SMP_TRACE_COMMTHREAD
2542         endT = CmiWallTimer();
2543         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2544 #endif
2545     }
2546
2547 #if CMK_SMP_TRACE_COMMTHREAD
2548     startT = CmiWallTimer();
2549 #endif
2550     PumpNetworkSmsg();
2551     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2552 #if CMK_SMP_TRACE_COMMTHREAD
2553     endT = CmiWallTimer();
2554     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2555 #endif
2556
2557 #if CMK_SMP_TRACE_COMMTHREAD
2558     startT = CmiWallTimer();
2559 #endif
2560     PumpLocalRdmaTransactions();
2561     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2562 #if CMK_SMP_TRACE_COMMTHREAD
2563     endT = CmiWallTimer();
2564     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2565 #endif
2566     /* Send buffered Message */
2567 #if CMK_SMP_TRACE_COMMTHREAD
2568     startT = CmiWallTimer();
2569 #endif
2570 #if CMK_USE_OOB
2571     if (SendBufferMsg(&smsg_oob_queue) == 1)
2572 #endif
2573     {
2574 #if PIGGYBACK_ACK
2575     //if (count%10 == 0) SendBufferMsg(&smsg_ack_queue);
2576     if (SendBufferMsg(&smsg_queue) == 1) {
2577         //if (count++ % 10 == 0) 
2578         SendBufferMsg(&smsg_ack_queue);
2579     }
2580 #else
2581     SendBufferMsg(&smsg_queue);
2582 #endif
2583     }
2584     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2585 #if CMK_SMP_TRACE_COMMTHREAD
2586     endT = CmiWallTimer();
2587     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2588 #endif
2589
2590 #if CMK_SMP_TRACE_COMMTHREAD
2591     startT = CmiWallTimer();
2592 #endif
2593     SendRdmaMsg();
2594     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2595 #if CMK_SMP_TRACE_COMMTHREAD
2596     endT = CmiWallTimer();
2597     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2598 #endif
2599
2600 #if CMK_SMP
2601     if (_detected_hang)  ProcessDeadlock();
2602 #endif
2603 }
2604
2605 /* useDynamicSMSG */
2606 static void _init_dynamic_smsg()
2607 {
2608     gni_return_t status;
2609     uint32_t     vmdh_index = -1;
2610     int i;
2611
2612     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2613     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2614     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2615     for(i=0; i<mysize; i++) {
2616         smsg_connected_flag[i] = 0;
2617         smsg_attr_vector_local[i] = NULL;
2618         smsg_attr_vector_remote[i] = NULL;
2619     }
2620     if(mysize <=512)
2621     {
2622         SMSG_MAX_MSG = 4096;
2623     }else if (mysize <= 4096)
2624     {
2625         SMSG_MAX_MSG = 4096/mysize * 1024;
2626     }else if (mysize <= 16384)
2627     {
2628         SMSG_MAX_MSG = 512;
2629     }else {
2630         SMSG_MAX_MSG = 256;
2631     }
2632
2633     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2634     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2635     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2636     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2637     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2638
2639     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2640     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2641     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2642     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2643     mailbox_list->offset = 0;
2644     mailbox_list->next = 0;
2645     
2646     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2647         mailbox_list->size, smsg_rx_cqh,
2648         GNI_MEM_READWRITE,   
2649         vmdh_index,
2650         &(mailbox_list->mem_hndl));
2651     GNI_RC_CHECK("MEMORY registration for smsg", status);
2652
2653     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2654     GNI_RC_CHECK("Unbound EP", status);
2655     
2656     alloc_smsg_attr(&send_smsg_attr);
2657
2658     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2659     GNI_RC_CHECK("post unbound datagram", status);
2660
2661       /* always pre-connect to proc 0 */
2662     //if (myrank != 0) connect_to(0);
2663 }
2664
2665 static void _init_static_smsg()
2666 {
2667     gni_smsg_attr_t      *smsg_attr;
2668     gni_smsg_attr_t      remote_smsg_attr;
2669     gni_smsg_attr_t      *smsg_attr_vec;
2670     gni_mem_handle_t     my_smsg_mdh_mailbox;
2671     int      ret, i;
2672     gni_return_t status;
2673     uint32_t              vmdh_index = -1;
2674     mdh_addr_t            base_infor;
2675     mdh_addr_t            *base_addr_vec;
2676     char *env;
2677
2678     if(mysize <=512)
2679     {
2680         SMSG_MAX_MSG = 1024;
2681     }else if (mysize <= 4096)
2682     {
2683         SMSG_MAX_MSG = 1024;
2684     }else if (mysize <= 16384)
2685     {
2686         SMSG_MAX_MSG = 512;
2687     }else {
2688         SMSG_MAX_MSG = 256;
2689     }
2690     
2691     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2692     if (env) SMSG_MAX_MSG = atoi(env);
2693     CmiAssert(SMSG_MAX_MSG > 0);
2694
2695     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2696     
2697     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2698     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2699     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2700     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2701     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2702     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2703     CmiAssert(ret == 0);
2704     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2705     
2706     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2707             smsg_memlen*(mysize), smsg_rx_cqh,
2708             GNI_MEM_READWRITE,   
2709             vmdh_index,
2710             &my_smsg_mdh_mailbox);
2711     register_memory_size += smsg_memlen*(mysize);
2712     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2713
2714     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2715     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2716
2717     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2718     base_infor.mdh =  my_smsg_mdh_mailbox;
2719     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2720
2721     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2722  
2723     for(i=0; i<mysize; i++)
2724     {
2725         if(i==myrank)
2726             continue;
2727         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2728         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2729         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2730         smsg_attr[i].mbox_offset = i*smsg_memlen;
2731         smsg_attr[i].buff_size = smsg_memlen;
2732         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2733         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2734     }
2735
2736     for(i=0; i<mysize; i++)
2737     {
2738         if (myrank == i) continue;
2739
2740         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2741         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2742         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2743         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2744         remote_smsg_attr.buff_size = smsg_memlen;
2745         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2746         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2747
2748         /* initialize the smsg channel */
2749         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2750         GNI_RC_CHECK("SMSG Init", status);
2751     } //end initialization
2752
2753     free(base_addr_vec);
2754     free(smsg_attr);
2755
2756     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2757     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2758
2759
2760 inline
2761 static void _init_send_queue(SMSG_QUEUE *queue)
2762 {
2763      int i;
2764 #if ONE_SEND_QUEUE
2765      queue->sendMsgBuf = PCQueueCreate();
2766      destpe_avail = (char*)malloc(mysize * sizeof(char));
2767 #else
2768      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2769 #if CMK_SMP && SMP_LOCKS
2770      nonEmptyQueues = PCQueueCreate();
2771 #endif
2772      for(i =0; i<mysize; i++)
2773      {
2774 #if CMK_SMP
2775          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2776 #if SMP_LOCKS
2777          queue->smsg_msglist_index[i].pushed = 0;
2778          queue->smsg_msglist_index[i].lock = CmiCreateLock();
2779 #endif
2780 #else
2781          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2782          queue->smsg_msglist_index[i].next = -1;
2783          queue->smsg_head_index = -1;
2784 #endif
2785         
2786      }
2787 #endif
2788 }
2789
2790 inline
2791 static void _init_smsg()
2792 {
2793     if(mysize > 1) {
2794         if (useDynamicSMSG)
2795             _init_dynamic_smsg();
2796         else
2797             _init_static_smsg();
2798     }
2799
2800     _init_send_queue(&smsg_queue);
2801 #if PIGGYBACK_ACK
2802     _init_send_queue(&smsg_ack_queue);
2803 #endif
2804 #if CMK_USE_OOB
2805     _init_send_queue(&smsg_oob_queue);
2806 #endif
2807 }
2808
2809 static void _init_static_msgq()
2810 {
2811     gni_return_t status;
2812     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2813     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2814     msgq_attrs.smsg_q_sz = 1;
2815     msgq_attrs.rcv_pool_sz = 1;
2816     msgq_attrs.num_msgq_eps = 2;
2817     msgq_attrs.nloc_insts = 8;
2818     msgq_attrs.modes = 0;
2819     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2820
2821     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2822     GNI_RC_CHECK("MSGQ Init", status);
2823
2824
2825 }
2826
2827 #if CMK_SMP && STEAL_MEMPOOL
2828 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2829 {
2830     void *pool = NULL;
2831     int i, k;
2832     // check other ranks
2833     for (k=0; k<CmiMyNodeSize()+1; k++) {
2834         i = (CmiMyRank()+k)%CmiMyNodeSize();
2835         if (i==CmiMyRank()) continue;
2836         mempool_type *mptr = CpvAccessOther(mempool, i);
2837         CmiLock(mptr->mempoolLock);
2838         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2839         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2840             CmiUnlock(mptr->mempoolLock);
2841             continue;
2842         }
2843         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2844         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2845             /* search in the free list */
2846           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2847           mempool_header *current = free_header;
2848           while (current) {
2849             if (current->next_free == (char*)header-(char*)mptr) break;
2850             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2851           }
2852           if (current == NULL) {         /* not found in free list */
2853             CmiUnlock(mptr->mempoolLock);
2854             continue;
2855           }
2856 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2857             /* search the previous memblock, and remove the tail */
2858           mempool_block *ptr = (mempool_block *)mptr;
2859           while (ptr) {
2860             if (ptr->memblock_next == mptr->memblock_tail) break;
2861             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2862           }
2863           CmiAssert(ptr!=NULL);
2864           ptr->memblock_next = 0;
2865           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2866
2867             /* remove memblock from the free list */
2868           current->next_free = header->next_free;
2869           if (header == free_header) mptr->freelist_head = header->next_free;
2870
2871           CmiUnlock(mptr->mempoolLock);
2872
2873           pool = (void*)tail;
2874           *mem_hndl = tail->mem_hndl;
2875           *size = tail->size;
2876           return pool;
2877         }
2878         CmiUnlock(mptr->mempoolLock);
2879     }
2880
2881       /* steal failed, deregister and free memblock now */
2882     int freed = 0;
2883     for (k=0; k<CmiMyNodeSize()+1; k++) {
2884         i = (CmiMyRank()+k)%CmiMyNodeSize();
2885         mempool_type *mptr = CpvAccessOther(mempool, i);
2886         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2887
2888         mempool_block *mempools_head = &(mptr->mempools_head);
2889         mempool_block *current = mempools_head;
2890         mempool_block *prev = NULL;
2891
2892         while (current) {
2893           int isfree = 0;
2894           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2895 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2896           mempool_header *cur = free_header;
2897           mempool_header *header;
2898           if (current != mempools_head) {
2899             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2900              /* search in free list */
2901             if (header->size == current->size - sizeof(mempool_block)) {
2902               cur = free_header;
2903               while (cur) {
2904                 if (cur->next_free == (char*)header-(char*)mptr) break;
2905                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2906               }
2907               if (cur != NULL) isfree = 1;
2908             }
2909           }
2910           if (isfree) {
2911               /* remove from free list */
2912             cur->next_free = header->next_free;
2913             if (header == free_header) mptr->freelist_head = header->next_free;
2914              // deregister
2915             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2916             GNI_RC_CHECK("steal Mempool de-register", status);
2917             mempool_block *ptr = current;
2918             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2919             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2920 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2921             freed += ptr->size;
2922             free(ptr);
2923              // try now
2924             if (freed > *size) {
2925               if (pool == NULL) {
2926                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2927                 CmiAssert(ret == 0);
2928               }
2929               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2930               if (status == GNI_RC_SUCCESS) {
2931                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2932 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2933                 return pool;
2934               }
2935 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2936             }
2937           }
2938           else {
2939              prev = current;
2940              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2941           }
2942         }
2943
2944         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2945     }
2946       /* still no luck registering pool */
2947     if (pool) free(pool);
2948     return NULL;
2949 }
2950 #endif
2951
2952 static CmiUInt8 total_mempool_size = 0;
2953 static CmiUInt8 total_mempool_calls = 0;
2954
2955 #if USE_LRTS_MEMPOOL
2956 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2957 {
2958     void *pool;
2959     int ret;
2960     gni_return_t status = GNI_RC_SUCCESS;
2961
2962     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
2963     if (*size < default_size) *size = default_size;
2964 #if LARGEPAGE
2965     // round up to be multiple of _tlbpagesize
2966     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
2967     *size = ALIGNHUGEPAGE(*size);
2968 #endif
2969     total_mempool_size += *size;
2970     total_mempool_calls += 1;
2971 #if   !LARGEPAGE
2972     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
2973     {
2974         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
2975         CmiAbort("alloc_mempool_block");
2976     }
2977 #endif
2978 #if LARGEPAGE
2979     pool = my_get_huge_pages(*size);
2980     ret = pool==NULL;
2981 #else
2982     ret = posix_memalign(&pool, ALIGNBUF, *size);
2983 #endif
2984     if (ret != 0) {
2985 #if CMK_SMP && STEAL_MEMPOOL
2986       pool = steal_mempool_block(size, mem_hndl);
2987       if (pool != NULL) return pool;
2988 #endif
2989       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2990       if (ret == ENOMEM)
2991         CmiAbort("alloc_mempool_block: out of memory.");
2992       else
2993         CmiAbort("alloc_mempool_block: posix_memalign failed");
2994     }
2995 #if LARGEPAGE
2996     CmiMemLock();
2997     register_count++;
2998     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, status);
2999     CmiMemUnlock();
3000     if(status != GNI_RC_SUCCESS) {
3001         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3002 sweep_mempool(CpvAccess(mempool));
3003     }
3004     GNI_RC_CHECK("MEMORY_REGISTER", status);
3005 #else
3006     SetMemHndlZero((*mem_hndl));
3007 #endif
3008     return pool;
3009 }
3010
3011 // ptr is a block head pointer
3012 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3013 {
3014     if(!(IsMemHndlZero(mem_hndl)))
3015     {
3016         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3017     }
3018 #if LARGEPAGE
3019     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3020 #else
3021     free(ptr);
3022 #endif
3023 }
3024 #endif
3025
3026 void LrtsPreCommonInit(int everReturn){
3027 #if USE_LRTS_MEMPOOL
3028     CpvInitialize(mempool_type*, mempool);
3029     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3030     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3031 #endif
3032 }
3033
3034 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3035 {
3036     register int            i;
3037     int                     rc;
3038     int                     device_id = 0;
3039     unsigned int            remote_addr;
3040     gni_cdm_handle_t        cdm_hndl;
3041     gni_return_t            status = GNI_RC_SUCCESS;
3042     uint32_t                vmdh_index = -1;
3043     uint8_t                 ptag;
3044     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3045     int                     first_spawned;
3046     int                     physicalID;
3047     char                   *env;
3048
3049     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3050     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3051     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3052    
3053     status = PMI_Init(&first_spawned);
3054     GNI_RC_CHECK("PMI_Init", status);
3055
3056     status = PMI_Get_size(&mysize);
3057     GNI_RC_CHECK("PMI_Getsize", status);
3058
3059     status = PMI_Get_rank(&myrank);
3060     GNI_RC_CHECK("PMI_getrank", status);
3061
3062     //physicalID = CmiPhysicalNodeID(myrank);
3063     
3064     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3065
3066     *myNodeID = myrank;
3067     *numNodes = mysize;
3068   
3069 #if MULTI_THREAD_SEND
3070     /* Currently, we only consider the case that comm. thread will only recv msgs */
3071     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3072 #endif
3073
3074     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3075     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3076     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3077
3078     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3079     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3080     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3081
3082     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3083     if (env) useDynamicSMSG = 1;
3084     if (!useDynamicSMSG)
3085       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3086     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3087     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3088     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3089     
3090     if(myrank == 0)
3091     {
3092         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3093         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3094     }
3095 #ifdef USE_ONESIDED
3096     onesided_init(NULL, &onesided_hnd);
3097
3098     // this is a GNI test, so use the libonesided bypass functionality
3099     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3100     local_addr = gniGetNicAddress();
3101 #else
3102     ptag = get_ptag();
3103     cookie = get_cookie();
3104 #if 0
3105     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3106 #endif
3107     //Create and attach to the communication  domain */
3108     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3109     GNI_RC_CHECK("GNI_CdmCreate", status);
3110     //* device id The device id is the minor number for the device
3111     //that is assigned to the device by the system when the device is created.
3112     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3113     //where X is the device number 0 default 
3114     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3115     GNI_RC_CHECK("GNI_CdmAttach", status);
3116     local_addr = get_gni_nic_address(0);
3117 #endif
3118     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3119     _MEMCHECK(MPID_UGNI_AllAddr);
3120     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3121     /* create the local completion queue */
3122     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3123     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
3124     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3125     
3126     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
3127     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
3128     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3129
3130     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3131     GNI_RC_CHECK("Create CQ (rx)", status);
3132     
3133     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
3134     //GNI_RC_CHECK("Create Post CQ (rx)", status);
3135     
3136     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3137     //GNI_RC_CHECK("Create BTE CQ", status);
3138
3139     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3140     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3141     _MEMCHECK(ep_hndl_array);
3142 #if MULTI_THREAD_SEND 
3143     tx_cq_lock  = CmiCreateLock();
3144     rx_cq_lock  = CmiCreateLock();
3145 #endif
3146     for (i=0; i<mysize; i++) {
3147         if(i == myrank) continue;
3148         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
3149         GNI_RC_CHECK("GNI_EpCreate ", status);   
3150         remote_addr = MPID_UGNI_AllAddr[i];
3151         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3152         GNI_RC_CHECK("GNI_EpBind ", status);   
3153     }
3154
3155     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3156     _init_smsg();
3157     PMI_Barrier();
3158
3159 #if     USE_LRTS_MEMPOOL
3160     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3161     if (env) {
3162         _totalmem = CmiReadSize(env);
3163         if (myrank == 0)
3164             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3165     }
3166
3167     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3168     if (env) _mempool_size = CmiReadSize(env);
3169     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3170         _mempool_size = CmiReadSize(env);
3171
3172
3173     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3174     if (env) {
3175         MAX_REG_MEM = CmiReadSize(env);
3176         user_set_flag = 1;
3177     }
3178     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3179         MAX_REG_MEM = CmiReadSize(env);
3180         user_set_flag = 1;
3181     }
3182
3183     env = getenv("CHARM_UGNI_SEND_MAX");
3184     if (env) {
3185         MAX_BUFF_SEND = CmiReadSize(env);
3186         user_set_flag = 1;
3187     }
3188     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3189         MAX_BUFF_SEND = CmiReadSize(env);
3190         user_set_flag = 1;
3191     }
3192
3193     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3194     if (env) {
3195         _mempool_size_limit = CmiReadSize(env);
3196     }
3197
3198     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3199     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3200
3201     if (myrank==0) {
3202         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3203         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3204         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3205             /* memblock can expand to BIG_MSG * 2 size */
3206             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3207             CmiAbort("mempool maximum size is too small. \n");
3208         }
3209 #if CMK_SMP && MULTI_THREAD_SEND
3210         printf("Charm++> worker thread sending messages\n");
3211 #elif CMK_SMP && COMM_THREAD_SEND
3212         printf("Charm++> only comm thread send/recv messages\n");
3213 #endif
3214     }
3215
3216 #endif     /* end of USE_LRTS_MEMPOOL */
3217
3218     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3219     if (env) {
3220         BIG_MSG = CmiReadSize(env);
3221         if (BIG_MSG < ONE_SEG)
3222           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3223     }
3224     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3225     if (env) {
3226         BIG_MSG_PIPELINE = atoi(env);
3227     }
3228
3229     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3230     if (env) _checkProgress = 0;
3231     if (mysize == 1) _checkProgress = 0;
3232
3233     
3234     /*
3235     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3236     if (env) 
3237         _tlbpagesize = CmiReadSize(env);
3238     */
3239     /* real gethugepagesize() is only available when hugetlb module linked */
3240     _tlbpagesize = gethugepagesize();
3241     if (myrank == 0) {
3242         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3243     }
3244
3245 #if LARGEPAGE
3246     if (_tlbpagesize == 4096) {
3247         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3248     }
3249 #endif
3250
3251     /* init DMA buffer for medium message */
3252
3253     //_init_DMA_buffer();
3254     
3255     free(MPID_UGNI_AllAddr);
3256 #if CMK_SMP
3257     sendRdmaBuf = PCQueueCreate();
3258 #else
3259     sendRdmaBuf = 0;
3260 #endif
3261 #if MACHINE_DEBUG_LOG
3262     char ln[200];
3263     sprintf(ln,"debugLog.%d",myrank);
3264     debugLog=fopen(ln,"w");
3265 #endif
3266
3267 //    NTK_Init();
3268 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3269 }
3270
3271 void* LrtsAlloc(int n_bytes, int header)
3272 {
3273     void *ptr = NULL;
3274 #if 0
3275     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3276 #endif
3277     if(n_bytes <= SMSG_MAX_MSG)
3278     {
3279         int totalsize = n_bytes+header;
3280         ptr = malloc(totalsize);
3281     }
3282     else {
3283         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3284 #if     USE_LRTS_MEMPOOL
3285         n_bytes = ALIGN64(n_bytes);
3286         if(n_bytes < BIG_MSG)
3287         {
3288             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3289             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3290         }else 
3291         {
3292 #if LARGEPAGE
3293             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3294             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3295             char *res = my_get_huge_pages(n_bytes);
3296 #else
3297             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3298 #endif
3299             if (res) ptr = res + ALIGNBUF - header;
3300         }
3301 #else
3302         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3303         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3304         ptr = res + ALIGNBUF - header;
3305 #endif
3306     }
3307     return ptr;
3308 }
3309
3310 void  LrtsFree(void *msg)
3311 {
3312     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3313     if (size <= SMSG_MAX_MSG)
3314         free(msg);
3315     else {
3316         size = ALIGN64(size);
3317         if(size>=BIG_MSG)
3318         {
3319 #if LARGEPAGE
3320             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3321             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3322 #else
3323             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3324 #endif
3325         }
3326         else {
3327 #if    USE_LRTS_MEMPOOL
3328 #if CMK_SMP
3329             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3330 #else
3331             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3332 #endif
3333 #else
3334             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3335 #endif
3336         }
3337     }
3338 }
3339
3340 void LrtsExit()
3341 {
3342     /* free memory ? */
3343 #if USE_LRTS_MEMPOOL
3344     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
3345     mempool_destroy(CpvAccess(mempool));
3346 #endif
3347     PMI_Finalize();
3348     exit(0);
3349 }
3350
3351 void LrtsDrainResources()
3352 {
3353     if(mysize == 1) return;
3354     while (
3355 #if CMK_USE_OOB
3356            !SendBufferMsg(&smsg_oob_queue) ||
3357 #endif
3358            !SendBufferMsg(&smsg_queue) 
3359 #if PIGGYBACK_ACK
3360         || !SendBufferMsg(&smsg_ack_queue)
3361 #endif
3362           )
3363     {
3364         if (useDynamicSMSG)
3365             PumpDatagramConnection();
3366         PumpNetworkSmsg();
3367         PumpLocalRdmaTransactions();
3368         SendRdmaMsg();
3369     }
3370     PMI_Barrier();
3371 }
3372
3373 void LrtsAbort(const char *message) {
3374     printf("CmiAbort is calling on PE:%d\n", myrank);
3375     CmiPrintStackTrace(0);
3376     PMI_Abort(-1, message);
3377 }
3378
3379 /**************************  TIMER FUNCTIONS **************************/
3380 #if CMK_TIMER_USE_SPECIAL
3381 /* MPI calls are not threadsafe, even the timer on some machines */
3382 static CmiNodeLock  timerLock = 0;
3383 static int _absoluteTime = 0;
3384 static int _is_global = 0;
3385 static struct timespec start_ts;
3386
3387 inline int CmiTimerIsSynchronized() {
3388     return 0;
3389 }
3390
3391 inline int CmiTimerAbsolute() {
3392     return _absoluteTime;
3393 }
3394
3395 double CmiStartTimer() {
3396     return 0.0;
3397 }
3398
3399 double CmiInitTime() {
3400     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
3401 }
3402
3403 void CmiTimerInit(char **argv) {
3404     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3405     if (_absoluteTime && CmiMyPe() == 0)
3406         printf("Charm++> absolute  timer is used\n");
3407     
3408     _is_global = CmiTimerIsSynchronized();
3409
3410
3411     if (_is_global) {
3412         if (CmiMyRank() == 0) {
3413             clock_gettime(CLOCK_MONOTONIC, &start_ts);
3414         }
3415     } else { /* we don't have a synchronous timer, set our own start time */
3416         CmiBarrier();
3417         CmiBarrier();
3418         CmiBarrier();
3419         clock_gettime(CLOCK_MONOTONIC, &start_ts);
3420     }
3421     CmiNodeAllBarrier();          /* for smp */
3422 }
3423
3424 /**
3425  * Since the timerLock is never created, and is
3426  * always NULL, then all the if-condition inside
3427  * the timer functions could be disabled right
3428  * now in the case of SMP.
3429  */
3430 double CmiTimer(void) {
3431     struct timespec now_ts;
3432     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3433     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3434         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3435 }
3436
3437 double CmiWallTimer(void) {
3438     struct timespec now_ts;
3439     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3440     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3441         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
3442 }
3443
3444 double CmiCpuTimer(void) {
3445     struct timespec now_ts;
3446     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3447     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3448         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3449 }
3450
3451 #endif
3452 /************Barrier Related Functions****************/
3453
3454 int CmiBarrier()
3455 {
3456     gni_return_t status;
3457
3458 #if CMK_SMP
3459     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
3460     CmiNodeAllBarrier();
3461     if (CmiMyRank() == CmiMyNodeSize())
3462 #else
3463     if (CmiMyRank() == 0)
3464 #endif
3465     {
3466         /**
3467          *  The call of CmiBarrier is usually before the initialization
3468          *  of trace module of Charm++, therefore, the START_EVENT
3469          *  and END_EVENT are disabled here. -Chao Mei
3470          */
3471         /*START_EVENT();*/
3472         status = PMI_Barrier();
3473         GNI_RC_CHECK("PMI_Barrier", status);
3474         /*END_EVENT(10);*/
3475     }
3476     CmiNodeAllBarrier();
3477     return 0;
3478
3479 }
3480 #if CMK_DIRECT
3481 #include "machine-cmidirect.c"
3482 #endif