disable FMA if using remote events
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/dir.h>
38 #include <sys/stat.h>
39 #include <gni_pub.h>
40 #include <pmi.h>
41 //#include <numatoolkit.h>
42
43 #include "converse.h"
44
45 #if CMK_DIRECT
46 #include "cmidirect.h"
47 #endif
48
49 #define     LARGEPAGE              0
50
51 #if CMK_SMP
52 #define MULTI_THREAD_SEND          0
53 #define COMM_THREAD_SEND           1
54 #endif
55
56 #if MULTI_THREAD_SEND
57 #define CMK_WORKER_SINGLE_TASK     0
58 #endif
59
60 #define REMOTE_EVENT               0
61 #define CQWRITE                    0
62
63 #define CMI_EXERT_SEND_CAP      0
64 #define CMI_EXERT_RECV_CAP      0
65
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP 16
68 #endif
69
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP 2
72 #endif
73
74 #define USE_LRTS_MEMPOOL                  1
75
76 #define PRINT_SYH                         0
77
78 // Trace communication thread
79 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
80 #define TRACE_THRESHOLD     0.00005
81 #define CMI_MPI_TRACE_MOREDETAILED 0
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
84 #else
85 #undef CMK_SMP_TRACE_COMMTHREAD
86 #define CMK_SMP_TRACE_COMMTHREAD 0
87 #endif
88
89 #define CMK_TRACE_COMMOVERHEAD 0
90 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
91 #undef CMI_MPI_TRACE_USEREVENTS
92 #define CMI_MPI_TRACE_USEREVENTS 1
93 #else
94 #undef CMK_TRACE_COMMOVERHEAD
95 #define CMK_TRACE_COMMOVERHEAD 0
96 #endif
97
98 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
99 CpvStaticDeclare(double, projTraceStart);
100 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
101 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
102 #else
103 #define  START_EVENT()
104 #define  END_EVENT(x)
105 #endif
106
107 #if USE_LRTS_MEMPOOL
108
109 #define oneMB (1024ll*1024)
110 #define oneGB (1024ll*1024*1024)
111
112 static CmiInt8 _mempool_size = 8*oneMB;
113 static CmiInt8 _expand_mem =  4*oneMB;
114 static CmiInt8 _mempool_size_limit = 0;
115
116 static CmiInt8 _totalmem = 0.8*oneGB;
117
118 #if LARGEPAGE
119 static CmiInt8 BIG_MSG  =  16*oneMB;
120 static CmiInt8 ONE_SEG  =  4*oneMB;
121 #else
122 static CmiInt8 BIG_MSG  =  4*oneMB;
123 static CmiInt8 ONE_SEG  =  2*oneMB;
124 #endif
125 #if MULTI_THREAD_SEND
126 static int BIG_MSG_PIPELINE = 1;
127 #else
128 static int BIG_MSG_PIPELINE = 4;
129 #endif
130
131 // dynamic flow control
132 static CmiInt8 buffered_send_msg = 0;
133 static CmiInt8 register_memory_size = 0;
134
135 #if LARGEPAGE
136 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
137 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
138 static CmiInt8  register_count = 0;
139 #else
140 #if CMK_SMP && COMM_THREAD_SEND 
141 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
142 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
143 #else
144 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
145 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
146 #endif
147
148
149 #endif
150
151 #endif     /* end USE_LRTS_MEMPOOL */
152
153 #if MULTI_THREAD_SEND 
154 #define     CMI_GNI_LOCK(x)       CmiLock(x);
155 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
156 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
157 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
158 #else
159 #define     CMI_GNI_LOCK(x)
160 #define     CMI_GNI_UNLOCK(x)
161 #define     CMI_PCQUEUEPOP_LOCK(Q)   
162 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
163 #endif
164
165 static int _tlbpagesize = 4096;
166
167 //static int _smpd_count  = 0;
168
169 static int   user_set_flag  = 0;
170
171 static int _checkProgress = 1;             /* check deadlock */
172 static int _detected_hang = 0;
173
174 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
175
176 // dynamic SMSG
177 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
178
179 static int avg_smsg_connection = 32;
180 static int                 *smsg_connected_flag= 0;
181 static gni_smsg_attr_t     **smsg_attr_vector_local;
182 static gni_smsg_attr_t     **smsg_attr_vector_remote;
183 static gni_ep_handle_t     ep_hndl_unbound;
184 static gni_smsg_attr_t     send_smsg_attr;
185 static gni_smsg_attr_t     recv_smsg_attr;
186
187 typedef struct _dynamic_smsg_mailbox{
188    void     *mailbox_base;
189    int      size;
190    int      offset;
191    gni_mem_handle_t  mem_hndl;
192    struct      _dynamic_smsg_mailbox  *next;
193 }dynamic_smsg_mailbox_t;
194
195 static dynamic_smsg_mailbox_t  *mailbox_list;
196
197 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
198 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
199
200 #if PRINT_SYH
201 int         lrts_send_msg_id = 0;
202 int         lrts_local_done_msg = 0;
203 int         lrts_send_rdma_success = 0;
204 #endif
205
206 #include "machine.h"
207
208 #include "pcqueue.h"
209
210 #include "mempool.h"
211
212 #if CMK_PERSISTENT_COMM
213 #include "machine-persistent.h"
214 #endif
215
216 //#define  USE_ONESIDED 1
217 #ifdef USE_ONESIDED
218 //onesided implementation is wrong, since no place to restore omdh
219 #include "onesided.h"
220 onesided_hnd_t   onesided_hnd;
221 onesided_md_t    omdh;
222 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
223
224 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
225
226 #else
227 uint8_t   onesided_hnd, omdh;
228
229 #if REMOTE_EVENT || CQWRITE 
230 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
231     if(register_memory_size+size>= MAX_REG_MEM) { \
232         status = GNI_RC_ERROR_NOMEM;} \
233     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
234         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
235 #else
236 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
237         if (register_memory_size + size >= MAX_REG_MEM) { \
238             status = GNI_RC_ERROR_NOMEM; \
239         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
240             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
241 #endif
242
243 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
244     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
245              register_memory_size -= size; \
246          else CmiAbort("MEM_DEregister");  \
247     } while (0)
248 #endif
249
250 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
251 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
252 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
253 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
254 #define   IncreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)++
255 #define   DecreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)--
256 #define   IncreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)++
257 #define   DecreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)--
258 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
259 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
260 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
261 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
262 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
263 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
264
265 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
266 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
267
268 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
269 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
270 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
271 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
272
273 #define ALIGNBUF                64
274
275 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
276 /* If SMSG is not used */
277
278 #define FMA_PER_CORE  1024
279 #define FMA_BUFFER_SIZE 1024
280
281 /* If SMSG is used */
282 static int  SMSG_MAX_MSG = 1024;
283 #define SMSG_MAX_CREDIT 72 
284
285 #define MSGQ_MAXSIZE       2048
286
287 /* large message transfer with FMA or BTE */
288 #if ! REMOTE_EVENT
289 #define LRTS_GNI_RDMA_THRESHOLD  1024 
290 #else
291    /* remote events only work with RDMA */
292 #define LRTS_GNI_RDMA_THRESHOLD  0 
293 #endif
294
295 #if CMK_SMP
296 static int  REMOTE_QUEUE_ENTRIES=163840; 
297 static int LOCAL_QUEUE_ENTRIES=163840; 
298 #else
299 static int  REMOTE_QUEUE_ENTRIES=20480;
300 static int LOCAL_QUEUE_ENTRIES=20480; 
301 #endif
302
303 #define BIG_MSG_TAG             0x26
304 #define PUT_DONE_TAG            0x28
305 #define DIRECT_PUT_DONE_TAG     0x29
306 #define ACK_TAG                 0x30
307 /* SMSG is data message */
308 #define SMALL_DATA_TAG          0x31
309 /* SMSG is a control message to initialize a BTE */
310 #define LMSG_INIT_TAG           0x39 
311
312 #define DEBUG
313 #ifdef GNI_RC_CHECK
314 #undef GNI_RC_CHECK
315 #endif
316 #ifdef DEBUG
317 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
318 #else
319 #define GNI_RC_CHECK(msg,rc)
320 #endif
321
322 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
323 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
324 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
325
326 static int useStaticMSGQ = 0;
327 static int useStaticFMA = 0;
328 static int mysize, myrank;
329 static gni_nic_handle_t   nic_hndl;
330
331 typedef struct {
332     gni_mem_handle_t mdh;
333     uint64_t addr;
334 } mdh_addr_t ;
335 // this is related to dynamic SMSG
336
337 typedef struct mdh_addr_list{
338     gni_mem_handle_t mdh;
339    void *addr;
340     struct mdh_addr_list *next;
341 }mdh_addr_list_t;
342
343 static unsigned int         smsg_memlen;
344 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
345 mdh_addr_t          setup_mem;
346 mdh_addr_t          *smsg_connection_vec = 0;
347 gni_mem_handle_t    smsg_connection_memhndl;
348 static int          smsg_expand_slots = 10;
349 static int          smsg_available_slot = 0;
350 static void         *smsg_mailbox_mempool = 0;
351 mdh_addr_list_t     *smsg_dynamic_list = 0;
352
353 static void             *smsg_mailbox_base;
354 gni_msgq_attr_t         msgq_attrs;
355 gni_msgq_handle_t       msgq_handle;
356 gni_msgq_ep_attr_t      msgq_ep_attrs;
357 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
358
359 /* =====Beginning of Declarations of Machine Specific Variables===== */
360 static int cookie;
361 static int modes = 0;
362 static gni_cq_handle_t       smsg_rx_cqh = NULL;
363 static gni_cq_handle_t       default_tx_cqh = NULL;
364 static gni_cq_handle_t       rdma_tx_cqh = NULL;
365 static gni_cq_handle_t       rdma_rx_cqh = NULL;
366 static gni_cq_handle_t       post_tx_cqh = NULL;
367 static gni_ep_handle_t       *ep_hndl_array;
368
369 static CmiNodeLock           *ep_lock_array;
370 static CmiNodeLock           default_tx_cq_lock; 
371 static CmiNodeLock           rdma_tx_cq_lock; 
372 static CmiNodeLock           global_gni_lock; 
373 static CmiNodeLock           rx_cq_lock;
374 static CmiNodeLock           smsg_mailbox_lock;
375 static CmiNodeLock           smsg_rx_cq_lock;
376 static CmiNodeLock           *mempool_lock;
377 //#define     CMK_WITH_STATS      0
378 typedef struct msg_list
379 {
380     uint32_t destNode;
381     uint32_t size;
382     void *msg;
383     uint8_t tag;
384 #if !CMK_SMP
385     struct msg_list *next;
386 #endif
387 #if CMK_WITH_STATS
388     double  creation_time;
389 #endif
390 }MSG_LIST;
391
392
393 typedef struct control_msg
394 {
395     uint64_t            source_addr;    /* address from the start of buffer  */
396     uint64_t            dest_addr;      /* address from the start of buffer */
397     int                 total_length;   /* total length */
398     int                 length;         /* length of this packet */
399 #if REMOTE_EVENT
400     int                 ack_index;      /* index from integer to address */
401 #endif
402     uint8_t             seq_id;         //big message   0 meaning single message
403     gni_mem_handle_t    source_mem_hndl;
404     struct control_msg *next;
405 } CONTROL_MSG;
406
407 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
408
409 typedef struct ack_msg
410 {
411     uint64_t            source_addr;    /* address from the start of buffer  */
412 #if ! USE_LRTS_MEMPOOL
413     gni_mem_handle_t    source_mem_hndl;
414     int                 length;          /* total length */
415 #endif
416     struct ack_msg     *next;
417 } ACK_MSG;
418
419 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
420
421 #if CMK_DIRECT
422 typedef struct{
423     uint64_t    handler_addr;
424 }CMK_DIRECT_HEADER;
425
426 typedef struct {
427     char core[CmiMsgHeaderSizeBytes];
428     uint64_t handler;
429 }cmidirectMsg;
430
431 //SYH
432 CpvDeclare(int, CmiHandleDirectIdx);
433 void CmiHandleDirectMsg(cmidirectMsg* msg)
434 {
435
436     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
437    (*(_handle->callbackFnPtr))(_handle->callbackData);
438    CmiFree(msg);
439 }
440
441 void CmiDirectInit()
442 {
443     CpvInitialize(int,  CmiHandleDirectIdx);
444     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
445 }
446
447 #endif
448 typedef struct  rmda_msg
449 {
450     int                   destNode;
451 #if REMOTE_EVENT
452     int                   ack_index;
453 #endif
454     gni_post_descriptor_t *pd;
455 #if !CMK_SMP
456     struct  rmda_msg      *next;
457 #endif
458 }RDMA_REQUEST;
459
460
461 #if CMK_SMP
462 #define SMP_LOCKS               0
463 #define ONE_SEND_QUEUE                  0
464 PCQueue sendRdmaBuf;
465 typedef struct  msg_list_index
466 {
467     PCQueue     sendSmsgBuf;
468     int         pushed;
469     CmiNodeLock   lock;
470 } MSG_LIST_INDEX;
471 char                *destpe_avail;
472 #if  !ONE_SEND_QUEUE && SMP_LOCKS
473     PCQueue     nonEmptyQueues;
474 #endif
475 #else         /* non-smp */
476
477 static RDMA_REQUEST        *sendRdmaBuf = 0;
478 static RDMA_REQUEST        *sendRdmaTail = 0;
479 typedef struct  msg_list_index
480 {
481     int         next;
482     MSG_LIST    *sendSmsgBuf;
483     MSG_LIST    *tail;
484 } MSG_LIST_INDEX;
485
486 #endif
487
488 // buffered send queue
489 #if ! ONE_SEND_QUEUE
490 typedef struct smsg_queue
491 {
492     MSG_LIST_INDEX   *smsg_msglist_index;
493     int               smsg_head_index;
494 } SMSG_QUEUE;
495 #else
496 typedef struct smsg_queue
497 {
498     PCQueue       sendMsgBuf;
499 }  SMSG_QUEUE;
500 #endif
501
502 SMSG_QUEUE                  smsg_queue;
503 #if CMK_USE_OOB
504 SMSG_QUEUE                  smsg_oob_queue;
505 #endif
506
507 #if CMK_SMP
508
509 #define FreeMsgList(d)   free(d);
510 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
511
512 #else
513
514 static MSG_LIST       *msglist_freelist=0;
515
516 #define FreeMsgList(d)  \
517   do { \
518   (d)->next = msglist_freelist;\
519   msglist_freelist = d; \
520   } while (0)
521
522 #define MallocMsgList(d) \
523   do {  \
524   d = msglist_freelist;\
525   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
526              _MEMCHECK(d);\
527   } else msglist_freelist = d->next; \
528   d->next =0;  \
529   } while (0)
530
531 #endif
532
533 #if CMK_SMP
534
535 #define FreeControlMsg(d)      free(d);
536 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
537
538 #else
539
540 static CONTROL_MSG    *control_freelist=0;
541
542 #define FreeControlMsg(d)       \
543   do { \
544   (d)->next = control_freelist;\
545   control_freelist = d; \
546   } while (0);
547
548 #define MallocControlMsg(d) \
549   do {  \
550   d = control_freelist;\
551   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
552              _MEMCHECK(d);\
553   } else control_freelist = d->next;  \
554   } while (0);
555
556 #endif
557
558 #if CMK_SMP
559
560 #define FreeAckMsg(d)      free(d);
561 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
562
563 #else
564
565 static ACK_MSG        *ack_freelist=0;
566
567 #define FreeAckMsg(d)       \
568   do { \
569   (d)->next = ack_freelist;\
570   ack_freelist = d; \
571   } while (0)
572
573 #define MallocAckMsg(d) \
574   do { \
575   d = ack_freelist;\
576   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
577              _MEMCHECK(d);\
578   } else ack_freelist = d->next; \
579   } while (0)
580
581 #endif
582
583
584 # if CMK_SMP
585 #define FreeRdmaRequest(d)       free(d);
586 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
587 #else
588
589 static RDMA_REQUEST         *rdma_freelist = NULL;
590
591 #define FreeRdmaRequest(d)       \
592   do {  \
593   (d)->next = rdma_freelist;\
594   rdma_freelist = d;    \
595   } while (0)
596
597 #define MallocRdmaRequest(d) \
598   do {   \
599   d = rdma_freelist;\
600   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
601              _MEMCHECK(d);\
602   } else rdma_freelist = d->next; \
603   d->next =0;   \
604   } while (0)
605 #endif
606
607 /* reuse gni_post_descriptor_t */
608 static gni_post_descriptor_t *post_freelist=0;
609
610 #if  !CMK_SMP
611 #define FreePostDesc(d)       \
612     (d)->next_descr = post_freelist;\
613     post_freelist = d;
614
615 #define MallocPostDesc(d) \
616   d = post_freelist;\
617   if (d==0) { \
618      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
619      d->next_descr = 0;\
620       _MEMCHECK(d);\
621   } else post_freelist = d->next_descr;
622 #else
623
624 #define FreePostDesc(d)     free(d);
625 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
626
627 #endif
628
629
630 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
631 static int      buffered_smsg_counter = 0;
632
633 /* SmsgSend return success but message sent is not confirmed by remote side */
634 static MSG_LIST *buffered_fma_head = 0;
635 static MSG_LIST *buffered_fma_tail = 0;
636
637 /* functions  */
638 #define IsFree(a,ind)  !( a& (1<<(ind) ))
639 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
640 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
641
642 CpvDeclare(mempool_type*, mempool);
643
644 #if REMOTE_EVENT
645 /* ack pool for remote events */
646
647 #define ACK_SHIFT                  19
648 #define ACK_EVENT(idx)             ((idx<<ACK_SHIFT) | myrank)
649 #define ACK_GET_RANK(evt)          (evt & ((1<<ACK_SHIFT)-1))
650 #define ACK_GET_INDEX(evt)         (evt >> ACK_SHIFT)
651
652 struct AckPool {
653 void *addr;
654 int next;
655 };
656
657 static struct AckPool   *ackpool;
658 static int    ackpoolsize;
659 static int    ackpool_freehead;
660 static CmiNodeLock  ackpool_lock;
661
662 #define  GetAckAddress(s)          (ackpool[s].addr)
663
664 static void AckPool_init()
665 {
666     int i;
667     if ((1<<ACK_SHIFT) < mysize) 
668         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
669     ackpoolsize = 2048;
670     ackpool = (struct AckPool *)malloc(ackpoolsize*sizeof(struct AckPool));
671     for (i=0; i<ackpoolsize-1; i++) {
672         ackpool[i].next = i+1;
673     }
674     ackpool[i].next = -1;
675     ackpool_freehead = 0;
676 #if MULTI_THREAD_SEND 
677     ackpool_lock  = CmiCreateLock();
678 #endif
679 }
680
681 static
682 inline int AckPool_getslot(void *addr)
683 {
684     int i;
685     int s;
686     CMI_GNI_LOCK(ackpool_lock);
687     s = ackpool_freehead;
688     if (s == -1) {
689         int newsize = ackpoolsize * 2;
690         printf("[%d] AckPool_getslot expand to: %d\n", myrank, newsize);
691         if (newsize > (1<<(32-ACK_SHIFT))) CmiAbort("AckPool too large");
692         struct AckPool *old_ackpool = ackpool;
693         ackpool = (struct AckPool *)malloc(newsize*sizeof(struct AckPool));
694         memcpy(ackpool, old_ackpool, ackpoolsize*sizeof(struct AckPool));
695         for (i=ackpoolsize; i<newsize-1; i++) {
696             ackpool[i].next = i+1;
697         }
698         ackpool[i].next = -1;
699         ackpool_freehead = ackpoolsize;
700         s = ackpoolsize;
701         ackpoolsize = newsize;
702         free(old_ackpool);
703     }
704     ackpool_freehead = ackpool[s].next;
705     ackpool[s].addr = addr;
706     CMI_GNI_UNLOCK(ackpool_lock);
707     return s;
708 }
709
710 static
711 inline  void AckPool_freeslot(int s)
712 {
713     CmiAssert(s>=0 && s<ackpoolsize);
714     CMI_GNI_LOCK(ackpool_lock);
715     ackpool[s].next = ackpool_freehead;
716     ackpool_freehead = s;
717     CMI_GNI_UNLOCK(ackpool_lock);
718 }
719
720
721 #endif
722
723 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
724 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
725 #define CHARM_MAGIC_NUMBER               126
726
727 #if CMK_ERROR_CHECKING
728 extern unsigned char computeCheckSum(unsigned char *data, int len);
729 static int checksum_flag = 0;
730 #define CMI_SET_CHECKSUM(msg, len)      \
731         if (checksum_flag)  {   \
732           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
733           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
734         }
735 #define CMI_CHECK_CHECKSUM(msg, len)    \
736         if (checksum_flag)      \
737           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
738             CmiAbort("Fatal error: checksum doesn't agree!\n");
739 #else
740 #define CMI_SET_CHECKSUM(msg, len)
741 #define CMI_CHECK_CHECKSUM(msg, len)
742 #endif
743 /* =====End of Definitions of Message-Corruption Related Macros=====*/
744
745 #if CMK_WITH_STATS
746 FILE *counterLog = NULL;
747 typedef struct comm_thread_stats
748 {
749     uint64_t  smsg_data_count;
750     uint64_t  lmsg_init_count;
751     uint64_t  ack_count;
752     uint64_t  big_msg_ack_count;
753     uint64_t  smsg_count;
754     //times of calling SmsgSend
755     uint64_t  try_smsg_data_count;
756     uint64_t  try_lmsg_init_count;
757     uint64_t  try_ack_count;
758     uint64_t  try_big_msg_ack_count;
759     uint64_t  try_smsg_count;
760     
761     double    max_time_in_send_buffered_smsg;
762     double    all_time_in_send_buffered_smsg;
763
764     uint64_t  rdma_count;
765     uint64_t  try_rdma_count;
766     double    max_time_from_control_to_rdma_init;
767     double    all_time_from_control_to_rdma_init;
768
769     double    max_time_from_rdma_init_to_rdma_done;
770     double    all_time_from_rdma_init_to_rdma_done;
771 } Comm_Thread_Stats;
772
773 static Comm_Thread_Stats   comm_stats;
774
775 static void init_comm_stats()
776 {
777   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
778 }
779
780 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
781
782 #define SMSG_SENT_DONE(creation_time, tag)  \
783         if (print_stats) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
784             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
785             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
786             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
787             comm_stats.smsg_count++; \
788             double inbuff_time = CmiWallTimer() - creation_time;   \
789             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
790             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
791         }
792
793 #define SMSG_TRY_SEND(tag)  \
794         if (print_stats){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
795             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
796             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
797             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
798             comm_stats.try_smsg_count++; \
799         }
800
801 #define  RDMA_TRY_SEND()        if (print_stats) {comm_stats.try_rdma_count++;}
802
803 #define  RDMA_TRANS_DONE(x)      \
804          if (print_stats) {  double rdma_trans_time = CmiWallTimer() - x ; \
805              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
806              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
807          }
808
809 #define  RDMA_TRANS_INIT(x)      \
810          if (print_stats) {   comm_stats.rdma_count++;  \
811              double rdma_trans_time = CmiWallTimer() - x ; \
812              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
813              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
814          }
815
816
817 static void print_comm_stats()
818 {
819     fprintf(counterLog, "Node[%d]SMSG time in buffer\t[max:%f\tAverage:%f](milisecond)\n", myrank, 1000*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
820     fprintf(counterLog, "Node[%d]Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n", myrank, 
821             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
822             comm_stats.ack_count, comm_stats.big_msg_ack_count);
823     
824     fprintf(counterLog, "Node[%d]SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n\n", myrank, 
825             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
826             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count);
827
828     fprintf(counterLog, "Node[%d]Rdma Transaction [count:%lld\t calls:%lld]\n", myrank, comm_stats.rdma_count, comm_stats.try_rdma_count);
829     fprintf(counterLog, "Node[%d]Rdma time from control arrives to rdma init [MAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/comm_stats.rdma_count); 
830     fprintf(counterLog, "Node[%d]Rdma time from init to rdma done [MAX:%f\t Average:%f](milisecond)\n\n", myrank, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/comm_stats.rdma_count); 
831 }
832
833 #else
834 #define STATS_ACK_TIME(x)            x
835 #define STATS_SEND_SMSGS_TIME(x)     x
836 #endif
837
838 static int print_stats = 0;
839
840 static void
841 allgather(void *in,void *out, int len)
842 {
843     static int *ivec_ptr=NULL,already_called=0,job_size=0;
844     int i,rc;
845     int my_rank;
846     char *tmp_buf,*out_ptr;
847
848     if(!already_called) {
849
850         rc = PMI_Get_size(&job_size);
851         CmiAssert(rc == PMI_SUCCESS);
852         rc = PMI_Get_rank(&my_rank);
853         CmiAssert(rc == PMI_SUCCESS);
854
855         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
856         CmiAssert(ivec_ptr != NULL);
857
858         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
859         CmiAssert(rc == PMI_SUCCESS);
860
861         already_called = 1;
862
863     }
864
865     tmp_buf = (char *)malloc(job_size * len);
866     CmiAssert(tmp_buf);
867
868     rc = PMI_Allgather(in,tmp_buf,len);
869     CmiAssert(rc == PMI_SUCCESS);
870
871     out_ptr = out;
872
873     for(i=0;i<job_size;i++) {
874
875         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
876
877     }
878
879     free(tmp_buf);
880 }
881
882 static void
883 allgather_2(void *in,void *out, int len)
884 {
885     //PMI_Allgather is out of order
886     int i,rc, extend_len;
887     int  rank_index;
888     char *out_ptr, *out_ref;
889     char *in2;
890
891     extend_len = sizeof(int) + len;
892     in2 = (char*)malloc(extend_len);
893
894     memcpy(in2, &myrank, sizeof(int));
895     memcpy(in2+sizeof(int), in, len);
896
897     out_ptr = (char*)malloc(mysize*extend_len);
898
899     rc = PMI_Allgather(in2, out_ptr, extend_len);
900     GNI_RC_CHECK("allgather", rc);
901
902     out_ref = out;
903
904     for(i=0;i<mysize;i++) {
905         //rank index 
906         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
907         //copy to the rank index slot
908         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
909     }
910
911     free(out_ptr);
912     free(in2);
913
914 }
915
916 static unsigned int get_gni_nic_address(int device_id)
917 {
918     unsigned int address, cpu_id;
919     gni_return_t status;
920     int i, alps_dev_id=-1,alps_address=-1;
921     char *token, *p_ptr;
922
923     p_ptr = getenv("PMI_GNI_DEV_ID");
924     if (!p_ptr) {
925         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
926        
927         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
928     } else {
929         while ((token = strtok(p_ptr,":")) != NULL) {
930             alps_dev_id = atoi(token);
931             if (alps_dev_id == device_id) {
932                 break;
933             }
934             p_ptr = NULL;
935         }
936         CmiAssert(alps_dev_id != -1);
937         p_ptr = getenv("PMI_GNI_LOC_ADDR");
938         CmiAssert(p_ptr != NULL);
939         i = 0;
940         while ((token = strtok(p_ptr,":")) != NULL) {
941             if (i == alps_dev_id) {
942                 alps_address = atoi(token);
943                 break;
944             }
945             p_ptr = NULL;
946             ++i;
947         }
948         CmiAssert(alps_address != -1);
949         address = alps_address;
950     }
951     return address;
952 }
953
954 static uint8_t get_ptag(void)
955 {
956     char *p_ptr, *token;
957     uint8_t ptag;
958
959     p_ptr = getenv("PMI_GNI_PTAG");
960     CmiAssert(p_ptr != NULL);
961     token = strtok(p_ptr, ":");
962     ptag = (uint8_t)atoi(token);
963     return ptag;
964         
965 }
966
967 static uint32_t get_cookie(void)
968 {
969     uint32_t cookie;
970     char *p_ptr, *token;
971
972     p_ptr = getenv("PMI_GNI_COOKIE");
973     CmiAssert(p_ptr != NULL);
974     token = strtok(p_ptr, ":");
975     cookie = (uint32_t)atoi(token);
976
977     return cookie;
978 }
979
980 #if LARGEPAGE
981
982 /* directly mmap memory from hugetlbfs for large pages */
983
984 #include <sys/stat.h>
985 #include <fcntl.h>
986 #include <sys/mman.h>
987 #include <hugetlbfs.h>
988
989 // size must be _tlbpagesize aligned
990 void *my_get_huge_pages(size_t size)
991 {
992     char filename[512];
993     int fd;
994     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
995     void *ptr = NULL;
996
997     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
998     fd = open(filename, O_RDWR | O_CREAT, mode);
999     if (fd == -1) {
1000         CmiAbort("my_get_huge_pages: open filed");
1001     }
1002     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1003     if (ptr == MAP_FAILED) ptr = NULL;
1004 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1005     close(fd);
1006     unlink(filename);
1007     return ptr;
1008 }
1009
1010 void my_free_huge_pages(void *ptr, int size)
1011 {
1012 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1013     int ret = munmap(ptr, size);
1014     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1015 }
1016
1017 #endif
1018
1019 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1020 /* TODO: add any that are related */
1021 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1022
1023
1024 #include "machine-lrts.h"
1025 #include "machine-common-core.c"
1026
1027 /* Network progress function is used to poll the network when for
1028    messages. This flushes receive buffers on some  implementations*/
1029 #if CMK_MACHINE_PROGRESS_DEFINED
1030 void CmiMachineProgressImpl() {
1031 }
1032 #endif
1033
1034 static int SendBufferMsg(SMSG_QUEUE *queue);
1035 static void SendRdmaMsg();
1036 static void PumpNetworkSmsg();
1037 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1038 #if CQWRITE
1039 static void PumpCqWriteTransactions();
1040 #endif
1041 #if REMOTE_EVENT
1042 static void PumpRemoteTransactions();
1043 #endif
1044
1045 #if MACHINE_DEBUG_LOG
1046 FILE *debugLog = NULL;
1047 static CmiInt8 buffered_recv_msg = 0;
1048 int         lrts_smsg_success = 0;
1049 int         lrts_received_msg = 0;
1050 #endif
1051
1052 static void sweep_mempool(mempool_type *mptr)
1053 {
1054     int n = 0;
1055     block_header *current = &(mptr->block_head);
1056
1057     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1058     while( current!= NULL) {
1059         printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1060         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1061     }
1062     printf("[n %d] sweep_mempool slot END.\n", myrank);
1063 }
1064
1065 inline
1066 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1067 {
1068     block_header *current = *from;
1069
1070     //while(register_memory_size>= MAX_REG_MEM)
1071     //{
1072         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1073             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1074
1075         *from = current;
1076         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1077         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1078         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1079     //}
1080     return GNI_RC_SUCCESS;
1081 }
1082
1083 inline 
1084 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1085 {
1086     gni_return_t status = GNI_RC_SUCCESS;
1087     //int size = GetMempoolsize(msg);
1088     //void *blockaddr = GetMempoolBlockPtr(msg);
1089     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1090    
1091     block_header *current = &(mptr->block_head);
1092     while(register_memory_size>= MAX_REG_MEM)
1093     {
1094         status = deregisterMemory(mptr, &current);
1095         if (status != GNI_RC_SUCCESS) break;
1096     }
1097     if(register_memory_size>= MAX_REG_MEM) return status;
1098
1099     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1100     while(1)
1101     {
1102         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1103         if(status == GNI_RC_SUCCESS)
1104         {
1105             break;
1106         }
1107         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1108         {
1109             CmiAbort("Memory registor for mempool fails\n");
1110         }
1111         else
1112         {
1113             status = deregisterMemory(mptr, &current);
1114             if (status != GNI_RC_SUCCESS) break;
1115         }
1116     }; 
1117     return status;
1118 }
1119
1120 inline 
1121 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1122 {
1123     static int rank = -1;
1124     int i;
1125     gni_return_t status;
1126     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1127     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1128     mempool_type *mptr;
1129
1130     status = registerFromMempool(mptr1, msg, size, t, cqh);
1131     if (status == GNI_RC_SUCCESS) return status;
1132 #if CMK_SMP 
1133     for (i=0; i<CmiMyNodeSize()+1; i++) {
1134       rank = (rank+1)%(CmiMyNodeSize()+1);
1135       mptr = CpvAccessOther(mempool, rank);
1136       if (mptr == mptr1) continue;
1137       status = registerFromMempool(mptr, msg, size, t, cqh);
1138       if (status == GNI_RC_SUCCESS) return status;
1139     }
1140 #endif
1141     return  GNI_RC_ERROR_RESOURCE;
1142 }
1143
1144 inline
1145 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1146 {
1147     MSG_LIST        *msg_tmp;
1148     MallocMsgList(msg_tmp);
1149     msg_tmp->destNode = destNode;
1150     msg_tmp->size   = size;
1151     msg_tmp->msg    = msg;
1152     msg_tmp->tag    = tag;
1153 #if CMK_WITH_STATS
1154     SMSG_CREATION(msg_tmp)
1155 #endif
1156 #if !CMK_SMP
1157     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1158         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1159         queue->smsg_head_index = destNode;
1160         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1161     }else
1162     {
1163         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1164         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1165     }
1166 #else
1167 #if ONE_SEND_QUEUE
1168     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1169 #else
1170 #if SMP_LOCKS
1171     CmiLock(queue->smsg_msglist_index[destNode].lock);
1172     if(queue->smsg_msglist_index[destNode].pushed == 0)
1173     {
1174         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1175     }
1176     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1177     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1178 #else
1179     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1180 #endif
1181 #endif
1182 #endif
1183 #if PRINT_SYH
1184     buffered_smsg_counter++;
1185 #endif
1186 }
1187
1188 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1189 {
1190     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1191 }
1192
1193 inline
1194 static void setup_smsg_connection(int destNode)
1195 {
1196     mdh_addr_list_t  *new_entry = 0;
1197     gni_post_descriptor_t *pd;
1198     gni_smsg_attr_t      *smsg_attr;
1199     gni_return_t status = GNI_RC_NOT_DONE;
1200     RDMA_REQUEST        *rdma_request_msg;
1201     
1202     if(smsg_available_slot == smsg_expand_slots)
1203     {
1204         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1205         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1206         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1207
1208         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1209             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1210             GNI_MEM_READWRITE,   
1211             -1,
1212             &(new_entry->mdh));
1213         smsg_available_slot = 0; 
1214         new_entry->next = smsg_dynamic_list;
1215         smsg_dynamic_list = new_entry;
1216     }
1217     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1218     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1219     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1220     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1221     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1222     smsg_attr->buff_size = smsg_memlen;
1223     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1224     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1225     smsg_local_attr_vec[destNode] = smsg_attr;
1226     smsg_available_slot++;
1227     MallocPostDesc(pd);
1228     pd->type            = GNI_POST_FMA_PUT;
1229     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1230     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1231     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1232     pd->length          = sizeof(gni_smsg_attr_t);
1233     pd->local_addr      = (uint64_t) smsg_attr;
1234     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1235     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1236     pd->src_cq_hndl     = rdma_tx_cqh;
1237     pd->rdma_mode       = 0;
1238     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1239     print_smsg_attr(smsg_attr);
1240     if(status == GNI_RC_ERROR_RESOURCE )
1241     {
1242         MallocRdmaRequest(rdma_request_msg);
1243         rdma_request_msg->destNode = destNode;
1244         rdma_request_msg->pd = pd;
1245         /* buffer this request */
1246     }
1247 #if PRINT_SYH
1248     if(status != GNI_RC_SUCCESS)
1249        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1250     else
1251         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1252 #endif
1253 }
1254
1255 /* useDynamicSMSG */
1256 inline 
1257 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1258 {
1259     gni_return_t status = GNI_RC_NOT_DONE;
1260
1261     if(mailbox_list->offset == mailbox_list->size)
1262     {
1263         dynamic_smsg_mailbox_t *new_mailbox_entry;
1264         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1265         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1266         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1267         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1268         new_mailbox_entry->offset = 0;
1269         
1270         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1271             new_mailbox_entry->size, smsg_rx_cqh,
1272             GNI_MEM_READWRITE,   
1273             -1,
1274             &(new_mailbox_entry->mem_hndl));
1275
1276         GNI_RC_CHECK("register", status);
1277         new_mailbox_entry->next = mailbox_list;
1278         mailbox_list = new_mailbox_entry;
1279     }
1280     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1281     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1282     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1283     local_smsg_attr->mbox_offset = mailbox_list->offset;
1284     mailbox_list->offset += smsg_memlen;
1285     local_smsg_attr->buff_size = smsg_memlen;
1286     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1287     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1288 }
1289
1290 /* useDynamicSMSG */
1291 inline 
1292 static int connect_to(int destNode)
1293 {
1294     gni_return_t status = GNI_RC_NOT_DONE;
1295     CmiAssert(smsg_connected_flag[destNode] == 0);
1296     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1297     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1298     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1299     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1300     
1301     CMI_GNI_LOCK(global_gni_lock)
1302     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1303     CMI_GNI_UNLOCK(global_gni_lock)
1304     if (status == GNI_RC_ERROR_RESOURCE) {
1305       /* possibly destNode is making connection at the same time */
1306       free(smsg_attr_vector_local[destNode]);
1307       smsg_attr_vector_local[destNode] = NULL;
1308       free(smsg_attr_vector_remote[destNode]);
1309       smsg_attr_vector_remote[destNode] = NULL;
1310       mailbox_list->offset -= smsg_memlen;
1311       return 0;
1312     }
1313     GNI_RC_CHECK("GNI_Post", status);
1314     smsg_connected_flag[destNode] = 1;
1315     return 1;
1316 }
1317
1318 inline 
1319 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1320 {
1321     unsigned int          remote_address;
1322     uint32_t              remote_id;
1323     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1324     gni_smsg_attr_t       *smsg_attr;
1325     gni_post_descriptor_t *pd;
1326     gni_post_state_t      post_state;
1327     char                  *real_data; 
1328
1329     if (useDynamicSMSG) {
1330         switch (smsg_connected_flag[destNode]) {
1331         case 0: 
1332             connect_to(destNode);         /* continue to case 1 */
1333         case 1:                           /* pending connection, do nothing */
1334             status = GNI_RC_NOT_DONE;
1335             if(inbuff ==0)
1336                 buffer_small_msgs(queue, msg, size, destNode, tag);
1337             return status;
1338         }
1339     }
1340 #if CMK_SMP
1341 #if ! ONE_SEND_QUEUE
1342     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1343 #endif
1344     {
1345 #else
1346     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1347     {
1348 #endif
1349         //CMI_GNI_LOCK(smsg_mailbox_lock)
1350         CMI_GNI_LOCK(default_tx_cq_lock)
1351 #if CMK_SMP_TRACE_COMMTHREAD
1352         int oldpe = -1;
1353         int oldeventid = -1;
1354         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1355         { 
1356             START_EVENT();
1357             if ( tag == SMALL_DATA_TAG)
1358                 real_data = (char*)msg; 
1359             else 
1360                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1361             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1362             TRACE_COMM_SET_COMM_MSGID(real_data);
1363         }
1364 #endif
1365 #if REMOTE_EVENT
1366         if (tag == LMSG_INIT_TAG) {
1367             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1368             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1369                 control_msg_tmp->ack_index = AckPool_getslot((void*)control_msg_tmp->source_addr);
1370         }
1371         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1372 #endif
1373 #if     CMK_WITH_STATS
1374         SMSG_TRY_SEND(tag)
1375 #endif
1376 #if CMK_WITH_STATS
1377     double              creation_time;
1378     if (ptr == NULL)
1379         creation_time = CmiWallTimer();
1380     else
1381         creation_time = ptr->creation_time;
1382 #endif
1383
1384     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1385 #if CMK_SMP_TRACE_COMMTHREAD
1386         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1387 #endif
1388         CMI_GNI_UNLOCK(default_tx_cq_lock)
1389         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1390         if(status == GNI_RC_SUCCESS)
1391         {
1392 #if     CMK_WITH_STATS
1393             SMSG_SENT_DONE(creation_time,tag) 
1394 #endif
1395 #if CMK_SMP_TRACE_COMMTHREAD
1396             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1397             { 
1398                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1399             }
1400 #endif
1401         }else
1402             status = GNI_RC_ERROR_RESOURCE;
1403     }
1404     if(status != GNI_RC_SUCCESS && inbuff ==0)
1405         buffer_small_msgs(queue, msg, size, destNode, tag);
1406     return status;
1407 }
1408
1409 inline 
1410 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1411 {
1412     /* construct a control message and send */
1413     CONTROL_MSG         *control_msg_tmp;
1414     MallocControlMsg(control_msg_tmp);
1415     control_msg_tmp->source_addr = (uint64_t)msg;
1416     control_msg_tmp->seq_id    = seqno;
1417     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1418 #if REMOTE_EVENT
1419     control_msg_tmp->ack_index    =  -1;
1420 #endif
1421 #if     USE_LRTS_MEMPOOL
1422     if(size < BIG_MSG)
1423     {
1424         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1425     }
1426     else
1427     {
1428         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1429         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1430         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1431     }
1432 #else
1433     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1434 #endif
1435     return control_msg_tmp;
1436 }
1437
1438 #define BLOCKING_SEND_CONTROL    0
1439
1440 // Large message, send control to receiver, receiver register memory and do a GET, 
1441 // return 1 - send no success
1442 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1443 {
1444     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1445     uint32_t            vmdh_index  = -1;
1446     int                 size;
1447     int                 offset = 0;
1448     uint64_t            source_addr;
1449     int                 register_size; 
1450     void                *msg;
1451
1452     size    =   control_msg_tmp->total_length;
1453     source_addr = control_msg_tmp->source_addr;
1454     register_size = control_msg_tmp->length;
1455
1456 #if  USE_LRTS_MEMPOOL
1457     if( control_msg_tmp->seq_id == 0 ){
1458 #if BLOCKING_SEND_CONTROL
1459         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1460             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1461                 LrtsAdvanceCommunication(0);
1462         }
1463 #endif
1464         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1465         {
1466             msg = (void*)source_addr;
1467             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1468             {
1469                 if(!inbuff)
1470                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1471                 return GNI_RC_ERROR_NOMEM;
1472             }
1473             //register the corresponding mempool
1474             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1475             if(status == GNI_RC_SUCCESS)
1476             {
1477                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1478             }
1479         }else
1480         {
1481             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1482             status = GNI_RC_SUCCESS;
1483         }
1484         if(NoMsgInSend(source_addr))
1485             register_size = GetMempoolsize((void*)(source_addr));
1486         else
1487             register_size = 0;
1488     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1489     {
1490         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1491         source_addr += offset;
1492         size = control_msg_tmp->length;
1493 #if BLOCKING_SEND_CONTROL
1494         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1495             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1496                 LrtsAdvanceCommunication(0);
1497         }
1498 #endif
1499         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1500             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1501             {
1502                 if(!inbuff)
1503                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1504                 return GNI_RC_ERROR_NOMEM;
1505             }
1506             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1507             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1508         }
1509         else
1510         {
1511             status = GNI_RC_SUCCESS;
1512         }
1513         register_size = 0;  
1514     }
1515
1516     if(status == GNI_RC_SUCCESS)
1517     {
1518         status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1519         if(status == GNI_RC_SUCCESS)
1520         {
1521             buffered_send_msg += register_size;
1522             if(control_msg_tmp->seq_id == 0)
1523             {
1524                 IncreaseMsgInSend(source_addr);
1525             }
1526             FreeControlMsg(control_msg_tmp);
1527             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1528         }else
1529             status = GNI_RC_ERROR_RESOURCE;
1530
1531     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1532     {
1533         CmiAbort("Memory registor for large msg\n");
1534     }else 
1535     {
1536         status = GNI_RC_ERROR_NOMEM; 
1537         if(!inbuff)
1538             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1539     }
1540     return status;
1541 #else
1542     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1543     if(status == GNI_RC_SUCCESS)
1544     {
1545         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1546         if(status == GNI_RC_SUCCESS)
1547         {
1548             FreeControlMsg(control_msg_tmp);
1549         }
1550     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1551     {
1552         CmiAbort("Memory registor for large msg\n");
1553     }else 
1554     {
1555         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1556     }
1557     return status;
1558 #endif
1559 }
1560
1561 inline void LrtsPrepareEnvelope(char *msg, int size)
1562 {
1563     CmiSetMsgSize(msg, size);
1564     CMI_SET_CHECKSUM(msg, size);
1565 }
1566
1567 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1568 {
1569     gni_return_t        status  =   GNI_RC_SUCCESS;
1570     uint8_t tag;
1571     CONTROL_MSG         *control_msg_tmp;
1572     int                 oob = ( mode & OUT_OF_BAND);
1573     SMSG_QUEUE          *queue;
1574
1575     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1576 #if CMK_USE_OOB
1577     queue = oob? &smsg_oob_queue : &smsg_queue;
1578 #else
1579     queue = &smsg_queue;
1580 #endif
1581
1582     LrtsPrepareEnvelope(msg, size);
1583
1584 #if PRINT_SYH
1585     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1586 #endif 
1587 #if CMK_SMP 
1588     if(size <= SMSG_MAX_MSG)
1589         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1590     else if (size < BIG_MSG) {
1591         control_msg_tmp =  construct_control_msg(size, msg, 0);
1592         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1593     }
1594     else {
1595           CmiSetMsgSeq(msg, 0);
1596           control_msg_tmp =  construct_control_msg(size, msg, 1);
1597           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1598     }
1599 #else   //non-smp, smp(worker sending)
1600     if(size <= SMSG_MAX_MSG)
1601     {
1602         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1603             CmiFree(msg);
1604     }
1605     else if (size < BIG_MSG) {
1606         control_msg_tmp =  construct_control_msg(size, msg, 0);
1607         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1608     }
1609     else {
1610 #if     USE_LRTS_MEMPOOL
1611         CmiSetMsgSeq(msg, 0);
1612         control_msg_tmp =  construct_control_msg(size, msg, 1);
1613         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1614 #else
1615         control_msg_tmp =  construct_control_msg(size, msg, 0);
1616         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1617 #endif
1618     }
1619 #endif
1620     return 0;
1621 }
1622
1623 static void    PumpDatagramConnection();
1624 static      int         event_SetupConnect = 111;
1625 static      int         event_PumpSmsg = 222 ;
1626 static      int         event_PumpTransaction = 333;
1627 static      int         event_PumpRdmaTransaction = 444;
1628 static      int         event_SendBufferSmsg = 444;
1629 static      int         event_SendFmaRdmaMsg = 555;
1630 static      int         event_AdvanceCommunication = 666;
1631
1632 static void registerUserTraceEvents() {
1633 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1634     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1635     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1636     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1637     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1638     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1639     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1640     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1641 #endif
1642 }
1643
1644 static void ProcessDeadlock()
1645 {
1646     static CmiUInt8 *ptr = NULL;
1647     static CmiUInt8  last = 0, mysum, sum;
1648     static int count = 0;
1649     gni_return_t status;
1650     int i;
1651
1652 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1653 //sweep_mempool(CpvAccess(mempool));
1654     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1655     mysum = smsg_send_count + smsg_recv_count;
1656     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1657     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1658     GNI_RC_CHECK("PMI_Allgather", status);
1659     sum = 0;
1660     for (i=0; i<mysize; i++)  sum+= ptr[i];
1661     if (last == 0 || sum == last) 
1662         count++;
1663     else
1664         count = 0;
1665     last = sum;
1666     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1667     if (count == 2) { 
1668         /* detected twice, it is a real deadlock */
1669         if (myrank == 0)  {
1670             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1671             CmiAbort("Fatal> Deadlock detected.");
1672         }
1673
1674     }
1675     _detected_hang = 0;
1676 }
1677
1678 static void CheckProgress()
1679 {
1680     if (smsg_send_count == last_smsg_send_count &&
1681         smsg_recv_count == last_smsg_recv_count ) 
1682     {
1683         _detected_hang = 1;
1684 #if !CMK_SMP
1685         if (_detected_hang) ProcessDeadlock();
1686 #endif
1687
1688     }
1689     else {
1690         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1691         last_smsg_send_count = smsg_send_count;
1692         last_smsg_recv_count = smsg_recv_count;
1693         _detected_hang = 0;
1694     }
1695 }
1696
1697 static void set_limit()
1698 {
1699     //if (!user_set_flag && CmiMyRank() == 0) {
1700     if (CmiMyRank() == 0) {
1701         int mynode = CmiPhysicalNodeID(CmiMyPe());
1702         int numpes = CmiNumPesOnPhysicalNode(mynode);
1703         int numprocesses = numpes / CmiMyNodeSize();
1704         MAX_REG_MEM  = _totalmem / numprocesses;
1705         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1706         if (CmiMyPe() == 0)
1707            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1708         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1709         {
1710              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1711              CmiAbort("memory registration\n");
1712         }
1713     }
1714 }
1715
1716 void LrtsPostCommonInit(int everReturn)
1717 {
1718 #if CMK_DIRECT
1719     CmiDirectInit();
1720 #endif
1721 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1722     CpvInitialize(double, projTraceStart);
1723     /* only PE 0 needs to care about registration (to generate sts file). */
1724     //if (CmiMyPe() == 0) 
1725     {
1726         registerMachineUserEventsFunction(&registerUserTraceEvents);
1727     }
1728 #endif
1729
1730 #if CMK_SMP
1731     CmiIdleState *s=CmiNotifyGetState();
1732     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1733     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1734 #else
1735     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1736     if (useDynamicSMSG)
1737     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1738 #endif
1739
1740 #if ! LARGEPAGE
1741     if (_checkProgress)
1742 #if CMK_SMP
1743     if (CmiMyRank() == 0)
1744 #endif
1745     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1746 #endif
1747  
1748 #if !LARGEPAGE
1749     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1750 #endif
1751 }
1752
1753 /* this is called by worker thread */
1754 void LrtsPostNonLocal(){
1755 #if CMK_SMP_TRACE_COMMTHREAD
1756     double startT, endT;
1757 #endif
1758 #if MULTI_THREAD_SEND
1759     if(mysize == 1) return;
1760 #if CMK_SMP_TRACE_COMMTHREAD
1761     traceEndIdle();
1762 #endif
1763
1764 #if CMK_SMP_TRACE_COMMTHREAD
1765     startT = CmiWallTimer();
1766 #endif
1767
1768 #if CMK_WORKER_SINGLE_TASK
1769     if (CmiMyRank() % 6 == 0)
1770 #endif
1771     PumpNetworkSmsg();
1772
1773 #if CMK_WORKER_SINGLE_TASK
1774     if (CmiMyRank() % 6 == 1)
1775 #endif
1776     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1777
1778 #if CMK_WORKER_SINGLE_TASK
1779     if (CmiMyRank() % 6 == 2)
1780 #endif
1781     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1782
1783 #if REMOTE_EVENT
1784 #if CMK_WORKER_SINGLE_TASK
1785     if (CmiMyRank() % 6 == 3)
1786 #endif
1787     PumpRemoteTransactions();
1788 #endif
1789
1790 #if CMK_USE_OOB
1791     if (SendBufferMsg(&smsg_oob_queue) == 1)
1792 #endif
1793     {
1794 #if CMK_WORKER_SINGLE_TASK
1795     if (CmiMyRank() % 6 == 4)
1796 #endif
1797         SendBufferMsg(&smsg_queue);
1798     }
1799
1800 #if CMK_WORKER_SINGLE_TASK
1801     if (CmiMyRank() % 6 == 5)
1802 #endif
1803     SendRdmaMsg();
1804
1805 #if CMK_SMP_TRACE_COMMTHREAD
1806     endT = CmiWallTimer();
1807     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1808 #endif
1809 #if CMK_SMP_TRACE_COMMTHREAD
1810     traceBeginIdle();
1811 #endif
1812 #endif
1813 }
1814
1815 /* useDynamicSMSG */
1816 static void    PumpDatagramConnection()
1817 {
1818     uint32_t          remote_address;
1819     uint32_t          remote_id;
1820     gni_return_t status;
1821     gni_post_state_t  post_state;
1822     uint64_t          datagram_id;
1823     int i;
1824
1825    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1826    {
1827        if (datagram_id >= mysize) {           /* bound endpoint */
1828            int pe = datagram_id - mysize;
1829            CMI_GNI_LOCK(global_gni_lock)
1830            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1831            CMI_GNI_UNLOCK(global_gni_lock)
1832            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1833            {
1834                CmiAssert(remote_id == pe);
1835                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1836                GNI_RC_CHECK("Dynamic SMSG Init", status);
1837 #if PRINT_SYH
1838                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1839 #endif
1840                CmiAssert(smsg_connected_flag[pe] == 1);
1841                smsg_connected_flag[pe] = 2;
1842            }
1843        }
1844        else {         /* unbound ep */
1845            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1846            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1847            {
1848                CmiAssert(remote_id<mysize);
1849                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1850                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1851                GNI_RC_CHECK("Dynamic SMSG Init", status);
1852 #if PRINT_SYH
1853                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1854 #endif
1855                smsg_connected_flag[remote_id] = 2;
1856
1857                alloc_smsg_attr(&send_smsg_attr);
1858                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1859                GNI_RC_CHECK("post unbound datagram", status);
1860            }
1861        }
1862    }
1863 }
1864
1865 /* pooling CQ to receive network message */
1866 static void PumpNetworkRdmaMsgs()
1867 {
1868     gni_cq_entry_t      event_data;
1869     gni_return_t        status;
1870
1871 }
1872
1873 inline 
1874 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
1875 {
1876     RDMA_REQUEST        *rdma_request_msg;
1877     MallocRdmaRequest(rdma_request_msg);
1878     rdma_request_msg->destNode = inst_id;
1879     rdma_request_msg->pd = pd;
1880 #if REMOTE_EVENT
1881     rdma_request_msg->ack_index = ack_index;
1882 #endif
1883 #if CMK_SMP
1884     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1885 #else
1886     if(sendRdmaBuf == 0)
1887     {
1888         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1889     }else{
1890         sendRdmaTail->next = rdma_request_msg;
1891         sendRdmaTail =  rdma_request_msg;
1892     }
1893 #endif
1894
1895 }
1896
1897 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1898
1899 static void PumpNetworkSmsg()
1900 {
1901     uint64_t            inst_id;
1902     int                 ret;
1903     gni_cq_entry_t      event_data;
1904     gni_return_t        status, status2;
1905     void                *header;
1906     uint8_t             msg_tag;
1907     int                 msg_nbytes;
1908     void                *msg_data;
1909     gni_mem_handle_t    msg_mem_hndl;
1910     gni_smsg_attr_t     *smsg_attr;
1911     gni_smsg_attr_t     *remote_smsg_attr;
1912     int                 init_flag;
1913     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1914     uint64_t            source_addr;
1915     SMSG_QUEUE         *queue = &smsg_queue;
1916 #if     CMK_DIRECT
1917     cmidirectMsg        *direct_msg;
1918 #endif
1919     while(1)
1920     {
1921         CMI_GNI_LOCK(smsg_rx_cq_lock)
1922         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1923         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
1924         if(status != GNI_RC_SUCCESS)
1925             break;
1926         inst_id = GNI_CQ_GET_INST_ID(event_data);
1927 #if REMOTE_EVENT
1928         inst_id = ACK_GET_RANK(inst_id);
1929 #endif
1930         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1931 #if PRINT_SYH
1932         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1933 #endif
1934         if (useDynamicSMSG) {
1935             /* subtle: smsg may come before connection is setup */
1936             while (smsg_connected_flag[inst_id] != 2) 
1937                PumpDatagramConnection();
1938         }
1939         msg_tag = GNI_SMSG_ANY_TAG;
1940         while(1) {
1941             CMI_GNI_LOCK(smsg_mailbox_lock)
1942             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1943             if (status != GNI_RC_SUCCESS)
1944             {
1945                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1946                 break;
1947             }
1948 #if PRINT_SYH
1949             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1950 #endif
1951             /* copy msg out and then put into queue (small message) */
1952             switch (msg_tag) {
1953             case SMALL_DATA_TAG:
1954             {
1955                 START_EVENT();
1956                 msg_nbytes = CmiGetMsgSize(header);
1957                 msg_data    = CmiAlloc(msg_nbytes);
1958                 memcpy(msg_data, (char*)header, msg_nbytes);
1959                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1960                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1961                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1962                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
1963                 handleOneRecvedMsg(msg_nbytes, msg_data);
1964                 break;
1965             }
1966             case LMSG_INIT_TAG:
1967             {
1968 #if MULTI_THREAD_SEND
1969                 MallocControlMsg(control_msg_tmp);
1970                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
1971                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1972                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1973                 getLargeMsgRequest(control_msg_tmp, inst_id);
1974                 FreeControlMsg(control_msg_tmp);
1975 #else
1976                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1977                 getLargeMsgRequest(header, inst_id);
1978                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1979 #endif
1980                 break;
1981             }
1982             case ACK_TAG:   //msg fit into mempool
1983             {
1984                 /* Get is done, release message . Now put is not used yet*/
1985                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1986                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1987                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1988 #if ! USE_LRTS_MEMPOOL
1989                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1990 #else
1991                 DecreaseMsgInSend(msg);
1992 #endif
1993                 if(NoMsgInSend(msg))
1994                     buffered_send_msg -= GetMempoolsize(msg);
1995                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1996                 CmiFree(msg);
1997                 break;
1998             }
1999             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2000             {
2001 #if MULTI_THREAD_SEND
2002                 MallocControlMsg(header_tmp);
2003                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2004                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2005 #else
2006                 header_tmp = (CONTROL_MSG *) header;
2007 #endif
2008                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2009                 void *msg = (void*)(header_tmp->source_addr);
2010                 int cur_seq = CmiGetMsgSeq(msg);
2011                 int offset = ONE_SEG*(cur_seq+1);
2012                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2013                 buffered_send_msg -= header_tmp->length;
2014                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2015                 if (remain_size < 0) remain_size = 0;
2016                 CmiSetMsgSize(msg, remain_size);
2017                 if(remain_size <= 0) //transaction done
2018                 {
2019                     CmiFree(msg);
2020                 }else if (header_tmp->total_length > offset)
2021                 {
2022                     CmiSetMsgSeq(msg, cur_seq+1);
2023                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2024                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2025                     //send next seg
2026                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2027                          // pipelining
2028                     if (header_tmp->seq_id == 1) {
2029                       int i;
2030                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2031                         int seq = cur_seq+i+2;
2032                         CmiSetMsgSeq(msg, seq-1);
2033                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2034                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2035                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2036                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2037                       }
2038                     }
2039                 }
2040 #if MULTI_THREAD_SEND
2041                 FreeControlMsg(header_tmp);
2042 #else
2043                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2044 #endif
2045                 break;
2046             }
2047 #if CMK_PERSISTENT_COMM
2048             case PUT_DONE_TAG:  {   //persistent message
2049                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2050                 int size = ((CONTROL_MSG *) header)->length;
2051                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2052                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2053                 CmiReference(msg);
2054                 CMI_CHECK_CHECKSUM(msg, size);
2055                 handleOneRecvedMsg(size, msg); 
2056 #if PRINT_SYH
2057                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2058 #endif
2059                 break;
2060             }
2061 #endif
2062 #if CMK_DIRECT
2063             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2064                 //create a trigger message
2065                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2066                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2067                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2068                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2069                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2070                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2071                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2072                 break;
2073 #endif
2074             default: {
2075                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2076                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2077                 printf("weird tag problem\n");
2078                 CmiAbort("Unknown tag\n");
2079                      }
2080             }               // end switch
2081 #if PRINT_SYH
2082             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2083 #endif
2084             smsg_recv_count ++;
2085             msg_tag = GNI_SMSG_ANY_TAG;
2086         } //endwhile getNext
2087     }   //end while GetEvent
2088     if(status == GNI_RC_ERROR_RESOURCE)
2089     {
2090         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2091         GNI_RC_CHECK("Smsg_rx_cq full", status);
2092     }
2093 }
2094
2095 static void printDesc(gni_post_descriptor_t *pd)
2096 {
2097     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2098 }
2099
2100 #if CQWRITE
2101 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2102 {
2103     gni_post_descriptor_t *pd;
2104     gni_return_t        status = GNI_RC_SUCCESS;
2105     
2106     MallocPostDesc(pd);
2107     pd->type = GNI_POST_CQWRITE;
2108     pd->cq_mode = GNI_CQMODE_SILENT;
2109     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2110     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2111     pd->cqwrite_value = data;
2112     pd->remote_mem_hndl = mem_hndl;
2113     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2114     GNI_RC_CHECK("GNI_PostCqWrite", status);
2115 }
2116 #endif
2117
2118 // register memory for a message
2119 // return mem handle
2120 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2121 {
2122     gni_return_t status = GNI_RC_SUCCESS;
2123
2124     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2125
2126 #if CMK_PERSISTENT_COMM
2127         // persistent message is always registered
2128     if (!IsMemHndlZero(MEMHFIELD(msg))) {
2129         *memh = MEMHFIELD(msg);
2130         return GNI_RC_SUCCESS;
2131     }
2132 #endif
2133     if(seqno == 0)
2134     {
2135         if(IsMemHndlZero((GetMemHndl(msg))))
2136         {
2137             msg = (void*)(msg);
2138             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2139             if(status == GNI_RC_SUCCESS)
2140                 *memh = GetMemHndl(msg);
2141         }
2142         else {
2143             *memh = GetMemHndl(msg);
2144         }
2145     }
2146     else {
2147         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2148         status = registerMemory(msg, size, memh, NULL); 
2149     }
2150     return status;
2151 }
2152
2153 // for BIG_MSG called on receiver side for receiving control message
2154 // LMSG_INIT_TAG
2155 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2156 {
2157 #if     USE_LRTS_MEMPOOL
2158     CONTROL_MSG         *request_msg;
2159     gni_return_t        status = GNI_RC_SUCCESS;
2160     void                *msg_data;
2161     gni_post_descriptor_t *pd;
2162     gni_mem_handle_t    msg_mem_hndl;
2163     int source, size, transaction_size, offset = 0;
2164     size_t     register_size = 0;
2165
2166     // initial a get to transfer data from the sender side */
2167     request_msg = (CONTROL_MSG *) header;
2168     size = request_msg->total_length;
2169     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2170     MallocPostDesc(pd);
2171 #if CMK_WITH_STATS 
2172     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2173 #endif
2174     if(request_msg->seq_id < 2)   {
2175 #if CMK_SMP_TRACE_COMMTHREAD 
2176         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2177 #endif
2178         msg_data = CmiAlloc(size);
2179         CmiSetMsgSeq(msg_data, 0);
2180         _MEMCHECK(msg_data);
2181     }
2182     else {
2183         offset = ONE_SEG*(request_msg->seq_id-1);
2184         msg_data = (char*)request_msg->dest_addr + offset;
2185     }
2186    
2187     pd->cqwrite_value = request_msg->seq_id;
2188
2189     SetMemHndlZero(pd->local_mem_hndl);
2190     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2191     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2192     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2193         if(NoMsgInRecv( (void*)(msg_data)))
2194             register_size = GetMempoolsize((void*)(msg_data));
2195         else
2196             register_size = 0;
2197     }
2198
2199 #if 0
2200     if( request_msg->seq_id == 0)
2201     {
2202         pd->local_mem_hndl= GetMemHndl(msg_data);
2203         transaction_size = ALIGN64(size);
2204         if(IsMemHndlZero(pd->local_mem_hndl))
2205         {   
2206             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)), rdma_rx_cqh);
2207             if(status == GNI_RC_SUCCESS)
2208             {
2209                 pd->local_mem_hndl = GetMemHndl(msg_data);
2210             }
2211             else
2212             {
2213                 SetMemHndlZero(pd->local_mem_hndl);
2214             }
2215         }
2216         if(NoMsgInRecv( (void*)(msg_data)))
2217             register_size = GetMempoolsize((void*)(msg_data));
2218         else
2219             register_size = 0;
2220     }
2221     else{
2222         transaction_size = ALIGN64(request_msg->length);
2223         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl), NULL); 
2224         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2225         {
2226             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2227         }
2228     }
2229 #endif
2230     pd->first_operand = ALIGN64(size);                   //  total length
2231
2232     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2233         pd->type            = GNI_POST_FMA_GET;
2234     else
2235         pd->type            = GNI_POST_RDMA_GET;
2236     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2237     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2238     pd->length          = transaction_size;
2239     pd->local_addr      = (uint64_t) msg_data;
2240     pd->remote_addr     = request_msg->source_addr + offset;
2241     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2242     pd->src_cq_hndl     = rdma_tx_cqh;
2243     pd->rdma_mode       = 0;
2244     pd->amo_cmd         = 0;
2245
2246     //memory registration success
2247     if(status == GNI_RC_SUCCESS)
2248     {
2249         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2250         CMI_GNI_LOCK(lock)
2251 #if REMOTE_EVENT
2252         if( request_msg->seq_id == 0)
2253         {
2254             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2255             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2256             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2257         }
2258 #endif
2259
2260 #if CMK_WITH_STATS
2261         RDMA_TRY_SEND()
2262 #endif
2263         if(pd->type == GNI_POST_RDMA_GET) 
2264         {
2265             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2266         }
2267         else
2268         {
2269             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2270         }
2271         CMI_GNI_UNLOCK(lock)
2272
2273         if(status == GNI_RC_SUCCESS )
2274         {
2275             if(pd->cqwrite_value == 0)
2276             {
2277 #if MACHINE_DEBUG_LOG
2278                 buffered_recv_msg += register_size;
2279                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2280 #endif
2281                 IncreaseMsgInRecv(msg_data);
2282 #if CMK_SMP_TRACE_COMMTHREAD 
2283                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2284 #endif
2285             }
2286 #if  CMK_WITH_STATS
2287                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2288                 RDMA_TRANS_INIT(pd->sync_flag_addr/1000000.0)
2289 #endif
2290         }
2291     }else
2292     {
2293         SetMemHndlZero((pd->local_mem_hndl));
2294     }
2295     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2296     {
2297 #if REMOTE_EVENT
2298         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2299 #else
2300         bufferRdmaMsg(inst_id, pd, -1); 
2301 #endif
2302     }else {
2303          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
2304         GNI_RC_CHECK("GetLargeAFter posting", status);
2305     }
2306 #else
2307     CONTROL_MSG         *request_msg;
2308     gni_return_t        status;
2309     void                *msg_data;
2310     gni_post_descriptor_t *pd;
2311     RDMA_REQUEST        *rdma_request_msg;
2312     gni_mem_handle_t    msg_mem_hndl;
2313     //int source;
2314     // initial a get to transfer data from the sender side */
2315     request_msg = (CONTROL_MSG *) header;
2316     msg_data = CmiAlloc(request_msg->length);
2317     _MEMCHECK(msg_data);
2318
2319     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2320
2321     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2322     {
2323         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2324     }
2325
2326     MallocPostDesc(pd);
2327     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2328         pd->type            = GNI_POST_FMA_GET;
2329     else
2330         pd->type            = GNI_POST_RDMA_GET;
2331     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2332     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2333     pd->length          = ALIGN64(request_msg->length);
2334     pd->local_addr      = (uint64_t) msg_data;
2335     pd->remote_addr     = request_msg->source_addr;
2336     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2337     pd->src_cq_hndl     = rdma_tx_cqh;
2338     pd->rdma_mode       = 0;
2339     pd->amo_cmd         = 0;
2340
2341     //memory registration successful
2342     if(status == GNI_RC_SUCCESS)
2343     {
2344         pd->local_mem_hndl  = msg_mem_hndl;
2345        
2346         if(pd->type == GNI_POST_RDMA_GET) 
2347         {
2348             CMI_GNI_LOCK(rdma_tx_cq_lock)
2349             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2350             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2351         }
2352         else
2353         {
2354             CMI_GNI_LOCK(default_tx_cq_lock)
2355             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2356             CMI_GNI_UNLOCK(default_tx_cq_lock)
2357         }
2358
2359     }else
2360     {
2361         SetMemHndlZero(pd->local_mem_hndl);
2362     }
2363     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2364     {
2365         MallocRdmaRequest(rdma_request_msg);
2366         rdma_request_msg->next = 0;
2367         rdma_request_msg->destNode = inst_id;
2368         rdma_request_msg->pd = pd;
2369         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2370     }else {
2371         GNI_RC_CHECK("AFter posting", status);
2372     }
2373 #endif
2374 }
2375
2376 #if CQWRITE
2377 static void PumpCqWriteTransactions()
2378 {
2379
2380     gni_cq_entry_t          ev;
2381     gni_return_t            status;
2382     void                    *msg;   
2383     while(1) {
2384         //CMI_GNI_LOCK(my_cq_lock) 
2385         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2386         //CMI_GNI_UNLOCK(my_cq_lock)
2387         if(status != GNI_RC_SUCCESS) break;
2388         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2389
2390         DecreaseMsgInSend(msg);
2391 #if ! USE_LRTS_MEMPOOL
2392        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2393 #else
2394         DecreaseMsgInSend(msg);
2395 #endif
2396         if(NoMsgInSend(msg))
2397             buffered_send_msg -= GetMempoolsize(msg);
2398         CmiFree(msg);
2399     };
2400     if(status == GNI_RC_ERROR_RESOURCE)
2401     {
2402         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2403     }
2404 }
2405 #endif
2406
2407 #if REMOTE_EVENT
2408 static void PumpRemoteTransactions()
2409 {
2410     gni_cq_entry_t          ev;
2411     gni_return_t            status;
2412     void                    *msg;   
2413     int                     slot;
2414
2415     while(1) {
2416         CMI_GNI_LOCK(global_gni_lock)
2417         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2418         CMI_GNI_UNLOCK(global_gni_lock)
2419         if(status != GNI_RC_SUCCESS) {
2420             break;
2421         }
2422
2423         slot = GNI_CQ_GET_INST_ID(ev);
2424         slot = ACK_GET_INDEX(slot);
2425         //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
2426
2427         //CMI_GNI_LOCK(ackpool_lock);
2428         msg = GetAckAddress(slot);
2429         //CMI_GNI_UNLOCK(ackpool_lock);
2430
2431         DecreaseMsgInSend(msg);
2432 #if ! USE_LRTS_MEMPOOL
2433        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2434 #else
2435         DecreaseMsgInSend(msg);
2436 #endif
2437         if(NoMsgInSend(msg))
2438             buffered_send_msg -= GetMempoolsize(msg);
2439         CmiFree(msg);
2440         AckPool_freeslot(slot);
2441     }
2442     if(status == GNI_RC_ERROR_RESOURCE)
2443     {
2444         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2445     }
2446 }
2447 #endif
2448
2449 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2450 {
2451     gni_cq_entry_t          ev;
2452     gni_return_t            status;
2453     uint64_t                type, inst_id;
2454     gni_post_descriptor_t   *tmp_pd;
2455     MSG_LIST                *ptr;
2456     CONTROL_MSG             *ack_msg_tmp;
2457     ACK_MSG                 *ack_msg;
2458     uint8_t                 msg_tag;
2459 #if CMK_DIRECT
2460     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2461 #endif
2462     SMSG_QUEUE         *queue = &smsg_queue;
2463
2464     while(1) {
2465         CMI_GNI_LOCK(my_cq_lock) 
2466         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2467         CMI_GNI_UNLOCK(my_cq_lock)
2468         if(status != GNI_RC_SUCCESS) break;
2469         
2470         type = GNI_CQ_GET_TYPE(ev);
2471         if (type == GNI_CQ_EVENT_TYPE_POST)
2472         {
2473             inst_id     = GNI_CQ_GET_INST_ID(ev);
2474 #if PRINT_SYH
2475             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2476 #endif
2477             CMI_GNI_LOCK(my_cq_lock)
2478             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2479             CMI_GNI_UNLOCK(my_cq_lock)
2480
2481             switch (tmp_pd->type) {
2482 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2483             case GNI_POST_RDMA_PUT:
2484 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2485                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2486 #endif
2487             case GNI_POST_FMA_PUT:
2488                 if(tmp_pd->amo_cmd == 1) {
2489 #if CMK_DIRECT
2490                     //sender ACK to receiver to trigger it is done
2491                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2492                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2493                     msg_tag = DIRECT_PUT_DONE_TAG;
2494 #endif
2495                 }
2496                 else {
2497                     CmiFree((void *)tmp_pd->local_addr);
2498                     MallocControlMsg(ack_msg_tmp);
2499                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2500                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2501                     ack_msg_tmp->length  = tmp_pd->length;
2502                     msg_tag = PUT_DONE_TAG;
2503                 }
2504                 break;
2505 #endif
2506             case GNI_POST_RDMA_GET:
2507             case GNI_POST_FMA_GET:  {
2508 #if  ! USE_LRTS_MEMPOOL
2509                 MallocControlMsg(ack_msg_tmp);
2510                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2511                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2512                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2513                 msg_tag = ACK_TAG;  
2514 #else
2515 #if CMK_WITH_STATS
2516                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2517 #endif
2518                 int seq_id = tmp_pd->cqwrite_value;
2519                 if(seq_id > 0)      // BIG_MSG
2520                 {
2521                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2522                     MallocControlMsg(ack_msg_tmp);
2523                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2524                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2525                     ack_msg_tmp->seq_id = seq_id;
2526                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2527                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2528                     ack_msg_tmp->length = tmp_pd->length;
2529                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2530                     msg_tag = BIG_MSG_TAG; 
2531                 } 
2532                 else
2533                 {
2534                     msg_tag = ACK_TAG; 
2535 #if  !REMOTE_EVENT && !CQWRITE
2536                     MallocAckMsg(ack_msg);
2537                     ack_msg->source_addr = tmp_pd->remote_addr;
2538 #endif
2539                 }
2540 #endif
2541                 break;
2542             }
2543             case  GNI_POST_CQWRITE:
2544                    FreePostDesc(tmp_pd);
2545                    continue;
2546             default:
2547                 CmiPrintf("type=%d\n", tmp_pd->type);
2548                 CmiAbort("PumpLocalTransactions: unknown type!");
2549             }      /* end of switch */
2550
2551 #if CMK_DIRECT
2552             if (tmp_pd->amo_cmd == 1) {
2553                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2554                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2555             }
2556             else
2557 #endif
2558             if (msg_tag == ACK_TAG) {
2559 #if !REMOTE_EVENT
2560 #if   !CQWRITE
2561                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2562                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2563 #else
2564                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2565 #endif
2566 #endif
2567             }
2568             else {
2569                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2570                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2571             }
2572 #if CMK_PERSISTENT_COMM
2573             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2574 #endif
2575             {
2576                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2577 #if PRINT_SYH
2578                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2579 #endif
2580                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2581                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2582
2583                     START_EVENT();
2584                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2585                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2586 #if MACHINE_DEBUG_LOG
2587                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2588                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2589                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2590 #endif
2591                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2592                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2593                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2594                 }else if(msg_tag == BIG_MSG_TAG){
2595                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2596                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2597                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2598                         START_EVENT();
2599 #if PRINT_SYH
2600                         printf("Pipeline msg done [%d]\n", myrank);
2601 #endif
2602 #if                 CMK_SMP_TRACE_COMMTHREAD
2603                         if( tmp_pd->cqwrite_value == 1)
2604                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2605 #endif
2606                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2607                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2608                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2609                     }
2610                 }
2611             }
2612             FreePostDesc(tmp_pd);
2613         }
2614     } //end while
2615     if(status == GNI_RC_ERROR_RESOURCE)
2616     {
2617         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2618         GNI_RC_CHECK("Smsg_tx_cq full", status);
2619     }
2620 }
2621
2622 static void  SendRdmaMsg()
2623 {
2624     gni_return_t            status = GNI_RC_SUCCESS;
2625     gni_mem_handle_t        msg_mem_hndl;
2626     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2627     RDMA_REQUEST            *pre = 0;
2628     uint64_t                register_size = 0;
2629     void                    *msg;
2630     int                     i;
2631 #if CMK_SMP
2632     int len = PCQueueLength(sendRdmaBuf);
2633     for (i=0; i<len; i++)
2634     {
2635         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2636         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2637         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2638         if (ptr == NULL) break;
2639 #else
2640     ptr = sendRdmaBuf;
2641     while (ptr!=0)
2642     {
2643 #endif 
2644         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2645         gni_post_descriptor_t *pd = ptr->pd;
2646         status = GNI_RC_SUCCESS;
2647         
2648         msg = (void*)(pd->local_addr);
2649         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2650         if(pd->cqwrite_value == 0) {
2651             if(NoMsgInRecv(msg))
2652                 register_size = GetMempoolsize((void*)(pd->local_addr));
2653             else
2654                 register_size = 0;
2655         }
2656         else
2657             register_size = 0;
2658 #if 0
2659         if(pd->cqwrite_value == 0)
2660         {
2661             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2662             {
2663                 msg = (void*)(pd->local_addr);
2664                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2665                 if(status == GNI_RC_SUCCESS)
2666                 {
2667                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2668                 }
2669             }else
2670             {
2671                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2672             }
2673             if(NoMsgInRecv( (void*)(pd->local_addr)))
2674                 register_size = GetMempoolsize((void*)(pd->local_addr));
2675             else
2676                 register_size = 0;
2677         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2678         {
2679             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl), NULL); 
2680         }
2681 #endif
2682         if(status == GNI_RC_SUCCESS)        //mem register good
2683         {
2684             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2685             CMI_GNI_LOCK(lock);
2686 #if REMOTE_EVENT
2687             if( pd->cqwrite_value == 0)
2688             {
2689                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2690                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2691                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2692             }
2693 #endif
2694 #if CMK_WITH_STATS
2695             RDMA_TRY_SEND()
2696 #endif
2697             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2698             {
2699                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2700             }
2701             else
2702             {
2703                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2704             }
2705             CMI_GNI_UNLOCK(lock);
2706             
2707             if(status == GNI_RC_SUCCESS)    //post good
2708             {
2709 #if !CMK_SMP
2710                 tmp_ptr = ptr;
2711                 if(pre != 0) {
2712                     pre->next = ptr->next;
2713                 }
2714                 else {
2715                     sendRdmaBuf = ptr->next;
2716                 }
2717                 ptr = ptr->next;
2718                 FreeRdmaRequest(tmp_ptr);
2719 #endif
2720                 if(pd->cqwrite_value == 0)
2721                 {
2722 #if CMK_SMP_TRACE_COMMTHREAD 
2723                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2724 #endif
2725                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2726                 }
2727 #if  CMK_WITH_STATS
2728                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2729                 RDMA_TRANS_INIT(pd->sync_flag_addr/1000000.0)
2730 #endif
2731 #if MACHINE_DEBUG_LOG
2732                 buffered_recv_msg += register_size;
2733                 MACHSTATE(8, "GO request from buffered\n"); 
2734 #endif
2735 #if PRINT_SYH
2736                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
2737 #endif
2738             }else           // cannot post
2739             {
2740 #if CMK_SMP
2741                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2742 #else
2743                 pre = ptr;
2744                 ptr = ptr->next;
2745 #endif
2746 #if PRINT_SYH
2747                 printf("[%d] SendRdmaMsg: post failed. seqno: %d\n", myrank, pd->cqwrite_value);
2748 #endif
2749                 break;
2750             }
2751         } else          //memory registration fails
2752         {
2753 #if CMK_SMP
2754             PCQueuePush(sendRdmaBuf, (char*)ptr);
2755 #else
2756             pre = ptr;
2757             ptr = ptr->next;
2758 #endif
2759         }
2760     } //end while
2761 #if ! CMK_SMP
2762     if(ptr == 0)
2763         sendRdmaTail = pre;
2764 #endif
2765 }
2766
2767 // return 1 if all messages are sent
2768 static int SendBufferMsg(SMSG_QUEUE *queue)
2769 {
2770     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2771     CONTROL_MSG         *control_msg_tmp;
2772     gni_return_t        status;
2773     int                 done = 1;
2774     uint64_t            register_size;
2775     void                *register_addr;
2776     int                 index_previous = -1;
2777 #if CMI_EXERT_SEND_CAP
2778     int                 sent_cnt = 0;
2779 #endif
2780
2781 #if CMK_SMP
2782     int          index = 0;
2783 #if ONE_SEND_QUEUE
2784     memset(destpe_avail, 0, mysize * sizeof(char));
2785     for (index=0; index<1; index++)
2786     {
2787         int i, len = PCQueueLength(queue->sendMsgBuf);
2788         for (i=0; i<len; i++) 
2789         {
2790             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2791             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2792             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2793             if(ptr == NULL) break;
2794             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2795                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2796                 continue;
2797             }
2798 #else
2799 #if SMP_LOCKS
2800     int nonempty = PCQueueLength(nonEmptyQueues);
2801     for(index =0; index<nonempty; index++)
2802     {
2803         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2804         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2805         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
2806         if(current_list == NULL) break; 
2807         PCQueue current_queue= current_list-> sendSmsgBuf;
2808         CmiLock(current_list->lock);
2809         int i, len = PCQueueLength(current_queue);
2810         current_list->pushed = 0;
2811         CmiUnlock(current_list->lock);
2812 #else
2813     for(index =0; index<mysize; index++)
2814     {
2815         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2816         int i, len = PCQueueLength(current_queue);
2817 #endif
2818         for (i=0; i<len; i++) 
2819         {
2820             CMI_PCQUEUEPOP_LOCK(current_queue)
2821             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2822             CMI_PCQUEUEPOP_UNLOCK(current_queue)
2823             if (ptr == 0) break;
2824 #endif
2825 #else
2826     int index = queue->smsg_head_index;
2827     while(index != -1)
2828     {
2829         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2830         pre = 0;
2831         while(ptr != 0)
2832         {
2833 #endif
2834             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2835             status = GNI_RC_ERROR_RESOURCE;
2836             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2837                 /* connection not exists yet */
2838             }
2839             else
2840             switch(ptr->tag)
2841             {
2842             case SMALL_DATA_TAG:
2843                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2844                 if(status == GNI_RC_SUCCESS)
2845                 {
2846                     CmiFree(ptr->msg);
2847                 }
2848                 break;
2849             case LMSG_INIT_TAG:
2850                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2851                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
2852                 break;
2853             case ACK_TAG:
2854                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2855                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2856                 break;
2857             case BIG_MSG_TAG:
2858                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2859                 if(status == GNI_RC_SUCCESS)
2860                 {
2861                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2862                 }
2863                 break;
2864 #if CMK_DIRECT
2865             case DIRECT_PUT_DONE_TAG:
2866                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
2867                 if(status == GNI_RC_SUCCESS)
2868                 {
2869                     free((CMK_DIRECT_HEADER*)ptr->msg);
2870                 }
2871                 break;
2872
2873 #endif
2874             default:
2875                 printf("Weird tag\n");
2876                 CmiAbort("should not happen\n");
2877             }       // end switch
2878             if(status == GNI_RC_SUCCESS)
2879             {
2880 #if PRINT_SYH
2881                 buffered_smsg_counter--;
2882                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2883 #endif
2884 #if !CMK_SMP
2885                 tmp_ptr = ptr;
2886                 if(pre)
2887                 {
2888                     ptr = pre ->next = ptr->next;
2889                 }else
2890                 {
2891                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2892                 }
2893                 FreeMsgList(tmp_ptr);
2894 #else
2895                 FreeMsgList(ptr);
2896 #endif
2897 #if CMI_EXERT_SEND_CAP
2898                 sent_cnt++;
2899                 if(sent_cnt == SEND_CAP)
2900                     break;
2901 #endif
2902             }else {
2903 #if CMK_SMP
2904 #if ONE_SEND_QUEUE
2905                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2906 #else
2907                 PCQueuePush(current_queue, (char*)ptr);
2908 #endif
2909 #else
2910                 pre = ptr;
2911                 ptr=ptr->next;
2912 #endif
2913                 done = 0;
2914                 if(status == GNI_RC_ERROR_RESOURCE)
2915                 {
2916 #if CMK_SMP && ONE_SEND_QUEUE 
2917                     destpe_avail[ptr->destNode] = 1;
2918 #else
2919                     break;
2920 #endif
2921                 }
2922             } 
2923         } //end while
2924 #if !CMK_SMP
2925         if(ptr == 0)
2926             queue->smsg_msglist_index[index].tail = pre;
2927         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2928         {
2929             if(index_previous != -1)
2930                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2931             else
2932                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2933         }
2934         else {
2935             index_previous = index;
2936         }
2937         index = queue->smsg_msglist_index[index].next;
2938 #else
2939 #if !ONE_SEND_QUEUE && SMP_LOCKS
2940         CmiLock(current_list->lock);
2941         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2942         {
2943             current_list->pushed = 1;
2944             PCQueuePush(nonEmptyQueues, current_list);
2945         }
2946         CmiUnlock(current_list->lock); 
2947 #endif
2948 #endif
2949
2950 #if CMI_EXERT_SEND_CAP
2951         if(sent_cnt == SEND_CAP)
2952                 break;
2953 #endif
2954     }   // end pooling for all cores
2955     return done;
2956 }
2957
2958 static void ProcessDeadlock();
2959 void LrtsAdvanceCommunication(int whileidle)
2960 {
2961     static int count = 0;
2962     /*  Receive Msg first */
2963 #if CMK_SMP_TRACE_COMMTHREAD
2964     double startT, endT;
2965 #endif
2966     if (useDynamicSMSG && whileidle)
2967     {
2968 #if CMK_SMP_TRACE_COMMTHREAD
2969         startT = CmiWallTimer();
2970 #endif
2971         PumpDatagramConnection();
2972 #if CMK_SMP_TRACE_COMMTHREAD
2973         endT = CmiWallTimer();
2974         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
2975 #endif
2976     }
2977
2978 #if CMK_SMP_TRACE_COMMTHREAD
2979     startT = CmiWallTimer();
2980 #endif
2981     PumpNetworkSmsg();
2982     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2983 #if CMK_SMP_TRACE_COMMTHREAD
2984     endT = CmiWallTimer();
2985     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
2986 #endif
2987
2988 #if CMK_SMP_TRACE_COMMTHREAD
2989     startT = CmiWallTimer();
2990 #endif
2991     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2992     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
2993 #if CMK_SMP_TRACE_COMMTHREAD
2994     endT = CmiWallTimer();
2995     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
2996 #endif
2997
2998 #if CMK_SMP_TRACE_COMMTHREAD
2999     startT = CmiWallTimer();
3000 #endif
3001     PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock);
3002
3003 #if CQWRITE
3004     PumpCqWriteTransactions();
3005 #endif
3006
3007 #if REMOTE_EVENT
3008     PumpRemoteTransactions();
3009 #endif
3010
3011     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3012 #if CMK_SMP_TRACE_COMMTHREAD
3013     endT = CmiWallTimer();
3014     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3015 #endif
3016  
3017     /* Send buffered Message */
3018 #if CMK_SMP_TRACE_COMMTHREAD
3019     startT = CmiWallTimer();
3020 #endif
3021 #if CMK_USE_OOB
3022     if (SendBufferMsg(&smsg_oob_queue) == 1)
3023 #endif
3024     {
3025     SendBufferMsg(&smsg_queue);
3026     }
3027     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3028 #if CMK_SMP_TRACE_COMMTHREAD
3029     endT = CmiWallTimer();
3030     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3031 #endif
3032
3033 #if CMK_SMP_TRACE_COMMTHREAD
3034     startT = CmiWallTimer();
3035 #endif
3036     SendRdmaMsg();
3037     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3038 #if CMK_SMP_TRACE_COMMTHREAD
3039     endT = CmiWallTimer();
3040     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3041 #endif
3042
3043 #if CMK_SMP && ! LARGEPAGE
3044     if (_detected_hang)  ProcessDeadlock();
3045 #endif
3046 }
3047
3048 /* useDynamicSMSG */
3049 static void _init_dynamic_smsg()
3050 {
3051     gni_return_t status;
3052     uint32_t     vmdh_index = -1;
3053     int i;
3054
3055     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3056     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3057     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3058     for(i=0; i<mysize; i++) {
3059         smsg_connected_flag[i] = 0;
3060         smsg_attr_vector_local[i] = NULL;
3061         smsg_attr_vector_remote[i] = NULL;
3062     }
3063     if(mysize <=512)
3064     {
3065         SMSG_MAX_MSG = 4096;
3066     }else if (mysize <= 4096)
3067     {
3068         SMSG_MAX_MSG = 4096/mysize * 1024;
3069     }else if (mysize <= 16384)
3070     {
3071         SMSG_MAX_MSG = 512;
3072     }else {
3073         SMSG_MAX_MSG = 256;
3074     }
3075
3076     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3077     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3078     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3079     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3080     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3081
3082     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3083     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3084     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3085     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3086     mailbox_list->offset = 0;
3087     mailbox_list->next = 0;
3088     
3089     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3090         mailbox_list->size, smsg_rx_cqh,
3091         GNI_MEM_READWRITE,   
3092         vmdh_index,
3093         &(mailbox_list->mem_hndl));
3094     GNI_RC_CHECK("MEMORY registration for smsg", status);
3095
3096     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3097     GNI_RC_CHECK("Unbound EP", status);
3098     
3099     alloc_smsg_attr(&send_smsg_attr);
3100
3101     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3102     GNI_RC_CHECK("post unbound datagram", status);
3103
3104       /* always pre-connect to proc 0 */
3105     //if (myrank != 0) connect_to(0);
3106 }
3107
3108 static void _init_static_smsg()
3109 {
3110     gni_smsg_attr_t      *smsg_attr;
3111     gni_smsg_attr_t      remote_smsg_attr;
3112     gni_smsg_attr_t      *smsg_attr_vec;
3113     gni_mem_handle_t     my_smsg_mdh_mailbox;
3114     int      ret, i;
3115     gni_return_t status;
3116     uint32_t              vmdh_index = -1;
3117     mdh_addr_t            base_infor;
3118     mdh_addr_t            *base_addr_vec;
3119     char *env;
3120
3121     if(mysize <=512)
3122     {
3123         SMSG_MAX_MSG = 1024;
3124     }else if (mysize <= 4096)
3125     {
3126         SMSG_MAX_MSG = 1024;
3127     }else if (mysize <= 16384)
3128     {
3129         SMSG_MAX_MSG = 512;
3130     }else {
3131         SMSG_MAX_MSG = 256;
3132     }
3133     
3134     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3135     if (env) SMSG_MAX_MSG = atoi(env);
3136     CmiAssert(SMSG_MAX_MSG > 0);
3137
3138     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3139     
3140     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3141     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3142     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3143     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3144     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3145     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3146     CmiAssert(ret == 0);
3147     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3148     
3149     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3150             smsg_memlen*(mysize), smsg_rx_cqh,
3151             GNI_MEM_READWRITE,   
3152             vmdh_index,
3153             &my_smsg_mdh_mailbox);
3154     register_memory_size += smsg_memlen*(mysize);
3155     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3156
3157     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3158     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3159
3160     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3161     base_infor.mdh =  my_smsg_mdh_mailbox;
3162     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3163
3164     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3165  
3166     for(i=0; i<mysize; i++)
3167     {
3168         if(i==myrank)
3169             continue;
3170         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3171         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3172         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3173         smsg_attr[i].mbox_offset = i*smsg_memlen;
3174         smsg_attr[i].buff_size = smsg_memlen;
3175         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3176         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3177     }
3178
3179     for(i=0; i<mysize; i++)
3180     {
3181         if (myrank == i) continue;
3182
3183         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3184         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3185         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3186         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3187         remote_smsg_attr.buff_size = smsg_memlen;
3188         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3189         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3190
3191         /* initialize the smsg channel */
3192         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3193         GNI_RC_CHECK("SMSG Init", status);
3194     } //end initialization
3195
3196     free(base_addr_vec);
3197     free(smsg_attr);
3198
3199     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3200     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3201
3202
3203 inline
3204 static void _init_send_queue(SMSG_QUEUE *queue)
3205 {
3206      int i;
3207 #if ONE_SEND_QUEUE
3208      queue->sendMsgBuf = PCQueueCreate();
3209      destpe_avail = (char*)malloc(mysize * sizeof(char));
3210 #else
3211      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3212 #if CMK_SMP && SMP_LOCKS
3213      nonEmptyQueues = PCQueueCreate();
3214 #endif
3215      for(i =0; i<mysize; i++)
3216      {
3217 #if CMK_SMP
3218          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3219 #if SMP_LOCKS
3220          queue->smsg_msglist_index[i].pushed = 0;
3221          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3222 #endif
3223 #else
3224          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3225          queue->smsg_msglist_index[i].next = -1;
3226          queue->smsg_head_index = -1;
3227 #endif
3228         
3229      }
3230 #endif
3231 }
3232
3233 inline
3234 static void _init_smsg()
3235 {
3236     if(mysize > 1) {
3237         if (useDynamicSMSG)
3238             _init_dynamic_smsg();
3239         else
3240             _init_static_smsg();
3241     }
3242
3243     _init_send_queue(&smsg_queue);
3244 #if CMK_USE_OOB
3245     _init_send_queue(&smsg_oob_queue);
3246 #endif
3247 }
3248
3249 static void _init_static_msgq()
3250 {
3251     gni_return_t status;
3252     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3253     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3254     msgq_attrs.smsg_q_sz = 1;
3255     msgq_attrs.rcv_pool_sz = 1;
3256     msgq_attrs.num_msgq_eps = 2;
3257     msgq_attrs.nloc_insts = 8;
3258     msgq_attrs.modes = 0;
3259     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3260
3261     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3262     GNI_RC_CHECK("MSGQ Init", status);
3263
3264
3265 }
3266
3267
3268 static CmiUInt8 total_mempool_size = 0;
3269 static CmiUInt8 total_mempool_calls = 0;
3270
3271 #if USE_LRTS_MEMPOOL
3272 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3273 {
3274     void *pool;
3275     int ret;
3276     gni_return_t status = GNI_RC_SUCCESS;
3277
3278     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3279     if (*size < default_size) *size = default_size;
3280 #if LARGEPAGE
3281     // round up to be multiple of _tlbpagesize
3282     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3283     *size = ALIGNHUGEPAGE(*size);
3284 #endif
3285     total_mempool_size += *size;
3286     total_mempool_calls += 1;
3287 #if   !LARGEPAGE
3288     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3289     {
3290         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3291         CmiAbort("alloc_mempool_block");
3292     }
3293 #endif
3294 #if LARGEPAGE
3295     pool = my_get_huge_pages(*size);
3296     ret = pool==NULL;
3297 #else
3298     ret = posix_memalign(&pool, ALIGNBUF, *size);
3299 #endif
3300     if (ret != 0) {
3301 #if CMK_SMP && STEAL_MEMPOOL
3302       pool = steal_mempool_block(size, mem_hndl);
3303       if (pool != NULL) return pool;
3304 #endif
3305       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3306       if (ret == ENOMEM)
3307         CmiAbort("alloc_mempool_block: out of memory.");
3308       else
3309         CmiAbort("alloc_mempool_block: posix_memalign failed");
3310     }
3311 #if LARGEPAGE
3312     CmiMemLock();
3313     register_count++;
3314     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3315     CmiMemUnlock();
3316     if(status != GNI_RC_SUCCESS) {
3317         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3318 sweep_mempool(CpvAccess(mempool));
3319     }
3320     GNI_RC_CHECK("MEMORY_REGISTER", status);
3321 #else
3322     SetMemHndlZero((*mem_hndl));
3323 #endif
3324     return pool;
3325 }
3326
3327 // ptr is a block head pointer
3328 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3329 {
3330     if(!(IsMemHndlZero(mem_hndl)))
3331     {
3332         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3333     }
3334 #if LARGEPAGE
3335     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3336 #else
3337     free(ptr);
3338 #endif
3339 }
3340 #endif
3341
3342 void LrtsPreCommonInit(int everReturn){
3343 #if USE_LRTS_MEMPOOL
3344     CpvInitialize(mempool_type*, mempool);
3345     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3346     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3347 #endif
3348 }
3349
3350 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3351 {
3352     register int            i;
3353     int                     rc;
3354     int                     device_id = 0;
3355     unsigned int            remote_addr;
3356     gni_cdm_handle_t        cdm_hndl;
3357     gni_return_t            status = GNI_RC_SUCCESS;
3358     uint32_t                vmdh_index = -1;
3359     uint8_t                 ptag;
3360     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3361     int                     first_spawned;
3362     int                     physicalID;
3363     char                   *env;
3364
3365     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3366     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3367     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3368    
3369     status = PMI_Init(&first_spawned);
3370     GNI_RC_CHECK("PMI_Init", status);
3371
3372     status = PMI_Get_size(&mysize);
3373     GNI_RC_CHECK("PMI_Getsize", status);
3374
3375     status = PMI_Get_rank(&myrank);
3376     GNI_RC_CHECK("PMI_getrank", status);
3377
3378     //physicalID = CmiPhysicalNodeID(myrank);
3379     
3380     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3381
3382     *myNodeID = myrank;
3383     *numNodes = mysize;
3384   
3385 #if MULTI_THREAD_SEND
3386     /* Currently, we only consider the case that comm. thread will only recv msgs */
3387     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3388 #endif
3389
3390     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3391     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3392     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3393
3394     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3395     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3396     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3397
3398     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3399     if (env) useDynamicSMSG = 1;
3400     if (!useDynamicSMSG)
3401       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3402     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3403     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3404     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3405     
3406     if(myrank == 0)
3407     {
3408         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3409         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3410     }
3411 #ifdef USE_ONESIDED
3412     onesided_init(NULL, &onesided_hnd);
3413
3414     // this is a GNI test, so use the libonesided bypass functionality
3415     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3416     local_addr = gniGetNicAddress();
3417 #else
3418     ptag = get_ptag();
3419     cookie = get_cookie();
3420 #if 0
3421     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3422 #endif
3423     //Create and attach to the communication  domain */
3424     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3425     GNI_RC_CHECK("GNI_CdmCreate", status);
3426     //* device id The device id is the minor number for the device
3427     //that is assigned to the device by the system when the device is created.
3428     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3429     //where X is the device number 0 default 
3430     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3431     GNI_RC_CHECK("GNI_CdmAttach", status);
3432     local_addr = get_gni_nic_address(0);
3433 #endif
3434     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3435     _MEMCHECK(MPID_UGNI_AllAddr);
3436     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3437     /* create the local completion queue */
3438     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3439     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3440     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3441     
3442     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3443     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3444     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3445
3446     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3447     GNI_RC_CHECK("Create CQ (rx)", status);
3448     
3449     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3450     GNI_RC_CHECK("Create Post CQ (rx)", status);
3451     
3452     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3453     //GNI_RC_CHECK("Create BTE CQ", status);
3454
3455     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3456     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3457     _MEMCHECK(ep_hndl_array);
3458 #if MULTI_THREAD_SEND 
3459     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3460     //default_tx_cq_lock = CmiCreateLock();
3461     rdma_tx_cq_lock = CmiCreateLock();
3462     smsg_rx_cq_lock = CmiCreateLock();
3463     //global_gni_lock  = CmiCreateLock();
3464     //rx_cq_lock  = CmiCreateLock();
3465 #endif
3466     for (i=0; i<mysize; i++) {
3467         if(i == myrank) continue;
3468         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3469         GNI_RC_CHECK("GNI_EpCreate ", status);   
3470         remote_addr = MPID_UGNI_AllAddr[i];
3471         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3472         GNI_RC_CHECK("GNI_EpBind ", status);   
3473     }
3474
3475     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3476     _init_smsg();
3477     PMI_Barrier();
3478
3479 #if     USE_LRTS_MEMPOOL
3480     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3481     if (env) {
3482      &