make ackpool general
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/dir.h>
38 #include <sys/stat.h>
39 #include <gni_pub.h>
40 #include <pmi.h>
41 //#include <numatoolkit.h>
42
43 #include "converse.h"
44
45 #if CMK_DIRECT
46 #include "cmidirect.h"
47 #endif
48
49 #define     LARGEPAGE              0
50
51 #if CMK_SMP
52 #define MULTI_THREAD_SEND          0
53 #define COMM_THREAD_SEND           1
54 #endif
55
56 #if MULTI_THREAD_SEND
57 #define CMK_WORKER_SINGLE_TASK     0
58 #endif
59
60 #define REMOTE_EVENT               0
61 #define CQWRITE                    0
62
63 #define CMI_EXERT_SEND_CAP      0
64 #define CMI_EXERT_RECV_CAP      0
65
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP 16
68 #endif
69
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP 2
72 #endif
73
74 #define USE_LRTS_MEMPOOL                  1
75
76 #define PRINT_SYH                         0
77
78 // Trace communication thread
79 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
80 #define TRACE_THRESHOLD     0.00005
81 #define CMI_MPI_TRACE_MOREDETAILED 0
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
84 #else
85 #undef CMK_SMP_TRACE_COMMTHREAD
86 #define CMK_SMP_TRACE_COMMTHREAD 0
87 #endif
88
89 #define CMK_TRACE_COMMOVERHEAD 0
90 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
91 #undef CMI_MPI_TRACE_USEREVENTS
92 #define CMI_MPI_TRACE_USEREVENTS 1
93 #else
94 #undef CMK_TRACE_COMMOVERHEAD
95 #define CMK_TRACE_COMMOVERHEAD 0
96 #endif
97
98 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
99 CpvStaticDeclare(double, projTraceStart);
100 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
101 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
102 #else
103 #define  START_EVENT()
104 #define  END_EVENT(x)
105 #endif
106
107 #if USE_LRTS_MEMPOOL
108
109 #define oneMB (1024ll*1024)
110 #define oneGB (1024ll*1024*1024)
111
112 static CmiInt8 _mempool_size = 8*oneMB;
113 static CmiInt8 _expand_mem =  4*oneMB;
114 static CmiInt8 _mempool_size_limit = 0;
115
116 static CmiInt8 _totalmem = 0.8*oneGB;
117
118 #if LARGEPAGE
119 static CmiInt8 BIG_MSG  =  16*oneMB;
120 static CmiInt8 ONE_SEG  =  4*oneMB;
121 #else
122 static CmiInt8 BIG_MSG  =  4*oneMB;
123 static CmiInt8 ONE_SEG  =  2*oneMB;
124 #endif
125 #if MULTI_THREAD_SEND
126 static int BIG_MSG_PIPELINE = 1;
127 #else
128 static int BIG_MSG_PIPELINE = 4;
129 #endif
130
131 // dynamic flow control
132 static CmiInt8 buffered_send_msg = 0;
133 static CmiInt8 register_memory_size = 0;
134
135 #if LARGEPAGE
136 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
137 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
138 static CmiInt8  register_count = 0;
139 #else
140 #if CMK_SMP && COMM_THREAD_SEND 
141 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
142 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
143 #else
144 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
145 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
146 #endif
147
148
149 #endif
150
151 #endif     /* end USE_LRTS_MEMPOOL */
152
153 #if MULTI_THREAD_SEND 
154 #define     CMI_GNI_LOCK(x)       CmiLock(x);
155 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
156 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
157 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
158 #else
159 #define     CMI_GNI_LOCK(x)
160 #define     CMI_GNI_UNLOCK(x)
161 #define     CMI_PCQUEUEPOP_LOCK(Q)   
162 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
163 #endif
164
165 static int _tlbpagesize = 4096;
166
167 //static int _smpd_count  = 0;
168
169 static int   user_set_flag  = 0;
170
171 static int _checkProgress = 1;             /* check deadlock */
172 static int _detected_hang = 0;
173
174 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
175
176 // dynamic SMSG
177 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
178
179 static int avg_smsg_connection = 32;
180 static int                 *smsg_connected_flag= 0;
181 static gni_smsg_attr_t     **smsg_attr_vector_local;
182 static gni_smsg_attr_t     **smsg_attr_vector_remote;
183 static gni_ep_handle_t     ep_hndl_unbound;
184 static gni_smsg_attr_t     send_smsg_attr;
185 static gni_smsg_attr_t     recv_smsg_attr;
186
187 typedef struct _dynamic_smsg_mailbox{
188    void     *mailbox_base;
189    int      size;
190    int      offset;
191    gni_mem_handle_t  mem_hndl;
192    struct      _dynamic_smsg_mailbox  *next;
193 }dynamic_smsg_mailbox_t;
194
195 static dynamic_smsg_mailbox_t  *mailbox_list;
196
197 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
198 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
199
200 #if PRINT_SYH
201 int         lrts_send_msg_id = 0;
202 int         lrts_local_done_msg = 0;
203 int         lrts_send_rdma_success = 0;
204 #endif
205
206 #include "machine.h"
207
208 #include "pcqueue.h"
209
210 #include "mempool.h"
211
212 #if CMK_PERSISTENT_COMM
213 #include "machine-persistent.h"
214 #endif
215
216 //#define  USE_ONESIDED 1
217 #ifdef USE_ONESIDED
218 //onesided implementation is wrong, since no place to restore omdh
219 #include "onesided.h"
220 onesided_hnd_t   onesided_hnd;
221 onesided_md_t    omdh;
222 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
223
224 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
225
226 #else
227 uint8_t   onesided_hnd, omdh;
228
229 #if REMOTE_EVENT || CQWRITE 
230 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
231     if(register_memory_size+size>= MAX_REG_MEM) { \
232         status = GNI_RC_ERROR_NOMEM;} \
233     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
234         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
235 #else
236 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
237         if (register_memory_size + size >= MAX_REG_MEM) { \
238             status = GNI_RC_ERROR_NOMEM; \
239         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
240             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
241 #endif
242
243 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
244     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
245              register_memory_size -= size; \
246          else CmiAbort("MEM_DEregister");  \
247     } while (0)
248 #endif
249
250 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
251 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
252 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
253 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
254 #define   IncreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)++
255 #define   DecreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)--
256 #define   IncreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)++
257 #define   DecreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)--
258 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
259 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
260 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
261 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
262 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
263 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
264
265 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
266 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
267
268 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
269 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
270 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
271 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
272
273 #define ALIGNBUF                64
274
275 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
276 /* If SMSG is not used */
277
278 #define FMA_PER_CORE  1024
279 #define FMA_BUFFER_SIZE 1024
280
281 /* If SMSG is used */
282 static int  SMSG_MAX_MSG = 1024;
283 #define SMSG_MAX_CREDIT 72 
284
285 #define MSGQ_MAXSIZE       2048
286
287 /* large message transfer with FMA or BTE */
288 #if ! REMOTE_EVENT
289 #define LRTS_GNI_RDMA_THRESHOLD  1024 
290 #else
291    /* remote events only work with RDMA */
292 #define LRTS_GNI_RDMA_THRESHOLD  0 
293 #endif
294
295 #if CMK_SMP
296 static int  REMOTE_QUEUE_ENTRIES=163840; 
297 static int LOCAL_QUEUE_ENTRIES=163840; 
298 #else
299 static int  REMOTE_QUEUE_ENTRIES=20480;
300 static int LOCAL_QUEUE_ENTRIES=20480; 
301 #endif
302
303 #define BIG_MSG_TAG             0x26
304 #define PUT_DONE_TAG            0x28
305 #define DIRECT_PUT_DONE_TAG     0x29
306 #define ACK_TAG                 0x30
307 /* SMSG is data message */
308 #define SMALL_DATA_TAG          0x31
309 /* SMSG is a control message to initialize a BTE */
310 #define LMSG_INIT_TAG           0x39 
311
312 #define DEBUG
313 #ifdef GNI_RC_CHECK
314 #undef GNI_RC_CHECK
315 #endif
316 #ifdef DEBUG
317 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
318 #else
319 #define GNI_RC_CHECK(msg,rc)
320 #endif
321
322 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
323 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
324 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
325
326 static int useStaticMSGQ = 0;
327 static int useStaticFMA = 0;
328 static int mysize, myrank;
329 static gni_nic_handle_t   nic_hndl;
330
331 typedef struct {
332     gni_mem_handle_t mdh;
333     uint64_t addr;
334 } mdh_addr_t ;
335 // this is related to dynamic SMSG
336
337 typedef struct mdh_addr_list{
338     gni_mem_handle_t mdh;
339    void *addr;
340     struct mdh_addr_list *next;
341 }mdh_addr_list_t;
342
343 static unsigned int         smsg_memlen;
344 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
345 mdh_addr_t          setup_mem;
346 mdh_addr_t          *smsg_connection_vec = 0;
347 gni_mem_handle_t    smsg_connection_memhndl;
348 static int          smsg_expand_slots = 10;
349 static int          smsg_available_slot = 0;
350 static void         *smsg_mailbox_mempool = 0;
351 mdh_addr_list_t     *smsg_dynamic_list = 0;
352
353 static void             *smsg_mailbox_base;
354 gni_msgq_attr_t         msgq_attrs;
355 gni_msgq_handle_t       msgq_handle;
356 gni_msgq_ep_attr_t      msgq_ep_attrs;
357 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
358
359 /* =====Beginning of Declarations of Machine Specific Variables===== */
360 static int cookie;
361 static int modes = 0;
362 static gni_cq_handle_t       smsg_rx_cqh = NULL;
363 static gni_cq_handle_t       default_tx_cqh = NULL;
364 static gni_cq_handle_t       rdma_tx_cqh = NULL;
365 static gni_cq_handle_t       rdma_rx_cqh = NULL;
366 static gni_cq_handle_t       post_tx_cqh = NULL;
367 static gni_ep_handle_t       *ep_hndl_array;
368
369 static CmiNodeLock           *ep_lock_array;
370 static CmiNodeLock           default_tx_cq_lock; 
371 static CmiNodeLock           rdma_tx_cq_lock; 
372 static CmiNodeLock           global_gni_lock; 
373 static CmiNodeLock           rx_cq_lock;
374 static CmiNodeLock           smsg_mailbox_lock;
375 static CmiNodeLock           smsg_rx_cq_lock;
376 static CmiNodeLock           *mempool_lock;
377 //#define     CMK_WITH_STATS      0
378 typedef struct msg_list
379 {
380     uint32_t destNode;
381     uint32_t size;
382     void *msg;
383     uint8_t tag;
384 #if !CMK_SMP
385     struct msg_list *next;
386 #endif
387 #if CMK_WITH_STATS
388     double  creation_time;
389 #endif
390 }MSG_LIST;
391
392
393 typedef struct control_msg
394 {
395     uint64_t            source_addr;    /* address from the start of buffer  */
396     uint64_t            dest_addr;      /* address from the start of buffer */
397     int                 total_length;   /* total length */
398     int                 length;         /* length of this packet */
399 #if REMOTE_EVENT
400     int                 ack_index;      /* index from integer to address */
401 #endif
402     uint8_t             seq_id;         //big message   0 meaning single message
403     gni_mem_handle_t    source_mem_hndl;
404     struct control_msg *next;
405 } CONTROL_MSG;
406
407 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
408
409 typedef struct ack_msg
410 {
411     uint64_t            source_addr;    /* address from the start of buffer  */
412 #if ! USE_LRTS_MEMPOOL
413     gni_mem_handle_t    source_mem_hndl;
414     int                 length;          /* total length */
415 #endif
416     struct ack_msg     *next;
417 } ACK_MSG;
418
419 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
420
421 #if CMK_DIRECT
422 typedef struct{
423     uint64_t    handler_addr;
424 }CMK_DIRECT_HEADER;
425
426 typedef struct {
427     char core[CmiMsgHeaderSizeBytes];
428     uint64_t handler;
429 }cmidirectMsg;
430
431 //SYH
432 CpvDeclare(int, CmiHandleDirectIdx);
433 void CmiHandleDirectMsg(cmidirectMsg* msg)
434 {
435
436     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
437    (*(_handle->callbackFnPtr))(_handle->callbackData);
438    CmiFree(msg);
439 }
440
441 void CmiDirectInit()
442 {
443     CpvInitialize(int,  CmiHandleDirectIdx);
444     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
445 }
446
447 #endif
448 typedef struct  rmda_msg
449 {
450     int                   destNode;
451 #if REMOTE_EVENT
452     int                   ack_index;
453 #endif
454     gni_post_descriptor_t *pd;
455 #if !CMK_SMP
456     struct  rmda_msg      *next;
457 #endif
458 }RDMA_REQUEST;
459
460
461 #if CMK_SMP
462 #define SMP_LOCKS               0
463 #define ONE_SEND_QUEUE                  0
464 PCQueue sendRdmaBuf;
465 typedef struct  msg_list_index
466 {
467     PCQueue     sendSmsgBuf;
468     int         pushed;
469     CmiNodeLock   lock;
470 } MSG_LIST_INDEX;
471 char                *destpe_avail;
472 #if  !ONE_SEND_QUEUE && SMP_LOCKS
473     PCQueue     nonEmptyQueues;
474 #endif
475 #else         /* non-smp */
476
477 static RDMA_REQUEST        *sendRdmaBuf = 0;
478 static RDMA_REQUEST        *sendRdmaTail = 0;
479 typedef struct  msg_list_index
480 {
481     int         next;
482     MSG_LIST    *sendSmsgBuf;
483     MSG_LIST    *tail;
484 } MSG_LIST_INDEX;
485
486 #endif
487
488 // buffered send queue
489 #if ! ONE_SEND_QUEUE
490 typedef struct smsg_queue
491 {
492     MSG_LIST_INDEX   *smsg_msglist_index;
493     int               smsg_head_index;
494 } SMSG_QUEUE;
495 #else
496 typedef struct smsg_queue
497 {
498     PCQueue       sendMsgBuf;
499 }  SMSG_QUEUE;
500 #endif
501
502 SMSG_QUEUE                  smsg_queue;
503 #if CMK_USE_OOB
504 SMSG_QUEUE                  smsg_oob_queue;
505 #endif
506
507 #if CMK_SMP
508
509 #define FreeMsgList(d)   free(d);
510 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
511
512 #else
513
514 static MSG_LIST       *msglist_freelist=0;
515
516 #define FreeMsgList(d)  \
517   do { \
518   (d)->next = msglist_freelist;\
519   msglist_freelist = d; \
520   } while (0)
521
522 #define MallocMsgList(d) \
523   do {  \
524   d = msglist_freelist;\
525   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
526              _MEMCHECK(d);\
527   } else msglist_freelist = d->next; \
528   d->next =0;  \
529   } while (0)
530
531 #endif
532
533 #if CMK_SMP
534
535 #define FreeControlMsg(d)      free(d);
536 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
537
538 #else
539
540 static CONTROL_MSG    *control_freelist=0;
541
542 #define FreeControlMsg(d)       \
543   do { \
544   (d)->next = control_freelist;\
545   control_freelist = d; \
546   } while (0);
547
548 #define MallocControlMsg(d) \
549   do {  \
550   d = control_freelist;\
551   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
552              _MEMCHECK(d);\
553   } else control_freelist = d->next;  \
554   } while (0);
555
556 #endif
557
558 #if CMK_SMP
559
560 #define FreeAckMsg(d)      free(d);
561 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
562
563 #else
564
565 static ACK_MSG        *ack_freelist=0;
566
567 #define FreeAckMsg(d)       \
568   do { \
569   (d)->next = ack_freelist;\
570   ack_freelist = d; \
571   } while (0)
572
573 #define MallocAckMsg(d) \
574   do { \
575   d = ack_freelist;\
576   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
577              _MEMCHECK(d);\
578   } else ack_freelist = d->next; \
579   } while (0)
580
581 #endif
582
583
584 # if CMK_SMP
585 #define FreeRdmaRequest(d)       free(d);
586 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
587 #else
588
589 static RDMA_REQUEST         *rdma_freelist = NULL;
590
591 #define FreeRdmaRequest(d)       \
592   do {  \
593   (d)->next = rdma_freelist;\
594   rdma_freelist = d;    \
595   } while (0)
596
597 #define MallocRdmaRequest(d) \
598   do {   \
599   d = rdma_freelist;\
600   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
601              _MEMCHECK(d);\
602   } else rdma_freelist = d->next; \
603   d->next =0;   \
604   } while (0)
605 #endif
606
607 /* reuse gni_post_descriptor_t */
608 static gni_post_descriptor_t *post_freelist=0;
609
610 #if  !CMK_SMP
611 #define FreePostDesc(d)       \
612     (d)->next_descr = post_freelist;\
613     post_freelist = d;
614
615 #define MallocPostDesc(d) \
616   d = post_freelist;\
617   if (d==0) { \
618      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
619      d->next_descr = 0;\
620       _MEMCHECK(d);\
621   } else post_freelist = d->next_descr;
622 #else
623
624 #define FreePostDesc(d)     free(d);
625 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
626
627 #endif
628
629
630 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
631 static int      buffered_smsg_counter = 0;
632
633 /* SmsgSend return success but message sent is not confirmed by remote side */
634 static MSG_LIST *buffered_fma_head = 0;
635 static MSG_LIST *buffered_fma_tail = 0;
636
637 /* functions  */
638 #define IsFree(a,ind)  !( a& (1<<(ind) ))
639 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
640 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
641
642 CpvDeclare(mempool_type*, mempool);
643
644 #if REMOTE_EVENT
645 /* ack pool for remote events */
646
647 #define ACK_SHIFT                  19
648 #define ACK_EVENT(idx)             ((idx<<ACK_SHIFT) | myrank)
649 #define ACK_GET_RANK(evt)          (evt & ((1<<ACK_SHIFT)-1))
650 #define ACK_GET_INDEX(evt)         (evt >> ACK_SHIFT)
651
652 struct IndexStruct {
653 void *addr;
654 int next;
655 };
656
657 typedef struct IndexPool {
658     struct IndexStruct   *indexes;
659     int                   size;
660     int                   freehead;
661     CmiNodeLock           lock;
662 } IndexPool;
663
664 static IndexPool  ackPool;
665
666
667 #define  GetAckAddress(s)          (ackPool.indexes[s].addr)
668
669 static void IndexPool_init(IndexPool *pool)
670 {
671     int i;
672     if ((1<<ACK_SHIFT) < mysize) 
673         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
674     pool->size = 2048;
675     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
676     for (i=0; i<pool->size-1; i++) {
677         pool->indexes[i].next = i+1;
678     }
679     pool->indexes[i].next = -1;
680     pool->freehead = 0;
681 #if MULTI_THREAD_SEND 
682     pool->lock  = CmiCreateLock();
683 #endif
684 }
685
686 static
687 inline int IndexPool_getslot(IndexPool *pool, void *addr)
688 {
689     int i;
690     int s;
691     CMI_GNI_LOCK(pool->lock);
692     s = pool->freehead;
693     if (s == -1) {
694         int newsize = pool->size * 2;
695         printf("[%d] AckPool_getslot expand to: %d\n", myrank, newsize);
696         if (newsize > (1<<(32-ACK_SHIFT))) CmiAbort("AckPool too large");
697         struct IndexStruct *old_ackpool = pool->indexes;
698         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
699         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
700         for (i=pool->size; i<newsize-1; i++) {
701             pool->indexes[i].next = i+1;
702         }
703         pool->indexes[i].next = -1;
704         pool->freehead = pool->size;
705         s = pool->size;
706         pool->size = newsize;
707         free(old_ackpool);
708     }
709     pool->freehead = pool->indexes[s].next;
710     pool->indexes[s].addr = addr;
711     CMI_GNI_UNLOCK(pool->lock);
712     return s;
713 }
714
715 static
716 inline  void IndexPool_freeslot(IndexPool *pool, int s)
717 {
718     CmiAssert(s>=0 && s<pool->size);
719     CMI_GNI_LOCK(pool->lock);
720     pool->indexes[s].next = pool->freehead;
721     pool->freehead = s;
722     CMI_GNI_UNLOCK(pool->lock);
723 }
724
725
726 #endif
727
728 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
729 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
730 #define CHARM_MAGIC_NUMBER               126
731
732 #if CMK_ERROR_CHECKING
733 extern unsigned char computeCheckSum(unsigned char *data, int len);
734 static int checksum_flag = 0;
735 #define CMI_SET_CHECKSUM(msg, len)      \
736         if (checksum_flag)  {   \
737           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
738           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
739         }
740 #define CMI_CHECK_CHECKSUM(msg, len)    \
741         if (checksum_flag)      \
742           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
743             CmiAbort("Fatal error: checksum doesn't agree!\n");
744 #else
745 #define CMI_SET_CHECKSUM(msg, len)
746 #define CMI_CHECK_CHECKSUM(msg, len)
747 #endif
748 /* =====End of Definitions of Message-Corruption Related Macros=====*/
749
750 #if CMK_WITH_STATS
751 FILE *counterLog = NULL;
752 typedef struct comm_thread_stats
753 {
754     uint64_t  smsg_data_count;
755     uint64_t  lmsg_init_count;
756     uint64_t  ack_count;
757     uint64_t  big_msg_ack_count;
758     uint64_t  smsg_count;
759     //times of calling SmsgSend
760     uint64_t  try_smsg_data_count;
761     uint64_t  try_lmsg_init_count;
762     uint64_t  try_ack_count;
763     uint64_t  try_big_msg_ack_count;
764     uint64_t  try_smsg_count;
765     
766     double    max_time_in_send_buffered_smsg;
767     double    all_time_in_send_buffered_smsg;
768
769     uint64_t  rdma_count;
770     uint64_t  try_rdma_count;
771     double    max_time_from_control_to_rdma_init;
772     double    all_time_from_control_to_rdma_init;
773
774     double    max_time_from_rdma_init_to_rdma_done;
775     double    all_time_from_rdma_init_to_rdma_done;
776 } Comm_Thread_Stats;
777
778 static Comm_Thread_Stats   comm_stats;
779
780 static void init_comm_stats()
781 {
782   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
783 }
784
785 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
786
787 #define SMSG_SENT_DONE(creation_time, tag)  \
788         if (print_stats) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
789             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
790             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
791             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
792             comm_stats.smsg_count++; \
793             double inbuff_time = CmiWallTimer() - creation_time;   \
794             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
795             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
796         }
797
798 #define SMSG_TRY_SEND(tag)  \
799         if (print_stats){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
800             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
801             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
802             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
803             comm_stats.try_smsg_count++; \
804         }
805
806 #define  RDMA_TRY_SEND()        if (print_stats) {comm_stats.try_rdma_count++;}
807
808 #define  RDMA_TRANS_DONE(x)      \
809          if (print_stats) {  double rdma_trans_time = CmiWallTimer() - x ; \
810              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
811              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
812          }
813
814 #define  RDMA_TRANS_INIT(x)      \
815          if (print_stats) {   comm_stats.rdma_count++;  \
816              double rdma_trans_time = CmiWallTimer() - x ; \
817              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
818              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
819          }
820
821
822 static void print_comm_stats()
823 {
824     fprintf(counterLog, "Node[%d]SMSG time in buffer\t[max:%f\tAverage:%f](milisecond)\n", myrank, 1000*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
825     fprintf(counterLog, "Node[%d]Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n", myrank, 
826             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
827             comm_stats.ack_count, comm_stats.big_msg_ack_count);
828     
829     fprintf(counterLog, "Node[%d]SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n\n", myrank, 
830             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
831             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count);
832
833     fprintf(counterLog, "Node[%d]Rdma Transaction [count:%lld\t calls:%lld]\n", myrank, comm_stats.rdma_count, comm_stats.try_rdma_count);
834     fprintf(counterLog, "Node[%d]Rdma time from control arrives to rdma init [MAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/comm_stats.rdma_count); 
835     fprintf(counterLog, "Node[%d]Rdma time from init to rdma done [MAX:%f\t Average:%f](milisecond)\n\n", myrank, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/comm_stats.rdma_count); 
836 }
837
838 #else
839 #define STATS_ACK_TIME(x)            x
840 #define STATS_SEND_SMSGS_TIME(x)     x
841 #endif
842
843 static int print_stats = 0;
844
845 static void
846 allgather(void *in,void *out, int len)
847 {
848     static int *ivec_ptr=NULL,already_called=0,job_size=0;
849     int i,rc;
850     int my_rank;
851     char *tmp_buf,*out_ptr;
852
853     if(!already_called) {
854
855         rc = PMI_Get_size(&job_size);
856         CmiAssert(rc == PMI_SUCCESS);
857         rc = PMI_Get_rank(&my_rank);
858         CmiAssert(rc == PMI_SUCCESS);
859
860         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
861         CmiAssert(ivec_ptr != NULL);
862
863         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
864         CmiAssert(rc == PMI_SUCCESS);
865
866         already_called = 1;
867
868     }
869
870     tmp_buf = (char *)malloc(job_size * len);
871     CmiAssert(tmp_buf);
872
873     rc = PMI_Allgather(in,tmp_buf,len);
874     CmiAssert(rc == PMI_SUCCESS);
875
876     out_ptr = out;
877
878     for(i=0;i<job_size;i++) {
879
880         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
881
882     }
883
884     free(tmp_buf);
885 }
886
887 static void
888 allgather_2(void *in,void *out, int len)
889 {
890     //PMI_Allgather is out of order
891     int i,rc, extend_len;
892     int  rank_index;
893     char *out_ptr, *out_ref;
894     char *in2;
895
896     extend_len = sizeof(int) + len;
897     in2 = (char*)malloc(extend_len);
898
899     memcpy(in2, &myrank, sizeof(int));
900     memcpy(in2+sizeof(int), in, len);
901
902     out_ptr = (char*)malloc(mysize*extend_len);
903
904     rc = PMI_Allgather(in2, out_ptr, extend_len);
905     GNI_RC_CHECK("allgather", rc);
906
907     out_ref = out;
908
909     for(i=0;i<mysize;i++) {
910         //rank index 
911         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
912         //copy to the rank index slot
913         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
914     }
915
916     free(out_ptr);
917     free(in2);
918
919 }
920
921 static unsigned int get_gni_nic_address(int device_id)
922 {
923     unsigned int address, cpu_id;
924     gni_return_t status;
925     int i, alps_dev_id=-1,alps_address=-1;
926     char *token, *p_ptr;
927
928     p_ptr = getenv("PMI_GNI_DEV_ID");
929     if (!p_ptr) {
930         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
931        
932         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
933     } else {
934         while ((token = strtok(p_ptr,":")) != NULL) {
935             alps_dev_id = atoi(token);
936             if (alps_dev_id == device_id) {
937                 break;
938             }
939             p_ptr = NULL;
940         }
941         CmiAssert(alps_dev_id != -1);
942         p_ptr = getenv("PMI_GNI_LOC_ADDR");
943         CmiAssert(p_ptr != NULL);
944         i = 0;
945         while ((token = strtok(p_ptr,":")) != NULL) {
946             if (i == alps_dev_id) {
947                 alps_address = atoi(token);
948                 break;
949             }
950             p_ptr = NULL;
951             ++i;
952         }
953         CmiAssert(alps_address != -1);
954         address = alps_address;
955     }
956     return address;
957 }
958
959 static uint8_t get_ptag(void)
960 {
961     char *p_ptr, *token;
962     uint8_t ptag;
963
964     p_ptr = getenv("PMI_GNI_PTAG");
965     CmiAssert(p_ptr != NULL);
966     token = strtok(p_ptr, ":");
967     ptag = (uint8_t)atoi(token);
968     return ptag;
969         
970 }
971
972 static uint32_t get_cookie(void)
973 {
974     uint32_t cookie;
975     char *p_ptr, *token;
976
977     p_ptr = getenv("PMI_GNI_COOKIE");
978     CmiAssert(p_ptr != NULL);
979     token = strtok(p_ptr, ":");
980     cookie = (uint32_t)atoi(token);
981
982     return cookie;
983 }
984
985 #if LARGEPAGE
986
987 /* directly mmap memory from hugetlbfs for large pages */
988
989 #include <sys/stat.h>
990 #include <fcntl.h>
991 #include <sys/mman.h>
992 #include <hugetlbfs.h>
993
994 // size must be _tlbpagesize aligned
995 void *my_get_huge_pages(size_t size)
996 {
997     char filename[512];
998     int fd;
999     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1000     void *ptr = NULL;
1001
1002     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1003     fd = open(filename, O_RDWR | O_CREAT, mode);
1004     if (fd == -1) {
1005         CmiAbort("my_get_huge_pages: open filed");
1006     }
1007     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1008     if (ptr == MAP_FAILED) ptr = NULL;
1009 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1010     close(fd);
1011     unlink(filename);
1012     return ptr;
1013 }
1014
1015 void my_free_huge_pages(void *ptr, int size)
1016 {
1017 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1018     int ret = munmap(ptr, size);
1019     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1020 }
1021
1022 #endif
1023
1024 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1025 /* TODO: add any that are related */
1026 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1027
1028
1029 #include "machine-lrts.h"
1030 #include "machine-common-core.c"
1031
1032 /* Network progress function is used to poll the network when for
1033    messages. This flushes receive buffers on some  implementations*/
1034 #if CMK_MACHINE_PROGRESS_DEFINED
1035 void CmiMachineProgressImpl() {
1036 }
1037 #endif
1038
1039 static int SendBufferMsg(SMSG_QUEUE *queue);
1040 static void SendRdmaMsg();
1041 static void PumpNetworkSmsg();
1042 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1043 #if CQWRITE
1044 static void PumpCqWriteTransactions();
1045 #endif
1046 #if REMOTE_EVENT
1047 static void PumpRemoteTransactions();
1048 #endif
1049
1050 #if MACHINE_DEBUG_LOG
1051 FILE *debugLog = NULL;
1052 static CmiInt8 buffered_recv_msg = 0;
1053 int         lrts_smsg_success = 0;
1054 int         lrts_received_msg = 0;
1055 #endif
1056
1057 static void sweep_mempool(mempool_type *mptr)
1058 {
1059     int n = 0;
1060     block_header *current = &(mptr->block_head);
1061
1062     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1063     while( current!= NULL) {
1064         printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1065         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1066     }
1067     printf("[n %d] sweep_mempool slot END.\n", myrank);
1068 }
1069
1070 inline
1071 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1072 {
1073     block_header *current = *from;
1074
1075     //while(register_memory_size>= MAX_REG_MEM)
1076     //{
1077         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1078             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1079
1080         *from = current;
1081         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1082         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1083         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1084     //}
1085     return GNI_RC_SUCCESS;
1086 }
1087
1088 inline 
1089 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1090 {
1091     gni_return_t status = GNI_RC_SUCCESS;
1092     //int size = GetMempoolsize(msg);
1093     //void *blockaddr = GetMempoolBlockPtr(msg);
1094     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1095    
1096     block_header *current = &(mptr->block_head);
1097     while(register_memory_size>= MAX_REG_MEM)
1098     {
1099         status = deregisterMemory(mptr, &current);
1100         if (status != GNI_RC_SUCCESS) break;
1101     }
1102     if(register_memory_size>= MAX_REG_MEM) return status;
1103
1104     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1105     while(1)
1106     {
1107         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1108         if(status == GNI_RC_SUCCESS)
1109         {
1110             break;
1111         }
1112         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1113         {
1114             CmiAbort("Memory registor for mempool fails\n");
1115         }
1116         else
1117         {
1118             status = deregisterMemory(mptr, &current);
1119             if (status != GNI_RC_SUCCESS) break;
1120         }
1121     }; 
1122     return status;
1123 }
1124
1125 inline 
1126 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1127 {
1128     static int rank = -1;
1129     int i;
1130     gni_return_t status;
1131     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1132     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1133     mempool_type *mptr;
1134
1135     status = registerFromMempool(mptr1, msg, size, t, cqh);
1136     if (status == GNI_RC_SUCCESS) return status;
1137 #if CMK_SMP 
1138     for (i=0; i<CmiMyNodeSize()+1; i++) {
1139       rank = (rank+1)%(CmiMyNodeSize()+1);
1140       mptr = CpvAccessOther(mempool, rank);
1141       if (mptr == mptr1) continue;
1142       status = registerFromMempool(mptr, msg, size, t, cqh);
1143       if (status == GNI_RC_SUCCESS) return status;
1144     }
1145 #endif
1146     return  GNI_RC_ERROR_RESOURCE;
1147 }
1148
1149 inline
1150 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1151 {
1152     MSG_LIST        *msg_tmp;
1153     MallocMsgList(msg_tmp);
1154     msg_tmp->destNode = destNode;
1155     msg_tmp->size   = size;
1156     msg_tmp->msg    = msg;
1157     msg_tmp->tag    = tag;
1158 #if CMK_WITH_STATS
1159     SMSG_CREATION(msg_tmp)
1160 #endif
1161 #if !CMK_SMP
1162     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1163         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1164         queue->smsg_head_index = destNode;
1165         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1166     }else
1167     {
1168         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1169         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1170     }
1171 #else
1172 #if ONE_SEND_QUEUE
1173     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1174 #else
1175 #if SMP_LOCKS
1176     CmiLock(queue->smsg_msglist_index[destNode].lock);
1177     if(queue->smsg_msglist_index[destNode].pushed == 0)
1178     {
1179         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1180     }
1181     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1182     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1183 #else
1184     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1185 #endif
1186 #endif
1187 #endif
1188 #if PRINT_SYH
1189     buffered_smsg_counter++;
1190 #endif
1191 }
1192
1193 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1194 {
1195     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1196 }
1197
1198 inline
1199 static void setup_smsg_connection(int destNode)
1200 {
1201     mdh_addr_list_t  *new_entry = 0;
1202     gni_post_descriptor_t *pd;
1203     gni_smsg_attr_t      *smsg_attr;
1204     gni_return_t status = GNI_RC_NOT_DONE;
1205     RDMA_REQUEST        *rdma_request_msg;
1206     
1207     if(smsg_available_slot == smsg_expand_slots)
1208     {
1209         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1210         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1211         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1212
1213         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1214             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1215             GNI_MEM_READWRITE,   
1216             -1,
1217             &(new_entry->mdh));
1218         smsg_available_slot = 0; 
1219         new_entry->next = smsg_dynamic_list;
1220         smsg_dynamic_list = new_entry;
1221     }
1222     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1223     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1224     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1225     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1226     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1227     smsg_attr->buff_size = smsg_memlen;
1228     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1229     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1230     smsg_local_attr_vec[destNode] = smsg_attr;
1231     smsg_available_slot++;
1232     MallocPostDesc(pd);
1233     pd->type            = GNI_POST_FMA_PUT;
1234     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1235     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1236     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1237     pd->length          = sizeof(gni_smsg_attr_t);
1238     pd->local_addr      = (uint64_t) smsg_attr;
1239     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1240     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1241     pd->src_cq_hndl     = rdma_tx_cqh;
1242     pd->rdma_mode       = 0;
1243     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1244     print_smsg_attr(smsg_attr);
1245     if(status == GNI_RC_ERROR_RESOURCE )
1246     {
1247         MallocRdmaRequest(rdma_request_msg);
1248         rdma_request_msg->destNode = destNode;
1249         rdma_request_msg->pd = pd;
1250         /* buffer this request */
1251     }
1252 #if PRINT_SYH
1253     if(status != GNI_RC_SUCCESS)
1254        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1255     else
1256         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1257 #endif
1258 }
1259
1260 /* useDynamicSMSG */
1261 inline 
1262 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1263 {
1264     gni_return_t status = GNI_RC_NOT_DONE;
1265
1266     if(mailbox_list->offset == mailbox_list->size)
1267     {
1268         dynamic_smsg_mailbox_t *new_mailbox_entry;
1269         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1270         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1271         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1272         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1273         new_mailbox_entry->offset = 0;
1274         
1275         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1276             new_mailbox_entry->size, smsg_rx_cqh,
1277             GNI_MEM_READWRITE,   
1278             -1,
1279             &(new_mailbox_entry->mem_hndl));
1280
1281         GNI_RC_CHECK("register", status);
1282         new_mailbox_entry->next = mailbox_list;
1283         mailbox_list = new_mailbox_entry;
1284     }
1285     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1286     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1287     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1288     local_smsg_attr->mbox_offset = mailbox_list->offset;
1289     mailbox_list->offset += smsg_memlen;
1290     local_smsg_attr->buff_size = smsg_memlen;
1291     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1292     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1293 }
1294
1295 /* useDynamicSMSG */
1296 inline 
1297 static int connect_to(int destNode)
1298 {
1299     gni_return_t status = GNI_RC_NOT_DONE;
1300     CmiAssert(smsg_connected_flag[destNode] == 0);
1301     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1302     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1303     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1304     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1305     
1306     CMI_GNI_LOCK(global_gni_lock)
1307     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1308     CMI_GNI_UNLOCK(global_gni_lock)
1309     if (status == GNI_RC_ERROR_RESOURCE) {
1310       /* possibly destNode is making connection at the same time */
1311       free(smsg_attr_vector_local[destNode]);
1312       smsg_attr_vector_local[destNode] = NULL;
1313       free(smsg_attr_vector_remote[destNode]);
1314       smsg_attr_vector_remote[destNode] = NULL;
1315       mailbox_list->offset -= smsg_memlen;
1316       return 0;
1317     }
1318     GNI_RC_CHECK("GNI_Post", status);
1319     smsg_connected_flag[destNode] = 1;
1320     return 1;
1321 }
1322
1323 inline 
1324 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1325 {
1326     unsigned int          remote_address;
1327     uint32_t              remote_id;
1328     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1329     gni_smsg_attr_t       *smsg_attr;
1330     gni_post_descriptor_t *pd;
1331     gni_post_state_t      post_state;
1332     char                  *real_data; 
1333
1334     if (useDynamicSMSG) {
1335         switch (smsg_connected_flag[destNode]) {
1336         case 0: 
1337             connect_to(destNode);         /* continue to case 1 */
1338         case 1:                           /* pending connection, do nothing */
1339             status = GNI_RC_NOT_DONE;
1340             if(inbuff ==0)
1341                 buffer_small_msgs(queue, msg, size, destNode, tag);
1342             return status;
1343         }
1344     }
1345 #if CMK_SMP
1346 #if ! ONE_SEND_QUEUE
1347     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1348 #endif
1349     {
1350 #else
1351     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1352     {
1353 #endif
1354         //CMI_GNI_LOCK(smsg_mailbox_lock)
1355         CMI_GNI_LOCK(default_tx_cq_lock)
1356 #if CMK_SMP_TRACE_COMMTHREAD
1357         int oldpe = -1;
1358         int oldeventid = -1;
1359         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1360         { 
1361             START_EVENT();
1362             if ( tag == SMALL_DATA_TAG)
1363                 real_data = (char*)msg; 
1364             else 
1365                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1366             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1367             TRACE_COMM_SET_COMM_MSGID(real_data);
1368         }
1369 #endif
1370 #if REMOTE_EVENT
1371         if (tag == LMSG_INIT_TAG) {
1372             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1373             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1374                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr);
1375         }
1376         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1377 #endif
1378 #if     CMK_WITH_STATS
1379         SMSG_TRY_SEND(tag)
1380 #endif
1381 #if CMK_WITH_STATS
1382     double              creation_time;
1383     if (ptr == NULL)
1384         creation_time = CmiWallTimer();
1385     else
1386         creation_time = ptr->creation_time;
1387 #endif
1388
1389     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1390 #if CMK_SMP_TRACE_COMMTHREAD
1391         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1392 #endif
1393         CMI_GNI_UNLOCK(default_tx_cq_lock)
1394         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1395         if(status == GNI_RC_SUCCESS)
1396         {
1397 #if     CMK_WITH_STATS
1398             SMSG_SENT_DONE(creation_time,tag) 
1399 #endif
1400 #if CMK_SMP_TRACE_COMMTHREAD
1401             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1402             { 
1403                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1404             }
1405 #endif
1406         }else
1407             status = GNI_RC_ERROR_RESOURCE;
1408     }
1409     if(status != GNI_RC_SUCCESS && inbuff ==0)
1410         buffer_small_msgs(queue, msg, size, destNode, tag);
1411     return status;
1412 }
1413
1414 inline 
1415 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1416 {
1417     /* construct a control message and send */
1418     CONTROL_MSG         *control_msg_tmp;
1419     MallocControlMsg(control_msg_tmp);
1420     control_msg_tmp->source_addr = (uint64_t)msg;
1421     control_msg_tmp->seq_id    = seqno;
1422     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1423 #if REMOTE_EVENT
1424     control_msg_tmp->ack_index    =  -1;
1425 #endif
1426 #if     USE_LRTS_MEMPOOL
1427     if(size < BIG_MSG)
1428     {
1429         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1430     }
1431     else
1432     {
1433         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1434         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1435         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1436     }
1437 #else
1438     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1439 #endif
1440     return control_msg_tmp;
1441 }
1442
1443 #define BLOCKING_SEND_CONTROL    0
1444
1445 // Large message, send control to receiver, receiver register memory and do a GET, 
1446 // return 1 - send no success
1447 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1448 {
1449     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1450     uint32_t            vmdh_index  = -1;
1451     int                 size;
1452     int                 offset = 0;
1453     uint64_t            source_addr;
1454     int                 register_size; 
1455     void                *msg;
1456
1457     size    =   control_msg_tmp->total_length;
1458     source_addr = control_msg_tmp->source_addr;
1459     register_size = control_msg_tmp->length;
1460
1461 #if  USE_LRTS_MEMPOOL
1462     if( control_msg_tmp->seq_id == 0 ){
1463 #if BLOCKING_SEND_CONTROL
1464         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1465             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1466                 LrtsAdvanceCommunication(0);
1467         }
1468 #endif
1469         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1470         {
1471             msg = (void*)source_addr;
1472             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1473             {
1474                 if(!inbuff)
1475                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1476                 return GNI_RC_ERROR_NOMEM;
1477             }
1478             //register the corresponding mempool
1479             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1480             if(status == GNI_RC_SUCCESS)
1481             {
1482                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1483             }
1484         }else
1485         {
1486             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1487             status = GNI_RC_SUCCESS;
1488         }
1489         if(NoMsgInSend(source_addr))
1490             register_size = GetMempoolsize((void*)(source_addr));
1491         else
1492             register_size = 0;
1493     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1494     {
1495         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1496         source_addr += offset;
1497         size = control_msg_tmp->length;
1498 #if BLOCKING_SEND_CONTROL
1499         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1500             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1501                 LrtsAdvanceCommunication(0);
1502         }
1503 #endif
1504         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1505             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1506             {
1507                 if(!inbuff)
1508                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1509                 return GNI_RC_ERROR_NOMEM;
1510             }
1511             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1512             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1513         }
1514         else
1515         {
1516             status = GNI_RC_SUCCESS;
1517         }
1518         register_size = 0;  
1519     }
1520
1521     if(status == GNI_RC_SUCCESS)
1522     {
1523         status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1524         if(status == GNI_RC_SUCCESS)
1525         {
1526             buffered_send_msg += register_size;
1527             if(control_msg_tmp->seq_id == 0)
1528             {
1529                 IncreaseMsgInSend(source_addr);
1530             }
1531             FreeControlMsg(control_msg_tmp);
1532             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1533         }else
1534             status = GNI_RC_ERROR_RESOURCE;
1535
1536     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1537     {
1538         CmiAbort("Memory registor for large msg\n");
1539     }else 
1540     {
1541         status = GNI_RC_ERROR_NOMEM; 
1542         if(!inbuff)
1543             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1544     }
1545     return status;
1546 #else
1547     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1548     if(status == GNI_RC_SUCCESS)
1549     {
1550         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1551         if(status == GNI_RC_SUCCESS)
1552         {
1553             FreeControlMsg(control_msg_tmp);
1554         }
1555     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1556     {
1557         CmiAbort("Memory registor for large msg\n");
1558     }else 
1559     {
1560         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1561     }
1562     return status;
1563 #endif
1564 }
1565
1566 inline void LrtsPrepareEnvelope(char *msg, int size)
1567 {
1568     CmiSetMsgSize(msg, size);
1569     CMI_SET_CHECKSUM(msg, size);
1570 }
1571
1572 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1573 {
1574     gni_return_t        status  =   GNI_RC_SUCCESS;
1575     uint8_t tag;
1576     CONTROL_MSG         *control_msg_tmp;
1577     int                 oob = ( mode & OUT_OF_BAND);
1578     SMSG_QUEUE          *queue;
1579
1580     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1581 #if CMK_USE_OOB
1582     queue = oob? &smsg_oob_queue : &smsg_queue;
1583 #else
1584     queue = &smsg_queue;
1585 #endif
1586
1587     LrtsPrepareEnvelope(msg, size);
1588
1589 #if PRINT_SYH
1590     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1591 #endif 
1592 #if CMK_SMP 
1593     if(size <= SMSG_MAX_MSG)
1594         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1595     else if (size < BIG_MSG) {
1596         control_msg_tmp =  construct_control_msg(size, msg, 0);
1597         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1598     }
1599     else {
1600           CmiSetMsgSeq(msg, 0);
1601           control_msg_tmp =  construct_control_msg(size, msg, 1);
1602           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1603     }
1604 #else   //non-smp, smp(worker sending)
1605     if(size <= SMSG_MAX_MSG)
1606     {
1607         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1608             CmiFree(msg);
1609     }
1610     else if (size < BIG_MSG) {
1611         control_msg_tmp =  construct_control_msg(size, msg, 0);
1612         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1613     }
1614     else {
1615 #if     USE_LRTS_MEMPOOL
1616         CmiSetMsgSeq(msg, 0);
1617         control_msg_tmp =  construct_control_msg(size, msg, 1);
1618         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1619 #else
1620         control_msg_tmp =  construct_control_msg(size, msg, 0);
1621         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1622 #endif
1623     }
1624 #endif
1625     return 0;
1626 }
1627
1628 static void    PumpDatagramConnection();
1629 static      int         event_SetupConnect = 111;
1630 static      int         event_PumpSmsg = 222 ;
1631 static      int         event_PumpTransaction = 333;
1632 static      int         event_PumpRdmaTransaction = 444;
1633 static      int         event_SendBufferSmsg = 444;
1634 static      int         event_SendFmaRdmaMsg = 555;
1635 static      int         event_AdvanceCommunication = 666;
1636
1637 static void registerUserTraceEvents() {
1638 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1639     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1640     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1641     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1642     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1643     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1644     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1645     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1646 #endif
1647 }
1648
1649 static void ProcessDeadlock()
1650 {
1651     static CmiUInt8 *ptr = NULL;
1652     static CmiUInt8  last = 0, mysum, sum;
1653     static int count = 0;
1654     gni_return_t status;
1655     int i;
1656
1657 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1658 //sweep_mempool(CpvAccess(mempool));
1659     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1660     mysum = smsg_send_count + smsg_recv_count;
1661     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1662     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1663     GNI_RC_CHECK("PMI_Allgather", status);
1664     sum = 0;
1665     for (i=0; i<mysize; i++)  sum+= ptr[i];
1666     if (last == 0 || sum == last) 
1667         count++;
1668     else
1669         count = 0;
1670     last = sum;
1671     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1672     if (count == 2) { 
1673         /* detected twice, it is a real deadlock */
1674         if (myrank == 0)  {
1675             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1676             CmiAbort("Fatal> Deadlock detected.");
1677         }
1678
1679     }
1680     _detected_hang = 0;
1681 }
1682
1683 static void CheckProgress()
1684 {
1685     if (smsg_send_count == last_smsg_send_count &&
1686         smsg_recv_count == last_smsg_recv_count ) 
1687     {
1688         _detected_hang = 1;
1689 #if !CMK_SMP
1690         if (_detected_hang) ProcessDeadlock();
1691 #endif
1692
1693     }
1694     else {
1695         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1696         last_smsg_send_count = smsg_send_count;
1697         last_smsg_recv_count = smsg_recv_count;
1698         _detected_hang = 0;
1699     }
1700 }
1701
1702 static void set_limit()
1703 {
1704     //if (!user_set_flag && CmiMyRank() == 0) {
1705     if (CmiMyRank() == 0) {
1706         int mynode = CmiPhysicalNodeID(CmiMyPe());
1707         int numpes = CmiNumPesOnPhysicalNode(mynode);
1708         int numprocesses = numpes / CmiMyNodeSize();
1709         MAX_REG_MEM  = _totalmem / numprocesses;
1710         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1711         if (CmiMyPe() == 0)
1712            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1713         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1714         {
1715              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1716              CmiAbort("memory registration\n");
1717         }
1718     }
1719 }
1720
1721 void LrtsPostCommonInit(int everReturn)
1722 {
1723 #if CMK_DIRECT
1724     CmiDirectInit();
1725 #endif
1726 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1727     CpvInitialize(double, projTraceStart);
1728     /* only PE 0 needs to care about registration (to generate sts file). */
1729     //if (CmiMyPe() == 0) 
1730     {
1731         registerMachineUserEventsFunction(&registerUserTraceEvents);
1732     }
1733 #endif
1734
1735 #if CMK_SMP
1736     CmiIdleState *s=CmiNotifyGetState();
1737     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1738     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1739 #else
1740     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1741     if (useDynamicSMSG)
1742     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1743 #endif
1744
1745 #if ! LARGEPAGE
1746     if (_checkProgress)
1747 #if CMK_SMP
1748     if (CmiMyRank() == 0)
1749 #endif
1750     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1751 #endif
1752  
1753 #if !LARGEPAGE
1754     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1755 #endif
1756 }
1757
1758 /* this is called by worker thread */
1759 void LrtsPostNonLocal(){
1760 #if CMK_SMP_TRACE_COMMTHREAD
1761     double startT, endT;
1762 #endif
1763 #if MULTI_THREAD_SEND
1764     if(mysize == 1) return;
1765 #if CMK_SMP_TRACE_COMMTHREAD
1766     traceEndIdle();
1767 #endif
1768
1769 #if CMK_SMP_TRACE_COMMTHREAD
1770     startT = CmiWallTimer();
1771 #endif
1772
1773 #if CMK_WORKER_SINGLE_TASK
1774     if (CmiMyRank() % 6 == 0)
1775 #endif
1776     PumpNetworkSmsg();
1777
1778 #if CMK_WORKER_SINGLE_TASK
1779     if (CmiMyRank() % 6 == 1)
1780 #endif
1781     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1782
1783 #if CMK_WORKER_SINGLE_TASK
1784     if (CmiMyRank() % 6 == 2)
1785 #endif
1786     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1787
1788 #if REMOTE_EVENT
1789 #if CMK_WORKER_SINGLE_TASK
1790     if (CmiMyRank() % 6 == 3)
1791 #endif
1792     PumpRemoteTransactions();
1793 #endif
1794
1795 #if CMK_USE_OOB
1796     if (SendBufferMsg(&smsg_oob_queue) == 1)
1797 #endif
1798     {
1799 #if CMK_WORKER_SINGLE_TASK
1800     if (CmiMyRank() % 6 == 4)
1801 #endif
1802         SendBufferMsg(&smsg_queue);
1803     }
1804
1805 #if CMK_WORKER_SINGLE_TASK
1806     if (CmiMyRank() % 6 == 5)
1807 #endif
1808     SendRdmaMsg();
1809
1810 #if CMK_SMP_TRACE_COMMTHREAD
1811     endT = CmiWallTimer();
1812     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1813 #endif
1814 #if CMK_SMP_TRACE_COMMTHREAD
1815     traceBeginIdle();
1816 #endif
1817 #endif
1818 }
1819
1820 /* useDynamicSMSG */
1821 static void    PumpDatagramConnection()
1822 {
1823     uint32_t          remote_address;
1824     uint32_t          remote_id;
1825     gni_return_t status;
1826     gni_post_state_t  post_state;
1827     uint64_t          datagram_id;
1828     int i;
1829
1830    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1831    {
1832        if (datagram_id >= mysize) {           /* bound endpoint */
1833            int pe = datagram_id - mysize;
1834            CMI_GNI_LOCK(global_gni_lock)
1835            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1836            CMI_GNI_UNLOCK(global_gni_lock)
1837            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1838            {
1839                CmiAssert(remote_id == pe);
1840                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1841                GNI_RC_CHECK("Dynamic SMSG Init", status);
1842 #if PRINT_SYH
1843                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1844 #endif
1845                CmiAssert(smsg_connected_flag[pe] == 1);
1846                smsg_connected_flag[pe] = 2;
1847            }
1848        }
1849        else {         /* unbound ep */
1850            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1851            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1852            {
1853                CmiAssert(remote_id<mysize);
1854                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1855                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1856                GNI_RC_CHECK("Dynamic SMSG Init", status);
1857 #if PRINT_SYH
1858                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1859 #endif
1860                smsg_connected_flag[remote_id] = 2;
1861
1862                alloc_smsg_attr(&send_smsg_attr);
1863                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1864                GNI_RC_CHECK("post unbound datagram", status);
1865            }
1866        }
1867    }
1868 }
1869
1870 /* pooling CQ to receive network message */
1871 static void PumpNetworkRdmaMsgs()
1872 {
1873     gni_cq_entry_t      event_data;
1874     gni_return_t        status;
1875
1876 }
1877
1878 inline 
1879 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
1880 {
1881     RDMA_REQUEST        *rdma_request_msg;
1882     MallocRdmaRequest(rdma_request_msg);
1883     rdma_request_msg->destNode = inst_id;
1884     rdma_request_msg->pd = pd;
1885 #if REMOTE_EVENT
1886     rdma_request_msg->ack_index = ack_index;
1887 #endif
1888 #if CMK_SMP
1889     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1890 #else
1891     if(sendRdmaBuf == 0)
1892     {
1893         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1894     }else{
1895         sendRdmaTail->next = rdma_request_msg;
1896         sendRdmaTail =  rdma_request_msg;
1897     }
1898 #endif
1899
1900 }
1901
1902 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1903
1904 static void PumpNetworkSmsg()
1905 {
1906     uint64_t            inst_id;
1907     int                 ret;
1908     gni_cq_entry_t      event_data;
1909     gni_return_t        status, status2;
1910     void                *header;
1911     uint8_t             msg_tag;
1912     int                 msg_nbytes;
1913     void                *msg_data;
1914     gni_mem_handle_t    msg_mem_hndl;
1915     gni_smsg_attr_t     *smsg_attr;
1916     gni_smsg_attr_t     *remote_smsg_attr;
1917     int                 init_flag;
1918     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1919     uint64_t            source_addr;
1920     SMSG_QUEUE         *queue = &smsg_queue;
1921 #if     CMK_DIRECT
1922     cmidirectMsg        *direct_msg;
1923 #endif
1924     while(1)
1925     {
1926         CMI_GNI_LOCK(smsg_rx_cq_lock)
1927         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1928         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
1929         if(status != GNI_RC_SUCCESS)
1930             break;
1931         inst_id = GNI_CQ_GET_INST_ID(event_data);
1932 #if REMOTE_EVENT
1933         inst_id = ACK_GET_RANK(inst_id);
1934 #endif
1935         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1936 #if PRINT_SYH
1937         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1938 #endif
1939         if (useDynamicSMSG) {
1940             /* subtle: smsg may come before connection is setup */
1941             while (smsg_connected_flag[inst_id] != 2) 
1942                PumpDatagramConnection();
1943         }
1944         msg_tag = GNI_SMSG_ANY_TAG;
1945         while(1) {
1946             CMI_GNI_LOCK(smsg_mailbox_lock)
1947             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1948             if (status != GNI_RC_SUCCESS)
1949             {
1950                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1951                 break;
1952             }
1953 #if PRINT_SYH
1954             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1955 #endif
1956             /* copy msg out and then put into queue (small message) */
1957             switch (msg_tag) {
1958             case SMALL_DATA_TAG:
1959             {
1960                 START_EVENT();
1961                 msg_nbytes = CmiGetMsgSize(header);
1962                 msg_data    = CmiAlloc(msg_nbytes);
1963                 memcpy(msg_data, (char*)header, msg_nbytes);
1964                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1965                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1966                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1967                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
1968                 handleOneRecvedMsg(msg_nbytes, msg_data);
1969                 break;
1970             }
1971             case LMSG_INIT_TAG:
1972             {
1973 #if MULTI_THREAD_SEND
1974                 MallocControlMsg(control_msg_tmp);
1975                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
1976                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1977                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1978                 getLargeMsgRequest(control_msg_tmp, inst_id);
1979                 FreeControlMsg(control_msg_tmp);
1980 #else
1981                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1982                 getLargeMsgRequest(header, inst_id);
1983                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1984 #endif
1985                 break;
1986             }
1987             case ACK_TAG:   //msg fit into mempool
1988             {
1989                 /* Get is done, release message . Now put is not used yet*/
1990                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1991                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1992                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1993 #if ! USE_LRTS_MEMPOOL
1994                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1995 #else
1996                 DecreaseMsgInSend(msg);
1997 #endif
1998                 if(NoMsgInSend(msg))
1999                     buffered_send_msg -= GetMempoolsize(msg);
2000                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2001                 CmiFree(msg);
2002                 break;
2003             }
2004             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2005             {
2006 #if MULTI_THREAD_SEND
2007                 MallocControlMsg(header_tmp);
2008                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2009                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2010 #else
2011                 header_tmp = (CONTROL_MSG *) header;
2012 #endif
2013                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2014                 void *msg = (void*)(header_tmp->source_addr);
2015                 int cur_seq = CmiGetMsgSeq(msg);
2016                 int offset = ONE_SEG*(cur_seq+1);
2017                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2018                 buffered_send_msg -= header_tmp->length;
2019                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2020                 if (remain_size < 0) remain_size = 0;
2021                 CmiSetMsgSize(msg, remain_size);
2022                 if(remain_size <= 0) //transaction done
2023                 {
2024                     CmiFree(msg);
2025                 }else if (header_tmp->total_length > offset)
2026                 {
2027                     CmiSetMsgSeq(msg, cur_seq+1);
2028                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2029                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2030                     //send next seg
2031                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2032                          // pipelining
2033                     if (header_tmp->seq_id == 1) {
2034                       int i;
2035                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2036                         int seq = cur_seq+i+2;
2037                         CmiSetMsgSeq(msg, seq-1);
2038                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2039                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2040                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2041                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2042                       }
2043                     }
2044                 }
2045 #if MULTI_THREAD_SEND
2046                 FreeControlMsg(header_tmp);
2047 #else
2048                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2049 #endif
2050                 break;
2051             }
2052 #if CMK_PERSISTENT_COMM
2053             case PUT_DONE_TAG:  {   //persistent message
2054                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2055                 int size = ((CONTROL_MSG *) header)->length;
2056                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2057                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2058                 CmiReference(msg);
2059                 CMI_CHECK_CHECKSUM(msg, size);
2060                 handleOneRecvedMsg(size, msg); 
2061 #if PRINT_SYH
2062                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2063 #endif
2064                 break;
2065             }
2066 #endif
2067 #if CMK_DIRECT
2068             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2069                 //create a trigger message
2070                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2071                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2072                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2073                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2074                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2075                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2076                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2077                 break;
2078 #endif
2079             default: {
2080                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2081                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2082                 printf("weird tag problem\n");
2083                 CmiAbort("Unknown tag\n");
2084                      }
2085             }               // end switch
2086 #if PRINT_SYH
2087             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2088 #endif
2089             smsg_recv_count ++;
2090             msg_tag = GNI_SMSG_ANY_TAG;
2091         } //endwhile getNext
2092     }   //end while GetEvent
2093     if(status == GNI_RC_ERROR_RESOURCE)
2094     {
2095         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2096         GNI_RC_CHECK("Smsg_rx_cq full", status);
2097     }
2098 }
2099
2100 static void printDesc(gni_post_descriptor_t *pd)
2101 {
2102     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2103 }
2104
2105 #if CQWRITE
2106 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2107 {
2108     gni_post_descriptor_t *pd;
2109     gni_return_t        status = GNI_RC_SUCCESS;
2110     
2111     MallocPostDesc(pd);
2112     pd->type = GNI_POST_CQWRITE;
2113     pd->cq_mode = GNI_CQMODE_SILENT;
2114     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2115     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2116     pd->cqwrite_value = data;
2117     pd->remote_mem_hndl = mem_hndl;
2118     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2119     GNI_RC_CHECK("GNI_PostCqWrite", status);
2120 }
2121 #endif
2122
2123 // register memory for a message
2124 // return mem handle
2125 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2126 {
2127     gni_return_t status = GNI_RC_SUCCESS;
2128
2129     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2130
2131 #if CMK_PERSISTENT_COMM
2132         // persistent message is always registered
2133         // BIG_MSG small pieces do not have malloc chunk header
2134     if (seqno == PERSIST_SEQ && !IsMemHndlZero(MEMHFIELD(msg))) {
2135         *memh = MEMHFIELD(msg);
2136         return GNI_RC_SUCCESS;
2137     }
2138 #endif
2139     if(seqno == 0)
2140     {
2141         if(IsMemHndlZero((GetMemHndl(msg))))
2142         {
2143             msg = (void*)(msg);
2144             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2145             if(status == GNI_RC_SUCCESS)
2146                 *memh = GetMemHndl(msg);
2147         }
2148         else {
2149             *memh = GetMemHndl(msg);
2150         }
2151     }
2152     else {
2153         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2154         status = registerMemory(msg, size, memh, NULL); 
2155     }
2156     return status;
2157 }
2158
2159 // for BIG_MSG called on receiver side for receiving control message
2160 // LMSG_INIT_TAG
2161 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2162 {
2163 #if     USE_LRTS_MEMPOOL
2164     CONTROL_MSG         *request_msg;
2165     gni_return_t        status = GNI_RC_SUCCESS;
2166     void                *msg_data;
2167     gni_post_descriptor_t *pd;
2168     gni_mem_handle_t    msg_mem_hndl;
2169     int                 size, transaction_size, offset = 0;
2170     size_t              register_size = 0;
2171
2172     // initial a get to transfer data from the sender side */
2173     request_msg = (CONTROL_MSG *) header;
2174     size = request_msg->total_length;
2175     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2176     MallocPostDesc(pd);
2177 #if CMK_WITH_STATS 
2178     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2179 #endif
2180     if(request_msg->seq_id < 2)   {
2181 #if CMK_SMP_TRACE_COMMTHREAD 
2182         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2183 #endif
2184         msg_data = CmiAlloc(size);
2185         CmiSetMsgSeq(msg_data, 0);
2186         _MEMCHECK(msg_data);
2187     }
2188     else {
2189         offset = ONE_SEG*(request_msg->seq_id-1);
2190         msg_data = (char*)request_msg->dest_addr + offset;
2191     }
2192    
2193     pd->cqwrite_value = request_msg->seq_id;
2194
2195     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2196     SetMemHndlZero(pd->local_mem_hndl);
2197     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2198     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2199         if(NoMsgInRecv( (void*)(msg_data)))
2200             register_size = GetMempoolsize((void*)(msg_data));
2201     }
2202
2203     pd->first_operand = ALIGN64(size);                   //  total length
2204
2205     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2206         pd->type            = GNI_POST_FMA_GET;
2207     else
2208         pd->type            = GNI_POST_RDMA_GET;
2209     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2210     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2211     pd->length          = transaction_size;
2212     pd->local_addr      = (uint64_t) msg_data;
2213     pd->remote_addr     = request_msg->source_addr + offset;
2214     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2215     pd->src_cq_hndl     = rdma_tx_cqh;
2216     pd->rdma_mode       = 0;
2217     pd->amo_cmd         = 0;
2218
2219     //memory registration success
2220     if(status == GNI_RC_SUCCESS)
2221     {
2222         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2223         CMI_GNI_LOCK(lock)
2224 #if REMOTE_EVENT
2225         if( request_msg->seq_id == 0)
2226         {
2227             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2228             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2229             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2230         }
2231 #endif
2232
2233 #if CMK_WITH_STATS
2234         RDMA_TRY_SEND()
2235 #endif
2236         if(pd->type == GNI_POST_RDMA_GET) 
2237         {
2238             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2239         }
2240         else
2241         {
2242             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2243         }
2244         CMI_GNI_UNLOCK(lock)
2245
2246         if(status == GNI_RC_SUCCESS )
2247         {
2248             if(pd->cqwrite_value == 0)
2249             {
2250 #if MACHINE_DEBUG_LOG
2251                 buffered_recv_msg += register_size;
2252                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2253 #endif
2254                 IncreaseMsgInRecv(msg_data);
2255 #if CMK_SMP_TRACE_COMMTHREAD 
2256                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2257 #endif
2258             }
2259 #if  CMK_WITH_STATS
2260                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2261                 RDMA_TRANS_INIT(pd->sync_flag_addr/1000000.0)
2262 #endif
2263         }
2264     }else
2265     {
2266         SetMemHndlZero((pd->local_mem_hndl));
2267     }
2268     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2269     {
2270 #if REMOTE_EVENT
2271         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2272 #else
2273         bufferRdmaMsg(inst_id, pd, -1); 
2274 #endif
2275     }else if (status != GNI_RC_SUCCESS) {
2276         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2277         GNI_RC_CHECK("GetLargeAFter posting", status);
2278     }
2279 #else
2280     CONTROL_MSG         *request_msg;
2281     gni_return_t        status;
2282     void                *msg_data;
2283     gni_post_descriptor_t *pd;
2284     RDMA_REQUEST        *rdma_request_msg;
2285     gni_mem_handle_t    msg_mem_hndl;
2286     //int source;
2287     // initial a get to transfer data from the sender side */
2288     request_msg = (CONTROL_MSG *) header;
2289     msg_data = CmiAlloc(request_msg->length);
2290     _MEMCHECK(msg_data);
2291
2292     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2293
2294     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2295     {
2296         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2297     }
2298
2299     MallocPostDesc(pd);
2300     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2301         pd->type            = GNI_POST_FMA_GET;
2302     else
2303         pd->type            = GNI_POST_RDMA_GET;
2304     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2305     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2306     pd->length          = ALIGN64(request_msg->length);
2307     pd->local_addr      = (uint64_t) msg_data;
2308     pd->remote_addr     = request_msg->source_addr;
2309     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2310     pd->src_cq_hndl     = rdma_tx_cqh;
2311     pd->rdma_mode       = 0;
2312     pd->amo_cmd         = 0;
2313
2314     //memory registration successful
2315     if(status == GNI_RC_SUCCESS)
2316     {
2317         pd->local_mem_hndl  = msg_mem_hndl;
2318        
2319         if(pd->type == GNI_POST_RDMA_GET) 
2320         {
2321             CMI_GNI_LOCK(rdma_tx_cq_lock)
2322             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2323             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2324         }
2325         else
2326         {
2327             CMI_GNI_LOCK(default_tx_cq_lock)
2328             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2329             CMI_GNI_UNLOCK(default_tx_cq_lock)
2330         }
2331
2332     }else
2333     {
2334         SetMemHndlZero(pd->local_mem_hndl);
2335     }
2336     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2337     {
2338         MallocRdmaRequest(rdma_request_msg);
2339         rdma_request_msg->next = 0;
2340         rdma_request_msg->destNode = inst_id;
2341         rdma_request_msg->pd = pd;
2342         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2343     }else {
2344         GNI_RC_CHECK("AFter posting", status);
2345     }
2346 #endif
2347 }
2348
2349 #if CQWRITE
2350 static void PumpCqWriteTransactions()
2351 {
2352
2353     gni_cq_entry_t          ev;
2354     gni_return_t            status;
2355     void                    *msg;  
2356     int                     msg_size;
2357     while(1) {
2358         //CMI_GNI_LOCK(my_cq_lock) 
2359         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2360         //CMI_GNI_UNLOCK(my_cq_lock)
2361         if(status != GNI_RC_SUCCESS) break;
2362         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2363 #if CMK_PERSISTENT_COMM
2364 #if PRINT_SYH
2365         printf(" %d CQ write event %p\n", myrank, msg);
2366 #endif
2367         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2368 #if PRINT_SYH
2369             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2370 #endif
2371             CmiReference(msg);
2372             msg_size = CmiGetMsgSize(msg);
2373             CMI_CHECK_CHECKSUM(msg, msg_size);
2374             handleOneRecvedMsg(msg_size, msg); 
2375             continue;
2376         }
2377 #endif
2378         DecreaseMsgInSend(msg);
2379 #if ! USE_LRTS_MEMPOOL
2380        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2381 #else
2382         DecreaseMsgInSend(msg);
2383 #endif
2384         if(NoMsgInSend(msg))
2385             buffered_send_msg -= GetMempoolsize(msg);
2386         CmiFree(msg);
2387     };
2388     if(status == GNI_RC_ERROR_RESOURCE)
2389     {
2390         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2391     }
2392 }
2393 #endif
2394
2395 #if REMOTE_EVENT
2396 static void PumpRemoteTransactions()
2397 {
2398     gni_cq_entry_t          ev;
2399     gni_return_t            status;
2400     void                    *msg;   
2401     int                     slot;
2402
2403     while(1) {
2404         CMI_GNI_LOCK(global_gni_lock)
2405         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2406         CMI_GNI_UNLOCK(global_gni_lock)
2407         if(status != GNI_RC_SUCCESS) {
2408             break;
2409         }
2410
2411         slot = GNI_CQ_GET_INST_ID(ev);
2412         slot = ACK_GET_INDEX(slot);
2413         //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
2414
2415         //CMI_GNI_LOCK(ackpool_lock);
2416         msg = GetAckAddress(slot);
2417         //CMI_GNI_UNLOCK(ackpool_lock);
2418
2419         DecreaseMsgInSend(msg);
2420 #if ! USE_LRTS_MEMPOOL
2421        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2422 #else
2423         DecreaseMsgInSend(msg);
2424 #endif
2425         if(NoMsgInSend(msg))
2426             buffered_send_msg -= GetMempoolsize(msg);
2427         CmiFree(msg);
2428         IndexPool_freeslot(&ackPool, slot);
2429     }
2430     if(status == GNI_RC_ERROR_RESOURCE)
2431     {
2432         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2433     }
2434 }
2435 #endif
2436
2437 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2438 {
2439     gni_cq_entry_t          ev;
2440     gni_return_t            status;
2441     uint64_t                type, inst_id;
2442     gni_post_descriptor_t   *tmp_pd;
2443     MSG_LIST                *ptr;
2444     CONTROL_MSG             *ack_msg_tmp;
2445     ACK_MSG                 *ack_msg;
2446     uint8_t                 msg_tag;
2447 #if CMK_DIRECT
2448     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2449 #endif
2450     SMSG_QUEUE         *queue = &smsg_queue;
2451
2452     while(1) {
2453         CMI_GNI_LOCK(my_cq_lock) 
2454         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2455         CMI_GNI_UNLOCK(my_cq_lock)
2456         if(status != GNI_RC_SUCCESS) break;
2457         
2458         type = GNI_CQ_GET_TYPE(ev);
2459         if (type == GNI_CQ_EVENT_TYPE_POST)
2460         {
2461             inst_id     = GNI_CQ_GET_INST_ID(ev);
2462 #if PRINT_SYH
2463             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2464 #endif
2465             CMI_GNI_LOCK(my_cq_lock)
2466             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2467             CMI_GNI_UNLOCK(my_cq_lock)
2468
2469             switch (tmp_pd->type) {
2470 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2471             case GNI_POST_RDMA_PUT:
2472 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2473                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2474 #endif
2475             case GNI_POST_FMA_PUT:
2476                 if(tmp_pd->amo_cmd == 1) {
2477 #if CMK_DIRECT
2478                     //sender ACK to receiver to trigger it is done
2479                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2480                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2481                     msg_tag = DIRECT_PUT_DONE_TAG;
2482 #endif
2483                 }
2484                 else {
2485                     CmiFree((void *)tmp_pd->local_addr);
2486 #if     !CQWRITE
2487                     MallocControlMsg(ack_msg_tmp);
2488                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2489                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2490                     ack_msg_tmp->length  = tmp_pd->length;
2491                     msg_tag = PUT_DONE_TAG;
2492 #else
2493                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2494                     FreePostDesc(tmp_pd);
2495                     continue;
2496 #endif
2497                 }
2498                 break;
2499 #endif
2500             case GNI_POST_RDMA_GET:
2501             case GNI_POST_FMA_GET:  {
2502 #if  ! USE_LRTS_MEMPOOL
2503                 MallocControlMsg(ack_msg_tmp);
2504                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2505                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2506                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2507                 msg_tag = ACK_TAG;  
2508 #else
2509 #if CMK_WITH_STATS
2510                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2511 #endif
2512                 int seq_id = tmp_pd->cqwrite_value;
2513                 if(seq_id > 0)      // BIG_MSG
2514                 {
2515                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2516                     MallocControlMsg(ack_msg_tmp);
2517                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2518                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2519                     ack_msg_tmp->seq_id = seq_id;
2520                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2521                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2522                     ack_msg_tmp->length = tmp_pd->length;
2523                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2524                     msg_tag = BIG_MSG_TAG; 
2525                 } 
2526                 else
2527                 {
2528                     msg_tag = ACK_TAG; 
2529 #if  !REMOTE_EVENT && !CQWRITE
2530                     MallocAckMsg(ack_msg);
2531                     ack_msg->source_addr = tmp_pd->remote_addr;
2532 #endif
2533                 }
2534 #endif
2535                 break;
2536             }
2537             case  GNI_POST_CQWRITE:
2538                    FreePostDesc(tmp_pd);
2539                    continue;
2540             default:
2541                 CmiPrintf("type=%d\n", tmp_pd->type);
2542                 CmiAbort("PumpLocalTransactions: unknown type!");
2543             }      /* end of switch */
2544
2545 #if CMK_DIRECT
2546             if (tmp_pd->amo_cmd == 1) {
2547                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2548                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2549             }
2550             else
2551 #endif
2552             if (msg_tag == ACK_TAG) {
2553 #if !REMOTE_EVENT
2554 #if   !CQWRITE
2555                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2556                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2557 #else
2558                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2559 #endif
2560 #endif
2561             }
2562             else {
2563                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2564                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2565             }
2566 #if CMK_PERSISTENT_COMM
2567             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2568 #endif
2569             {
2570                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2571 #if PRINT_SYH
2572                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2573 #endif
2574                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2575                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2576
2577                     START_EVENT();
2578                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2579                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2580 #if MACHINE_DEBUG_LOG
2581                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2582                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2583                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2584 #endif
2585                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2586                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2587                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2588                 }else if(msg_tag == BIG_MSG_TAG){
2589                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2590                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2591                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2592                         START_EVENT();
2593 #if PRINT_SYH
2594                         printf("Pipeline msg done [%d]\n", myrank);
2595 #endif
2596 #if                 CMK_SMP_TRACE_COMMTHREAD
2597                         if( tmp_pd->cqwrite_value == 1)
2598                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2599 #endif
2600                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2601                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2602                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2603                     }
2604                 }
2605             }
2606             FreePostDesc(tmp_pd);
2607         }
2608     } //end while
2609     if(status == GNI_RC_ERROR_RESOURCE)
2610     {
2611         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2612         GNI_RC_CHECK("Smsg_tx_cq full", status);
2613     }
2614 }
2615
2616 static void  SendRdmaMsg()
2617 {
2618     gni_return_t            status = GNI_RC_SUCCESS;
2619     gni_mem_handle_t        msg_mem_hndl;
2620     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2621     RDMA_REQUEST            *pre = 0;
2622     uint64_t                register_size = 0;
2623     void                    *msg;
2624     int                     i;
2625 #if CMK_SMP
2626     int len = PCQueueLength(sendRdmaBuf);
2627     for (i=0; i<len; i++)
2628     {
2629         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2630         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2631         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2632         if (ptr == NULL) break;
2633 #else
2634     ptr = sendRdmaBuf;
2635     while (ptr!=0)
2636     {
2637 #endif 
2638         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2639         gni_post_descriptor_t *pd = ptr->pd;
2640         
2641         msg = (void*)(pd->local_addr);
2642         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2643         register_size = 0;
2644         if(pd->cqwrite_value == 0) {
2645             if(NoMsgInRecv(msg))
2646                 register_size = GetMempoolsize(msg);
2647         }
2648
2649         if(status == GNI_RC_SUCCESS)        //mem register good
2650         {
2651             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2652             CMI_GNI_LOCK(lock);
2653 #if REMOTE_EVENT
2654             if( pd->cqwrite_value == 0)
2655             {
2656                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2657                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2658                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2659             }
2660 #endif
2661 #if CMK_WITH_STATS
2662             RDMA_TRY_SEND()
2663 #endif
2664             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2665             {
2666                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2667             }
2668             else
2669             {
2670                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2671             }
2672             CMI_GNI_UNLOCK(lock);
2673             
2674             if(status == GNI_RC_SUCCESS)    //post good
2675             {
2676 #if !CMK_SMP
2677                 tmp_ptr = ptr;
2678                 if(pre != 0) {
2679                     pre->next = ptr->next;
2680                 }
2681                 else {
2682                     sendRdmaBuf = ptr->next;
2683                 }
2684                 ptr = ptr->next;
2685                 FreeRdmaRequest(tmp_ptr);
2686 #endif
2687                 if(pd->cqwrite_value == 0)
2688                 {
2689 #if CMK_SMP_TRACE_COMMTHREAD 
2690                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2691 #endif
2692                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2693                 }
2694 #if  CMK_WITH_STATS
2695                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2696                 RDMA_TRANS_INIT(pd->sync_flag_addr/1000000.0)
2697 #endif
2698 #if MACHINE_DEBUG_LOG
2699                 buffered_recv_msg += register_size;
2700                 MACHSTATE(8, "GO request from buffered\n"); 
2701 #endif
2702 #if PRINT_SYH
2703                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
2704 #endif
2705             }else           // cannot post
2706             {
2707 #if CMK_SMP
2708                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2709 #else
2710                 pre = ptr;
2711                 ptr = ptr->next;
2712 #endif
2713 #if PRINT_SYH
2714                 printf("[%d] SendRdmaMsg: post failed. seqno: %d\n", myrank, pd->cqwrite_value);
2715 #endif
2716                 break;
2717             }
2718         } else          //memory registration fails
2719         {
2720 #if CMK_SMP
2721             PCQueuePush(sendRdmaBuf, (char*)ptr);
2722 #else
2723             pre = ptr;
2724             ptr = ptr->next;
2725 #endif
2726         }
2727     } //end while
2728 #if ! CMK_SMP
2729     if(ptr == 0)
2730         sendRdmaTail = pre;
2731 #endif
2732 }
2733
2734 // return 1 if all messages are sent
2735 static int SendBufferMsg(SMSG_QUEUE *queue)
2736 {
2737     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2738     CONTROL_MSG         *control_msg_tmp;
2739     gni_return_t        status;
2740     int                 done = 1;
2741     uint64_t            register_size;
2742     void                *register_addr;
2743     int                 index_previous = -1;
2744 #if CMI_EXERT_SEND_CAP
2745     int                 sent_cnt = 0;
2746 #endif
2747
2748 #if CMK_SMP
2749     int          index = 0;
2750 #if ONE_SEND_QUEUE
2751     memset(destpe_avail, 0, mysize * sizeof(char));
2752     for (index=0; index<1; index++)
2753     {
2754         int i, len = PCQueueLength(queue->sendMsgBuf);
2755         for (i=0; i<len; i++) 
2756         {
2757             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2758             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2759             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2760             if(ptr == NULL) break;
2761             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2762                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2763                 continue;
2764             }
2765 #else
2766 #if SMP_LOCKS
2767     int nonempty = PCQueueLength(nonEmptyQueues);
2768     for(index =0; index<nonempty; index++)
2769     {
2770         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2771         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2772         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
2773         if(current_list == NULL) break; 
2774         PCQueue current_queue= current_list-> sendSmsgBuf;
2775         CmiLock(current_list->lock);
2776         int i, len = PCQueueLength(current_queue);
2777         current_list->pushed = 0;
2778         CmiUnlock(current_list->lock);
2779 #else
2780     for(index =0; index<mysize; index++)
2781     {
2782         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2783         int i, len = PCQueueLength(current_queue);
2784 #endif
2785         for (i=0; i<len; i++) 
2786         {
2787             CMI_PCQUEUEPOP_LOCK(current_queue)
2788             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2789             CMI_PCQUEUEPOP_UNLOCK(current_queue)
2790             if (ptr == 0) break;
2791 #endif
2792 #else
2793     int index = queue->smsg_head_index;
2794     while(index != -1)
2795     {
2796         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2797         pre = 0;
2798         while(ptr != 0)
2799         {
2800 #endif
2801             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2802             status = GNI_RC_ERROR_RESOURCE;
2803             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2804                 /* connection not exists yet */
2805             }
2806             else
2807             switch(ptr->tag)
2808             {
2809             case SMALL_DATA_TAG:
2810                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2811                 if(status == GNI_RC_SUCCESS)
2812                 {
2813                     CmiFree(ptr->msg);
2814                 }
2815                 break;
2816             case LMSG_INIT_TAG:
2817                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2818                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
2819                 break;
2820             case ACK_TAG:
2821                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2822                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2823                 break;
2824             case BIG_MSG_TAG:
2825                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2826                 if(status == GNI_RC_SUCCESS)
2827                 {
2828                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2829                 }
2830                 break;
2831 #if CMK_DIRECT
2832             case DIRECT_PUT_DONE_TAG:
2833                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
2834                 if(status == GNI_RC_SUCCESS)
2835                 {
2836                     free((CMK_DIRECT_HEADER*)ptr->msg);
2837                 }
2838                 break;
2839
2840 #endif
2841             default:
2842                 printf("Weird tag\n");
2843                 CmiAbort("should not happen\n");
2844             }       // end switch
2845             if(status == GNI_RC_SUCCESS)
2846             {
2847 #if PRINT_SYH
2848                 buffered_smsg_counter--;
2849                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2850 #endif
2851 #if !CMK_SMP
2852                 tmp_ptr = ptr;
2853                 if(pre)
2854                 {
2855                     ptr = pre ->next = ptr->next;
2856                 }else
2857                 {
2858                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2859                 }
2860                 FreeMsgList(tmp_ptr);
2861 #else
2862                 FreeMsgList(ptr);
2863 #endif
2864 #if CMI_EXERT_SEND_CAP
2865                 sent_cnt++;
2866                 if(sent_cnt == SEND_CAP)
2867                     break;
2868 #endif
2869             }else {
2870 #if CMK_SMP
2871 #if ONE_SEND_QUEUE
2872                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2873 #else
2874                 PCQueuePush(current_queue, (char*)ptr);
2875 #endif
2876 #else
2877                 pre = ptr;
2878                 ptr=ptr->next;
2879 #endif
2880                 done = 0;
2881                 if(status == GNI_RC_ERROR_RESOURCE)
2882                 {
2883 #if CMK_SMP && ONE_SEND_QUEUE 
2884                     destpe_avail[ptr->destNode] = 1;
2885 #else
2886                     break;
2887 #endif
2888                 }
2889             } 
2890         } //end while
2891 #if !CMK_SMP
2892         if(ptr == 0)
2893             queue->smsg_msglist_index[index].tail = pre;
2894         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2895         {
2896             if(index_previous != -1)
2897                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2898             else
2899                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2900         }
2901         else {
2902             index_previous = index;
2903         }
2904         index = queue->smsg_msglist_index[index].next;
2905 #else
2906 #if !ONE_SEND_QUEUE && SMP_LOCKS
2907         CmiLock(current_list->lock);
2908         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2909         {
2910             current_list->pushed = 1;
2911             PCQueuePush(nonEmptyQueues, current_list);
2912         }
2913         CmiUnlock(current_list->lock); 
2914 #endif
2915 #endif
2916
2917 #if CMI_EXERT_SEND_CAP
2918         if(sent_cnt == SEND_CAP)
2919                 break;
2920 #endif
2921     }   // end pooling for all cores
2922     return done;
2923 }
2924
2925 static void ProcessDeadlock();
2926 void LrtsAdvanceCommunication(int whileidle)
2927 {
2928     static int count = 0;
2929     /*  Receive Msg first */
2930 #if CMK_SMP_TRACE_COMMTHREAD
2931     double startT, endT;
2932 #endif
2933     if (useDynamicSMSG && whileidle)
2934     {
2935 #if CMK_SMP_TRACE_COMMTHREAD
2936         startT = CmiWallTimer();
2937 #endif
2938         PumpDatagramConnection();
2939 #if CMK_SMP_TRACE_COMMTHREAD
2940         endT = CmiWallTimer();
2941         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
2942 #endif
2943     }
2944
2945 #if CMK_SMP_TRACE_COMMTHREAD
2946     startT = CmiWallTimer();
2947 #endif
2948     PumpNetworkSmsg();
2949     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2950 #if CMK_SMP_TRACE_COMMTHREAD
2951     endT = CmiWallTimer();
2952     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
2953 #endif
2954
2955 #if CMK_SMP_TRACE_COMMTHREAD
2956     startT = CmiWallTimer();
2957 #endif
2958     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2959     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
2960 #if CMK_SMP_TRACE_COMMTHREAD
2961     endT = CmiWallTimer();
2962     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
2963 #endif
2964
2965 #if CMK_SMP_TRACE_COMMTHREAD
2966     startT = CmiWallTimer();
2967 #endif
2968     PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock);
2969
2970 #if CQWRITE
2971     PumpCqWriteTransactions();
2972 #endif
2973
2974 #if REMOTE_EVENT
2975     PumpRemoteTransactions();
2976 #endif
2977
2978     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
2979 #if CMK_SMP_TRACE_COMMTHREAD
2980     endT = CmiWallTimer();
2981     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
2982 #endif
2983  
2984     /* Send buffered Message */
2985 #if CMK_SMP_TRACE_COMMTHREAD
2986     startT = CmiWallTimer();
2987 #endif
2988 #if CMK_USE_OOB
2989     if (SendBufferMsg(&smsg_oob_queue) == 1)
2990 #endif
2991     {
2992     SendBufferMsg(&smsg_queue);
2993     }
2994     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2995 #if CMK_SMP_TRACE_COMMTHREAD
2996     endT = CmiWallTimer();
2997     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
2998 #endif
2999
3000 #if CMK_SMP_TRACE_COMMTHREAD
3001     startT = CmiWallTimer();
3002 #endif
3003     SendRdmaMsg();
3004     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3005 #if CMK_SMP_TRACE_COMMTHREAD
3006     endT = CmiWallTimer();
3007     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3008 #endif
3009
3010 #if CMK_SMP && ! LARGEPAGE
3011     if (_detected_hang)  ProcessDeadlock();
3012 #endif
3013 }
3014
3015 /* useDynamicSMSG */
3016 static void _init_dynamic_smsg()
3017 {
3018     gni_return_t status;
3019     uint32_t     vmdh_index = -1;
3020     int i;
3021
3022     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3023     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3024     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3025     for(i=0; i<mysize; i++) {
3026         smsg_connected_flag[i] = 0;
3027         smsg_attr_vector_local[i] = NULL;
3028         smsg_attr_vector_remote[i] = NULL;
3029     }
3030     if(mysize <=512)
3031     {
3032         SMSG_MAX_MSG = 4096;
3033     }else if (mysize <= 4096)
3034     {
3035         SMSG_MAX_MSG = 4096/mysize * 1024;
3036     }else if (mysize <= 16384)
3037     {
3038         SMSG_MAX_MSG = 512;
3039     }else {
3040         SMSG_MAX_MSG = 256;
3041     }
3042
3043     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3044     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3045     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3046     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3047     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3048
3049     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3050     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3051     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3052     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3053     mailbox_list->offset = 0;
3054     mailbox_list->next = 0;
3055     
3056     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3057         mailbox_list->size, smsg_rx_cqh,
3058         GNI_MEM_READWRITE,   
3059         vmdh_index,
3060         &(mailbox_list->mem_hndl));
3061     GNI_RC_CHECK("MEMORY registration for smsg", status);
3062
3063     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3064     GNI_RC_CHECK("Unbound EP", status);
3065     
3066     alloc_smsg_attr(&send_smsg_attr);
3067
3068     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3069     GNI_RC_CHECK("post unbound datagram", status);
3070
3071       /* always pre-connect to proc 0 */
3072     //if (myrank != 0) connect_to(0);
3073 }
3074
3075 static void _init_static_smsg()
3076 {
3077     gni_smsg_attr_t      *smsg_attr;
3078     gni_smsg_attr_t      remote_smsg_attr;
3079     gni_smsg_attr_t      *smsg_attr_vec;
3080     gni_mem_handle_t     my_smsg_mdh_mailbox;
3081     int      ret, i;
3082     gni_return_t status;
3083     uint32_t              vmdh_index = -1;
3084     mdh_addr_t            base_infor;
3085     mdh_addr_t            *base_addr_vec;
3086     char *env;
3087
3088     if(mysize <=512)
3089     {
3090         SMSG_MAX_MSG = 1024;
3091     }else if (mysize <= 4096)
3092     {
3093         SMSG_MAX_MSG = 1024;
3094     }else if (mysize <= 16384)
3095     {
3096         SMSG_MAX_MSG = 512;
3097     }else {
3098         SMSG_MAX_MSG = 256;
3099     }
3100     
3101     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3102     if (env) SMSG_MAX_MSG = atoi(env);
3103     CmiAssert(SMSG_MAX_MSG > 0);
3104
3105     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3106     
3107     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3108     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3109     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3110     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3111     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3112     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3113     CmiAssert(ret == 0);
3114     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3115     
3116     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3117             smsg_memlen*(mysize), smsg_rx_cqh,
3118             GNI_MEM_READWRITE,   
3119             vmdh_index,
3120             &my_smsg_mdh_mailbox);
3121     register_memory_size += smsg_memlen*(mysize);
3122     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3123
3124     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3125     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3126
3127     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3128     base_infor.mdh =  my_smsg_mdh_mailbox;
3129     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3130
3131     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3132  
3133     for(i=0; i<mysize; i++)
3134     {
3135         if(i==myrank)
3136             continue;
3137         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3138         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3139         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3140         smsg_attr[i].mbox_offset = i*smsg_memlen;
3141         smsg_attr[i].buff_size = smsg_memlen;
3142         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3143         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3144     }
3145
3146     for(i=0; i<mysize; i++)
3147     {
3148         if (myrank == i) continue;
3149
3150         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3151         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3152         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3153         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3154         remote_smsg_attr.buff_size = smsg_memlen;
3155         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3156         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3157
3158         /* initialize the smsg channel */
3159         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3160         GNI_RC_CHECK("SMSG Init", status);
3161     } //end initialization
3162
3163     free(base_addr_vec);
3164     free(smsg_attr);
3165
3166     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3167     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3168
3169
3170 inline
3171 static void _init_send_queue(SMSG_QUEUE *queue)
3172 {
3173      int i;
3174 #if ONE_SEND_QUEUE
3175      queue->sendMsgBuf = PCQueueCreate();
3176      destpe_avail = (char*)malloc(mysize * sizeof(char));
3177 #else
3178      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3179 #if CMK_SMP && SMP_LOCKS
3180      nonEmptyQueues = PCQueueCreate();
3181 #endif
3182      for(i =0; i<mysize; i++)
3183      {
3184 #if CMK_SMP
3185          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3186 #if SMP_LOCKS
3187          queue->smsg_msglist_index[i].pushed = 0;
3188          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3189 #endif
3190 #else
3191          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3192          queue->smsg_msglist_index[i].next = -1;
3193          queue->smsg_head_index = -1;
3194 #endif
3195         
3196      }
3197 #endif
3198 }
3199
3200 inline
3201 static void _init_smsg()
3202 {
3203     if(mysize > 1) {
3204         if (useDynamicSMSG)
3205             _init_dynamic_smsg();
3206         else
3207             _init_static_smsg();
3208     }
3209
3210     _init_send_queue(&smsg_queue);
3211 #if CMK_USE_OOB
3212     _init_send_queue(&smsg_oob_queue);
3213 #endif
3214 }
3215
3216 static void _init_static_msgq()
3217 {
3218     gni_return_t status;
3219     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3220     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3221     msgq_attrs.smsg_q_sz = 1;
3222     msgq_attrs.rcv_pool_sz = 1;
3223     msgq_attrs.num_msgq_eps = 2;
3224     msgq_attrs.nloc_insts = 8;
3225     msgq_attrs.modes = 0;
3226     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3227
3228     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3229     GNI_RC_CHECK("MSGQ Init", status);
3230
3231
3232 }
3233
3234
3235 static CmiUInt8 total_mempool_size = 0;
3236 static CmiUInt8 total_mempool_calls = 0;
3237
3238 #if USE_LRTS_MEMPOOL
3239 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3240 {
3241     void *pool;
3242     int ret;
3243     gni_return_t status = GNI_RC_SUCCESS;
3244
3245     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3246     if (*size < default_size) *size = default_size;
3247 #if LARGEPAGE
3248     // round up to be multiple of _tlbpagesize
3249     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3250     *size = ALIGNHUGEPAGE(*size);
3251 #endif
3252     total_mempool_size += *size;
3253     total_mempool_calls += 1;
3254 #if   !LARGEPAGE
3255     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3256     {
3257         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3258         CmiAbort("alloc_mempool_block");
3259     }
3260 #endif
3261 #if LARGEPAGE
3262     pool = my_get_huge_pages(*size);
3263     ret = pool==NULL;
3264 #else
3265     ret = posix_memalign(&pool, ALIGNBUF, *size);
3266 #endif
3267     if (ret != 0) {
3268 #if CMK_SMP && STEAL_MEMPOOL
3269       pool = steal_mempool_block(size, mem_hndl);
3270       if (pool != NULL) return pool;
3271 #endif
3272       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3273       if (ret == ENOMEM)
3274         CmiAbort("alloc_mempool_block: out of memory.");
3275       else
3276         CmiAbort("alloc_mempool_block: posix_memalign failed");
3277     }
3278 #if LARGEPAGE
3279     CmiMemLock();
3280     register_count++;
3281     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3282     CmiMemUnlock();
3283     if(status != GNI_RC_SUCCESS) {
3284         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3285 sweep_mempool(CpvAccess(mempool));
3286     }
3287     GNI_RC_CHECK("MEMORY_REGISTER", status);
3288 #else
3289     SetMemHndlZero((*mem_hndl));
3290 #endif
3291     return pool;
3292 }
3293
3294 // ptr is a block head pointer
3295 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3296 {
3297     if(!(IsMemHndlZero(mem_hndl)))
3298     {
3299         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3300     }
3301 #if LARGEPAGE
3302     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3303 #else
3304     free(ptr);
3305 #endif
3306 }
3307 #endif
3308
3309 void LrtsPreCommonInit(int everReturn){
3310 #if USE_LRTS_MEMPOOL
3311     CpvInitialize(mempool_type*, mempool);
3312     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3313     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3314 #endif
3315 }
3316
3317 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3318 {
3319     register int            i;
3320     int                     rc;
3321     int                     device_id = 0;
3322     unsigned int            remote_addr;
3323     gni_cdm_handle_t        cdm_hndl;
3324     gni_return_t            status = GNI_RC_SUCCESS;
3325     uint32_t                vmdh_index = -1;
3326     uint8_t                 ptag;
3327     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3328     int                     first_spawned;
3329     int                     physicalID;
3330     char                   *env;
3331
3332     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3333     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3334     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3335    
3336     status = PMI_Init(&first_spawned);
3337     GNI_RC_CHECK("PMI_Init", status);
3338
3339     status = PMI_Get_size(&mysize);
3340     GNI_RC_CHECK("PMI_Getsize", status);
3341
3342     status = PMI_Get_rank(&myrank);
3343     GNI_RC_CHECK("PMI_getrank", status);
3344
3345     //physicalID = CmiPhysicalNodeID(myrank);
3346     
3347     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3348
3349     *myNodeID = myrank;
3350     *numNodes = mysize;
3351   
3352 #if MULTI_THREAD_SEND
3353     /* Currently, we only consider the case that comm. thread will only recv msgs */
3354     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3355 #endif
3356
3357     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3358     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3359     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3360
3361     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3362     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3363     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3364
3365     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3366     if (env) useDynamicSMSG = 1;
3367     if (!useDynamicSMSG)
3368       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3369     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3370     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3371     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3372     
3373     if(myrank == 0)
3374     {
3375         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3376         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3377     }
3378 #ifdef USE_ONESIDED
3379     onesided_init(NULL, &onesided_hnd);
3380
3381     // this is a GNI test, so use the libonesided bypass functionality
3382     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3383     local_addr = gniGetNicAddress();
3384 #else
3385     ptag = get_ptag();
3386     cookie = get_cookie();
3387 #if 0
3388     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3389 #endif
3390     //Create and attach to the communication  domain */
3391     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3392     GNI_RC_CHECK("GNI_CdmCreate", status);
3393     //* device id The device id is the minor number for the device
3394     //that is assigned to the device by the system when the device is created.
3395     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3396     //where X is the device number 0 default 
3397     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3398     GNI_RC_CHECK("GNI_CdmAttach", status);
3399     local_addr = get_gni_nic_address(0);
3400 #endif
3401     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3402     _MEMCHECK(MPID_UGNI_AllAddr);
3403     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3404     /* create the local completion queue */
3405     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3406     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3407     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3408     
3409     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3410     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3411     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3412
3413     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3414     GNI_RC_CHECK("Create CQ (rx)", status);
3415     
3416     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3417     GNI_RC_CHECK("Create Post CQ (rx)", status);
3418     
3419     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3420     //GNI_RC_CHECK("Create BTE CQ", status);
3421
3422     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3423     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3424     _MEMCHECK(ep_hndl_array);
3425 #if MULTI_THREAD_SEND 
3426     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3427     //default_tx_cq_lock = CmiCreateLock();
3428     rdma_tx_cq_lock = CmiCreateLock();
3429     smsg_rx_cq_lock = CmiCreateLock();
3430     //global_gni_lock  = CmiCreateLock();
3431     //rx_cq_lock  = CmiCreateLock();
3432 #endif
3433     for (i=0; i<mysize; i++) {
3434         if(i == myrank) continue;
3435         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3436         GNI_RC_CHECK("GNI_EpCreate ", status);   
3437         remote_addr = MPID_UGNI_AllAddr[i];
3438         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3439         GNI_RC_CHECK("GNI_EpBind ", status);   
3440     }
3441
3442     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3443     _init_smsg();
3444     PMI_Barrier();
3445
3446 #if     USE_LRTS_MEMPOOL
3447     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3448     if (env) {
3449         _totalmem = CmiReadSize(env);
3450         if (myrank == 0)
3451             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3452     }
3453
3454     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3455     if (env) _mempool_size = CmiReadSize(env);
3456     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3457         _mempool_size = CmiReadSize(env);
3458
3459
3460     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3461     if (env) {
3462         MAX_REG_MEM = CmiReadSize(env);
3463         user_set_flag = 1;
3464     }
3465     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3466         MAX_REG_MEM = CmiReadSize(env);
3467         user_set_flag = 1;
3468     }
3469
3470     env = getenv("CHARM_UGNI_SEND_MAX");
3471     if (env) {
3472         MAX_BUFF_SEND = CmiReadSize(env);
3473         user_set_flag = 1;
3474     }
3475     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3476         MAX_BUFF_SEND = CmiReadSize(env);
3477         user_set_flag = 1;
3478     }
3479
3480     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3481     if (env) {
3482         _mempool_size_limit = CmiReadSize(env);
3483     }
3484