added CQWrite support for persistent message
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/dir.h>
38 #include <sys/stat.h>
39 #include <gni_pub.h>
40 #include <pmi.h>
41 //#include <numatoolkit.h>
42
43 #include "converse.h"
44
45 #if CMK_DIRECT
46 #include "cmidirect.h"
47 #endif
48
49 #define     LARGEPAGE              0
50
51 #if CMK_SMP
52 #define MULTI_THREAD_SEND          0
53 #define COMM_THREAD_SEND           1
54 #endif
55
56 #if MULTI_THREAD_SEND
57 #define CMK_WORKER_SINGLE_TASK     0
58 #endif
59
60 #define REMOTE_EVENT               0
61 #define CQWRITE                    0
62
63 #define CMI_EXERT_SEND_CAP      0
64 #define CMI_EXERT_RECV_CAP      0
65
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP 16
68 #endif
69
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP 2
72 #endif
73
74 #define USE_LRTS_MEMPOOL                  1
75
76 #define PRINT_SYH                         0
77
78 // Trace communication thread
79 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
80 #define TRACE_THRESHOLD     0.00005
81 #define CMI_MPI_TRACE_MOREDETAILED 0
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
84 #else
85 #undef CMK_SMP_TRACE_COMMTHREAD
86 #define CMK_SMP_TRACE_COMMTHREAD 0
87 #endif
88
89 #define CMK_TRACE_COMMOVERHEAD 0
90 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
91 #undef CMI_MPI_TRACE_USEREVENTS
92 #define CMI_MPI_TRACE_USEREVENTS 1
93 #else
94 #undef CMK_TRACE_COMMOVERHEAD
95 #define CMK_TRACE_COMMOVERHEAD 0
96 #endif
97
98 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
99 CpvStaticDeclare(double, projTraceStart);
100 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
101 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
102 #else
103 #define  START_EVENT()
104 #define  END_EVENT(x)
105 #endif
106
107 #if USE_LRTS_MEMPOOL
108
109 #define oneMB (1024ll*1024)
110 #define oneGB (1024ll*1024*1024)
111
112 static CmiInt8 _mempool_size = 8*oneMB;
113 static CmiInt8 _expand_mem =  4*oneMB;
114 static CmiInt8 _mempool_size_limit = 0;
115
116 static CmiInt8 _totalmem = 0.8*oneGB;
117
118 #if LARGEPAGE
119 static CmiInt8 BIG_MSG  =  16*oneMB;
120 static CmiInt8 ONE_SEG  =  4*oneMB;
121 #else
122 static CmiInt8 BIG_MSG  =  4*oneMB;
123 static CmiInt8 ONE_SEG  =  2*oneMB;
124 #endif
125 #if MULTI_THREAD_SEND
126 static int BIG_MSG_PIPELINE = 1;
127 #else
128 static int BIG_MSG_PIPELINE = 4;
129 #endif
130
131 // dynamic flow control
132 static CmiInt8 buffered_send_msg = 0;
133 static CmiInt8 register_memory_size = 0;
134
135 #if LARGEPAGE
136 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
137 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
138 static CmiInt8  register_count = 0;
139 #else
140 #if CMK_SMP && COMM_THREAD_SEND 
141 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
142 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
143 #else
144 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
145 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
146 #endif
147
148
149 #endif
150
151 #endif     /* end USE_LRTS_MEMPOOL */
152
153 #if MULTI_THREAD_SEND 
154 #define     CMI_GNI_LOCK(x)       CmiLock(x);
155 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
156 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
157 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
158 #else
159 #define     CMI_GNI_LOCK(x)
160 #define     CMI_GNI_UNLOCK(x)
161 #define     CMI_PCQUEUEPOP_LOCK(Q)   
162 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
163 #endif
164
165 static int _tlbpagesize = 4096;
166
167 //static int _smpd_count  = 0;
168
169 static int   user_set_flag  = 0;
170
171 static int _checkProgress = 1;             /* check deadlock */
172 static int _detected_hang = 0;
173
174 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
175
176 // dynamic SMSG
177 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
178
179 static int avg_smsg_connection = 32;
180 static int                 *smsg_connected_flag= 0;
181 static gni_smsg_attr_t     **smsg_attr_vector_local;
182 static gni_smsg_attr_t     **smsg_attr_vector_remote;
183 static gni_ep_handle_t     ep_hndl_unbound;
184 static gni_smsg_attr_t     send_smsg_attr;
185 static gni_smsg_attr_t     recv_smsg_attr;
186
187 typedef struct _dynamic_smsg_mailbox{
188    void     *mailbox_base;
189    int      size;
190    int      offset;
191    gni_mem_handle_t  mem_hndl;
192    struct      _dynamic_smsg_mailbox  *next;
193 }dynamic_smsg_mailbox_t;
194
195 static dynamic_smsg_mailbox_t  *mailbox_list;
196
197 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
198 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
199
200 #if PRINT_SYH
201 int         lrts_send_msg_id = 0;
202 int         lrts_local_done_msg = 0;
203 int         lrts_send_rdma_success = 0;
204 #endif
205
206 #include "machine.h"
207
208 #include "pcqueue.h"
209
210 #include "mempool.h"
211
212 #if CMK_PERSISTENT_COMM
213 #include "machine-persistent.h"
214 #endif
215
216 //#define  USE_ONESIDED 1
217 #ifdef USE_ONESIDED
218 //onesided implementation is wrong, since no place to restore omdh
219 #include "onesided.h"
220 onesided_hnd_t   onesided_hnd;
221 onesided_md_t    omdh;
222 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
223
224 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
225
226 #else
227 uint8_t   onesided_hnd, omdh;
228
229 #if REMOTE_EVENT || CQWRITE 
230 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
231     if(register_memory_size+size>= MAX_REG_MEM) { \
232         status = GNI_RC_ERROR_NOMEM;} \
233     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
234         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
235 #else
236 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
237         if (register_memory_size + size >= MAX_REG_MEM) { \
238             status = GNI_RC_ERROR_NOMEM; \
239         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
240             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
241 #endif
242
243 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
244     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
245              register_memory_size -= size; \
246          else CmiAbort("MEM_DEregister");  \
247     } while (0)
248 #endif
249
250 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
251 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
252 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
253 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
254 #define   IncreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)++
255 #define   DecreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)--
256 #define   IncreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)++
257 #define   DecreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)--
258 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
259 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
260 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
261 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
262 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
263 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
264
265 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
266 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
267
268 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
269 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
270 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
271 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
272
273 #define ALIGNBUF                64
274
275 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
276 /* If SMSG is not used */
277
278 #define FMA_PER_CORE  1024
279 #define FMA_BUFFER_SIZE 1024
280
281 /* If SMSG is used */
282 static int  SMSG_MAX_MSG = 1024;
283 #define SMSG_MAX_CREDIT 72 
284
285 #define MSGQ_MAXSIZE       2048
286 /* large message transfer with FMA or BTE */
287 #define LRTS_GNI_RDMA_THRESHOLD  1024 
288
289 #if CMK_SMP
290 static int  REMOTE_QUEUE_ENTRIES=163840; 
291 static int LOCAL_QUEUE_ENTRIES=163840; 
292 #else
293 static int  REMOTE_QUEUE_ENTRIES=20480;
294 static int LOCAL_QUEUE_ENTRIES=20480; 
295 #endif
296
297 #define BIG_MSG_TAG             0x26
298 #define PUT_DONE_TAG            0x28
299 #define DIRECT_PUT_DONE_TAG     0x29
300 #define ACK_TAG                 0x30
301 /* SMSG is data message */
302 #define SMALL_DATA_TAG          0x31
303 /* SMSG is a control message to initialize a BTE */
304 #define LMSG_INIT_TAG           0x39 
305
306 #define DEBUG
307 #ifdef GNI_RC_CHECK
308 #undef GNI_RC_CHECK
309 #endif
310 #ifdef DEBUG
311 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
312 #else
313 #define GNI_RC_CHECK(msg,rc)
314 #endif
315
316 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
317 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
318 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
319
320 static int useStaticMSGQ = 0;
321 static int useStaticFMA = 0;
322 static int mysize, myrank;
323 static gni_nic_handle_t   nic_hndl;
324
325 typedef struct {
326     gni_mem_handle_t mdh;
327     uint64_t addr;
328 } mdh_addr_t ;
329 // this is related to dynamic SMSG
330
331 typedef struct mdh_addr_list{
332     gni_mem_handle_t mdh;
333    void *addr;
334     struct mdh_addr_list *next;
335 }mdh_addr_list_t;
336
337 static unsigned int         smsg_memlen;
338 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
339 mdh_addr_t          setup_mem;
340 mdh_addr_t          *smsg_connection_vec = 0;
341 gni_mem_handle_t    smsg_connection_memhndl;
342 static int          smsg_expand_slots = 10;
343 static int          smsg_available_slot = 0;
344 static void         *smsg_mailbox_mempool = 0;
345 mdh_addr_list_t     *smsg_dynamic_list = 0;
346
347 static void             *smsg_mailbox_base;
348 gni_msgq_attr_t         msgq_attrs;
349 gni_msgq_handle_t       msgq_handle;
350 gni_msgq_ep_attr_t      msgq_ep_attrs;
351 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
352
353 /* =====Beginning of Declarations of Machine Specific Variables===== */
354 static int cookie;
355 static int modes = 0;
356 static gni_cq_handle_t       smsg_rx_cqh = NULL;
357 static gni_cq_handle_t       default_tx_cqh = NULL;
358 static gni_cq_handle_t       rdma_tx_cqh = NULL;
359 static gni_cq_handle_t       rdma_rx_cqh = NULL;
360 static gni_cq_handle_t       post_tx_cqh = NULL;
361 static gni_ep_handle_t       *ep_hndl_array;
362
363 static CmiNodeLock           *ep_lock_array;
364 static CmiNodeLock           default_tx_cq_lock; 
365 static CmiNodeLock           rdma_tx_cq_lock; 
366 static CmiNodeLock           global_gni_lock; 
367 static CmiNodeLock           rx_cq_lock;
368 static CmiNodeLock           smsg_mailbox_lock;
369 static CmiNodeLock           smsg_rx_cq_lock;
370 static CmiNodeLock           *mempool_lock;
371 //#define     CMK_WITH_STATS      0
372 typedef struct msg_list
373 {
374     uint32_t destNode;
375     uint32_t size;
376     void *msg;
377     uint8_t tag;
378 #if !CMK_SMP
379     struct msg_list *next;
380 #endif
381 #if CMK_WITH_STATS
382     double  creation_time;
383 #endif
384 }MSG_LIST;
385
386
387 typedef struct control_msg
388 {
389     uint64_t            source_addr;    /* address from the start of buffer  */
390     uint64_t            dest_addr;      /* address from the start of buffer */
391     int                 total_length;   /* total length */
392     int                 length;         /* length of this packet */
393 #if REMOTE_EVENT
394     int                 ack_index;      /* index from integer to address */
395 #endif
396     uint8_t             seq_id;         //big message   0 meaning single message
397     gni_mem_handle_t    source_mem_hndl;
398     struct control_msg *next;
399 } CONTROL_MSG;
400
401 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
402
403 typedef struct ack_msg
404 {
405     uint64_t            source_addr;    /* address from the start of buffer  */
406 #if ! USE_LRTS_MEMPOOL
407     gni_mem_handle_t    source_mem_hndl;
408     int                 length;          /* total length */
409 #endif
410     struct ack_msg     *next;
411 } ACK_MSG;
412
413 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
414
415 #if CMK_DIRECT
416 typedef struct{
417     uint64_t    handler_addr;
418 }CMK_DIRECT_HEADER;
419
420 typedef struct {
421     char core[CmiMsgHeaderSizeBytes];
422     uint64_t handler;
423 }cmidirectMsg;
424
425 //SYH
426 CpvDeclare(int, CmiHandleDirectIdx);
427 void CmiHandleDirectMsg(cmidirectMsg* msg)
428 {
429
430     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
431    (*(_handle->callbackFnPtr))(_handle->callbackData);
432    CmiFree(msg);
433 }
434
435 void CmiDirectInit()
436 {
437     CpvInitialize(int,  CmiHandleDirectIdx);
438     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
439 }
440
441 #endif
442 typedef struct  rmda_msg
443 {
444     int                   destNode;
445 #if REMOTE_EVENT
446     int                   ack_index;
447 #endif
448     gni_post_descriptor_t *pd;
449 #if !CMK_SMP
450     struct  rmda_msg      *next;
451 #endif
452 }RDMA_REQUEST;
453
454
455 #if CMK_SMP
456 #define SMP_LOCKS               0
457 #define ONE_SEND_QUEUE                  0
458 PCQueue sendRdmaBuf;
459 typedef struct  msg_list_index
460 {
461     PCQueue     sendSmsgBuf;
462     int         pushed;
463     CmiNodeLock   lock;
464 } MSG_LIST_INDEX;
465 char                *destpe_avail;
466 #if  !ONE_SEND_QUEUE && SMP_LOCKS
467     PCQueue     nonEmptyQueues;
468 #endif
469 #else         /* non-smp */
470
471 static RDMA_REQUEST        *sendRdmaBuf = 0;
472 static RDMA_REQUEST        *sendRdmaTail = 0;
473 typedef struct  msg_list_index
474 {
475     int         next;
476     MSG_LIST    *sendSmsgBuf;
477     MSG_LIST    *tail;
478 } MSG_LIST_INDEX;
479
480 #endif
481
482 // buffered send queue
483 #if ! ONE_SEND_QUEUE
484 typedef struct smsg_queue
485 {
486     MSG_LIST_INDEX   *smsg_msglist_index;
487     int               smsg_head_index;
488 } SMSG_QUEUE;
489 #else
490 typedef struct smsg_queue
491 {
492     PCQueue       sendMsgBuf;
493 }  SMSG_QUEUE;
494 #endif
495
496 SMSG_QUEUE                  smsg_queue;
497 #if CMK_USE_OOB
498 SMSG_QUEUE                  smsg_oob_queue;
499 #endif
500
501 #if CMK_SMP
502
503 #define FreeMsgList(d)   free(d);
504 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
505
506 #else
507
508 static MSG_LIST       *msglist_freelist=0;
509
510 #define FreeMsgList(d)  \
511   do { \
512   (d)->next = msglist_freelist;\
513   msglist_freelist = d; \
514   } while (0)
515
516 #define MallocMsgList(d) \
517   do {  \
518   d = msglist_freelist;\
519   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
520              _MEMCHECK(d);\
521   } else msglist_freelist = d->next; \
522   d->next =0;  \
523   } while (0)
524
525 #endif
526
527 #if CMK_SMP
528
529 #define FreeControlMsg(d)      free(d);
530 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
531
532 #else
533
534 static CONTROL_MSG    *control_freelist=0;
535
536 #define FreeControlMsg(d)       \
537   do { \
538   (d)->next = control_freelist;\
539   control_freelist = d; \
540   } while (0);
541
542 #define MallocControlMsg(d) \
543   do {  \
544   d = control_freelist;\
545   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
546              _MEMCHECK(d);\
547   } else control_freelist = d->next;  \
548   } while (0);
549
550 #endif
551
552 #if CMK_SMP
553
554 #define FreeAckMsg(d)      free(d);
555 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
556
557 #else
558
559 static ACK_MSG        *ack_freelist=0;
560
561 #define FreeAckMsg(d)       \
562   do { \
563   (d)->next = ack_freelist;\
564   ack_freelist = d; \
565   } while (0)
566
567 #define MallocAckMsg(d) \
568   do { \
569   d = ack_freelist;\
570   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
571              _MEMCHECK(d);\
572   } else ack_freelist = d->next; \
573   } while (0)
574
575 #endif
576
577
578 # if CMK_SMP
579 #define FreeRdmaRequest(d)       free(d);
580 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
581 #else
582
583 static RDMA_REQUEST         *rdma_freelist = NULL;
584
585 #define FreeRdmaRequest(d)       \
586   do {  \
587   (d)->next = rdma_freelist;\
588   rdma_freelist = d;    \
589   } while (0)
590
591 #define MallocRdmaRequest(d) \
592   do {   \
593   d = rdma_freelist;\
594   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
595              _MEMCHECK(d);\
596   } else rdma_freelist = d->next; \
597   d->next =0;   \
598   } while (0)
599 #endif
600
601 /* reuse gni_post_descriptor_t */
602 static gni_post_descriptor_t *post_freelist=0;
603
604 #if  !CMK_SMP
605 #define FreePostDesc(d)       \
606     (d)->next_descr = post_freelist;\
607     post_freelist = d;
608
609 #define MallocPostDesc(d) \
610   d = post_freelist;\
611   if (d==0) { \
612      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
613      d->next_descr = 0;\
614       _MEMCHECK(d);\
615   } else post_freelist = d->next_descr;
616 #else
617
618 #define FreePostDesc(d)     free(d);
619 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
620
621 #endif
622
623
624 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
625 static int      buffered_smsg_counter = 0;
626
627 /* SmsgSend return success but message sent is not confirmed by remote side */
628 static MSG_LIST *buffered_fma_head = 0;
629 static MSG_LIST *buffered_fma_tail = 0;
630
631 /* functions  */
632 #define IsFree(a,ind)  !( a& (1<<(ind) ))
633 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
634 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
635
636 CpvDeclare(mempool_type*, mempool);
637
638 #if REMOTE_EVENT
639 /* ack pool for remote events */
640
641 #define ACK_SHIFT                  19
642 #define ACK_EVENT(idx)             ((idx<<ACK_SHIFT) | myrank)
643 #define ACK_GET_RANK(evt)          (evt & ((1<<ACK_SHIFT)-1))
644 #define ACK_GET_INDEX(evt)         (evt >> ACK_SHIFT)
645
646 struct AckPool {
647 void *addr;
648 int next;
649 };
650
651 static struct AckPool   *ackpool;
652 static int    ackpoolsize;
653 static int    ackpool_freehead;
654 static CmiNodeLock  ackpool_lock;
655
656 #define  GetAckAddress(s)          (ackpool[s].addr)
657
658 static void AckPool_init()
659 {
660     int i;
661     if ((1<<ACK_SHIFT) < mysize) 
662         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
663     ackpoolsize = 2048;
664     ackpool = (struct AckPool *)malloc(ackpoolsize*sizeof(struct AckPool));
665     for (i=0; i<ackpoolsize-1; i++) {
666         ackpool[i].next = i+1;
667     }
668     ackpool[i].next = -1;
669     ackpool_freehead = 0;
670 #if MULTI_THREAD_SEND 
671     ackpool_lock  = CmiCreateLock();
672 #endif
673 }
674
675 static
676 inline int AckPool_getslot(void *addr)
677 {
678     int i;
679     int s;
680     CMI_GNI_LOCK(ackpool_lock);
681     s = ackpool_freehead;
682     if (s == -1) {
683         int newsize = ackpoolsize * 2;
684         printf("[%d] AckPool_getslot expand to: %d\n", myrank, newsize);
685         if (newsize > (1<<(32-ACK_SHIFT))) CmiAbort("AckPool too large");
686         struct AckPool *old_ackpool = ackpool;
687         ackpool = (struct AckPool *)malloc(newsize*sizeof(struct AckPool));
688         memcpy(ackpool, old_ackpool, ackpoolsize*sizeof(struct AckPool));
689         for (i=ackpoolsize; i<newsize-1; i++) {
690             ackpool[i].next = i+1;
691         }
692         ackpool[i].next = -1;
693         ackpool_freehead = ackpoolsize;
694         s = ackpoolsize;
695         ackpoolsize = newsize;
696         free(old_ackpool);
697     }
698     ackpool_freehead = ackpool[s].next;
699     ackpool[s].addr = addr;
700     CMI_GNI_UNLOCK(ackpool_lock);
701     return s;
702 }
703
704 static
705 inline  void AckPool_freeslot(int s)
706 {
707     CmiAssert(s>=0 && s<ackpoolsize);
708     CMI_GNI_LOCK(ackpool_lock);
709     ackpool[s].next = ackpool_freehead;
710     ackpool_freehead = s;
711     CMI_GNI_UNLOCK(ackpool_lock);
712 }
713
714
715 #endif
716
717 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
718 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
719 #define CHARM_MAGIC_NUMBER               126
720
721 #if CMK_ERROR_CHECKING
722 extern unsigned char computeCheckSum(unsigned char *data, int len);
723 static int checksum_flag = 0;
724 #define CMI_SET_CHECKSUM(msg, len)      \
725         if (checksum_flag)  {   \
726           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
727           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
728         }
729 #define CMI_CHECK_CHECKSUM(msg, len)    \
730         if (checksum_flag)      \
731           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
732             CmiAbort("Fatal error: checksum doesn't agree!\n");
733 #else
734 #define CMI_SET_CHECKSUM(msg, len)
735 #define CMI_CHECK_CHECKSUM(msg, len)
736 #endif
737 /* =====End of Definitions of Message-Corruption Related Macros=====*/
738
739 #if CMK_WITH_STATS
740 FILE *counterLog = NULL;
741 typedef struct comm_thread_stats
742 {
743     uint64_t  smsg_data_count;
744     uint64_t  lmsg_init_count;
745     uint64_t  ack_count;
746     uint64_t  big_msg_ack_count;
747     uint64_t  smsg_count;
748     //times of calling SmsgSend
749     uint64_t  try_smsg_data_count;
750     uint64_t  try_lmsg_init_count;
751     uint64_t  try_ack_count;
752     uint64_t  try_big_msg_ack_count;
753     uint64_t  try_smsg_count;
754     
755     double    max_time_in_send_buffered_smsg;
756     double    all_time_in_send_buffered_smsg;
757
758     uint64_t  rdma_count;
759     uint64_t  try_rdma_count;
760     double    max_time_from_control_to_rdma_init;
761     double    all_time_from_control_to_rdma_init;
762
763     double    max_time_from_rdma_init_to_rdma_done;
764     double    all_time_from_rdma_init_to_rdma_done;
765 } Comm_Thread_Stats;
766
767 static Comm_Thread_Stats   comm_stats;
768
769 static void init_comm_stats()
770 {
771   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
772 }
773
774 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
775
776 #define SMSG_SENT_DONE(creation_time, tag)  \
777         if (print_stats) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
778             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
779             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
780             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
781             comm_stats.smsg_count++; \
782             double inbuff_time = CmiWallTimer() - creation_time;   \
783             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
784             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
785         }
786
787 #define SMSG_TRY_SEND(tag)  \
788         if (print_stats){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
789             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
790             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
791             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
792             comm_stats.try_smsg_count++; \
793         }
794
795 #define  RDMA_TRY_SEND()        if (print_stats) {comm_stats.try_rdma_count++;}
796
797 #define  RDMA_TRANS_DONE(x)      \
798          if (print_stats) {  double rdma_trans_time = CmiWallTimer() - x ; \
799              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
800              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
801          }
802
803 #define  RDMA_TRANS_INIT(x)      \
804          if (print_stats) {   comm_stats.rdma_count++;  \
805              double rdma_trans_time = CmiWallTimer() - x ; \
806              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
807              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
808          }
809
810
811 static void print_comm_stats()
812 {
813     fprintf(counterLog, "Node[%d]SMSG time in buffer\t[max:%f\tAverage:%f](milisecond)\n", myrank, 1000*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
814     fprintf(counterLog, "Node[%d]Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n", myrank, 
815             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
816             comm_stats.ack_count, comm_stats.big_msg_ack_count);
817     
818     fprintf(counterLog, "Node[%d]SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n\n", myrank, 
819             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
820             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count);
821
822     fprintf(counterLog, "Node[%d]Rdma Transaction [count:%lld\t calls:%lld]\n", myrank, comm_stats.rdma_count, comm_stats.try_rdma_count);
823     fprintf(counterLog, "Node[%d]Rdma time from control arrives to rdma init [MAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/comm_stats.rdma_count); 
824     fprintf(counterLog, "Node[%d]Rdma time from init to rdma done [MAX:%f\t Average:%f](milisecond)\n\n", myrank, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/comm_stats.rdma_count); 
825 }
826
827 #else
828 #define STATS_ACK_TIME(x)            x
829 #define STATS_SEND_SMSGS_TIME(x)     x
830 #endif
831
832 static int print_stats = 0;
833
834 static void
835 allgather(void *in,void *out, int len)
836 {
837     static int *ivec_ptr=NULL,already_called=0,job_size=0;
838     int i,rc;
839     int my_rank;
840     char *tmp_buf,*out_ptr;
841
842     if(!already_called) {
843
844         rc = PMI_Get_size(&job_size);
845         CmiAssert(rc == PMI_SUCCESS);
846         rc = PMI_Get_rank(&my_rank);
847         CmiAssert(rc == PMI_SUCCESS);
848
849         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
850         CmiAssert(ivec_ptr != NULL);
851
852         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
853         CmiAssert(rc == PMI_SUCCESS);
854
855         already_called = 1;
856
857     }
858
859     tmp_buf = (char *)malloc(job_size * len);
860     CmiAssert(tmp_buf);
861
862     rc = PMI_Allgather(in,tmp_buf,len);
863     CmiAssert(rc == PMI_SUCCESS);
864
865     out_ptr = out;
866
867     for(i=0;i<job_size;i++) {
868
869         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
870
871     }
872
873     free(tmp_buf);
874 }
875
876 static void
877 allgather_2(void *in,void *out, int len)
878 {
879     //PMI_Allgather is out of order
880     int i,rc, extend_len;
881     int  rank_index;
882     char *out_ptr, *out_ref;
883     char *in2;
884
885     extend_len = sizeof(int) + len;
886     in2 = (char*)malloc(extend_len);
887
888     memcpy(in2, &myrank, sizeof(int));
889     memcpy(in2+sizeof(int), in, len);
890
891     out_ptr = (char*)malloc(mysize*extend_len);
892
893     rc = PMI_Allgather(in2, out_ptr, extend_len);
894     GNI_RC_CHECK("allgather", rc);
895
896     out_ref = out;
897
898     for(i=0;i<mysize;i++) {
899         //rank index 
900         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
901         //copy to the rank index slot
902         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
903     }
904
905     free(out_ptr);
906     free(in2);
907
908 }
909
910 static unsigned int get_gni_nic_address(int device_id)
911 {
912     unsigned int address, cpu_id;
913     gni_return_t status;
914     int i, alps_dev_id=-1,alps_address=-1;
915     char *token, *p_ptr;
916
917     p_ptr = getenv("PMI_GNI_DEV_ID");
918     if (!p_ptr) {
919         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
920        
921         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
922     } else {
923         while ((token = strtok(p_ptr,":")) != NULL) {
924             alps_dev_id = atoi(token);
925             if (alps_dev_id == device_id) {
926                 break;
927             }
928             p_ptr = NULL;
929         }
930         CmiAssert(alps_dev_id != -1);
931         p_ptr = getenv("PMI_GNI_LOC_ADDR");
932         CmiAssert(p_ptr != NULL);
933         i = 0;
934         while ((token = strtok(p_ptr,":")) != NULL) {
935             if (i == alps_dev_id) {
936                 alps_address = atoi(token);
937                 break;
938             }
939             p_ptr = NULL;
940             ++i;
941         }
942         CmiAssert(alps_address != -1);
943         address = alps_address;
944     }
945     return address;
946 }
947
948 static uint8_t get_ptag(void)
949 {
950     char *p_ptr, *token;
951     uint8_t ptag;
952
953     p_ptr = getenv("PMI_GNI_PTAG");
954     CmiAssert(p_ptr != NULL);
955     token = strtok(p_ptr, ":");
956     ptag = (uint8_t)atoi(token);
957     return ptag;
958         
959 }
960
961 static uint32_t get_cookie(void)
962 {
963     uint32_t cookie;
964     char *p_ptr, *token;
965
966     p_ptr = getenv("PMI_GNI_COOKIE");
967     CmiAssert(p_ptr != NULL);
968     token = strtok(p_ptr, ":");
969     cookie = (uint32_t)atoi(token);
970
971     return cookie;
972 }
973
974 #if LARGEPAGE
975
976 /* directly mmap memory from hugetlbfs for large pages */
977
978 #include <sys/stat.h>
979 #include <fcntl.h>
980 #include <sys/mman.h>
981 #include <hugetlbfs.h>
982
983 // size must be _tlbpagesize aligned
984 void *my_get_huge_pages(size_t size)
985 {
986     char filename[512];
987     int fd;
988     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
989     void *ptr = NULL;
990
991     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
992     fd = open(filename, O_RDWR | O_CREAT, mode);
993     if (fd == -1) {
994         CmiAbort("my_get_huge_pages: open filed");
995     }
996     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
997     if (ptr == MAP_FAILED) ptr = NULL;
998 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
999     close(fd);
1000     unlink(filename);
1001     return ptr;
1002 }
1003
1004 void my_free_huge_pages(void *ptr, int size)
1005 {
1006 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1007     int ret = munmap(ptr, size);
1008     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1009 }
1010
1011 #endif
1012
1013 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1014 /* TODO: add any that are related */
1015 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1016
1017
1018 #include "machine-lrts.h"
1019 #include "machine-common-core.c"
1020
1021 /* Network progress function is used to poll the network when for
1022    messages. This flushes receive buffers on some  implementations*/
1023 #if CMK_MACHINE_PROGRESS_DEFINED
1024 void CmiMachineProgressImpl() {
1025 }
1026 #endif
1027
1028 static int SendBufferMsg(SMSG_QUEUE *queue);
1029 static void SendRdmaMsg();
1030 static void PumpNetworkSmsg();
1031 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1032 #if CQWRITE
1033 static void PumpCqWriteTransactions();
1034 #endif
1035 #if REMOTE_EVENT
1036 static void PumpRemoteTransactions();
1037 #endif
1038
1039 #if MACHINE_DEBUG_LOG
1040 FILE *debugLog = NULL;
1041 static CmiInt8 buffered_recv_msg = 0;
1042 int         lrts_smsg_success = 0;
1043 int         lrts_received_msg = 0;
1044 #endif
1045
1046 static void sweep_mempool(mempool_type *mptr)
1047 {
1048     int n = 0;
1049     block_header *current = &(mptr->block_head);
1050
1051     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1052     while( current!= NULL) {
1053         printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1054         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1055     }
1056     printf("[n %d] sweep_mempool slot END.\n", myrank);
1057 }
1058
1059 inline
1060 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1061 {
1062     block_header *current = *from;
1063
1064     //while(register_memory_size>= MAX_REG_MEM)
1065     //{
1066         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1067             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1068
1069         *from = current;
1070         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1071         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1072         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1073     //}
1074     return GNI_RC_SUCCESS;
1075 }
1076
1077 inline 
1078 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1079 {
1080     gni_return_t status = GNI_RC_SUCCESS;
1081     //int size = GetMempoolsize(msg);
1082     //void *blockaddr = GetMempoolBlockPtr(msg);
1083     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1084    
1085     block_header *current = &(mptr->block_head);
1086     while(register_memory_size>= MAX_REG_MEM)
1087     {
1088         status = deregisterMemory(mptr, &current);
1089         if (status != GNI_RC_SUCCESS) break;
1090     }
1091     if(register_memory_size>= MAX_REG_MEM) return status;
1092
1093     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1094     while(1)
1095     {
1096         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1097         if(status == GNI_RC_SUCCESS)
1098         {
1099             break;
1100         }
1101         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1102         {
1103             CmiAbort("Memory registor for mempool fails\n");
1104         }
1105         else
1106         {
1107             status = deregisterMemory(mptr, &current);
1108             if (status != GNI_RC_SUCCESS) break;
1109         }
1110     }; 
1111     return status;
1112 }
1113
1114 inline 
1115 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1116 {
1117     static int rank = -1;
1118     int i;
1119     gni_return_t status;
1120     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1121     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1122     mempool_type *mptr;
1123
1124     status = registerFromMempool(mptr1, msg, size, t, cqh);
1125     if (status == GNI_RC_SUCCESS) return status;
1126 #if CMK_SMP 
1127     for (i=0; i<CmiMyNodeSize()+1; i++) {
1128       rank = (rank+1)%(CmiMyNodeSize()+1);
1129       mptr = CpvAccessOther(mempool, rank);
1130       if (mptr == mptr1) continue;
1131       status = registerFromMempool(mptr, msg, size, t, cqh);
1132       if (status == GNI_RC_SUCCESS) return status;
1133     }
1134 #endif
1135     return  GNI_RC_ERROR_RESOURCE;
1136 }
1137
1138 inline
1139 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1140 {
1141     MSG_LIST        *msg_tmp;
1142     MallocMsgList(msg_tmp);
1143     msg_tmp->destNode = destNode;
1144     msg_tmp->size   = size;
1145     msg_tmp->msg    = msg;
1146     msg_tmp->tag    = tag;
1147 #if CMK_WITH_STATS
1148     SMSG_CREATION(msg_tmp)
1149 #endif
1150 #if !CMK_SMP
1151     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1152         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1153         queue->smsg_head_index = destNode;
1154         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1155     }else
1156     {
1157         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1158         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1159     }
1160 #else
1161 #if ONE_SEND_QUEUE
1162     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1163 #else
1164 #if SMP_LOCKS
1165     CmiLock(queue->smsg_msglist_index[destNode].lock);
1166     if(queue->smsg_msglist_index[destNode].pushed == 0)
1167     {
1168         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1169     }
1170     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1171     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1172 #else
1173     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1174 #endif
1175 #endif
1176 #endif
1177 #if PRINT_SYH
1178     buffered_smsg_counter++;
1179 #endif
1180 }
1181
1182 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1183 {
1184     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1185 }
1186
1187 inline
1188 static void setup_smsg_connection(int destNode)
1189 {
1190     mdh_addr_list_t  *new_entry = 0;
1191     gni_post_descriptor_t *pd;
1192     gni_smsg_attr_t      *smsg_attr;
1193     gni_return_t status = GNI_RC_NOT_DONE;
1194     RDMA_REQUEST        *rdma_request_msg;
1195     
1196     if(smsg_available_slot == smsg_expand_slots)
1197     {
1198         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1199         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1200         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1201
1202         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1203             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1204             GNI_MEM_READWRITE,   
1205             -1,
1206             &(new_entry->mdh));
1207         smsg_available_slot = 0; 
1208         new_entry->next = smsg_dynamic_list;
1209         smsg_dynamic_list = new_entry;
1210     }
1211     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1212     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1213     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1214     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1215     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1216     smsg_attr->buff_size = smsg_memlen;
1217     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1218     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1219     smsg_local_attr_vec[destNode] = smsg_attr;
1220     smsg_available_slot++;
1221     MallocPostDesc(pd);
1222     pd->type            = GNI_POST_FMA_PUT;
1223     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1224     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1225     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1226     pd->length          = sizeof(gni_smsg_attr_t);
1227     pd->local_addr      = (uint64_t) smsg_attr;
1228     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1229     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1230     pd->src_cq_hndl     = rdma_tx_cqh;
1231     pd->rdma_mode       = 0;
1232     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1233     print_smsg_attr(smsg_attr);
1234     if(status == GNI_RC_ERROR_RESOURCE )
1235     {
1236         MallocRdmaRequest(rdma_request_msg);
1237         rdma_request_msg->destNode = destNode;
1238         rdma_request_msg->pd = pd;
1239         /* buffer this request */
1240     }
1241 #if PRINT_SYH
1242     if(status != GNI_RC_SUCCESS)
1243        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1244     else
1245         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1246 #endif
1247 }
1248
1249 /* useDynamicSMSG */
1250 inline 
1251 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1252 {
1253     gni_return_t status = GNI_RC_NOT_DONE;
1254
1255     if(mailbox_list->offset == mailbox_list->size)
1256     {
1257         dynamic_smsg_mailbox_t *new_mailbox_entry;
1258         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1259         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1260         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1261         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1262         new_mailbox_entry->offset = 0;
1263         
1264         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1265             new_mailbox_entry->size, smsg_rx_cqh,
1266             GNI_MEM_READWRITE,   
1267             -1,
1268             &(new_mailbox_entry->mem_hndl));
1269
1270         GNI_RC_CHECK("register", status);
1271         new_mailbox_entry->next = mailbox_list;
1272         mailbox_list = new_mailbox_entry;
1273     }
1274     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1275     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1276     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1277     local_smsg_attr->mbox_offset = mailbox_list->offset;
1278     mailbox_list->offset += smsg_memlen;
1279     local_smsg_attr->buff_size = smsg_memlen;
1280     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1281     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1282 }
1283
1284 /* useDynamicSMSG */
1285 inline 
1286 static int connect_to(int destNode)
1287 {
1288     gni_return_t status = GNI_RC_NOT_DONE;
1289     CmiAssert(smsg_connected_flag[destNode] == 0);
1290     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1291     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1292     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1293     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1294     
1295     CMI_GNI_LOCK(global_gni_lock)
1296     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1297     CMI_GNI_UNLOCK(global_gni_lock)
1298     if (status == GNI_RC_ERROR_RESOURCE) {
1299       /* possibly destNode is making connection at the same time */
1300       free(smsg_attr_vector_local[destNode]);
1301       smsg_attr_vector_local[destNode] = NULL;
1302       free(smsg_attr_vector_remote[destNode]);
1303       smsg_attr_vector_remote[destNode] = NULL;
1304       mailbox_list->offset -= smsg_memlen;
1305       return 0;
1306     }
1307     GNI_RC_CHECK("GNI_Post", status);
1308     smsg_connected_flag[destNode] = 1;
1309     return 1;
1310 }
1311
1312 inline 
1313 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1314 {
1315     unsigned int          remote_address;
1316     uint32_t              remote_id;
1317     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1318     gni_smsg_attr_t       *smsg_attr;
1319     gni_post_descriptor_t *pd;
1320     gni_post_state_t      post_state;
1321     char                  *real_data; 
1322
1323     if (useDynamicSMSG) {
1324         switch (smsg_connected_flag[destNode]) {
1325         case 0: 
1326             connect_to(destNode);         /* continue to case 1 */
1327         case 1:                           /* pending connection, do nothing */
1328             status = GNI_RC_NOT_DONE;
1329             if(inbuff ==0)
1330                 buffer_small_msgs(queue, msg, size, destNode, tag);
1331             return status;
1332         }
1333     }
1334 #if CMK_SMP
1335 #if ! ONE_SEND_QUEUE
1336     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1337 #endif
1338     {
1339 #else
1340     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1341     {
1342 #endif
1343         //CMI_GNI_LOCK(smsg_mailbox_lock)
1344         CMI_GNI_LOCK(default_tx_cq_lock)
1345 #if CMK_SMP_TRACE_COMMTHREAD
1346         int oldpe = -1;
1347         int oldeventid = -1;
1348         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1349         { 
1350             START_EVENT();
1351             if ( tag == SMALL_DATA_TAG)
1352                 real_data = (char*)msg; 
1353             else 
1354                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1355             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1356             TRACE_COMM_SET_COMM_MSGID(real_data);
1357         }
1358 #endif
1359 #if REMOTE_EVENT
1360         if (tag == LMSG_INIT_TAG) {
1361             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1362             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1363                 control_msg_tmp->ack_index = AckPool_getslot((void*)control_msg_tmp->source_addr);
1364         }
1365         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1366 #endif
1367 #if     CMK_WITH_STATS
1368         SMSG_TRY_SEND(tag)
1369 #endif
1370 #if CMK_WITH_STATS
1371     double              creation_time;
1372     if (ptr == NULL)
1373         creation_time = CmiWallTimer();
1374     else
1375         creation_time = ptr->creation_time;
1376 #endif
1377
1378     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1379 #if CMK_SMP_TRACE_COMMTHREAD
1380         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1381 #endif
1382         CMI_GNI_UNLOCK(default_tx_cq_lock)
1383         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1384         if(status == GNI_RC_SUCCESS)
1385         {
1386 #if     CMK_WITH_STATS
1387             SMSG_SENT_DONE(creation_time,tag) 
1388 #endif
1389 #if CMK_SMP_TRACE_COMMTHREAD
1390             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1391             { 
1392                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1393             }
1394 #endif
1395         }else
1396             status = GNI_RC_ERROR_RESOURCE;
1397     }
1398     if(status != GNI_RC_SUCCESS && inbuff ==0)
1399         buffer_small_msgs(queue, msg, size, destNode, tag);
1400     return status;
1401 }
1402
1403 inline 
1404 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1405 {
1406     /* construct a control message and send */
1407     CONTROL_MSG         *control_msg_tmp;
1408     MallocControlMsg(control_msg_tmp);
1409     control_msg_tmp->source_addr = (uint64_t)msg;
1410     control_msg_tmp->seq_id    = seqno;
1411     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1412 #if REMOTE_EVENT
1413     control_msg_tmp->ack_index    =  -1;
1414 #endif
1415 #if     USE_LRTS_MEMPOOL
1416     if(size < BIG_MSG)
1417     {
1418         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1419     }
1420     else
1421     {
1422         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1423         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1424         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1425     }
1426 #else
1427     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1428 #endif
1429     return control_msg_tmp;
1430 }
1431
1432 #define BLOCKING_SEND_CONTROL    0
1433
1434 // Large message, send control to receiver, receiver register memory and do a GET, 
1435 // return 1 - send no success
1436 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1437 {
1438     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1439     uint32_t            vmdh_index  = -1;
1440     int                 size;
1441     int                 offset = 0;
1442     uint64_t            source_addr;
1443     int                 register_size; 
1444     void                *msg;
1445
1446     size    =   control_msg_tmp->total_length;
1447     source_addr = control_msg_tmp->source_addr;
1448     register_size = control_msg_tmp->length;
1449
1450 #if  USE_LRTS_MEMPOOL
1451     if( control_msg_tmp->seq_id == 0 ){
1452 #if BLOCKING_SEND_CONTROL
1453         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1454             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1455                 LrtsAdvanceCommunication(0);
1456         }
1457 #endif
1458         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1459         {
1460             msg = (void*)source_addr;
1461             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1462             {
1463                 if(!inbuff)
1464                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1465                 return GNI_RC_ERROR_NOMEM;
1466             }
1467             //register the corresponding mempool
1468             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1469             if(status == GNI_RC_SUCCESS)
1470             {
1471                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1472             }
1473         }else
1474         {
1475             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1476             status = GNI_RC_SUCCESS;
1477         }
1478         if(NoMsgInSend(source_addr))
1479             register_size = GetMempoolsize((void*)(source_addr));
1480         else
1481             register_size = 0;
1482     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1483     {
1484         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1485         source_addr += offset;
1486         size = control_msg_tmp->length;
1487 #if BLOCKING_SEND_CONTROL
1488         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1489             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1490                 LrtsAdvanceCommunication(0);
1491         }
1492 #endif
1493         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1494             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1495             {
1496                 if(!inbuff)
1497                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1498                 return GNI_RC_ERROR_NOMEM;
1499             }
1500             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1501             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1502         }
1503         else
1504         {
1505             status = GNI_RC_SUCCESS;
1506         }
1507         register_size = 0;  
1508     }
1509
1510     if(status == GNI_RC_SUCCESS)
1511     {
1512         status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1513         if(status == GNI_RC_SUCCESS)
1514         {
1515             buffered_send_msg += register_size;
1516             if(control_msg_tmp->seq_id == 0)
1517             {
1518                 IncreaseMsgInSend(source_addr);
1519             }
1520             FreeControlMsg(control_msg_tmp);
1521             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1522         }else
1523             status = GNI_RC_ERROR_RESOURCE;
1524
1525     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1526     {
1527         CmiAbort("Memory registor for large msg\n");
1528     }else 
1529     {
1530         status = GNI_RC_ERROR_NOMEM; 
1531         if(!inbuff)
1532             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1533     }
1534     return status;
1535 #else
1536     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1537     if(status == GNI_RC_SUCCESS)
1538     {
1539         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1540         if(status == GNI_RC_SUCCESS)
1541         {
1542             FreeControlMsg(control_msg_tmp);
1543         }
1544     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1545     {
1546         CmiAbort("Memory registor for large msg\n");
1547     }else 
1548     {
1549         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1550     }
1551     return status;
1552 #endif
1553 }
1554
1555 inline void LrtsPrepareEnvelope(char *msg, int size)
1556 {
1557     CmiSetMsgSize(msg, size);
1558     CMI_SET_CHECKSUM(msg, size);
1559 }
1560
1561 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1562 {
1563     gni_return_t        status  =   GNI_RC_SUCCESS;
1564     uint8_t tag;
1565     CONTROL_MSG         *control_msg_tmp;
1566     int                 oob = ( mode & OUT_OF_BAND);
1567     SMSG_QUEUE          *queue;
1568
1569     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1570 #if CMK_USE_OOB
1571     queue = oob? &smsg_oob_queue : &smsg_queue;
1572 #else
1573     queue = &smsg_queue;
1574 #endif
1575
1576     LrtsPrepareEnvelope(msg, size);
1577
1578 #if PRINT_SYH
1579     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1580 #endif 
1581 #if CMK_SMP 
1582     if(size <= SMSG_MAX_MSG)
1583         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1584     else if (size < BIG_MSG) {
1585         control_msg_tmp =  construct_control_msg(size, msg, 0);
1586         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1587     }
1588     else {
1589           CmiSetMsgSeq(msg, 0);
1590           control_msg_tmp =  construct_control_msg(size, msg, 1);
1591           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1592     }
1593 #else   //non-smp, smp(worker sending)
1594     if(size <= SMSG_MAX_MSG)
1595     {
1596         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1597             CmiFree(msg);
1598     }
1599     else if (size < BIG_MSG) {
1600         control_msg_tmp =  construct_control_msg(size, msg, 0);
1601         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1602     }
1603     else {
1604 #if     USE_LRTS_MEMPOOL
1605         CmiSetMsgSeq(msg, 0);
1606         control_msg_tmp =  construct_control_msg(size, msg, 1);
1607         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1608 #else
1609         control_msg_tmp =  construct_control_msg(size, msg, 0);
1610         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1611 #endif
1612     }
1613 #endif
1614     return 0;
1615 }
1616
1617 static void    PumpDatagramConnection();
1618 static      int         event_SetupConnect = 111;
1619 static      int         event_PumpSmsg = 222 ;
1620 static      int         event_PumpTransaction = 333;
1621 static      int         event_PumpRdmaTransaction = 444;
1622 static      int         event_SendBufferSmsg = 444;
1623 static      int         event_SendFmaRdmaMsg = 555;
1624 static      int         event_AdvanceCommunication = 666;
1625
1626 static void registerUserTraceEvents() {
1627 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1628     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1629     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1630     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1631     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1632     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1633     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1634     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1635 #endif
1636 }
1637
1638 static void ProcessDeadlock()
1639 {
1640     static CmiUInt8 *ptr = NULL;
1641     static CmiUInt8  last = 0, mysum, sum;
1642     static int count = 0;
1643     gni_return_t status;
1644     int i;
1645
1646 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1647 //sweep_mempool(CpvAccess(mempool));
1648     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1649     mysum = smsg_send_count + smsg_recv_count;
1650     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1651     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1652     GNI_RC_CHECK("PMI_Allgather", status);
1653     sum = 0;
1654     for (i=0; i<mysize; i++)  sum+= ptr[i];
1655     if (last == 0 || sum == last) 
1656         count++;
1657     else
1658         count = 0;
1659     last = sum;
1660     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1661     if (count == 2) { 
1662         /* detected twice, it is a real deadlock */
1663         if (myrank == 0)  {
1664             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1665             CmiAbort("Fatal> Deadlock detected.");
1666         }
1667
1668     }
1669     _detected_hang = 0;
1670 }
1671
1672 static void CheckProgress()
1673 {
1674     if (smsg_send_count == last_smsg_send_count &&
1675         smsg_recv_count == last_smsg_recv_count ) 
1676     {
1677         _detected_hang = 1;
1678 #if !CMK_SMP
1679         if (_detected_hang) ProcessDeadlock();
1680 #endif
1681
1682     }
1683     else {
1684         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1685         last_smsg_send_count = smsg_send_count;
1686         last_smsg_recv_count = smsg_recv_count;
1687         _detected_hang = 0;
1688     }
1689 }
1690
1691 static void set_limit()
1692 {
1693     //if (!user_set_flag && CmiMyRank() == 0) {
1694     if (CmiMyRank() == 0) {
1695         int mynode = CmiPhysicalNodeID(CmiMyPe());
1696         int numpes = CmiNumPesOnPhysicalNode(mynode);
1697         int numprocesses = numpes / CmiMyNodeSize();
1698         MAX_REG_MEM  = _totalmem / numprocesses;
1699         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1700         if (CmiMyPe() == 0)
1701            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1702         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1703         {
1704              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1705              CmiAbort("memory registration\n");
1706         }
1707     }
1708 }
1709
1710 void LrtsPostCommonInit(int everReturn)
1711 {
1712 #if CMK_DIRECT
1713     CmiDirectInit();
1714 #endif
1715 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1716     CpvInitialize(double, projTraceStart);
1717     /* only PE 0 needs to care about registration (to generate sts file). */
1718     //if (CmiMyPe() == 0) 
1719     {
1720         registerMachineUserEventsFunction(&registerUserTraceEvents);
1721     }
1722 #endif
1723
1724 #if CMK_SMP
1725     CmiIdleState *s=CmiNotifyGetState();
1726     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1727     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1728 #else
1729     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1730     if (useDynamicSMSG)
1731     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1732 #endif
1733
1734 #if ! LARGEPAGE
1735     if (_checkProgress)
1736 #if CMK_SMP
1737     if (CmiMyRank() == 0)
1738 #endif
1739     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1740 #endif
1741  
1742 #if !LARGEPAGE
1743     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1744 #endif
1745 }
1746
1747 /* this is called by worker thread */
1748 void LrtsPostNonLocal(){
1749 #if CMK_SMP_TRACE_COMMTHREAD
1750     double startT, endT;
1751 #endif
1752 #if MULTI_THREAD_SEND
1753     if(mysize == 1) return;
1754 #if CMK_SMP_TRACE_COMMTHREAD
1755     traceEndIdle();
1756 #endif
1757
1758 #if CMK_SMP_TRACE_COMMTHREAD
1759     startT = CmiWallTimer();
1760 #endif
1761
1762 #if CMK_WORKER_SINGLE_TASK
1763     if (CmiMyRank() % 6 == 0)
1764 #endif
1765     PumpNetworkSmsg();
1766
1767 #if CMK_WORKER_SINGLE_TASK
1768     if (CmiMyRank() % 6 == 1)
1769 #endif
1770     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1771
1772 #if CMK_WORKER_SINGLE_TASK
1773     if (CmiMyRank() % 6 == 2)
1774 #endif
1775     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1776
1777 #if REMOTE_EVENT
1778 #if CMK_WORKER_SINGLE_TASK
1779     if (CmiMyRank() % 6 == 3)
1780 #endif
1781     PumpRemoteTransactions();
1782 #endif
1783
1784 #if CMK_USE_OOB
1785     if (SendBufferMsg(&smsg_oob_queue) == 1)
1786 #endif
1787     {
1788 #if CMK_WORKER_SINGLE_TASK
1789     if (CmiMyRank() % 6 == 4)
1790 #endif
1791         SendBufferMsg(&smsg_queue);
1792     }
1793
1794 #if CMK_WORKER_SINGLE_TASK
1795     if (CmiMyRank() % 6 == 5)
1796 #endif
1797     SendRdmaMsg();
1798
1799 #if CMK_SMP_TRACE_COMMTHREAD
1800     endT = CmiWallTimer();
1801     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1802 #endif
1803 #if CMK_SMP_TRACE_COMMTHREAD
1804     traceBeginIdle();
1805 #endif
1806 #endif
1807 }
1808
1809 /* useDynamicSMSG */
1810 static void    PumpDatagramConnection()
1811 {
1812     uint32_t          remote_address;
1813     uint32_t          remote_id;
1814     gni_return_t status;
1815     gni_post_state_t  post_state;
1816     uint64_t          datagram_id;
1817     int i;
1818
1819    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1820    {
1821        if (datagram_id >= mysize) {           /* bound endpoint */
1822            int pe = datagram_id - mysize;
1823            CMI_GNI_LOCK(global_gni_lock)
1824            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1825            CMI_GNI_UNLOCK(global_gni_lock)
1826            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1827            {
1828                CmiAssert(remote_id == pe);
1829                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1830                GNI_RC_CHECK("Dynamic SMSG Init", status);
1831 #if PRINT_SYH
1832                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1833 #endif
1834                CmiAssert(smsg_connected_flag[pe] == 1);
1835                smsg_connected_flag[pe] = 2;
1836            }
1837        }
1838        else {         /* unbound ep */
1839            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1840            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1841            {
1842                CmiAssert(remote_id<mysize);
1843                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1844                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1845                GNI_RC_CHECK("Dynamic SMSG Init", status);
1846 #if PRINT_SYH
1847                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1848 #endif
1849                smsg_connected_flag[remote_id] = 2;
1850
1851                alloc_smsg_attr(&send_smsg_attr);
1852                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1853                GNI_RC_CHECK("post unbound datagram", status);
1854            }
1855        }
1856    }
1857 }
1858
1859 /* pooling CQ to receive network message */
1860 static void PumpNetworkRdmaMsgs()
1861 {
1862     gni_cq_entry_t      event_data;
1863     gni_return_t        status;
1864
1865 }
1866
1867 inline 
1868 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
1869 {
1870     RDMA_REQUEST        *rdma_request_msg;
1871     MallocRdmaRequest(rdma_request_msg);
1872     rdma_request_msg->destNode = inst_id;
1873     rdma_request_msg->pd = pd;
1874 #if REMOTE_EVENT
1875     rdma_request_msg->ack_index = ack_index;
1876 #endif
1877 #if CMK_SMP
1878     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1879 #else
1880     if(sendRdmaBuf == 0)
1881     {
1882         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1883     }else{
1884         sendRdmaTail->next = rdma_request_msg;
1885         sendRdmaTail =  rdma_request_msg;
1886     }
1887 #endif
1888
1889 }
1890
1891 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1892
1893 static void PumpNetworkSmsg()
1894 {
1895     uint64_t            inst_id;
1896     int                 ret;
1897     gni_cq_entry_t      event_data;
1898     gni_return_t        status, status2;
1899     void                *header;
1900     uint8_t             msg_tag;
1901     int                 msg_nbytes;
1902     void                *msg_data;
1903     gni_mem_handle_t    msg_mem_hndl;
1904     gni_smsg_attr_t     *smsg_attr;
1905     gni_smsg_attr_t     *remote_smsg_attr;
1906     int                 init_flag;
1907     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1908     uint64_t            source_addr;
1909     SMSG_QUEUE         *queue = &smsg_queue;
1910 #if     CMK_DIRECT
1911     cmidirectMsg        *direct_msg;
1912 #endif
1913     while(1)
1914     {
1915         CMI_GNI_LOCK(smsg_rx_cq_lock)
1916         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1917         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
1918         if(status != GNI_RC_SUCCESS)
1919             break;
1920         inst_id = GNI_CQ_GET_INST_ID(event_data);
1921 #if REMOTE_EVENT
1922         inst_id = ACK_GET_RANK(inst_id);
1923 #endif
1924         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1925 #if PRINT_SYH
1926         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1927 #endif
1928         if (useDynamicSMSG) {
1929             /* subtle: smsg may come before connection is setup */
1930             while (smsg_connected_flag[inst_id] != 2) 
1931                PumpDatagramConnection();
1932         }
1933         msg_tag = GNI_SMSG_ANY_TAG;
1934         while(1) {
1935             CMI_GNI_LOCK(smsg_mailbox_lock)
1936             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1937             if (status != GNI_RC_SUCCESS)
1938             {
1939                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1940                 break;
1941             }
1942 #if PRINT_SYH
1943             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1944 #endif
1945             /* copy msg out and then put into queue (small message) */
1946             switch (msg_tag) {
1947             case SMALL_DATA_TAG:
1948             {
1949                 START_EVENT();
1950                 msg_nbytes = CmiGetMsgSize(header);
1951                 msg_data    = CmiAlloc(msg_nbytes);
1952                 memcpy(msg_data, (char*)header, msg_nbytes);
1953                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1954                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1955                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1956                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
1957                 handleOneRecvedMsg(msg_nbytes, msg_data);
1958                 break;
1959             }
1960             case LMSG_INIT_TAG:
1961             {
1962 #if MULTI_THREAD_SEND
1963                 MallocControlMsg(control_msg_tmp);
1964                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
1965                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1966                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1967                 getLargeMsgRequest(control_msg_tmp, inst_id);
1968                 FreeControlMsg(control_msg_tmp);
1969 #else
1970                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1971                 getLargeMsgRequest(header, inst_id);
1972                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1973 #endif
1974                 break;
1975             }
1976             case ACK_TAG:   //msg fit into mempool
1977             {
1978                 /* Get is done, release message . Now put is not used yet*/
1979                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1980                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1981                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1982 #if ! USE_LRTS_MEMPOOL
1983                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1984 #else
1985                 DecreaseMsgInSend(msg);
1986 #endif
1987                 if(NoMsgInSend(msg))
1988                     buffered_send_msg -= GetMempoolsize(msg);
1989                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1990                 CmiFree(msg);
1991                 break;
1992             }
1993             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1994             {
1995 #if MULTI_THREAD_SEND
1996                 MallocControlMsg(header_tmp);
1997                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
1998                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1999 #else
2000                 header_tmp = (CONTROL_MSG *) header;
2001 #endif
2002                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2003                 void *msg = (void*)(header_tmp->source_addr);
2004                 int cur_seq = CmiGetMsgSeq(msg);
2005                 int offset = ONE_SEG*(cur_seq+1);
2006                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2007                 buffered_send_msg -= header_tmp->length;
2008                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2009                 if (remain_size < 0) remain_size = 0;
2010                 CmiSetMsgSize(msg, remain_size);
2011                 if(remain_size <= 0) //transaction done
2012                 {
2013                     CmiFree(msg);
2014                 }else if (header_tmp->total_length > offset)
2015                 {
2016                     CmiSetMsgSeq(msg, cur_seq+1);
2017                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2018                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2019                     //send next seg
2020                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2021                          // pipelining
2022                     if (header_tmp->seq_id == 1) {
2023                       int i;
2024                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2025                         int seq = cur_seq+i+2;
2026                         CmiSetMsgSeq(msg, seq-1);
2027                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2028                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2029                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2030                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2031                       }
2032                     }
2033                 }
2034 #if MULTI_THREAD_SEND
2035                 FreeControlMsg(header_tmp);
2036 #else
2037                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2038 #endif
2039                 break;
2040             }
2041 #if CMK_PERSISTENT_COMM
2042             case PUT_DONE_TAG:  {   //persistent message
2043                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2044                 int size = ((CONTROL_MSG *) header)->length;
2045                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2046                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2047                 CmiReference(msg);
2048                 CMI_CHECK_CHECKSUM(msg, size);
2049                 handleOneRecvedMsg(size, msg); 
2050 #if PRINT_SYH
2051                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2052 #endif
2053                 break;
2054             }
2055 #endif
2056 #if CMK_DIRECT
2057             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2058                 //create a trigger message
2059                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2060                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2061                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2062                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2063                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2064                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2065                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2066                 break;
2067 #endif
2068             default: {
2069                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2070                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2071                 printf("weird tag problem\n");
2072                 CmiAbort("Unknown tag\n");
2073                      }
2074             }               // end switch
2075 #if PRINT_SYH
2076             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2077 #endif
2078             smsg_recv_count ++;
2079             msg_tag = GNI_SMSG_ANY_TAG;
2080         } //endwhile getNext
2081     }   //end while GetEvent
2082     if(status == GNI_RC_ERROR_RESOURCE)
2083     {
2084         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2085         GNI_RC_CHECK("Smsg_rx_cq full", status);
2086     }
2087 }
2088
2089 static void printDesc(gni_post_descriptor_t *pd)
2090 {
2091     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2092 }
2093
2094 #if CQWRITE
2095 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2096 {
2097     gni_post_descriptor_t *pd;
2098     gni_return_t        status = GNI_RC_SUCCESS;
2099     
2100     MallocPostDesc(pd);
2101     pd->type = GNI_POST_CQWRITE;
2102     pd->cq_mode = GNI_CQMODE_SILENT;
2103     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2104     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2105     pd->cqwrite_value = data;
2106     pd->remote_mem_hndl = mem_hndl;
2107     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2108     GNI_RC_CHECK("GNI_PostCqWrite", status);
2109 }
2110 #endif
2111
2112 // register memory for a message
2113 // return mem handle
2114 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2115 {
2116     gni_return_t status = GNI_RC_SUCCESS;
2117
2118     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2119
2120 #if CMK_PERSISTENT_COMM
2121         // persistent message is always registered
2122     if (!IsMemHndlZero(MEMHFIELD(msg))) {
2123         *memh = MEMHFIELD(msg);
2124         return GNI_RC_SUCCESS;
2125     }
2126 #endif
2127     if(seqno == 0)
2128     {
2129         if(IsMemHndlZero((GetMemHndl(msg))))
2130         {
2131             msg = (void*)(msg);
2132             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2133             if(status == GNI_RC_SUCCESS)
2134                 *memh = GetMemHndl(msg);
2135         }
2136         else {
2137             *memh = GetMemHndl(msg);
2138         }
2139     }
2140     else {
2141         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2142         status = registerMemory(msg, size, memh, NULL); 
2143     }
2144     return status;
2145 }
2146
2147 // for BIG_MSG called on receiver side for receiving control message
2148 // LMSG_INIT_TAG
2149 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2150 {
2151 #if     USE_LRTS_MEMPOOL
2152     CONTROL_MSG         *request_msg;
2153     gni_return_t        status = GNI_RC_SUCCESS;
2154     void                *msg_data;
2155     gni_post_descriptor_t *pd;
2156     gni_mem_handle_t    msg_mem_hndl;
2157     int source, size, transaction_size, offset = 0;
2158     size_t     register_size = 0;
2159
2160     // initial a get to transfer data from the sender side */
2161     request_msg = (CONTROL_MSG *) header;
2162     size = request_msg->total_length;
2163     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2164     MallocPostDesc(pd);
2165 #if CMK_WITH_STATS 
2166     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2167 #endif
2168     if(request_msg->seq_id < 2)   {
2169 #if CMK_SMP_TRACE_COMMTHREAD 
2170         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2171 #endif
2172         msg_data = CmiAlloc(size);
2173         CmiSetMsgSeq(msg_data, 0);
2174         _MEMCHECK(msg_data);
2175     }
2176     else {
2177         offset = ONE_SEG*(request_msg->seq_id-1);
2178         msg_data = (char*)request_msg->dest_addr + offset;
2179     }
2180    
2181     pd->cqwrite_value = request_msg->seq_id;
2182
2183     SetMemHndlZero(pd->local_mem_hndl);
2184     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2185     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2186     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2187         if(NoMsgInRecv( (void*)(msg_data)))
2188             register_size = GetMempoolsize((void*)(msg_data));
2189         else
2190             register_size = 0;
2191     }
2192
2193 #if 0
2194     if( request_msg->seq_id == 0)
2195     {
2196         pd->local_mem_hndl= GetMemHndl(msg_data);
2197         transaction_size = ALIGN64(size);
2198         if(IsMemHndlZero(pd->local_mem_hndl))
2199         {   
2200             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)), rdma_rx_cqh);
2201             if(status == GNI_RC_SUCCESS)
2202             {
2203                 pd->local_mem_hndl = GetMemHndl(msg_data);
2204             }
2205             else
2206             {
2207                 SetMemHndlZero(pd->local_mem_hndl);
2208             }
2209         }
2210         if(NoMsgInRecv( (void*)(msg_data)))
2211             register_size = GetMempoolsize((void*)(msg_data));
2212         else
2213             register_size = 0;
2214     }
2215     else{
2216         transaction_size = ALIGN64(request_msg->length);
2217         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl), NULL); 
2218         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2219         {
2220             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2221         }
2222     }
2223 #endif
2224     pd->first_operand = ALIGN64(size);                   //  total length
2225
2226     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2227         pd->type            = GNI_POST_FMA_GET;
2228     else
2229         pd->type            = GNI_POST_RDMA_GET;
2230     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2231     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2232     pd->length          = transaction_size;
2233     pd->local_addr      = (uint64_t) msg_data;
2234     pd->remote_addr     = request_msg->source_addr + offset;
2235     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2236     pd->src_cq_hndl     = rdma_tx_cqh;
2237     pd->rdma_mode       = 0;
2238     pd->amo_cmd         = 0;
2239
2240     //memory registration success
2241     if(status == GNI_RC_SUCCESS)
2242     {
2243         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2244         CMI_GNI_LOCK(lock)
2245 #if REMOTE_EVENT
2246         if( request_msg->seq_id == 0)
2247         {
2248             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2249             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2250             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2251         }
2252         else {
2253             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, myrank);
2254             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2255         }
2256 #endif
2257
2258 #if CMK_WITH_STATS
2259         RDMA_TRY_SEND()
2260 #endif
2261         if(pd->type == GNI_POST_RDMA_GET) 
2262         {
2263             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2264         }
2265         else
2266         {
2267             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2268         }
2269         CMI_GNI_UNLOCK(lock)
2270
2271         if(status == GNI_RC_SUCCESS )
2272         {
2273             if(pd->cqwrite_value == 0)
2274             {
2275 #if MACHINE_DEBUG_LOG
2276                 buffered_recv_msg += register_size;
2277                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2278 #endif
2279                 IncreaseMsgInRecv(msg_data);
2280 #if CMK_SMP_TRACE_COMMTHREAD 
2281                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2282 #endif
2283             }
2284 #if  CMK_WITH_STATS
2285                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2286                 RDMA_TRANS_INIT(pd->sync_flag_addr/1000000.0)
2287 #endif
2288         }
2289     }else
2290     {
2291         SetMemHndlZero((pd->local_mem_hndl));
2292     }
2293     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2294     {
2295 #if REMOTE_EVENT
2296         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2297 #else
2298         bufferRdmaMsg(inst_id, pd, -1); 
2299 #endif
2300     }else {
2301          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
2302         GNI_RC_CHECK("GetLargeAFter posting", status);
2303     }
2304 #else
2305     CONTROL_MSG         *request_msg;
2306     gni_return_t        status;
2307     void                *msg_data;
2308     gni_post_descriptor_t *pd;
2309     RDMA_REQUEST        *rdma_request_msg;
2310     gni_mem_handle_t    msg_mem_hndl;
2311     //int source;
2312     // initial a get to transfer data from the sender side */
2313     request_msg = (CONTROL_MSG *) header;
2314     msg_data = CmiAlloc(request_msg->length);
2315     _MEMCHECK(msg_data);
2316
2317     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2318
2319     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2320     {
2321         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2322     }
2323
2324     MallocPostDesc(pd);
2325     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2326         pd->type            = GNI_POST_FMA_GET;
2327     else
2328         pd->type            = GNI_POST_RDMA_GET;
2329     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2330     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2331     pd->length          = ALIGN64(request_msg->length);
2332     pd->local_addr      = (uint64_t) msg_data;
2333     pd->remote_addr     = request_msg->source_addr;
2334     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2335     pd->src_cq_hndl     = rdma_tx_cqh;
2336     pd->rdma_mode       = 0;
2337     pd->amo_cmd         = 0;
2338
2339     //memory registration successful
2340     if(status == GNI_RC_SUCCESS)
2341     {
2342         pd->local_mem_hndl  = msg_mem_hndl;
2343        
2344         if(pd->type == GNI_POST_RDMA_GET) 
2345         {
2346             CMI_GNI_LOCK(rdma_tx_cq_lock)
2347             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2348             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2349         }
2350         else
2351         {
2352             CMI_GNI_LOCK(default_tx_cq_lock)
2353             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2354             CMI_GNI_UNLOCK(default_tx_cq_lock)
2355         }
2356
2357     }else
2358     {
2359         SetMemHndlZero(pd->local_mem_hndl);
2360     }
2361     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2362     {
2363         MallocRdmaRequest(rdma_request_msg);
2364         rdma_request_msg->next = 0;
2365         rdma_request_msg->destNode = inst_id;
2366         rdma_request_msg->pd = pd;
2367         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2368     }else {
2369         GNI_RC_CHECK("AFter posting", status);
2370     }
2371 #endif
2372 }
2373
2374 #if CQWRITE
2375 static void PumpCqWriteTransactions()
2376 {
2377
2378     gni_cq_entry_t          ev;
2379     gni_return_t            status;
2380     void                    *msg;  
2381     int                     msg_size;
2382     while(1) {
2383         //CMI_GNI_LOCK(my_cq_lock) 
2384         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2385         //CMI_GNI_UNLOCK(my_cq_lock)
2386         if(status != GNI_RC_SUCCESS) break;
2387         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2388 #if CMK_PERSISTENT_COMM
2389 #if PRINT_SYH
2390         printf(" %d CQ write event %p\n", myrank, msg);
2391 #endif
2392         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2393 #if PRINT_SYH
2394             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2395 #endif
2396             CmiReference(msg);
2397             msg_size = CmiGetMsgSize(msg);
2398             CMI_CHECK_CHECKSUM(msg, msg_size);
2399             handleOneRecvedMsg(msg_size, msg); 
2400             continue;
2401         }
2402 #endif
2403         DecreaseMsgInSend(msg);
2404 #if ! USE_LRTS_MEMPOOL
2405        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2406 #else
2407         DecreaseMsgInSend(msg);
2408 #endif
2409         if(NoMsgInSend(msg))
2410             buffered_send_msg -= GetMempoolsize(msg);
2411         CmiFree(msg);
2412     };
2413     if(status == GNI_RC_ERROR_RESOURCE)
2414     {
2415         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2416     }
2417 }
2418 #endif
2419
2420 #if REMOTE_EVENT
2421 static void PumpRemoteTransactions()
2422 {
2423     gni_cq_entry_t          ev;
2424     gni_return_t            status;
2425     void                    *msg;   
2426     int                     slot;
2427
2428     while(1) {
2429         CMI_GNI_LOCK(global_gni_lock)
2430         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2431         CMI_GNI_UNLOCK(global_gni_lock)
2432         if(status != GNI_RC_SUCCESS) {
2433             break;
2434         }
2435
2436         slot = GNI_CQ_GET_INST_ID(ev);
2437         slot = ACK_GET_INDEX(slot);
2438         //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
2439
2440         //CMI_GNI_LOCK(ackpool_lock);
2441         msg = GetAckAddress(slot);
2442         //CMI_GNI_UNLOCK(ackpool_lock);
2443
2444         DecreaseMsgInSend(msg);
2445 #if ! USE_LRTS_MEMPOOL
2446        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2447 #else
2448         DecreaseMsgInSend(msg);
2449 #endif
2450         if(NoMsgInSend(msg))
2451             buffered_send_msg -= GetMempoolsize(msg);
2452         CmiFree(msg);
2453         AckPool_freeslot(slot);
2454     }
2455     if(status == GNI_RC_ERROR_RESOURCE)
2456     {
2457         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2458     }
2459 }
2460 #endif
2461
2462 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2463 {
2464     gni_cq_entry_t          ev;
2465     gni_return_t            status;
2466     uint64_t                type, inst_id;
2467     gni_post_descriptor_t   *tmp_pd;
2468     MSG_LIST                *ptr;
2469     CONTROL_MSG             *ack_msg_tmp;
2470     ACK_MSG                 *ack_msg;
2471     uint8_t                 msg_tag;
2472 #if CMK_DIRECT
2473     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2474 #endif
2475     SMSG_QUEUE         *queue = &smsg_queue;
2476
2477     while(1) {
2478         CMI_GNI_LOCK(my_cq_lock) 
2479         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2480         CMI_GNI_UNLOCK(my_cq_lock)
2481         if(status != GNI_RC_SUCCESS) break;
2482         
2483         type = GNI_CQ_GET_TYPE(ev);
2484         if (type == GNI_CQ_EVENT_TYPE_POST)
2485         {
2486             inst_id     = GNI_CQ_GET_INST_ID(ev);
2487 #if PRINT_SYH
2488             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2489 #endif
2490             CMI_GNI_LOCK(my_cq_lock)
2491             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2492             CMI_GNI_UNLOCK(my_cq_lock)
2493
2494             switch (tmp_pd->type) {
2495 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2496             case GNI_POST_RDMA_PUT:
2497 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2498                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2499 #endif
2500             case GNI_POST_FMA_PUT:
2501                 if(tmp_pd->amo_cmd == 1) {
2502 #if CMK_DIRECT
2503                     //sender ACK to receiver to trigger it is done
2504                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2505                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2506                     msg_tag = DIRECT_PUT_DONE_TAG;
2507 #endif
2508                 }
2509                 else {
2510                     CmiFree((void *)tmp_pd->local_addr);
2511 #if     !CQWRITE
2512                     MallocControlMsg(ack_msg_tmp);
2513                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2514                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2515                     ack_msg_tmp->length  = tmp_pd->length;
2516                     msg_tag = PUT_DONE_TAG;
2517 #else
2518                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2519                     FreePostDesc(tmp_pd);
2520                     continue;
2521 #endif
2522                 }
2523                 break;
2524 #endif
2525             case GNI_POST_RDMA_GET:
2526             case GNI_POST_FMA_GET:  {
2527 #if  ! USE_LRTS_MEMPOOL
2528                 MallocControlMsg(ack_msg_tmp);
2529                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2530                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2531                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2532                 msg_tag = ACK_TAG;  
2533 #else
2534 #if CMK_WITH_STATS
2535                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2536 #endif
2537                 int seq_id = tmp_pd->cqwrite_value;
2538                 if(seq_id > 0)      // BIG_MSG
2539                 {
2540                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2541                     MallocControlMsg(ack_msg_tmp);
2542                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2543                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2544                     ack_msg_tmp->seq_id = seq_id;
2545                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2546                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2547                     ack_msg_tmp->length = tmp_pd->length;
2548                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2549                     msg_tag = BIG_MSG_TAG; 
2550                 } 
2551                 else
2552                 {
2553                     msg_tag = ACK_TAG; 
2554 #if  !REMOTE_EVENT && !inst_id, CQWRITE
2555                     MallocAckMsg(ack_msg);
2556                     ack_msg->source_addr = tmp_pd->remote_addr;
2557 #endif
2558                 }
2559 #endif
2560                 break;
2561             }
2562             case  GNI_POST_CQWRITE:
2563                    FreePostDesc(tmp_pd);
2564                    continue;
2565             default:
2566                 CmiPrintf("type=%d\n", tmp_pd->type);
2567                 CmiAbort("PumpLocalTransactions: unknown type!");
2568             }      /* end of switch */
2569
2570 #if CMK_DIRECT
2571             if (tmp_pd->amo_cmd == 1) {
2572                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2573                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2574             }
2575             else
2576 #endif
2577             if (msg_tag == ACK_TAG) {
2578 #if !REMOTE_EVENT
2579 #if   !CQWRITE
2580                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2581                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2582 #else
2583                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2584 #endif
2585 #endif
2586             }
2587             else {
2588                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2589                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2590             }
2591 #if CMK_PERSISTENT_COMM
2592             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2593 #endif
2594             {
2595                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2596 #if PRINT_SYH
2597                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2598 #endif
2599                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2600                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2601
2602                     START_EVENT();
2603                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2604                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2605 #if MACHINE_DEBUG_LOG
2606                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2607                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2608                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2609 #endif
2610                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2611                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2612                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2613                 }else if(msg_tag == BIG_MSG_TAG){
2614                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2615                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2616                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2617                         START_EVENT();
2618 #if PRINT_SYH
2619                         printf("Pipeline msg done [%d]\n", myrank);
2620 #endif
2621 #if                 CMK_SMP_TRACE_COMMTHREAD
2622                         if( tmp_pd->cqwrite_value == 1)
2623                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2624 #endif
2625                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2626                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2627                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2628                     }
2629                 }
2630             }
2631             FreePostDesc(tmp_pd);
2632         }
2633     } //end while
2634     if(status == GNI_RC_ERROR_RESOURCE)
2635     {
2636         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2637         GNI_RC_CHECK("Smsg_tx_cq full", status);
2638     }
2639 }
2640
2641 static void  SendRdmaMsg()
2642 {
2643     gni_return_t            status = GNI_RC_SUCCESS;
2644     gni_mem_handle_t        msg_mem_hndl;
2645     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2646     RDMA_REQUEST            *pre = 0;
2647     uint64_t                register_size = 0;
2648     void                    *msg;
2649     int                     i;
2650 #if CMK_SMP
2651     int len = PCQueueLength(sendRdmaBuf);
2652     for (i=0; i<len; i++)
2653     {
2654         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2655         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2656         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2657         if (ptr == NULL) break;
2658 #else
2659     ptr = sendRdmaBuf;
2660     while (ptr!=0)
2661     {
2662 #endif 
2663         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2664         gni_post_descriptor_t *pd = ptr->pd;
2665         status = GNI_RC_SUCCESS;
2666         
2667         msg = (void*)(pd->local_addr);
2668         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2669         if(pd->cqwrite_value == 0) {
2670             if(NoMsgInRecv(msg))
2671                 register_size = GetMempoolsize((void*)(pd->local_addr));
2672             else
2673                 register_size = 0;
2674         }
2675         else
2676             register_size = 0;
2677 #if 0
2678         if(pd->cqwrite_value == 0)
2679         {
2680             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2681             {
2682                 msg = (void*)(pd->local_addr);
2683                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2684                 if(status == GNI_RC_SUCCESS)
2685                 {
2686                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2687                 }
2688             }else
2689             {
2690                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2691             }
2692             if(NoMsgInRecv( (void*)(pd->local_addr)))
2693                 register_size = GetMempoolsize((void*)(pd->local_addr));
2694             else
2695                 register_size = 0;
2696         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2697         {
2698             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl), NULL); 
2699         }
2700 #endif
2701         if(status == GNI_RC_SUCCESS)        //mem register good
2702         {
2703             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2704             CMI_GNI_LOCK(lock);
2705 #if REMOTE_EVENT
2706             if( pd->cqwrite_value == 0)
2707             {
2708                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2709                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2710                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2711             }
2712             else {
2713                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, myrank);
2714                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2715             }
2716 #endif
2717 #if CMK_WITH_STATS
2718             RDMA_TRY_SEND()
2719 #endif
2720             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2721             {
2722                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2723             }
2724             else
2725             {
2726                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2727             }
2728             CMI_GNI_UNLOCK(lock);
2729             
2730             if(status == GNI_RC_SUCCESS)    //post good
2731             {
2732 #if !CMK_SMP
2733                 tmp_ptr = ptr;
2734                 if(pre != 0) {
2735                     pre->next = ptr->next;
2736                 }
2737                 else {
2738                     sendRdmaBuf = ptr->next;
2739                 }
2740                 ptr = ptr->next;
2741                 FreeRdmaRequest(tmp_ptr);
2742 #endif
2743                 if(pd->cqwrite_value == 0)
2744                 {
2745 #if CMK_SMP_TRACE_COMMTHREAD 
2746                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2747 #endif
2748                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2749                 }
2750 #if  CMK_WITH_STATS
2751                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2752                 RDMA_TRANS_INIT(pd->sync_flag_addr/1000000.0)
2753 #endif
2754 #if MACHINE_DEBUG_LOG
2755                 buffered_recv_msg += register_size;
2756                 MACHSTATE(8, "GO request from buffered\n"); 
2757 #endif
2758 #if PRINT_SYH
2759                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
2760 #endif
2761             }else           // cannot post
2762             {
2763 #if CMK_SMP
2764                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2765 #else
2766                 pre = ptr;
2767                 ptr = ptr->next;
2768 #endif
2769 #if PRINT_SYH
2770                 printf("[%d] SendRdmaMsg: post failed. seqno: %d\n", myrank, pd->cqwrite_value);
2771 #endif
2772                 break;
2773             }
2774         } else          //memory registration fails
2775         {
2776 #if CMK_SMP
2777             PCQueuePush(sendRdmaBuf, (char*)ptr);
2778 #else
2779             pre = ptr;
2780             ptr = ptr->next;
2781 #endif
2782         }
2783     } //end while
2784 #if ! CMK_SMP
2785     if(ptr == 0)
2786         sendRdmaTail = pre;
2787 #endif
2788 }
2789
2790 // return 1 if all messages are sent
2791 static int SendBufferMsg(SMSG_QUEUE *queue)
2792 {
2793     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2794     CONTROL_MSG         *control_msg_tmp;
2795     gni_return_t        status;
2796     int                 done = 1;
2797     uint64_t            register_size;
2798     void                *register_addr;
2799     int                 index_previous = -1;
2800 #if CMI_EXERT_SEND_CAP
2801     int                 sent_cnt = 0;
2802 #endif
2803
2804 #if CMK_SMP
2805     int          index = 0;
2806 #if ONE_SEND_QUEUE
2807     memset(destpe_avail, 0, mysize * sizeof(char));
2808     for (index=0; index<1; index++)
2809     {
2810         int i, len = PCQueueLength(queue->sendMsgBuf);
2811         for (i=0; i<len; i++) 
2812         {
2813             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2814             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2815             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2816             if(ptr == NULL) break;
2817             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2818                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2819                 continue;
2820             }
2821 #else
2822 #if SMP_LOCKS
2823     int nonempty = PCQueueLength(nonEmptyQueues);
2824     for(index =0; index<nonempty; index++)
2825     {
2826         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2827         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2828         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
2829         if(current_list == NULL) break; 
2830         PCQueue current_queue= current_list-> sendSmsgBuf;
2831         CmiLock(current_list->lock);
2832         int i, len = PCQueueLength(current_queue);
2833         current_list->pushed = 0;
2834         CmiUnlock(current_list->lock);
2835 #else
2836     for(index =0; index<mysize; index++)
2837     {
2838         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2839         int i, len = PCQueueLength(current_queue);
2840 #endif
2841         for (i=0; i<len; i++) 
2842         {
2843             CMI_PCQUEUEPOP_LOCK(current_queue)
2844             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2845             CMI_PCQUEUEPOP_UNLOCK(current_queue)
2846             if (ptr == 0) break;
2847 #endif
2848 #else
2849     int index = queue->smsg_head_index;
2850     while(index != -1)
2851     {
2852         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2853         pre = 0;
2854         while(ptr != 0)
2855         {
2856 #endif
2857             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2858             status = GNI_RC_ERROR_RESOURCE;
2859             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2860                 /* connection not exists yet */
2861             }
2862             else
2863             switch(ptr->tag)
2864             {
2865             case SMALL_DATA_TAG:
2866                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2867                 if(status == GNI_RC_SUCCESS)
2868                 {
2869                     CmiFree(ptr->msg);
2870                 }
2871                 break;
2872             case LMSG_INIT_TAG:
2873                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2874                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
2875                 break;
2876             case ACK_TAG:
2877                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2878                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2879                 break;
2880             case BIG_MSG_TAG:
2881                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2882                 if(status == GNI_RC_SUCCESS)
2883                 {
2884                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2885                 }
2886                 break;
2887 #if CMK_DIRECT
2888             case DIRECT_PUT_DONE_TAG:
2889                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
2890                 if(status == GNI_RC_SUCCESS)
2891                 {
2892                     free((CMK_DIRECT_HEADER*)ptr->msg);
2893                 }
2894                 break;
2895
2896 #endif
2897             default:
2898                 printf("Weird tag\n");
2899                 CmiAbort("should not happen\n");
2900             }       // end switch
2901             if(status == GNI_RC_SUCCESS)
2902             {
2903 #if PRINT_SYH
2904                 buffered_smsg_counter--;
2905                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2906 #endif
2907 #if !CMK_SMP
2908                 tmp_ptr = ptr;
2909                 if(pre)
2910                 {
2911                     ptr = pre ->next = ptr->next;
2912                 }else
2913                 {
2914                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2915                 }
2916                 FreeMsgList(tmp_ptr);
2917 #else
2918                 FreeMsgList(ptr);
2919 #endif
2920 #if CMI_EXERT_SEND_CAP
2921                 sent_cnt++;
2922                 if(sent_cnt == SEND_CAP)
2923                     break;
2924 #endif
2925             }else {
2926 #if CMK_SMP
2927 #if ONE_SEND_QUEUE
2928                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2929 #else
2930                 PCQueuePush(current_queue, (char*)ptr);
2931 #endif
2932 #else
2933                 pre = ptr;
2934                 ptr=ptr->next;
2935 #endif
2936                 done = 0;
2937                 if(status == GNI_RC_ERROR_RESOURCE)
2938                 {
2939 #if CMK_SMP && ONE_SEND_QUEUE 
2940                     destpe_avail[ptr->destNode] = 1;
2941 #else
2942                     break;
2943 #endif
2944                 }
2945             } 
2946         } //end while
2947 #if !CMK_SMP
2948         if(ptr == 0)
2949             queue->smsg_msglist_index[index].tail = pre;
2950         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2951         {
2952             if(index_previous != -1)
2953                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2954             else
2955                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2956         }
2957         else {
2958             index_previous = index;
2959         }
2960         index = queue->smsg_msglist_index[index].next;
2961 #else
2962 #if !ONE_SEND_QUEUE && SMP_LOCKS
2963         CmiLock(current_list->lock);
2964         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2965         {
2966             current_list->pushed = 1;
2967             PCQueuePush(nonEmptyQueues, current_list);
2968         }
2969         CmiUnlock(current_list->lock); 
2970 #endif
2971 #endif
2972
2973 #if CMI_EXERT_SEND_CAP
2974         if(sent_cnt == SEND_CAP)
2975                 break;
2976 #endif
2977     }   // end pooling for all cores
2978     return done;
2979 }
2980
2981 static void ProcessDeadlock();
2982 void LrtsAdvanceCommunication(int whileidle)
2983 {
2984     static int count = 0;
2985     /*  Receive Msg first */
2986 #if CMK_SMP_TRACE_COMMTHREAD
2987     double startT, endT;
2988 #endif
2989     if (useDynamicSMSG && whileidle)
2990     {
2991 #if CMK_SMP_TRACE_COMMTHREAD
2992         startT = CmiWallTimer();
2993 #endif
2994         PumpDatagramConnection();
2995 #if CMK_SMP_TRACE_COMMTHREAD
2996         endT = CmiWallTimer();
2997         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
2998 #endif
2999     }
3000
3001 #if CMK_SMP_TRACE_COMMTHREAD
3002     startT = CmiWallTimer();
3003 #endif
3004     PumpNetworkSmsg();
3005     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3006 #if CMK_SMP_TRACE_COMMTHREAD
3007     endT = CmiWallTimer();
3008     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3009 #endif
3010
3011 #if CMK_SMP_TRACE_COMMTHREAD
3012     startT = CmiWallTimer();
3013 #endif
3014     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3015     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3016 #if CMK_SMP_TRACE_COMMTHREAD
3017     endT = CmiWallTimer();
3018     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3019 #endif
3020
3021 #if CMK_SMP_TRACE_COMMTHREAD
3022     startT = CmiWallTimer();
3023 #endif
3024     PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock);
3025
3026 #if CQWRITE
3027     PumpCqWriteTransactions();
3028 #endif
3029
3030 #if REMOTE_EVENT
3031     PumpRemoteTransactions();
3032 #endif
3033
3034     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3035 #if CMK_SMP_TRACE_COMMTHREAD
3036     endT = CmiWallTimer();
3037     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3038 #endif
3039  
3040     /* Send buffered Message */
3041 #if CMK_SMP_TRACE_COMMTHREAD
3042     startT = CmiWallTimer();
3043 #endif
3044 #if CMK_USE_OOB
3045     if (SendBufferMsg(&smsg_oob_queue) == 1)
3046 #endif
3047     {
3048     SendBufferMsg(&smsg_queue);
3049     }
3050     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3051 #if CMK_SMP_TRACE_COMMTHREAD
3052     endT = CmiWallTimer();
3053     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3054 #endif
3055
3056 #if CMK_SMP_TRACE_COMMTHREAD
3057     startT = CmiWallTimer();
3058 #endif
3059     SendRdmaMsg();
3060     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3061 #if CMK_SMP_TRACE_COMMTHREAD
3062     endT = CmiWallTimer();
3063     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3064 #endif
3065
3066 #if CMK_SMP && ! LARGEPAGE
3067     if (_detected_hang)  ProcessDeadlock();
3068 #endif
3069 }
3070
3071 /* useDynamicSMSG */
3072 static void _init_dynamic_smsg()
3073 {
3074     gni_return_t status;
3075     uint32_t     vmdh_index = -1;
3076     int i;
3077
3078     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3079     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3080     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3081     for(i=0; i<mysize; i++) {
3082         smsg_connected_flag[i] = 0;
3083         smsg_attr_vector_local[i] = NULL;
3084         smsg_attr_vector_remote[i] = NULL;
3085     }
3086     if(mysize <=512)
3087     {
3088         SMSG_MAX_MSG = 4096;
3089     }else if (mysize <= 4096)
3090     {
3091         SMSG_MAX_MSG = 4096/mysize * 1024;
3092     }else if (mysize <= 16384)
3093     {
3094         SMSG_MAX_MSG = 512;
3095     }else {
3096         SMSG_MAX_MSG = 256;
3097     }
3098
3099     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3100     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3101     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3102     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3103     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3104
3105     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3106     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3107     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3108     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3109     mailbox_list->offset = 0;
3110     mailbox_list->next = 0;
3111     
3112     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3113         mailbox_list->size, smsg_rx_cqh,
3114         GNI_MEM_READWRITE,   
3115         vmdh_index,
3116         &(mailbox_list->mem_hndl));
3117     GNI_RC_CHECK("MEMORY registration for smsg", status);
3118
3119     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3120     GNI_RC_CHECK("Unbound EP", status);
3121     
3122     alloc_smsg_attr(&send_smsg_attr);
3123
3124     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3125     GNI_RC_CHECK("post unbound datagram", status);
3126
3127       /* always pre-connect to proc 0 */
3128     //if (myrank != 0) connect_to(0);
3129 }
3130
3131 static void _init_static_smsg()
3132 {
3133     gni_smsg_attr_t      *smsg_attr;
3134     gni_smsg_attr_t      remote_smsg_attr;
3135     gni_smsg_attr_t      *smsg_attr_vec;
3136     gni_mem_handle_t     my_smsg_mdh_mailbox;
3137     int      ret, i;
3138     gni_return_t status;
3139     uint32_t              vmdh_index = -1;
3140     mdh_addr_t            base_infor;
3141     mdh_addr_t            *base_addr_vec;
3142     char *env;
3143
3144     if(mysize <=512)
3145     {
3146         SMSG_MAX_MSG = 1024;
3147     }else if (mysize <= 4096)
3148     {
3149         SMSG_MAX_MSG = 1024;
3150     }else if (mysize <= 16384)
3151     {
3152         SMSG_MAX_MSG = 512;
3153     }else {
3154         SMSG_MAX_MSG = 256;
3155     }
3156     
3157     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3158     if (env) SMSG_MAX_MSG = atoi(env);
3159     CmiAssert(SMSG_MAX_MSG > 0);
3160
3161     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3162     
3163     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3164     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3165     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3166     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3167     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3168     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3169     CmiAssert(ret == 0);
3170     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3171     
3172     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3173             smsg_memlen*(mysize), smsg_rx_cqh,
3174             GNI_MEM_READWRITE,   
3175             vmdh_index,
3176             &my_smsg_mdh_mailbox);
3177     register_memory_size += smsg_memlen*(mysize);
3178     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3179
3180     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3181     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3182
3183     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3184     base_infor.mdh =  my_smsg_mdh_mailbox;
3185     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3186
3187     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3188  
3189     for(i=0; i<mysize; i++)
3190     {
3191         if(i==myrank)
3192             continue;
3193         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3194         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3195         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3196         smsg_attr[i].mbox_offset = i*smsg_memlen;
3197         smsg_attr[i].buff_size = smsg_memlen;
3198         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3199         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3200     }
3201
3202     for(i=0; i<mysize; i++)
3203     {
3204         if (myrank == i) continue;
3205
3206         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3207         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3208         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3209         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3210         remote_smsg_attr.buff_size = smsg_memlen;
3211         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3212         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3213
3214         /* initialize the smsg channel */
3215         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3216         GNI_RC_CHECK("SMSG Init", status);
3217     } //end initialization
3218
3219     free(base_addr_vec);
3220     free(smsg_attr);
3221
3222     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3223     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3224
3225
3226 inline
3227 static void _init_send_queue(SMSG_QUEUE *queue)
3228 {
3229      int i;
3230 #if ONE_SEND_QUEUE
3231      queue->sendMsgBuf = PCQueueCreate();
3232      destpe_avail = (char*)malloc(mysize * sizeof(char));
3233 #else
3234      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3235 #if CMK_SMP && SMP_LOCKS
3236      nonEmptyQueues = PCQueueCreate();
3237 #endif
3238      for(i =0; i<mysize; i++)
3239      {
3240 #if CMK_SMP
3241          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3242 #if SMP_LOCKS
3243          queue->smsg_msglist_index[i].pushed = 0;
3244          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3245 #endif
3246 #else
3247          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3248          queue->smsg_msglist_index[i].next = -1;
3249          queue->smsg_head_index = -1;
3250 #endif
3251         
3252      }
3253 #endif
3254 }
3255
3256 inline
3257 static void _init_smsg()
3258 {
3259     if(mysize > 1) {
3260         if (useDynamicSMSG)
3261             _init_dynamic_smsg();
3262         else
3263             _init_static_smsg();
3264     }
3265
3266     _init_send_queue(&smsg_queue);
3267 #if CMK_USE_OOB
3268     _init_send_queue(&smsg_oob_queue);
3269 #endif
3270 }
3271
3272 static void _init_static_msgq()
3273 {
3274     gni_return_t status;
3275     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3276     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3277     msgq_attrs.smsg_q_sz = 1;
3278     msgq_attrs.rcv_pool_sz = 1;
3279     msgq_attrs.num_msgq_eps = 2;
3280     msgq_attrs.nloc_insts = 8;
3281     msgq_attrs.modes = 0;
3282     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3283
3284     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3285     GNI_RC_CHECK("MSGQ Init", status);
3286
3287
3288 }
3289
3290
3291 static CmiUInt8 total_mempool_size = 0;
3292 static CmiUInt8 total_mempool_calls = 0;
3293
3294 #if USE_LRTS_MEMPOOL
3295 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3296 {
3297     void *pool;
3298     int ret;
3299     gni_return_t status = GNI_RC_SUCCESS;
3300
3301     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3302     if (*size < default_size) *size = default_size;
3303 #if LARGEPAGE
3304     // round up to be multiple of _tlbpagesize
3305     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3306     *size = ALIGNHUGEPAGE(*size);
3307 #endif
3308     total_mempool_size += *size;
3309     total_mempool_calls += 1;
3310 #if   !LARGEPAGE
3311     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3312     {
3313         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3314         CmiAbort("alloc_mempool_block");
3315     }
3316 #endif
3317 #if LARGEPAGE
3318     pool = my_get_huge_pages(*size);
3319     ret = pool==NULL;
3320 #else
3321     ret = posix_memalign(&pool, ALIGNBUF, *size);
3322 #endif
3323     if (ret != 0) {
3324 #if CMK_SMP && STEAL_MEMPOOL
3325       pool = steal_mempool_block(size, mem_hndl);
3326       if (pool != NULL) return pool;
3327 #endif
3328       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3329       if (ret == ENOMEM)
3330         CmiAbort("alloc_mempool_block: out of memory.");
3331       else
3332         CmiAbort("alloc_mempool_block: posix_memalign failed");
3333     }
3334 #if LARGEPAGE
3335     CmiMemLock();
3336     register_count++;
3337     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3338     CmiMemUnlock();
3339     if(status != GNI_RC_SUCCESS) {
3340         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3341 sweep_mempool(CpvAccess(mempool));
3342     }
3343     GNI_RC_CHECK("MEMORY_REGISTER", status);
3344 #else
3345     SetMemHndlZero((*mem_hndl));
3346 #endif
3347     return pool;
3348 }
3349
3350 // ptr is a block head pointer
3351 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3352 {
3353     if(!(IsMemHndlZero(mem_hndl)))
3354     {
3355         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3356     }
3357 #if LARGEPAGE
3358     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3359 #else
3360     free(ptr);
3361 #endif
3362 }
3363 #endif
3364
3365 void LrtsPreCommonInit(int everReturn){
3366 #if USE_LRTS_MEMPOOL
3367     CpvInitialize(mempool_type*, mempool);
3368     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3369     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3370 #endif
3371 }
3372
3373 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3374 {
3375     register int            i;
3376     int                     rc;
3377     int                     device_id = 0;
3378     unsigned int            remote_addr;
3379     gni_cdm_handle_t        cdm_hndl;
3380     gni_return_t            status = GNI_RC_SUCCESS;
3381     uint32_t                vmdh_index = -1;
3382     uint8_t                 ptag;
3383     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3384     int                     first_spawned;
3385     int                     physicalID;
3386     char                   *env;
3387
3388     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3389     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3390     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3391    
3392     status = PMI_Init(&first_spawned);
3393     GNI_RC_CHECK("PMI_Init", status);
3394
3395     status = PMI_Get_size(&mysize);
3396     GNI_RC_CHECK("PMI_Getsize", status);
3397
3398     status = PMI_Get_rank(&myrank);
3399     GNI_RC_CHECK("PMI_getrank", status);
3400
3401     //physicalID = CmiPhysicalNodeID(myrank);
3402     
3403     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3404
3405     *myNodeID = myrank;
3406     *numNodes = mysize;
3407   
3408 #if MULTI_THREAD_SEND
3409     /* Currently, we only consider the case that comm. thread will only recv msgs */
3410     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3411 #endif
3412
3413     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3414     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3415     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3416
3417     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3418     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3419     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3420
3421     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3422     if (env) useDynamicSMSG = 1;
3423     if (!useDynamicSMSG)
3424       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3425     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3426     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3427     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3428     
3429     if(myrank == 0)
3430     {
3431         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3432         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3433     }
3434 #ifdef USE_ONESIDED
3435     onesided_init(NULL, &onesided_hnd);
3436
3437     // this is a GNI test, so use the libonesided bypass functionality
3438     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3439     local_addr = gniGetNicAddress();
3440 #else
3441     ptag = get_ptag();
3442     cookie = get_cookie();
3443 #if 0
3444     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3445 #endif
3446     //Create and attach to the communication  domain */
3447     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3448     GNI_RC_CHECK("GNI_CdmCreate", status);
3449     //* device id The device id is the minor number for the device
3450     //that is assigned to the device by the system when the device is created.
3451     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3452     //where X is the device number 0 default 
3453     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3454     GNI_RC_CHECK("GNI_CdmAttach", status);
3455     local_addr = get_gni_nic_address(0);
3456 #endif
3457     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3458     _MEMCHECK(MPID_UGNI_AllAddr);
3459     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3460     /* create the local completion queue */
3461     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3462     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3463     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3464     
3465     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3466     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3467     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3468
3469     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3470     GNI_RC_CHECK("Create CQ (rx)", status);
3471     
3472     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3473     GNI_RC_CHECK("Create Post CQ (rx)", status);
3474     
3475     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3476     //GNI_RC_CHECK("Create BTE CQ", status);
3477
3478     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3479     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));