added sendbuffer control
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     0
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_LARGE_CAP        0
65 #define CMI_EXERT_RECV_RDMA_CAP      0
66
67
68 #define CMI_SENDBUFFERSMSG_CAP 0
69 #define CMI_PUMPNETWORKSMSG_CAP 0
70
71 #if CMI_SENDBUFFERSMSG_CAP
72 int     SendBufferMsg_cap  =10;
73 #endif
74
75 #if CMI_PUMPNETWORKSMSG_CAP
76 int     PumpNetworkSmsg_cap = 10;
77 #endif
78
79 #if CMI_EXERT_SEND_LARGE_CAP
80 int SEND_large_cap = 100;
81 int SEND_large_pending = 0;
82 #endif
83
84 #if CMI_EXERT_RECV_RDMA_CAP
85 int   RDMA_cap =   100;
86 int   RDMA_pending = 0;
87 #endif
88
89 #define USE_LRTS_MEMPOOL                  1
90
91 #define PRINT_SYH                         0
92
93 // Trace communication thread
94 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
95 #define TRACE_THRESHOLD     0.00001
96 #define CMI_MPI_TRACE_MOREDETAILED 0
97 #undef CMI_MPI_TRACE_USEREVENTS
98 #define CMI_MPI_TRACE_USEREVENTS 1
99 #else
100 #undef CMK_SMP_TRACE_COMMTHREAD
101 #define CMK_SMP_TRACE_COMMTHREAD 0
102 #endif
103
104 #define CMK_TRACE_COMMOVERHEAD 0
105 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
106 #undef CMI_MPI_TRACE_USEREVENTS
107 #define CMI_MPI_TRACE_USEREVENTS 1
108 #else
109 #undef CMK_TRACE_COMMOVERHEAD
110 #define CMK_TRACE_COMMOVERHEAD 0
111 #endif
112
113 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
114 CpvStaticDeclare(double, projTraceStart);
115 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
116 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
117 #else
118 #define  START_EVENT()
119 #define  END_EVENT(x)
120 #endif
121
122 #if USE_LRTS_MEMPOOL
123
124 #define oneMB (1024ll*1024)
125 #define oneGB (1024ll*1024*1024)
126
127 static CmiInt8 _mempool_size = 8*oneMB;
128 static CmiInt8 _expand_mem =  4*oneMB;
129 static CmiInt8 _mempool_size_limit = 0;
130
131 static CmiInt8 _totalmem = 0.8*oneGB;
132
133 #if LARGEPAGE
134 static CmiInt8 BIG_MSG  =  16*oneMB;
135 static CmiInt8 ONE_SEG  =  4*oneMB;
136 #else
137 static CmiInt8 BIG_MSG  =  4*oneMB;
138 static CmiInt8 ONE_SEG  =  2*oneMB;
139 #endif
140 #if MULTI_THREAD_SEND
141 static int BIG_MSG_PIPELINE = 1;
142 #else
143 static int BIG_MSG_PIPELINE = 4;
144 #endif
145
146 // dynamic flow control
147 static CmiInt8 buffered_send_msg = 0;
148 static CmiInt8 register_memory_size = 0;
149
150 #if LARGEPAGE
151 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
152 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
153 static CmiInt8  register_count = 0;
154 #else
155 #if CMK_SMP && COMM_THREAD_SEND 
156 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
157 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
158 #else
159 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
160 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
161 #endif
162
163
164 #endif
165
166 #endif     /* end USE_LRTS_MEMPOOL */
167
168 #if MULTI_THREAD_SEND 
169 #define     CMI_GNI_LOCK(x)       CmiLock(x);
170 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
171 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
172 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
173 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
174 #else
175 #define     CMI_GNI_LOCK(x)
176 #define     CMI_GNI_TRYLOCK(x)         (0)
177 #define     CMI_GNI_UNLOCK(x)
178 #define     CMI_PCQUEUEPOP_LOCK(Q)   
179 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
180 #endif
181
182 static int _tlbpagesize = 4096;
183
184 //static int _smpd_count  = 0;
185
186 static int   user_set_flag  = 0;
187
188 static int _checkProgress = 1;             /* check deadlock */
189 static int _detected_hang = 0;
190
191 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
192
193 // dynamic SMSG
194 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
195
196 static int avg_smsg_connection = 32;
197 static int                 *smsg_connected_flag= 0;
198 static gni_smsg_attr_t     **smsg_attr_vector_local;
199 static gni_smsg_attr_t     **smsg_attr_vector_remote;
200 static gni_ep_handle_t     ep_hndl_unbound;
201 static gni_smsg_attr_t     send_smsg_attr;
202 static gni_smsg_attr_t     recv_smsg_attr;
203
204 typedef struct _dynamic_smsg_mailbox{
205    void     *mailbox_base;
206    int      size;
207    int      offset;
208    gni_mem_handle_t  mem_hndl;
209    struct      _dynamic_smsg_mailbox  *next;
210 }dynamic_smsg_mailbox_t;
211
212 static dynamic_smsg_mailbox_t  *mailbox_list;
213
214 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
215 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
216
217 #if PRINT_SYH
218 int         lrts_send_msg_id = 0;
219 int         lrts_local_done_msg = 0;
220 int         lrts_send_rdma_success = 0;
221 #endif
222
223 #include "machine.h"
224
225 #include "pcqueue.h"
226
227 #include "mempool.h"
228
229 #if CMK_PERSISTENT_COMM
230 #include "machine-persistent.h"
231 #endif
232
233 //#define  USE_ONESIDED 1
234 #ifdef USE_ONESIDED
235 //onesided implementation is wrong, since no place to restore omdh
236 #include "onesided.h"
237 onesided_hnd_t   onesided_hnd;
238 onesided_md_t    omdh;
239 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
240
241 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
242
243 #else
244 uint8_t   onesided_hnd, omdh;
245
246 #if REMOTE_EVENT || CQWRITE 
247 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
248     if(register_memory_size+size>= MAX_REG_MEM) { \
249         status = GNI_RC_ERROR_NOMEM;} \
250     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
251         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
252 #else
253 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
254         if (register_memory_size + size >= MAX_REG_MEM) { \
255             status = GNI_RC_ERROR_NOMEM; \
256         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
257             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
258 #endif
259
260 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
261     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
262              register_memory_size -= size; \
263          else CmiAbort("MEM_DEregister");  \
264     } while (0)
265 #endif
266
267 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
268 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
269 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
270 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
271 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
272 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
273 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
274 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
275 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
276 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
277 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
278 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
279 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
280 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
281
282 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
283 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
284
285 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
286 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
287 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
288 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
289
290 #define ALIGNBUF                64
291
292 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
293 /* If SMSG is not used */
294
295 #define FMA_PER_CORE  1024
296 #define FMA_BUFFER_SIZE 1024
297
298 /* If SMSG is used */
299 static int  SMSG_MAX_MSG = 1024;
300 #define SMSG_MAX_CREDIT    72
301
302 #define MSGQ_MAXSIZE       2048
303
304 /* large message transfer with FMA or BTE */
305 #if ! REMOTE_EVENT
306 #define LRTS_GNI_RDMA_THRESHOLD  1024 
307 #else
308    /* remote events only work with RDMA */
309 #define LRTS_GNI_RDMA_THRESHOLD  0 
310 #endif
311
312 #if CMK_SMP
313 static int  REMOTE_QUEUE_ENTRIES=163840; 
314 static int LOCAL_QUEUE_ENTRIES=163840; 
315 #else
316 static int  REMOTE_QUEUE_ENTRIES=20480;
317 static int LOCAL_QUEUE_ENTRIES=20480; 
318 #endif
319
320 #define BIG_MSG_TAG             0x26
321 #define PUT_DONE_TAG            0x28
322 #define DIRECT_PUT_DONE_TAG     0x29
323 #define ACK_TAG                 0x30
324 /* SMSG is data message */
325 #define SMALL_DATA_TAG          0x31
326 /* SMSG is a control message to initialize a BTE */
327 #define LMSG_INIT_TAG           0x39 
328
329 #define DEBUG
330 #ifdef GNI_RC_CHECK
331 #undef GNI_RC_CHECK
332 #endif
333 #ifdef DEBUG
334 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
335 #else
336 #define GNI_RC_CHECK(msg,rc)
337 #endif
338
339 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
340 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
341 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
342
343 static int useStaticMSGQ = 0;
344 static int useStaticFMA = 0;
345 static int mysize, myrank;
346 static gni_nic_handle_t   nic_hndl;
347
348 typedef struct {
349     gni_mem_handle_t mdh;
350     uint64_t addr;
351 } mdh_addr_t ;
352 // this is related to dynamic SMSG
353
354 typedef struct mdh_addr_list{
355     gni_mem_handle_t mdh;
356    void *addr;
357     struct mdh_addr_list *next;
358 }mdh_addr_list_t;
359
360 static unsigned int         smsg_memlen;
361 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
362 mdh_addr_t          setup_mem;
363 mdh_addr_t          *smsg_connection_vec = 0;
364 gni_mem_handle_t    smsg_connection_memhndl;
365 static int          smsg_expand_slots = 10;
366 static int          smsg_available_slot = 0;
367 static void         *smsg_mailbox_mempool = 0;
368 mdh_addr_list_t     *smsg_dynamic_list = 0;
369
370 static void             *smsg_mailbox_base;
371 gni_msgq_attr_t         msgq_attrs;
372 gni_msgq_handle_t       msgq_handle;
373 gni_msgq_ep_attr_t      msgq_ep_attrs;
374 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
375
376 /* =====Beginning of Declarations of Machine Specific Variables===== */
377 static int cookie;
378 static int modes = 0;
379 static gni_cq_handle_t       smsg_rx_cqh = NULL;
380 static gni_cq_handle_t       default_tx_cqh = NULL;
381 static gni_cq_handle_t       rdma_tx_cqh = NULL;
382 static gni_cq_handle_t       rdma_rx_cqh = NULL;
383 static gni_cq_handle_t       post_tx_cqh = NULL;
384 static gni_ep_handle_t       *ep_hndl_array;
385
386 static CmiNodeLock           *ep_lock_array;
387 static CmiNodeLock           default_tx_cq_lock; 
388 static CmiNodeLock           rdma_tx_cq_lock; 
389 static CmiNodeLock           global_gni_lock; 
390 static CmiNodeLock           rx_cq_lock;
391 static CmiNodeLock           smsg_mailbox_lock;
392 static CmiNodeLock           smsg_rx_cq_lock;
393 static CmiNodeLock           *mempool_lock;
394 //#define     CMK_WITH_STATS      1
395 typedef struct msg_list
396 {
397     uint32_t destNode;
398     uint32_t size;
399     void *msg;
400     uint8_t tag;
401 #if !CMK_SMP
402     struct msg_list *next;
403 #endif
404 #if CMK_WITH_STATS
405     double  creation_time;
406 #endif
407 }MSG_LIST;
408
409
410 typedef struct control_msg
411 {
412     uint64_t            source_addr;    /* address from the start of buffer  */
413     uint64_t            dest_addr;      /* address from the start of buffer */
414     int                 total_length;   /* total length */
415     int                 length;         /* length of this packet */
416 #if REMOTE_EVENT
417     int                 ack_index;      /* index from integer to address */
418 #endif
419     uint8_t             seq_id;         //big message   0 meaning single message
420     gni_mem_handle_t    source_mem_hndl;
421     struct control_msg *next;
422 } CONTROL_MSG;
423
424 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
425
426 typedef struct ack_msg
427 {
428     uint64_t            source_addr;    /* address from the start of buffer  */
429 #if ! USE_LRTS_MEMPOOL
430     gni_mem_handle_t    source_mem_hndl;
431     int                 length;          /* total length */
432 #endif
433     struct ack_msg     *next;
434 } ACK_MSG;
435
436 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
437
438 #if CMK_DIRECT
439 typedef struct{
440     uint64_t    handler_addr;
441 }CMK_DIRECT_HEADER;
442
443 typedef struct {
444     char core[CmiMsgHeaderSizeBytes];
445     uint64_t handler;
446 }cmidirectMsg;
447
448 //SYH
449 CpvDeclare(int, CmiHandleDirectIdx);
450 void CmiHandleDirectMsg(cmidirectMsg* msg)
451 {
452
453     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
454    (*(_handle->callbackFnPtr))(_handle->callbackData);
455    CmiFree(msg);
456 }
457
458 void CmiDirectInit()
459 {
460     CpvInitialize(int,  CmiHandleDirectIdx);
461     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
462 }
463
464 #endif
465 typedef struct  rmda_msg
466 {
467     int                   destNode;
468 #if REMOTE_EVENT
469     int                   ack_index;
470 #endif
471     gni_post_descriptor_t *pd;
472 #if !CMK_SMP
473     struct  rmda_msg      *next;
474 #endif
475 }RDMA_REQUEST;
476
477
478 #if CMK_SMP
479 #define SMP_LOCKS               0
480 #define ONE_SEND_QUEUE                  0
481 PCQueue sendRdmaBuf;
482 typedef struct  msg_list_index
483 {
484     PCQueue     sendSmsgBuf;
485     int         pushed;
486     CmiNodeLock   lock;
487 } MSG_LIST_INDEX;
488 char                *destpe_avail;
489 #if  !ONE_SEND_QUEUE && SMP_LOCKS
490     PCQueue     nonEmptyQueues;
491 #endif
492 #else         /* non-smp */
493
494 static RDMA_REQUEST        *sendRdmaBuf = 0;
495 static RDMA_REQUEST        *sendRdmaTail = 0;
496 typedef struct  msg_list_index
497 {
498     int         next;
499     MSG_LIST    *sendSmsgBuf;
500     MSG_LIST    *tail;
501 } MSG_LIST_INDEX;
502
503 #endif
504
505 // buffered send queue
506 #if ! ONE_SEND_QUEUE
507 typedef struct smsg_queue
508 {
509     MSG_LIST_INDEX   *smsg_msglist_index;
510     int               smsg_head_index;
511 } SMSG_QUEUE;
512 #else
513 typedef struct smsg_queue
514 {
515     PCQueue       sendMsgBuf;
516 }  SMSG_QUEUE;
517 #endif
518
519 SMSG_QUEUE                  smsg_queue;
520 #if CMK_USE_OOB
521 SMSG_QUEUE                  smsg_oob_queue;
522 #endif
523
524 #if CMK_SMP
525
526 #define FreeMsgList(d)   free(d);
527 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
528
529 #else
530
531 static MSG_LIST       *msglist_freelist=0;
532
533 #define FreeMsgList(d)  \
534   do { \
535   (d)->next = msglist_freelist;\
536   msglist_freelist = d; \
537   } while (0)
538
539 #define MallocMsgList(d) \
540   do {  \
541   d = msglist_freelist;\
542   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
543              _MEMCHECK(d);\
544   } else msglist_freelist = d->next; \
545   d->next =0;  \
546   } while (0)
547
548 #endif
549
550 #if CMK_SMP
551
552 #define FreeControlMsg(d)      free(d);
553 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
554
555 #else
556
557 static CONTROL_MSG    *control_freelist=0;
558
559 #define FreeControlMsg(d)       \
560   do { \
561   (d)->next = control_freelist;\
562   control_freelist = d; \
563   } while (0);
564
565 #define MallocControlMsg(d) \
566   do {  \
567   d = control_freelist;\
568   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
569              _MEMCHECK(d);\
570   } else control_freelist = d->next;  \
571   } while (0);
572
573 #endif
574
575 #if CMK_SMP
576
577 #define FreeAckMsg(d)      free(d);
578 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
579
580 #else
581
582 static ACK_MSG        *ack_freelist=0;
583
584 #define FreeAckMsg(d)       \
585   do { \
586   (d)->next = ack_freelist;\
587   ack_freelist = d; \
588   } while (0)
589
590 #define MallocAckMsg(d) \
591   do { \
592   d = ack_freelist;\
593   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
594              _MEMCHECK(d);\
595   } else ack_freelist = d->next; \
596   } while (0)
597
598 #endif
599
600
601 # if CMK_SMP
602 #define FreeRdmaRequest(d)       free(d);
603 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
604 #else
605
606 static RDMA_REQUEST         *rdma_freelist = NULL;
607
608 #define FreeRdmaRequest(d)       \
609   do {  \
610   (d)->next = rdma_freelist;\
611   rdma_freelist = d;    \
612   } while (0)
613
614 #define MallocRdmaRequest(d) \
615   do {   \
616   d = rdma_freelist;\
617   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
618              _MEMCHECK(d);\
619   } else rdma_freelist = d->next; \
620   d->next =0;   \
621   } while (0)
622 #endif
623
624 /* reuse gni_post_descriptor_t */
625 static gni_post_descriptor_t *post_freelist=0;
626
627 #if  !CMK_SMP
628 #define FreePostDesc(d)       \
629     (d)->next_descr = post_freelist;\
630     post_freelist = d;
631
632 #define MallocPostDesc(d) \
633   d = post_freelist;\
634   if (d==0) { \
635      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
636      d->next_descr = 0;\
637       _MEMCHECK(d);\
638   } else post_freelist = d->next_descr;
639 #else
640
641 #define FreePostDesc(d)     free(d);
642 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
643
644 #endif
645
646
647 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
648 static int      buffered_smsg_counter = 0;
649
650 /* SmsgSend return success but message sent is not confirmed by remote side */
651 static MSG_LIST *buffered_fma_head = 0;
652 static MSG_LIST *buffered_fma_tail = 0;
653
654 /* functions  */
655 #define IsFree(a,ind)  !( a& (1<<(ind) ))
656 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
657 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
658
659 CpvDeclare(mempool_type*, mempool);
660
661 #if REMOTE_EVENT
662 /* ack pool for remote events */
663
664 static int  SHIFT   =           18;
665 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
666 #define RANK_MASK               ((1<<SHIFT) - 1)
667 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
668
669 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
670 #define GET_RANK(evt)           ((evt) & RANK_MASK)
671 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
672
673 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
674
675 #if CMK_SMP
676 #define INIT_SIZE                4096
677 #else
678 #define INIT_SIZE                1024
679 #endif
680
681 struct IndexStruct {
682 void *addr;
683 int next;
684 int type;     // 1: ACK   2: Persistent
685 };
686
687 typedef struct IndexPool {
688     struct IndexStruct   *indexes;
689     int                   size;
690     int                   freehead;
691     CmiNodeLock           lock;
692 } IndexPool;
693
694 static IndexPool  ackPool;
695 #if CMK_PERSISTENT_COMM
696 static IndexPool  persistPool;
697 #endif
698
699 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
700 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
701
702 static void IndexPool_init(IndexPool *pool)
703 {
704     int i;
705     if ((1<<SHIFT) < mysize) 
706         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
707     pool->size = INIT_SIZE;
708     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
709     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
710     for (i=0; i<pool->size-1; i++) {
711         pool->indexes[i].next = i+1;
712         pool->indexes[i].type = 0;
713     }
714     pool->indexes[i].next = -1;
715     pool->freehead = 0;
716 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
717     pool->lock  = CmiCreateLock();
718 #else
719     pool->lock  = 0;
720 #endif
721 }
722
723 static
724 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
725 {
726     int s, i;
727 #if MULTI_THREAD_SEND  
728     CmiLock(pool->lock);
729 #endif
730     s = pool->freehead;
731     if (s == -1) {
732         int newsize = pool->size * 2;
733         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
734         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
735         struct IndexStruct *old_ackpool = pool->indexes;
736         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
737         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
738         for (i=pool->size; i<newsize-1; i++) {
739             pool->indexes[i].next = i+1;
740             pool->indexes[i].type = 0;
741         }
742         pool->indexes[i].next = -1;
743         pool->indexes[i].type = 0;
744         pool->freehead = pool->size;
745         s = pool->size;
746         pool->size = newsize;
747         free(old_ackpool);
748     }
749     pool->freehead = pool->indexes[s].next;
750     pool->indexes[s].addr = addr;
751     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
752     pool->indexes[s].type = type;
753 #if MULTI_THREAD_SEND
754     CmiUnlock(pool->lock);
755 #endif
756     return s;
757 }
758
759 static
760 inline  void IndexPool_freeslot(IndexPool *pool, int s)
761 {
762     CmiAssert(s>=0 && s<pool->size);
763 #if MULTI_THREAD_SEND
764     CmiLock(pool->lock);
765 #endif
766     pool->indexes[s].next = pool->freehead;
767     pool->indexes[s].type = 0;
768     pool->freehead = s;
769 #if MULTI_THREAD_SEND
770     CmiUnlock(pool->lock);
771 #endif
772 }
773
774
775 #endif
776
777 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
778 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
779 #define CHARM_MAGIC_NUMBER               126
780
781 #if CMK_ERROR_CHECKING
782 extern unsigned char computeCheckSum(unsigned char *data, int len);
783 static int checksum_flag = 0;
784 #define CMI_SET_CHECKSUM(msg, len)      \
785         if (checksum_flag)  {   \
786           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
787           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
788         }
789 #define CMI_CHECK_CHECKSUM(msg, len)    \
790         if (checksum_flag)      \
791           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
792             CmiAbort("Fatal error: checksum doesn't agree!\n");
793 #else
794 #define CMI_SET_CHECKSUM(msg, len)
795 #define CMI_CHECK_CHECKSUM(msg, len)
796 #endif
797 /* =====End of Definitions of Message-Corruption Related Macros=====*/
798
799 static int print_stats = 0;
800 static int stats_off = 0;
801 void CmiTurnOnStats()
802 {
803     stats_off = 0;
804     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
805 }
806
807 void CmiTurnOffStats()
808 {
809     stats_off = 1;
810 }
811
812 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
813
814 #if CMK_WITH_STATS
815 FILE *counterLog = NULL;
816 typedef struct comm_thread_stats
817 {
818     uint64_t  smsg_data_count;
819     uint64_t  lmsg_init_count;
820     uint64_t  ack_count;
821     uint64_t  big_msg_ack_count;
822     uint64_t  smsg_count;
823     uint64_t  direct_put_done_count;
824     uint64_t  put_done_count;
825     //times of calling SmsgSend
826     uint64_t  try_smsg_data_count;
827     uint64_t  try_lmsg_init_count;
828     uint64_t  try_ack_count;
829     uint64_t  try_big_msg_ack_count;
830     uint64_t  try_direct_put_done_count;
831     uint64_t  try_put_done_count;
832     uint64_t  try_smsg_count;
833     
834     double    max_time_in_send_buffered_smsg;
835     double    all_time_in_send_buffered_smsg;
836
837     uint64_t  rdma_get_count, rdma_put_count;
838     uint64_t  try_rdma_get_count, try_rdma_put_count;
839     double    max_time_from_control_to_rdma_init;
840     double    all_time_from_control_to_rdma_init;
841
842     double    max_time_from_rdma_init_to_rdma_done;
843     double    all_time_from_rdma_init_to_rdma_done;
844
845     int      count_in_PumpNetwork;
846     double   time_in_PumpNetwork;
847     double   max_time_in_PumpNetwork;
848     int      count_in_SendBufferMsg_smsg;
849     double   time_in_SendBufferMsg_smsg;
850     double   max_time_in_SendBufferMsg_smsg;
851     int      count_in_SendRdmaMsg;
852     double   time_in_SendRdmaMsg;
853     double   max_time_in_SendRdmaMsg;
854     int      count_in_PumpRemoteTransactions;
855     double   time_in_PumpRemoteTransactions;
856     double   max_time_in_PumpRemoteTransactions;
857     int      count_in_PumpLocalTransactions_rdma;
858     double   time_in_PumpLocalTransactions_rdma;
859     double   max_time_in_PumpLocalTransactions_rdma;
860     int      count_in_PumpDatagramConnection;
861     double   time_in_PumpDatagramConnection;
862     double   max_time_in_PumpDatagramConnection;
863 } Comm_Thread_Stats;
864
865 static Comm_Thread_Stats   comm_stats;
866
867 static char *counters_dirname = "counters";
868
869 static void init_comm_stats()
870 {
871   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
872   if (print_stats){
873       char ln[200];
874       int code = mkdir(counters_dirname, 00777); 
875       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
876       counterLog=fopen(ln,"w");
877       if (counterLog == NULL) CmiAbort("Counter files open failed");
878   }
879 }
880
881 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
882
883 #define SMSG_SENT_DONE(creation_time, tag)  \
884         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
885             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
886             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
887             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
888             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
889             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
890             comm_stats.smsg_count++; \
891             double inbuff_time = CmiWallTimer() - creation_time;   \
892             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
893             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
894         }
895
896 #define SMSG_TRY_SEND(tag)  \
897         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
898             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
899             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
900             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
901             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
902             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
903             comm_stats.try_smsg_count++; \
904         }
905
906 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
907
908 #define  RDMA_TRANS_DONE(x)      \
909          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
910              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
911              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
912          }
913
914 #define  RDMA_TRANS_INIT(type, x)      \
915          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
916              double rdma_trans_time = CmiWallTimer() - x ; \
917              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
918              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
919          }
920
921 #define STATS_PUMPNETWORK_TIME(x)   \
922         { double t = CmiWallTimer(); \
923           x;        \
924           t = CmiWallTimer() - t;          \
925           comm_stats.count_in_PumpNetwork++;        \
926           comm_stats.time_in_PumpNetwork += t;   \
927           if (t>comm_stats.max_time_in_PumpNetwork)      \
928               comm_stats.max_time_in_PumpNetwork = t;    \
929         }
930
931 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
932         { double t = CmiWallTimer(); \
933           x;        \
934           t = CmiWallTimer() - t;          \
935           comm_stats.count_in_PumpRemoteTransactions ++;        \
936           comm_stats.time_in_PumpRemoteTransactions += t;   \
937           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
938               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
939         }
940
941 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
942         { double t = CmiWallTimer(); \
943           x;        \
944           t = CmiWallTimer() - t;          \
945           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
946           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
947           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
948               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
949         }
950
951 #define STATS_SEND_SMSGS_TIME(x)   \
952         { double t = CmiWallTimer(); \
953           x;        \
954           t = CmiWallTimer() - t;          \
955           comm_stats.count_in_SendBufferMsg_smsg ++;        \
956           comm_stats.time_in_SendBufferMsg_smsg += t;   \
957           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
958               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
959         }
960
961 #define STATS_SENDRDMAMSG_TIME(x)   \
962         { double t = CmiWallTimer(); \
963           x;        \
964           t = CmiWallTimer() - t;          \
965           comm_stats.count_in_SendRdmaMsg ++;        \
966           comm_stats.time_in_SendRdmaMsg += t;   \
967           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
968               comm_stats.max_time_in_SendRdmaMsg = t;    \
969         }
970
971 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
972         { double t = CmiWallTimer(); \
973           x;        \
974           t = CmiWallTimer() - t;          \
975           comm_stats.count_in_PumpDatagramConnection ++;        \
976           comm_stats.time_in_PumpDatagramConnection += t;   \
977           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
978               comm_stats.max_time_in_PumpDatagramConnection = t;    \
979         }
980
981 static void print_comm_stats()
982 {
983     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
984     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
985             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
986             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
987     
988     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
989             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
990             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
991
992     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
993     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
994     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
995
996
997     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
998     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
999     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
1000     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
1001     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
1002     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
1003     if (useDynamicSMSG)
1004     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
1005
1006     fclose(counterLog);
1007 }
1008
1009 #else
1010 #define STATS_PUMPNETWORK_TIME(x)                  x
1011 #define STATS_SEND_SMSGS_TIME(x)                   x
1012 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
1013 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1014 #define STATS_SENDRDMAMSG_TIME(x)                  x
1015 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1016 #endif
1017
1018 static void
1019 allgather(void *in,void *out, int len)
1020 {
1021     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1022     int i,rc;
1023     int my_rank;
1024     char *tmp_buf,*out_ptr;
1025
1026     if(!already_called) {
1027
1028         rc = PMI_Get_size(&job_size);
1029         CmiAssert(rc == PMI_SUCCESS);
1030         rc = PMI_Get_rank(&my_rank);
1031         CmiAssert(rc == PMI_SUCCESS);
1032
1033         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1034         CmiAssert(ivec_ptr != NULL);
1035
1036         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1037         CmiAssert(rc == PMI_SUCCESS);
1038
1039         already_called = 1;
1040
1041     }
1042
1043     tmp_buf = (char *)malloc(job_size * len);
1044     CmiAssert(tmp_buf);
1045
1046     rc = PMI_Allgather(in,tmp_buf,len);
1047     CmiAssert(rc == PMI_SUCCESS);
1048
1049     out_ptr = out;
1050
1051     for(i=0;i<job_size;i++) {
1052
1053         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1054
1055     }
1056
1057     free(tmp_buf);
1058 }
1059
1060 static void
1061 allgather_2(void *in,void *out, int len)
1062 {
1063     //PMI_Allgather is out of order
1064     int i,rc, extend_len;
1065     int  rank_index;
1066     char *out_ptr, *out_ref;
1067     char *in2;
1068
1069     extend_len = sizeof(int) + len;
1070     in2 = (char*)malloc(extend_len);
1071
1072     memcpy(in2, &myrank, sizeof(int));
1073     memcpy(in2+sizeof(int), in, len);
1074
1075     out_ptr = (char*)malloc(mysize*extend_len);
1076
1077     rc = PMI_Allgather(in2, out_ptr, extend_len);
1078     GNI_RC_CHECK("allgather", rc);
1079
1080     out_ref = out;
1081
1082     for(i=0;i<mysize;i++) {
1083         //rank index 
1084         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1085         //copy to the rank index slot
1086         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1087     }
1088
1089     free(out_ptr);
1090     free(in2);
1091
1092 }
1093
1094 static unsigned int get_gni_nic_address(int device_id)
1095 {
1096     unsigned int address, cpu_id;
1097     gni_return_t status;
1098     int i, alps_dev_id=-1,alps_address=-1;
1099     char *token, *p_ptr;
1100
1101     p_ptr = getenv("PMI_GNI_DEV_ID");
1102     if (!p_ptr) {
1103         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1104        
1105         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1106     } else {
1107         while ((token = strtok(p_ptr,":")) != NULL) {
1108             alps_dev_id = atoi(token);
1109             if (alps_dev_id == device_id) {
1110                 break;
1111             }
1112             p_ptr = NULL;
1113         }
1114         CmiAssert(alps_dev_id != -1);
1115         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1116         CmiAssert(p_ptr != NULL);
1117         i = 0;
1118         while ((token = strtok(p_ptr,":")) != NULL) {
1119             if (i == alps_dev_id) {
1120                 alps_address = atoi(token);
1121                 break;
1122             }
1123             p_ptr = NULL;
1124             ++i;
1125         }
1126         CmiAssert(alps_address != -1);
1127         address = alps_address;
1128     }
1129     return address;
1130 }
1131
1132 static uint8_t get_ptag(void)
1133 {
1134     char *p_ptr, *token;
1135     uint8_t ptag;
1136
1137     p_ptr = getenv("PMI_GNI_PTAG");
1138     CmiAssert(p_ptr != NULL);
1139     token = strtok(p_ptr, ":");
1140     ptag = (uint8_t)atoi(token);
1141     return ptag;
1142         
1143 }
1144
1145 static uint32_t get_cookie(void)
1146 {
1147     uint32_t cookie;
1148     char *p_ptr, *token;
1149
1150     p_ptr = getenv("PMI_GNI_COOKIE");
1151     CmiAssert(p_ptr != NULL);
1152     token = strtok(p_ptr, ":");
1153     cookie = (uint32_t)atoi(token);
1154
1155     return cookie;
1156 }
1157
1158 #if LARGEPAGE
1159
1160 /* directly mmap memory from hugetlbfs for large pages */
1161
1162 #include <sys/stat.h>
1163 #include <fcntl.h>
1164 #include <sys/mman.h>
1165 #include <hugetlbfs.h>
1166
1167 // size must be _tlbpagesize aligned
1168 void *my_get_huge_pages(size_t size)
1169 {
1170     char filename[512];
1171     int fd;
1172     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1173     void *ptr = NULL;
1174
1175     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1176     fd = open(filename, O_RDWR | O_CREAT, mode);
1177     if (fd == -1) {
1178         CmiAbort("my_get_huge_pages: open filed");
1179     }
1180     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1181     if (ptr == MAP_FAILED) ptr = NULL;
1182 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1183     close(fd);
1184     unlink(filename);
1185     return ptr;
1186 }
1187
1188 void my_free_huge_pages(void *ptr, int size)
1189 {
1190 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1191     int ret = munmap(ptr, size);
1192     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1193 }
1194
1195 #endif
1196
1197 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1198 /* TODO: add any that are related */
1199 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1200
1201
1202 #include "machine-lrts.h"
1203 #include "machine-common-core.c"
1204
1205 /* Network progress function is used to poll the network when for
1206    messages. This flushes receive buffers on some  implementations*/
1207 #if CMK_MACHINE_PROGRESS_DEFINED
1208 void CmiMachineProgressImpl() {
1209 }
1210 #endif
1211
1212 static int SendBufferMsg(SMSG_QUEUE *queue);
1213 static void SendRdmaMsg();
1214 static void PumpNetworkSmsg();
1215 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1216 #if CQWRITE
1217 static void PumpCqWriteTransactions();
1218 #endif
1219 #if REMOTE_EVENT
1220 static void PumpRemoteTransactions();
1221 #endif
1222
1223 #if MACHINE_DEBUG_LOG
1224 FILE *debugLog = NULL;
1225 static CmiInt8 buffered_recv_msg = 0;
1226 int         lrts_smsg_success = 0;
1227 int         lrts_received_msg = 0;
1228 #endif
1229
1230 static void sweep_mempool(mempool_type *mptr)
1231 {
1232     int n = 0;
1233     block_header *current = &(mptr->block_head);
1234
1235     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1236     while( current!= NULL) {
1237         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1238         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1239     }
1240     printf("[n %d] sweep_mempool slot END.\n", myrank);
1241 }
1242
1243 inline
1244 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1245 {
1246     block_header *current = *from;
1247
1248     //while(register_memory_size>= MAX_REG_MEM)
1249     //{
1250         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1251             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1252
1253         *from = current;
1254         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1255         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1256         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1257     //}
1258     return GNI_RC_SUCCESS;
1259 }
1260
1261 inline 
1262 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1263 {
1264     gni_return_t status = GNI_RC_SUCCESS;
1265     //int size = GetMempoolsize(msg);
1266     //void *blockaddr = GetMempoolBlockPtr(msg);
1267     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1268    
1269     block_header *current = &(mptr->block_head);
1270     while(register_memory_size>= MAX_REG_MEM)
1271     {
1272         status = deregisterMemory(mptr, &current);
1273         if (status != GNI_RC_SUCCESS) break;
1274     }
1275     if(register_memory_size>= MAX_REG_MEM) return status;
1276
1277     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1278     while(1)
1279     {
1280         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1281         if(status == GNI_RC_SUCCESS)
1282         {
1283             break;
1284         }
1285         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1286         {
1287             GNI_RC_CHECK("registerFromMempool", status);
1288         }
1289         else
1290         {
1291             status = deregisterMemory(mptr, &current);
1292             if (status != GNI_RC_SUCCESS) break;
1293         }
1294     }; 
1295     return status;
1296 }
1297
1298 inline 
1299 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1300 {
1301     static int rank = -1;
1302     int i;
1303     gni_return_t status;
1304     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1305     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1306     mempool_type *mptr;
1307
1308     status = registerFromMempool(mptr1, msg, size, t, cqh);
1309     if (status == GNI_RC_SUCCESS) return status;
1310 #if CMK_SMP 
1311     for (i=0; i<CmiMyNodeSize()+1; i++) {
1312       rank = (rank+1)%(CmiMyNodeSize()+1);
1313       mptr = CpvAccessOther(mempool, rank);
1314       if (mptr == mptr1) continue;
1315       status = registerFromMempool(mptr, msg, size, t, cqh);
1316       if (status == GNI_RC_SUCCESS) return status;
1317     }
1318 #endif
1319     return  GNI_RC_ERROR_RESOURCE;
1320 }
1321
1322 inline
1323 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1324 {
1325     MSG_LIST        *msg_tmp;
1326     MallocMsgList(msg_tmp);
1327     msg_tmp->destNode = destNode;
1328     msg_tmp->size   = size;
1329     msg_tmp->msg    = msg;
1330     msg_tmp->tag    = tag;
1331 #if CMK_WITH_STATS
1332     SMSG_CREATION(msg_tmp)
1333 #endif
1334 #if !CMK_SMP
1335     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1336         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1337         queue->smsg_head_index = destNode;
1338         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1339     }else
1340     {
1341         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1342         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1343     }
1344 #else
1345 #if ONE_SEND_QUEUE
1346     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1347 #else
1348 #if SMP_LOCKS
1349     CmiLock(queue->smsg_msglist_index[destNode].lock);
1350     if(queue->smsg_msglist_index[destNode].pushed == 0)
1351     {
1352         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1353     }
1354     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1355     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1356 #else
1357     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1358 #endif
1359 #endif
1360 #endif
1361 #if PRINT_SYH
1362     buffered_smsg_counter++;
1363 #endif
1364 }
1365
1366 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1367 {
1368     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1369 }
1370
1371 inline
1372 static void setup_smsg_connection(int destNode)
1373 {
1374     mdh_addr_list_t  *new_entry = 0;
1375     gni_post_descriptor_t *pd;
1376     gni_smsg_attr_t      *smsg_attr;
1377     gni_return_t status = GNI_RC_NOT_DONE;
1378     RDMA_REQUEST        *rdma_request_msg;
1379     
1380     if(smsg_available_slot == smsg_expand_slots)
1381     {
1382         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1383         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1384         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1385
1386         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1387             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1388             GNI_MEM_READWRITE,   
1389             -1,
1390             &(new_entry->mdh));
1391         smsg_available_slot = 0; 
1392         new_entry->next = smsg_dynamic_list;
1393         smsg_dynamic_list = new_entry;
1394     }
1395     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1396     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1397     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1398     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1399     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1400     smsg_attr->buff_size = smsg_memlen;
1401     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1402     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1403     smsg_local_attr_vec[destNode] = smsg_attr;
1404     smsg_available_slot++;
1405     MallocPostDesc(pd);
1406     pd->type            = GNI_POST_FMA_PUT;
1407     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1408     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1409     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1410     pd->length          = sizeof(gni_smsg_attr_t);
1411     pd->local_addr      = (uint64_t) smsg_attr;
1412     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1413     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1414     pd->src_cq_hndl     = rdma_tx_cqh;
1415     pd->rdma_mode       = 0;
1416     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1417     print_smsg_attr(smsg_attr);
1418     if(status == GNI_RC_ERROR_RESOURCE )
1419     {
1420         MallocRdmaRequest(rdma_request_msg);
1421         rdma_request_msg->destNode = destNode;
1422         rdma_request_msg->pd = pd;
1423         /* buffer this request */
1424     }
1425 #if PRINT_SYH
1426     if(status != GNI_RC_SUCCESS)
1427        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1428     else
1429         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1430 #endif
1431 }
1432
1433 /* useDynamicSMSG */
1434 inline 
1435 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1436 {
1437     gni_return_t status = GNI_RC_NOT_DONE;
1438
1439     if(mailbox_list->offset == mailbox_list->size)
1440     {
1441         dynamic_smsg_mailbox_t *new_mailbox_entry;
1442         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1443         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1444         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1445         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1446         new_mailbox_entry->offset = 0;
1447         
1448         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1449             new_mailbox_entry->size, smsg_rx_cqh,
1450             GNI_MEM_READWRITE,   
1451             -1,
1452             &(new_mailbox_entry->mem_hndl));
1453
1454         GNI_RC_CHECK("register", status);
1455         new_mailbox_entry->next = mailbox_list;
1456         mailbox_list = new_mailbox_entry;
1457     }
1458     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1459     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1460     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1461     local_smsg_attr->mbox_offset = mailbox_list->offset;
1462     mailbox_list->offset += smsg_memlen;
1463     local_smsg_attr->buff_size = smsg_memlen;
1464     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1465     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1466 }
1467
1468 /* useDynamicSMSG */
1469 inline 
1470 static int connect_to(int destNode)
1471 {
1472     gni_return_t status = GNI_RC_NOT_DONE;
1473     CmiAssert(smsg_connected_flag[destNode] == 0);
1474     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1475     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1476     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1477     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1478     
1479     CMI_GNI_LOCK(global_gni_lock)
1480     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1481     CMI_GNI_UNLOCK(global_gni_lock)
1482     if (status == GNI_RC_ERROR_RESOURCE) {
1483       /* possibly destNode is making connection at the same time */
1484       free(smsg_attr_vector_local[destNode]);
1485       smsg_attr_vector_local[destNode] = NULL;
1486       free(smsg_attr_vector_remote[destNode]);
1487       smsg_attr_vector_remote[destNode] = NULL;
1488       mailbox_list->offset -= smsg_memlen;
1489 #if PRINT_SYH
1490     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1491 #endif
1492       return 0;
1493     }
1494     GNI_RC_CHECK("GNI_Post", status);
1495     smsg_connected_flag[destNode] = 1;
1496 #if PRINT_SYH
1497     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1498 #endif
1499     return 1;
1500 }
1501
1502 inline 
1503 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1504 {
1505     unsigned int          remote_address;
1506     uint32_t              remote_id;
1507     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1508     gni_smsg_attr_t       *smsg_attr;
1509     gni_post_descriptor_t *pd;
1510     gni_post_state_t      post_state;
1511     char                  *real_data; 
1512
1513     if (useDynamicSMSG) {
1514         switch (smsg_connected_flag[destNode]) {
1515         case 0: 
1516             connect_to(destNode);         /* continue to case 1 */
1517         case 1:                           /* pending connection, do nothing */
1518             status = GNI_RC_NOT_DONE;
1519             if(inbuff ==0)
1520                 buffer_small_msgs(queue, msg, size, destNode, tag);
1521             return status;
1522         }
1523     }
1524 #if CMK_SMP
1525 #if ! ONE_SEND_QUEUE
1526     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1527 #endif
1528     {
1529 #else
1530     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1531     {
1532 #endif
1533         //CMI_GNI_LOCK(smsg_mailbox_lock)
1534         CMI_GNI_LOCK(default_tx_cq_lock)
1535 #if CMK_SMP_TRACE_COMMTHREAD
1536         int oldpe = -1;
1537         int oldeventid = -1;
1538         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1539         { 
1540             START_EVENT();
1541             if ( tag == SMALL_DATA_TAG)
1542                 real_data = (char*)msg; 
1543             else 
1544                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1545             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1546             TRACE_COMM_SET_COMM_MSGID(real_data);
1547         }
1548 #endif
1549 #if REMOTE_EVENT
1550         if (tag == LMSG_INIT_TAG) {
1551             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1552             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1553                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1554         }
1555         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1556 #endif
1557 #if     CMK_WITH_STATS
1558         SMSG_TRY_SEND(tag)
1559 #endif
1560 #if CMK_WITH_STATS
1561     double              creation_time;
1562     if (ptr == NULL)
1563         creation_time = CmiWallTimer();
1564     else
1565         creation_time = ptr->creation_time;
1566 #endif
1567
1568     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1569 #if CMK_SMP_TRACE_COMMTHREAD
1570         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1571 #endif
1572         CMI_GNI_UNLOCK(default_tx_cq_lock)
1573         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1574         if(status == GNI_RC_SUCCESS)
1575         {
1576 #if     CMK_WITH_STATS
1577             SMSG_SENT_DONE(creation_time,tag) 
1578 #endif
1579 #if CMK_SMP_TRACE_COMMTHREAD
1580             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1581             { 
1582                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1583             }
1584 #endif
1585         }else
1586             status = GNI_RC_ERROR_RESOURCE;
1587     }
1588     if(status != GNI_RC_SUCCESS && inbuff ==0)
1589         buffer_small_msgs(queue, msg, size, destNode, tag);
1590     return status;
1591 }
1592
1593 inline 
1594 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1595 {
1596     /* construct a control message and send */
1597     CONTROL_MSG         *control_msg_tmp;
1598     MallocControlMsg(control_msg_tmp);
1599     control_msg_tmp->source_addr = (uint64_t)msg;
1600     control_msg_tmp->seq_id    = seqno;
1601     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1602 #if REMOTE_EVENT
1603     control_msg_tmp->ack_index    =  -1;
1604 #endif
1605 #if     USE_LRTS_MEMPOOL
1606     if(size < BIG_MSG)
1607     {
1608         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1609     }
1610     else
1611     {
1612         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1613         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1614         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1615     }
1616 #else
1617     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1618 #endif
1619     return control_msg_tmp;
1620 }
1621
1622 #define BLOCKING_SEND_CONTROL    0
1623
1624 // Large message, send control to receiver, receiver register memory and do a GET, 
1625 // return 1 - send no success
1626 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1627 {
1628     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1629     uint32_t            vmdh_index  = -1;
1630     int                 size;
1631     int                 offset = 0;
1632     uint64_t            source_addr;
1633     int                 register_size; 
1634     void                *msg;
1635
1636     size    =   control_msg_tmp->total_length;
1637     source_addr = control_msg_tmp->source_addr;
1638     register_size = control_msg_tmp->length;
1639
1640 #if  USE_LRTS_MEMPOOL
1641     if( control_msg_tmp->seq_id == 0 ){
1642 #if BLOCKING_SEND_CONTROL
1643         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1644             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1645                 LrtsAdvanceCommunication(0);
1646         }
1647 #endif
1648         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1649         {
1650             msg = (void*)source_addr;
1651             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1652             {
1653                 if(!inbuff)
1654                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1655                 return GNI_RC_ERROR_NOMEM;
1656             }
1657             //register the corresponding mempool
1658             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1659             if(status == GNI_RC_SUCCESS)
1660             {
1661                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1662             }
1663         }else
1664         {
1665             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1666             status = GNI_RC_SUCCESS;
1667         }
1668         if(NoMsgInSend(source_addr))
1669             register_size = GetMempoolsize((void*)(source_addr));
1670         else
1671             register_size = 0;
1672     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1673     {
1674         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1675         source_addr += offset;
1676         size = control_msg_tmp->length;
1677 #if BLOCKING_SEND_CONTROL
1678         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1679             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1680                 LrtsAdvanceCommunication(0);
1681         }
1682 #endif
1683         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1684             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1685             {
1686                 if(!inbuff)
1687                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1688                 return GNI_RC_ERROR_NOMEM;
1689             }
1690             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1691             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1692         }
1693         else
1694         {
1695             status = GNI_RC_SUCCESS;
1696         }
1697         register_size = 0;  
1698     }
1699
1700 #if CMI_EXERT_SEND_LARGE_CAP
1701     if(SEND_large_pending >= SEND_large_cap)
1702     {
1703         status = GNI_RC_ERROR_NOMEM;
1704     }
1705 #endif
1706  
1707     if(status == GNI_RC_SUCCESS)
1708     {
1709        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1710         if(status == GNI_RC_SUCCESS)
1711         {
1712 #if CMI_EXERT_SEND_LARGE_CAP
1713             SEND_large_pending++;
1714 #endif
1715             buffered_send_msg += register_size;
1716             if(control_msg_tmp->seq_id == 0)
1717             {
1718                 IncreaseMsgInSend(source_addr);
1719             }
1720             FreeControlMsg(control_msg_tmp);
1721             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1722         }else
1723             status = GNI_RC_ERROR_RESOURCE;
1724
1725     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1726     {
1727         CmiAbort("Memory registor for large msg\n");
1728     }else 
1729     {
1730         status = GNI_RC_ERROR_NOMEM; 
1731         if(!inbuff)
1732             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1733     }
1734     return status;
1735 #else
1736     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1737     if(status == GNI_RC_SUCCESS)
1738     {
1739         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1740         if(status == GNI_RC_SUCCESS)
1741         {
1742             FreeControlMsg(control_msg_tmp);
1743         }
1744     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1745     {
1746         CmiAbort("Memory registor for large msg\n");
1747     }else 
1748     {
1749         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1750     }
1751     return status;
1752 #endif
1753 }
1754
1755 inline void LrtsPrepareEnvelope(char *msg, int size)
1756 {
1757     CmiSetMsgSize(msg, size);
1758     CMI_SET_CHECKSUM(msg, size);
1759 }
1760
1761 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1762 {
1763     gni_return_t        status  =   GNI_RC_SUCCESS;
1764     uint8_t tag;
1765     CONTROL_MSG         *control_msg_tmp;
1766     int                 oob = ( mode & OUT_OF_BAND);
1767     SMSG_QUEUE          *queue;
1768
1769     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1770 #if CMK_USE_OOB
1771     queue = oob? &smsg_oob_queue : &smsg_queue;
1772 #else
1773     queue = &smsg_queue;
1774 #endif
1775
1776     LrtsPrepareEnvelope(msg, size);
1777
1778 #if PRINT_SYH
1779     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1780 #endif 
1781 #if CMK_SMP 
1782     if(size <= SMSG_MAX_MSG)
1783         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1784     else if (size < BIG_MSG) {
1785         control_msg_tmp =  construct_control_msg(size, msg, 0);
1786         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1787     }
1788     else {
1789           CmiSetMsgSeq(msg, 0);
1790           control_msg_tmp =  construct_control_msg(size, msg, 1);
1791           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1792     }
1793 #else   //non-smp, smp(worker sending)
1794     if(size <= SMSG_MAX_MSG)
1795     {
1796         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1797             CmiFree(msg);
1798     }
1799     else if (size < BIG_MSG) {
1800         control_msg_tmp =  construct_control_msg(size, msg, 0);
1801         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1802     }
1803     else {
1804 #if     USE_LRTS_MEMPOOL
1805         CmiSetMsgSeq(msg, 0);
1806         control_msg_tmp =  construct_control_msg(size, msg, 1);
1807         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1808 #else
1809         control_msg_tmp =  construct_control_msg(size, msg, 0);
1810         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1811 #endif
1812     }
1813 #endif
1814     return 0;
1815 }
1816
1817 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1818 {
1819   int i;
1820 #if CMK_BROADCAST_USE_CMIREFERENCE
1821   for(i=0;i<npes;i++) {
1822     if (pes[i] == CmiMyPe())
1823       CmiSyncSend(pes[i], len, msg);
1824     else {
1825       CmiReference(msg);
1826       CmiSyncSendAndFree(pes[i], len, msg);
1827     }
1828   }
1829 #else
1830   for(i=0;i<npes;i++) {
1831     CmiSyncSend(pes[i], len, msg);
1832   }
1833 #endif
1834 }
1835
1836 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1837 {
1838   /* A better asynchronous implementation may be wanted, but at least it works */
1839   CmiSyncListSendFn(npes, pes, len, msg);
1840   return (CmiCommHandle) 0;
1841 }
1842
1843 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1844 {
1845   if (npes == 1) {
1846       CmiSyncSendAndFree(pes[0], len, msg);
1847       return;
1848   }
1849 #if CMK_PERSISTENT_COMM
1850   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE) {
1851       int i;
1852       for(i=0;i<npes;i++) {
1853         if (pes[i] == CmiMyPe())
1854           CmiSyncSend(pes[i], len, msg);
1855         else {
1856           CmiReference(msg);
1857           CmiSyncSendAndFree(pes[i], len, msg);
1858         }
1859       }
1860       CmiFree(msg);
1861       return;
1862   }
1863 #endif
1864   
1865 #if CMK_BROADCAST_USE_CMIREFERENCE
1866   CmiSyncListSendFn(npes, pes, len, msg);
1867   CmiFree(msg);
1868 #else
1869   int i;
1870   for(i=0;i<npes-1;i++) {
1871     CmiSyncSend(pes[i], len, msg);
1872   }
1873   if (npes>0)
1874     CmiSyncSendAndFree(pes[npes-1], len, msg);
1875   else 
1876     CmiFree(msg);
1877 #endif
1878 }
1879
1880 static void    PumpDatagramConnection();
1881 static      int         event_SetupConnect = 111;
1882 static      int         event_PumpSmsg = 222 ;
1883 static      int         event_PumpTransaction = 333;
1884 static      int         event_PumpRdmaTransaction = 444;
1885 static      int         event_SendBufferSmsg = 444;
1886 static      int         event_SendFmaRdmaMsg = 555;
1887 static      int         event_AdvanceCommunication = 666;
1888
1889 static void registerUserTraceEvents() {
1890 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1891     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1892     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1893     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1894     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1895     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1896     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1897     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1898 #endif
1899 }
1900
1901 static void ProcessDeadlock()
1902 {
1903     static CmiUInt8 *ptr = NULL;
1904     static CmiUInt8  last = 0, mysum, sum;
1905     static int count = 0;
1906     gni_return_t status;
1907     int i;
1908
1909 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1910 //sweep_mempool(CpvAccess(mempool));
1911     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1912     mysum = smsg_send_count + smsg_recv_count;
1913     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1914     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1915     GNI_RC_CHECK("PMI_Allgather", status);
1916     sum = 0;
1917     for (i=0; i<mysize; i++)  sum+= ptr[i];
1918     if (last == 0 || sum == last) 
1919         count++;
1920     else
1921         count = 0;
1922     last = sum;
1923     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1924     if (count == 2) { 
1925         /* detected twice, it is a real deadlock */
1926         if (myrank == 0)  {
1927             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1928             CmiAbort("Fatal> Deadlock detected.");
1929         }
1930
1931     }
1932     _detected_hang = 0;
1933 }
1934
1935 static void CheckProgress()
1936 {
1937     if (smsg_send_count == last_smsg_send_count &&
1938         smsg_recv_count == last_smsg_recv_count ) 
1939     {
1940         _detected_hang = 1;
1941 #if !CMK_SMP
1942         if (_detected_hang) ProcessDeadlock();
1943 #endif
1944
1945     }
1946     else {
1947         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1948         last_smsg_send_count = smsg_send_count;
1949         last_smsg_recv_count = smsg_recv_count;
1950         _detected_hang = 0;
1951     }
1952 }
1953
1954 static void set_limit()
1955 {
1956     //if (!user_set_flag && CmiMyRank() == 0) {
1957     if (CmiMyRank() == 0) {
1958         int mynode = CmiPhysicalNodeID(CmiMyPe());
1959         int numpes = CmiNumPesOnPhysicalNode(mynode);
1960         int numprocesses = numpes / CmiMyNodeSize();
1961         MAX_REG_MEM  = _totalmem / numprocesses;
1962         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1963         if (CmiMyPe() == 0)
1964            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1965         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1966         {
1967              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1968              CmiAbort("memory registration\n");
1969         }
1970     }
1971 }
1972
1973 void LrtsPostCommonInit(int everReturn)
1974 {
1975 #if CMK_DIRECT
1976     CmiDirectInit();
1977 #endif
1978 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1979     CpvInitialize(double, projTraceStart);
1980     /* only PE 0 needs to care about registration (to generate sts file). */
1981     //if (CmiMyPe() == 0) 
1982     {
1983         registerMachineUserEventsFunction(&registerUserTraceEvents);
1984     }
1985 #endif
1986
1987 #if CMK_SMP
1988     CmiIdleState *s=CmiNotifyGetState();
1989     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1990     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1991 #else
1992     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1993     if (useDynamicSMSG)
1994     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1995 #endif
1996
1997 #if ! LARGEPAGE
1998     if (_checkProgress)
1999 #if CMK_SMP
2000     if (CmiMyRank() == 0)
2001 #endif
2002     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
2003 #endif
2004  
2005 #if !LARGEPAGE
2006     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
2007 #endif
2008 }
2009
2010 /* this is called by worker thread */
2011 void LrtsPostNonLocal(){
2012 #if CMK_SMP_TRACE_COMMTHREAD
2013     double startT, endT;
2014 #endif
2015 #if MULTI_THREAD_SEND
2016     if(mysize == 1) return;
2017 #if CMK_SMP_TRACE_COMMTHREAD
2018     traceEndIdle();
2019 #endif
2020
2021 #if CMK_SMP_TRACE_COMMTHREAD
2022     startT = CmiWallTimer();
2023 #endif
2024
2025 #if CMK_WORKER_SINGLE_TASK
2026     if (CmiMyRank() % 6 == 0)
2027 #endif
2028     PumpNetworkSmsg();
2029
2030 #if CMK_WORKER_SINGLE_TASK
2031     if (CmiMyRank() % 6 == 1)
2032 #endif
2033     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2034
2035 #if CMK_WORKER_SINGLE_TASK
2036     if (CmiMyRank() % 6 == 2)
2037 #endif
2038     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2039
2040 #if REMOTE_EVENT
2041 #if CMK_WORKER_SINGLE_TASK
2042     if (CmiMyRank() % 6 == 3)
2043 #endif
2044     PumpRemoteTransactions();
2045 #endif
2046
2047 #if CMK_WORKER_SINGLE_TASK
2048     if (CmiMyRank() % 6 == 4)
2049 #endif
2050 #if CMK_USE_OOB
2051     if (SendBufferMsg(&smsg_oob_queue) == 1)
2052 #endif
2053     {
2054         SendBufferMsg(&smsg_queue);
2055     }
2056
2057 #if CMK_WORKER_SINGLE_TASK
2058     if (CmiMyRank() % 6 == 5)
2059 #endif
2060     SendRdmaMsg();
2061
2062 #if CMK_SMP_TRACE_COMMTHREAD
2063     endT = CmiWallTimer();
2064     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2065 #endif
2066 #if CMK_SMP_TRACE_COMMTHREAD
2067     traceBeginIdle();
2068 #endif
2069 #endif
2070 }
2071
2072 /* useDynamicSMSG */
2073 static void    PumpDatagramConnection()
2074 {
2075     uint32_t          remote_address;
2076     uint32_t          remote_id;
2077     gni_return_t status;
2078     gni_post_state_t  post_state;
2079     uint64_t          datagram_id;
2080     int i;
2081
2082    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2083    {
2084        if (datagram_id >= mysize) {           /* bound endpoint */
2085            int pe = datagram_id - mysize;
2086            CMI_GNI_LOCK(global_gni_lock)
2087            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2088            CMI_GNI_UNLOCK(global_gni_lock)
2089            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2090            {
2091                CmiAssert(remote_id == pe);
2092                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2093                GNI_RC_CHECK("Dynamic SMSG Init", status);
2094 #if PRINT_SYH
2095                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2096 #endif
2097                CmiAssert(smsg_connected_flag[pe] == 1);
2098                smsg_connected_flag[pe] = 2;
2099            }
2100        }
2101        else {         /* unbound ep */
2102            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2103            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2104            {
2105                CmiAssert(remote_id<mysize);
2106                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2107                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2108                GNI_RC_CHECK("Dynamic SMSG Init", status);
2109 #if PRINT_SYH
2110                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2111 #endif
2112                smsg_connected_flag[remote_id] = 2;
2113
2114                alloc_smsg_attr(&send_smsg_attr);
2115                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2116                GNI_RC_CHECK("post unbound datagram", status);
2117            }
2118        }
2119    }
2120 }
2121
2122 /* pooling CQ to receive network message */
2123 static void PumpNetworkRdmaMsgs()
2124 {
2125     gni_cq_entry_t      event_data;
2126     gni_return_t        status;
2127
2128 }
2129
2130 inline 
2131 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
2132 {
2133     RDMA_REQUEST        *rdma_request_msg;
2134     MallocRdmaRequest(rdma_request_msg);
2135     rdma_request_msg->destNode = inst_id;
2136     rdma_request_msg->pd = pd;
2137 #if REMOTE_EVENT
2138     rdma_request_msg->ack_index = ack_index;
2139 #endif
2140 #if CMK_SMP
2141     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2142 #else
2143     if(sendRdmaBuf == 0)
2144     {
2145         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2146     }else{
2147         sendRdmaTail->next = rdma_request_msg;
2148         sendRdmaTail =  rdma_request_msg;
2149     }
2150 #endif
2151
2152 }
2153
2154 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2155
2156 static void PumpNetworkSmsg()
2157 {
2158     uint64_t            inst_id;
2159     int                 ret;
2160     gni_cq_entry_t      event_data;
2161     gni_return_t        status, status2;
2162     void                *header;
2163     uint8_t             msg_tag;
2164     int                 msg_nbytes;
2165     void                *msg_data;
2166     gni_mem_handle_t    msg_mem_hndl;
2167     gni_smsg_attr_t     *smsg_attr;
2168     gni_smsg_attr_t     *remote_smsg_attr;
2169     int                 init_flag;
2170     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2171     uint64_t            source_addr;
2172     SMSG_QUEUE         *queue = &smsg_queue;
2173 #if   CMK_DIRECT
2174     cmidirectMsg        *direct_msg;
2175 #endif
2176 #if CMI_PUMPNETWORKSMSG_CAP 
2177     int                  recv_cnt = 0;
2178     while(recv_cnt< PumpNetworkSmsg_cap) {
2179 #else
2180     while(1) {
2181 #endif
2182         CMI_GNI_LOCK(smsg_rx_cq_lock)
2183         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2184         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2185         if(status != GNI_RC_SUCCESS)
2186             break;
2187         inst_id = GNI_CQ_GET_INST_ID(event_data);
2188 #if REMOTE_EVENT
2189         inst_id = GET_RANK(inst_id);      /* important */
2190 #endif
2191         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2192 #if PRINT_SYH
2193         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2194 #endif
2195         if (useDynamicSMSG) {
2196             /* subtle: smsg may come before connection is setup */
2197             while (smsg_connected_flag[inst_id] != 2) 
2198                PumpDatagramConnection();
2199         }
2200         msg_tag = GNI_SMSG_ANY_TAG;
2201         while(1) {
2202             CMI_GNI_LOCK(smsg_mailbox_lock)
2203             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2204             if (status != GNI_RC_SUCCESS)
2205             {
2206                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2207                 break;
2208             }
2209 #if         CMI_PUMPNETWORKSMSG_CAP
2210             recv_cnt++; 
2211 #endif
2212 #if PRINT_SYH
2213             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2214 #endif
2215             /* copy msg out and then put into queue (small message) */
2216             switch (msg_tag) {
2217             case SMALL_DATA_TAG:
2218             {
2219                 START_EVENT();
2220                 msg_nbytes = CmiGetMsgSize(header);
2221                 msg_data    = CmiAlloc(msg_nbytes);
2222                 memcpy(msg_data, (char*)header, msg_nbytes);
2223                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2224                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2225                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
2226                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2227                 handleOneRecvedMsg(msg_nbytes, msg_data);
2228                 break;
2229             }
2230             case LMSG_INIT_TAG:
2231             {
2232 #if MULTI_THREAD_SEND
2233                 MallocControlMsg(control_msg_tmp);
2234                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2235                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2236                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2237                 getLargeMsgRequest(control_msg_tmp, inst_id);
2238                 FreeControlMsg(control_msg_tmp);
2239 #else
2240                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2241                 getLargeMsgRequest(header, inst_id);
2242                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2243 #endif
2244                 break;
2245             }
2246 #if !REMOTE_EVENT && !CQWRITE
2247             case ACK_TAG:   //msg fit into mempool
2248             {
2249                 /* Get is done, release message . Now put is not used yet*/
2250                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2251                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2252                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2253 #if ! USE_LRTS_MEMPOOL
2254                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2255 #else
2256                 DecreaseMsgInSend(msg);
2257 #endif
2258                 if(NoMsgInSend(msg))
2259                     buffered_send_msg -= GetMempoolsize(msg);
2260                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2261                 CmiFree(msg);
2262 #if CMI_EXERT_SEND_LARGE_CAP
2263                 SEND_large_pending--;
2264 #endif
2265                 break;
2266             }
2267 #endif
2268             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2269             {
2270 #if MULTI_THREAD_SEND
2271                 MallocControlMsg(header_tmp);
2272                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2273                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2274 #else
2275                 header_tmp = (CONTROL_MSG *) header;
2276 #endif
2277                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2278 #if CMI_EXERT_SEND_LARGE_CAP
2279                     SEND_large_pending--;
2280 #endif
2281                 void *msg = (void*)(header_tmp->source_addr);
2282                 int cur_seq = CmiGetMsgSeq(msg);
2283                 int offset = ONE_SEG*(cur_seq+1);
2284                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2285                 buffered_send_msg -= header_tmp->length;
2286                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2287                 if (remain_size < 0) remain_size = 0;
2288                 CmiSetMsgSize(msg, remain_size);
2289                 if(remain_size <= 0) //transaction done
2290                 {
2291                     CmiFree(msg);
2292                 }else if (header_tmp->total_length > offset)
2293                 {
2294                     CmiSetMsgSeq(msg, cur_seq+1);
2295                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2296                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2297                     //send next seg
2298                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2299                          // pipelining
2300                     if (header_tmp->seq_id == 1) {
2301                       int i;
2302                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2303                         int seq = cur_seq+i+2;
2304                         CmiSetMsgSeq(msg, seq-1);
2305                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2306                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2307                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2308                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2309                       }
2310                     }
2311                 }
2312 #if MULTI_THREAD_SEND
2313                 FreeControlMsg(header_tmp);
2314 #else
2315                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2316 #endif
2317                 break;
2318             }
2319 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2320             case PUT_DONE_TAG:  {   //persistent message
2321                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2322                 int size = ((CONTROL_MSG *) header)->length;
2323                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2324                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2325                 CmiReference(msg);
2326                 CMI_CHECK_CHECKSUM(msg, size);
2327                 handleOneRecvedMsg(size, msg); 
2328 #if PRINT_SYH
2329                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2330 #endif
2331                 break;
2332             }
2333 #endif
2334 #if CMK_DIRECT
2335             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2336                 //create a trigger message
2337                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2338                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2339                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2340                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2341                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2342                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2343                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2344                 break;
2345 #endif
2346             default:
2347                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2348                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2349                 printf("weird tag problem\n");
2350                 CmiAbort("Unknown tag\n");
2351             }               // end switch
2352 #if PRINT_SYH
2353             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2354 #endif
2355             smsg_recv_count ++;
2356             msg_tag = GNI_SMSG_ANY_TAG;
2357         } //endwhile GNI_SmsgGetNextWTag
2358     }   //end while GetEvent
2359     if(status == GNI_RC_ERROR_RESOURCE)
2360     {
2361         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2362         GNI_RC_CHECK("Smsg_rx_cq full", status);
2363     }
2364 }
2365
2366 static void printDesc(gni_post_descriptor_t *pd)
2367 {
2368     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2369 }
2370
2371 #if CQWRITE
2372 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2373 {
2374     gni_post_descriptor_t *pd;
2375     gni_return_t        status = GNI_RC_SUCCESS;
2376     
2377     MallocPostDesc(pd);
2378     pd->type = GNI_POST_CQWRITE;
2379     pd->cq_mode = GNI_CQMODE_SILENT;
2380     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2381     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2382     pd->cqwrite_value = data;
2383     pd->remote_mem_hndl = mem_hndl;
2384     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2385     GNI_RC_CHECK("GNI_PostCqWrite", status);
2386 }
2387 #endif
2388
2389 // register memory for a message
2390 // return mem handle
2391 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2392 {
2393     gni_return_t status = GNI_RC_SUCCESS;
2394
2395     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2396
2397 #if CMK_PERSISTENT_COMM
2398       // persistent message is always registered
2399       // BIG_MSG small pieces do not have malloc chunk header
2400     if (IS_PERSISTENT_MEMORY(msg)) {
2401         *memh = GetMemHndl(msg);
2402         return GNI_RC_SUCCESS;
2403     }
2404 #endif
2405     if(seqno == 0 
2406 #if CMK_PERSISTENT_COMM
2407          || seqno == PERSIST_SEQ
2408 #endif
2409       )
2410     {
2411         if(IsMemHndlZero((GetMemHndl(msg))))
2412         {
2413             msg = (void*)(msg);
2414             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2415             if(status == GNI_RC_SUCCESS)
2416                 *memh = GetMemHndl(msg);
2417         }
2418         else {
2419             *memh = GetMemHndl(msg);
2420         }
2421     }
2422     else {
2423         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2424         status = registerMemory(msg, size, memh, NULL); 
2425     }
2426     return status;
2427 }
2428
2429 // for BIG_MSG called on receiver side for receiving control message
2430 // LMSG_INIT_TAG
2431 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2432 {
2433 #if     USE_LRTS_MEMPOOL
2434     CONTROL_MSG         *request_msg;
2435     gni_return_t        status = GNI_RC_SUCCESS;
2436     void                *msg_data;
2437     gni_post_descriptor_t *pd;
2438     gni_mem_handle_t    msg_mem_hndl;
2439     int                 size, transaction_size, offset = 0;
2440     size_t              register_size = 0;
2441
2442     // initial a get to transfer data from the sender side */
2443     request_msg = (CONTROL_MSG *) header;
2444     size = request_msg->total_length;
2445     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2446     MallocPostDesc(pd);
2447 #if CMK_WITH_STATS 
2448     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2449 #endif
2450     if(request_msg->seq_id < 2)   {
2451 #if CMK_SMP_TRACE_COMMTHREAD 
2452         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2453 #endif
2454         msg_data = CmiAlloc(size);
2455         CmiSetMsgSeq(msg_data, 0);
2456         _MEMCHECK(msg_data);
2457     }
2458     else {
2459         offset = ONE_SEG*(request_msg->seq_id-1);
2460         msg_data = (char*)request_msg->dest_addr + offset;
2461     }
2462    
2463     pd->cqwrite_value = request_msg->seq_id;
2464
2465     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2466     SetMemHndlZero(pd->local_mem_hndl);
2467     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2468     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2469         if(NoMsgInRecv( (void*)(msg_data)))
2470             register_size = GetMempoolsize((void*)(msg_data));
2471     }
2472
2473     pd->first_operand = ALIGN64(size);                   //  total length
2474
2475     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2476         pd->type            = GNI_POST_FMA_GET;
2477     else
2478         pd->type            = GNI_POST_RDMA_GET;
2479     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2480     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2481     pd->length          = transaction_size;
2482     pd->local_addr      = (uint64_t) msg_data;
2483     pd->remote_addr     = request_msg->source_addr + offset;
2484     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2485     pd->src_cq_hndl     = rdma_tx_cqh;
2486     pd->rdma_mode       = 0;
2487     pd->amo_cmd         = 0;
2488 #if CMI_EXERT_RECV_RDMA_CAP
2489     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2490 #endif
2491     //memory registration success
2492     if(status == GNI_RC_SUCCESS )
2493     {
2494         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2495         CMI_GNI_LOCK(lock)
2496 #if REMOTE_EVENT
2497         if( request_msg->seq_id == 0)
2498         {
2499             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2500             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2501             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2502         }
2503 #endif
2504
2505 #if CMK_WITH_STATS
2506         RDMA_TRY_SEND(pd->type)
2507 #endif
2508         if(pd->type == GNI_POST_RDMA_GET) 
2509         {
2510             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2511         }
2512         else
2513         {
2514             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2515         }
2516         CMI_GNI_UNLOCK(lock)
2517
2518         if(status == GNI_RC_SUCCESS )
2519         {
2520 #if CMI_EXERT_RECV_RDMA_CAP
2521             RDMA_pending++;
2522 #endif
2523             if(pd->cqwrite_value == 0)
2524             {
2525 #if MACHINE_DEBUG_LOG
2526                 buffered_recv_msg += register_size;
2527                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2528 #endif
2529                 IncreaseMsgInRecv(msg_data);
2530 #if CMK_SMP_TRACE_COMMTHREAD 
2531                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2532 #endif
2533             }
2534 #if  CMK_WITH_STATS
2535             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2536             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2537 #endif
2538         }
2539     }else
2540     {
2541         SetMemHndlZero((pd->local_mem_hndl));
2542     }
2543     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2544     {
2545 #if REMOTE_EVENT
2546         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2547 #else
2548         bufferRdmaMsg(inst_id, pd, -1); 
2549 #endif
2550     }else if (status != GNI_RC_SUCCESS) {
2551         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2552         GNI_RC_CHECK("GetLargeAFter posting", status);
2553     }
2554 #else
2555     CONTROL_MSG         *request_msg;
2556     gni_return_t        status;
2557     void                *msg_data;
2558     gni_post_descriptor_t *pd;
2559     RDMA_REQUEST        *rdma_request_msg;
2560     gni_mem_handle_t    msg_mem_hndl;
2561     //int source;
2562     // initial a get to transfer data from the sender side */
2563     request_msg = (CONTROL_MSG *) header;
2564     msg_data = CmiAlloc(request_msg->length);
2565     _MEMCHECK(msg_data);
2566
2567     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2568
2569     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2570     {
2571         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2572     }
2573
2574     MallocPostDesc(pd);
2575     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2576         pd->type            = GNI_POST_FMA_GET;
2577     else
2578         pd->type            = GNI_POST_RDMA_GET;
2579     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2580     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2581     pd->length          = ALIGN64(request_msg->length);
2582     pd->local_addr      = (uint64_t) msg_data;
2583     pd->remote_addr     = request_msg->source_addr;
2584     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2585     pd->src_cq_hndl     = rdma_tx_cqh;
2586     pd->rdma_mode       = 0;
2587     pd->amo_cmd         = 0;
2588
2589     //memory registration successful
2590     if(status == GNI_RC_SUCCESS)
2591     {
2592         pd->local_mem_hndl  = msg_mem_hndl;
2593        
2594         if(pd->type == GNI_POST_RDMA_GET) 
2595         {
2596             CMI_GNI_LOCK(rdma_tx_cq_lock)
2597             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2598             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2599         }
2600         else
2601         {
2602             CMI_GNI_LOCK(default_tx_cq_lock)
2603             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2604             CMI_GNI_UNLOCK(default_tx_cq_lock)
2605         }
2606
2607     }else
2608     {
2609         SetMemHndlZero(pd->local_mem_hndl);
2610     }
2611     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2612     {
2613         MallocRdmaRequest(rdma_request_msg);
2614         rdma_request_msg->next = 0;
2615         rdma_request_msg->destNode = inst_id;
2616         rdma_request_msg->pd = pd;
2617         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2618     }else {
2619         GNI_RC_CHECK("AFter posting", status);
2620     }
2621 #endif
2622 }
2623
2624 #if CQWRITE
2625 static void PumpCqWriteTransactions()
2626 {
2627
2628     gni_cq_entry_t          ev;
2629     gni_return_t            status;
2630     void                    *msg;  
2631     int                     msg_size;
2632     while(1) {
2633         //CMI_GNI_LOCK(my_cq_lock) 
2634         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2635         //CMI_GNI_UNLOCK(my_cq_lock)
2636         if(status != GNI_RC_SUCCESS) break;
2637         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2638 #if CMK_PERSISTENT_COMM
2639 #if PRINT_SYH
2640         printf(" %d CQ write event %p\n", myrank, msg);
2641 #endif
2642         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2643 #if PRINT_SYH
2644             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2645 #endif
2646             CmiReference(msg);
2647             msg_size = CmiGetMsgSize(msg);
2648             CMI_CHECK_CHECKSUM(msg, msg_size);
2649             handleOneRecvedMsg(msg_size, msg); 
2650             continue;
2651         }
2652 #endif
2653 #if ! USE_LRTS_MEMPOOL
2654        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2655 #else
2656         DecreaseMsgInSend(msg);
2657 #endif
2658         if(NoMsgInSend(msg))
2659             buffered_send_msg -= GetMempoolsize(msg);
2660         CmiFree(msg);
2661     };
2662     if(status == GNI_RC_ERROR_RESOURCE)
2663     {
2664         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2665     }
2666 }
2667 #endif
2668
2669 #if REMOTE_EVENT
2670 static void PumpRemoteTransactions()
2671 {
2672     gni_cq_entry_t          ev;
2673     gni_return_t            status;
2674     void                    *msg;   
2675     int                     inst_id, index, type, size;
2676
2677     while(1) {
2678         CMI_GNI_LOCK(rdma_tx_cq_lock)
2679 //        CMI_GNI_LOCK(global_gni_lock)
2680         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2681 //        CMI_GNI_UNLOCK(global_gni_lock)
2682         CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2683
2684         if(status != GNI_RC_SUCCESS) break;
2685
2686         inst_id = GNI_CQ_GET_INST_ID(ev);
2687         index = GET_INDEX(inst_id);
2688         type = GET_TYPE(inst_id);
2689         switch (type) {
2690         case 0:    // ACK
2691             CmiAssert(index>=0 && index<ackPool.size);
2692             CMI_GNI_LOCK(ackPool.lock);
2693             CmiAssert(GetIndexType(ackPool, index) == 1);
2694             msg = GetIndexAddress(ackPool, index);
2695             CMI_GNI_UNLOCK(ackPool.lock);
2696 #if PRINT_SYH
2697             printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2698 #endif
2699 #if ! USE_LRTS_MEMPOOL
2700            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2701 #else
2702             DecreaseMsgInSend(msg);
2703 #endif
2704             if(NoMsgInSend(msg))
2705                 buffered_send_msg -= GetMempoolsize(msg);
2706             CmiFree(msg);
2707             IndexPool_freeslot(&ackPool, index);
2708 #if CMI_EXERT_SEND_LARGE_CAP
2709             SEND_large_pending--;
2710 #endif
2711             break;
2712 #if CMK_PERSISTENT_COMM
2713         case 1:  {    // PERSISTENT
2714             CmiLock(persistPool.lock);
2715             CmiAssert(GetIndexType(persistPool, index) == 2);
2716             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2717             CmiUnlock(persistPool.lock);
2718             START_EVENT();
2719             msg = slot->destBuf[0].destAddress;
2720             size = CmiGetMsgSize(msg);
2721             CmiReference(msg);
2722             CMI_CHECK_CHECKSUM(msg, size);
2723             TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2724             handleOneRecvedMsg(size, msg); 
2725             break;
2726             }
2727 #endif
2728         default:
2729             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2730             CmiAbort("PumpRemoteTransactions: unknown type");
2731         }
2732     }
2733     if(status == GNI_RC_ERROR_RESOURCE)
2734     {
2735         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2736     }
2737 }
2738 #endif
2739
2740 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2741 {
2742     gni_cq_entry_t          ev;
2743     gni_return_t            status;
2744     uint64_t                type, inst_id;
2745     gni_post_descriptor_t   *tmp_pd;
2746     MSG_LIST                *ptr;
2747     CONTROL_MSG             *ack_msg_tmp;
2748     ACK_MSG                 *ack_msg;
2749     uint8_t                 msg_tag;
2750 #if CMK_DIRECT
2751     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2752 #endif
2753     SMSG_QUEUE         *queue = &smsg_queue;
2754
2755     while(1) {
2756         CMI_GNI_LOCK(my_cq_lock) 
2757         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2758         CMI_GNI_UNLOCK(my_cq_lock)
2759         if(status != GNI_RC_SUCCESS) break;
2760         
2761         type = GNI_CQ_GET_TYPE(ev);
2762         if (type == GNI_CQ_EVENT_TYPE_POST)
2763         {
2764
2765 #if CMI_EXERT_RECV_RDMA_CAP
2766             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2767             RDMA_pending--;
2768 #endif
2769             inst_id     = GNI_CQ_GET_INST_ID(ev);
2770 #if PRINT_SYH
2771             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2772 #endif
2773             CMI_GNI_LOCK(my_cq_lock)
2774             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2775             CMI_GNI_UNLOCK(my_cq_lock)
2776
2777             switch (tmp_pd->type) {
2778 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2779             case GNI_POST_RDMA_PUT:
2780 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2781                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2782 #endif
2783             case GNI_POST_FMA_PUT:
2784                 if(tmp_pd->amo_cmd == 1) {
2785 #if CMK_DIRECT
2786                     //sender ACK to receiver to trigger it is done
2787                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2788                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2789                     msg_tag = DIRECT_PUT_DONE_TAG;
2790 #endif
2791                 }
2792                 else {
2793                     CmiFree((void *)tmp_pd->local_addr);
2794 #if REMOTE_EVENT
2795                     FreePostDesc(tmp_pd);
2796                     continue;
2797 #elif CQWRITE
2798                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2799                     FreePostDesc(tmp_pd);
2800                     continue;
2801 #else
2802                     MallocControlMsg(ack_msg_tmp);
2803                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2804                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2805                     ack_msg_tmp->length  = tmp_pd->length;
2806                     msg_tag = PUT_DONE_TAG;
2807 #endif
2808                 }
2809                 break;
2810 #endif
2811             case GNI_POST_RDMA_GET:
2812             case GNI_POST_FMA_GET:  {
2813 #if  ! USE_LRTS_MEMPOOL
2814                 MallocControlMsg(ack_msg_tmp);
2815                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2816                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2817                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2818                 msg_tag = ACK_TAG;  
2819 #else
2820 #if CMK_WITH_STATS
2821                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2822 #endif
2823                 int seq_id = tmp_pd->cqwrite_value;
2824                 if(seq_id > 0)      // BIG_MSG
2825                 {
2826                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2827                     MallocControlMsg(ack_msg_tmp);
2828                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2829                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2830                     ack_msg_tmp->seq_id = seq_id;
2831                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2832                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2833                     ack_msg_tmp->length = tmp_pd->length;
2834                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2835                     msg_tag = BIG_MSG_TAG; 
2836                 } 
2837                 else
2838                 {
2839                     msg_tag = ACK_TAG; 
2840 #if  !REMOTE_EVENT && !CQWRITE
2841                     MallocAckMsg(ack_msg);
2842                     ack_msg->source_addr = tmp_pd->remote_addr;
2843 #endif
2844                 }
2845 #endif
2846                 break;
2847             }
2848             case  GNI_POST_CQWRITE:
2849                    FreePostDesc(tmp_pd);
2850                    continue;
2851             default:
2852                 CmiPrintf("type=%d\n", tmp_pd->type);
2853                 CmiAbort("PumpLocalTransactions: unknown type!");
2854             }      /* end of switch */
2855
2856 #if CMK_DIRECT
2857             if (tmp_pd->amo_cmd == 1) {
2858                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2859                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2860             }
2861             else
2862 #endif
2863             if (msg_tag == ACK_TAG) {
2864 #if !REMOTE_EVENT
2865 #if   !CQWRITE
2866                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2867                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2868 #else
2869                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2870 #endif
2871 #endif
2872             }
2873             else {
2874                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2875                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2876             }
2877 #if CMK_PERSISTENT_COMM
2878             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2879 #endif
2880             {
2881                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2882 #if PRINT_SYH
2883                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2884 #endif
2885                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2886                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2887
2888                     START_EVENT();
2889                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2890                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2891 #if MACHINE_DEBUG_LOG
2892                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2893                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2894                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2895 #endif
2896                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2897                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2898                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2899                 }else if(msg_tag == BIG_MSG_TAG){
2900                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2901                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2902                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2903                         START_EVENT();
2904 #if PRINT_SYH
2905                         printf("Pipeline msg done [%d]\n", myrank);
2906 #endif
2907 #if                 CMK_SMP_TRACE_COMMTHREAD
2908                         if( tmp_pd->cqwrite_value == 1)
2909                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2910 #endif
2911                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2912                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2913                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2914                     }
2915                 }
2916             }
2917             FreePostDesc(tmp_pd);
2918         }
2919     } //end while
2920     if(status == GNI_RC_ERROR_RESOURCE)
2921     {
2922         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2923         GNI_RC_CHECK("Smsg_tx_cq full", status);
2924     }
2925 }
2926
2927 static void  SendRdmaMsg()
2928 {
2929     gni_return_t            status = GNI_RC_SUCCESS;
2930     gni_mem_handle_t        msg_mem_hndl;
2931     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2932     RDMA_REQUEST            *pre = 0;
2933     uint64_t                register_size = 0;
2934     void                    *msg;
2935     int                     i;
2936 #if CMK_SMP
2937     int len = PCQueueLength(sendRdmaBuf);
2938     for (i=0; i<len; i++)
2939     {
2940 #if CMI_EXERT_RECV_RDMA_CAP
2941         if( RDMA_pending >= RDMA_cap) break;
2942 #endif
2943         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2944         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2945         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2946         if (ptr == NULL) break;
2947 #else
2948     ptr = sendRdmaBuf;
2949     while (ptr!=0 )
2950     {
2951 #if CMI_EXERT_RECV_RDMA_CAP
2952          if( RDMA_pending >= RDMA_cap) break;
2953 #endif
2954 #endif 
2955         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2956         gni_post_descriptor_t *pd = ptr->pd;
2957         
2958         msg = (void*)(pd->local_addr);
2959         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2960         register_size = 0;
2961         if(pd->cqwrite_value == 0) {
2962             if(NoMsgInRecv(msg))
2963                 register_size = GetMempoolsize(msg);
2964         }
2965
2966         if(status == GNI_RC_SUCCESS)        //mem register good
2967         {
2968             int destNode = ptr->destNode;
2969             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2970             CMI_GNI_LOCK(lock);
2971 #if REMOTE_EVENT
2972             if( pd->cqwrite_value == 0) {
2973                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2974                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
2975                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2976             }
2977 #if CMK_PERSISTENT_COMM
2978             else if (pd->cqwrite_value == PERSIST_SEQ) {
2979                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2980                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
2981                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2982             }
2983 #endif
2984 #endif
2985 #if CMK_WITH_STATS
2986             RDMA_TRY_SEND(pd->type)
2987 #endif
2988 #if CMK_SMP_TRACE_COMMTHREAD
2989 //            int oldpe = -1;
2990 //            int oldeventid = -1;
2991 //            START_EVENT();
2992 //            if(IS_PUT(pd->type))
2993 //            { 
2994 //                TRACE_COMM_GET_MSGID((void*)pd->local_addr, &oldpe, &oldeventid);
2995 //                TRACE_COMM_SET_COMM_MSGID((void*)pd->local_addr);
2996 //            }
2997             if(IS_PUT(pd->type))
2998             {
2999                  START_EVENT();
3000                  TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
3001             }
3002 #endif
3003
3004             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
3005             {
3006                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
3007             }
3008             else
3009             {
3010                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3011             }
3012             CMI_GNI_UNLOCK(lock);
3013             
3014 #if CMK_SMP_TRACE_COMMTHREAD
3015 //            if(IS_PUT(pd->type))
3016 //            { 
3017 //                if (oldpe != -1)  TRACE_COMM_SET_MSGID((void*)pd->local_addr, oldpe, oldeventid);
3018 //            }
3019 #endif
3020             if(status == GNI_RC_SUCCESS)    //post good
3021             {
3022 #if CMI_EXERT_RECV_RDMA_CAP
3023                 RDMA_pending ++;
3024 #endif
3025 #if !CMK_SMP
3026                 tmp_ptr = ptr;
3027                 if(pre != 0) {
3028                     pre->next = ptr->next;
3029                 }
3030                 else {
3031                     sendRdmaBuf = ptr->next;
3032                 }
3033                 ptr = ptr->next;
3034                 FreeRdmaRequest(tmp_ptr);
3035 #endif
3036                 if(pd->cqwrite_value == 0)
3037                 {
3038 #if CMK_SMP_TRACE_COMMTHREAD 
3039                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3040 //                    if(IS_PUT(pd->type))
3041 //                    { 
3042 //                        TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);
3043 //                    }
3044 #endif
3045                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3046                 }
3047 #if  CMK_WITH_STATS
3048                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3049                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3050 #endif
3051 #if MACHINE_DEBUG_LOG
3052                 buffered_recv_msg += register_size;
3053                 MACHSTATE(8, "GO request from buffered\n"); 
3054 #endif
3055 #if PRINT_SYH
3056                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
3057 #endif
3058             }else           // cannot post
3059             {
3060 #if CMK_SMP
3061                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3062 #else
3063                 pre = ptr;
3064                 ptr = ptr->next;
3065 #endif
3066 #if PRINT_SYH
3067                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3068 #endif
3069                 break;
3070             }
3071         } else          //memory registration fails
3072         {
3073 #if CMK_SMP
3074             PCQueuePush(sendRdmaBuf, (char*)ptr);
3075 #else
3076             pre = ptr;
3077             ptr = ptr->next;
3078 #endif
3079         }
3080     } //end while
3081 #if ! CMK_SMP
3082     if(ptr == 0)
3083         sendRdmaTail = pre;
3084 #endif
3085 }
3086
3087 // return 1 if all messages are sent
3088 static int SendBufferMsg(SMSG_QUEUE *queue)
3089 {
3090     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3091     CONTROL_MSG         *control_msg_tmp;
3092     gni_return_t        status;
3093     int                 done = 1;
3094     uint64_t            register_size;
3095     void                *register_addr;
3096     int                 index_previous = -1;
3097 #if     CMI_SENDBUFFERSMSG_CAP
3098     int                 sent_length = 0;
3099 #endif
3100 #if CMK_SMP
3101     int          index = 0;
3102 #if ONE_SEND_QUEUE
3103     memset(destpe_avail, 0, mysize * sizeof(char));
3104     for (index=0; index<1; index++)
3105     {
3106         int i, len = PCQueueLength(queue->sendMsgBuf);
3107         for (i=0; i<len; i++) 
3108         {
3109             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3110             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3111             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3112             if(ptr == NULL) break;
3113             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3114                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3115                 continue;
3116             }
3117 #else
3118 #if SMP_LOCKS
3119     int nonempty = PCQueueLength(nonEmptyQueues);
3120 #if CMI_SENDBUFFERSMSG_CAP
3121     for(index =0; index<nonempty ; index++) {
3122         if ( sent_length < SendBufferMsg_cap) { done = 0; return done;}
3123 #else
3124     for(index =0; index<nonempty; index++) {
3125 #endif
3126         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
3127         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
3128         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
3129         if(current_list == NULL) break; 
3130         PCQueue current_queue= current_list-> sendSmsgBuf;
3131         CmiLock(current_list->lock);
3132         int i, len = PCQueueLength(current_queue);
3133         current_list->pushed = 0;
3134         CmiUnlock(current_list->lock);
3135 #else
3136 #if CMI_SENDBUFFERSMSG_CAP
3137     for(index =0; index<mysize && sent_length < SendBufferMsg_cap; index++) {
3138 #else
3139     for(index =0; index<mysize; index++) {
3140 #endif
3141         //if (index == myrank) continue;
3142         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3143         int i, len = PCQueueLength(current_queue);
3144 #endif
3145         for (i=0; i<len; i++)  {
3146             CMI_PCQUEUEPOP_LOCK(current_queue)
3147             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3148             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3149             if (ptr == 0) break;
3150 #endif
3151 #else
3152     int index = queue->smsg_head_index;
3153     while(index != -1)
3154     {
3155         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3156         pre = 0;
3157         while(ptr != 0)
3158         {
3159 #endif
3160             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3161             status = GNI_RC_ERROR_RESOURCE;
3162             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
3163                 /* connection not exists yet */
3164 #if CMK_SMP
3165                   /* non-smp case, connect is issued in send_smsg_message */
3166                 if (smsg_connected_flag[index] == 0)
3167                     connect_to(ptr->destNode); 
3168 #endif
3169             }
3170             else
3171             switch(ptr->tag)
3172             {
3173             case SMALL_DATA_TAG:
3174                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3175                 if(status == GNI_RC_SUCCESS)
3176                 {
3177                     CmiFree(ptr->msg);
3178                 }
3179                 break;
3180             case LMSG_INIT_TAG:
3181                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3182                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
3183                 break;
3184 #if !REMOTE_EVENT && !CQWRITE
3185             case ACK_TAG:
3186                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3187                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3188                 break;
3189 #endif
3190             case BIG_MSG_TAG:
3191                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3192                 if(status == GNI_RC_SUCCESS)
3193                 {
3194                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3195                 }
3196                 break;
3197 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3198             case PUT_DONE_TAG:
3199                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3200                 if(status == GNI_RC_SUCCESS)
3201                 {
3202                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3203                 }
3204                 break;
3205 #endif
3206 #if CMK_DIRECT
3207             case DIRECT_PUT_DONE_TAG:
3208                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3209                 if(status == GNI_RC_SUCCESS)
3210                 {
3211                     free((CMK_DIRECT_HEADER*)ptr->msg);
3212                 }
3213                 break;
3214
3215 #endif
3216             default:
3217                 printf("Weird tag\n");
3218                 CmiAbort("should not happen\n");
3219             }       // end switch
3220 #if CMI_SENDBUFFERSMSG_CAP
3221             sent_length++;
3222 #endif
3223             if(status == GNI_RC_SUCCESS)
3224             {
3225 #if PRINT_SYH
3226                 buffered_smsg_counter--;
3227                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3228 #endif
3229 #if !CMK_SMP
3230                 tmp_ptr = ptr;
3231                 if(pre)
3232                 {
3233                     ptr = pre ->next = ptr->next;
3234                 }else
3235                 {
3236                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3237                 }
3238                 FreeMsgList(tmp_ptr);
3239 #else
3240                 FreeMsgList(ptr);
3241 #endif
3242             }else {
3243 #if CMK_SMP
3244 #if ONE_SEND_QUEUE
3245                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3246 #else
3247                 PCQueuePush(current_queue, (char*)ptr);
3248 #endif
3249 #else
3250                 pre = ptr;
3251                 ptr=ptr->next;
3252 #endif
3253                 done = 0;
3254                 if(status == GNI_RC_ERROR_RESOURCE)
3255                 {
3256 #if CMK_SMP && ONE_SEND_QUEUE 
3257                     destpe_avail[ptr->destNode] = 1;
3258 #else
3259                     break;
3260 #endif
3261                 }
3262             } 
3263         } //end while
3264 #if !CMK_SMP
3265         if(ptr == 0)
3266             queue->smsg_msglist_index[index].tail = pre;
3267         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3268         {
3269             if(index_previous != -1)
3270                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3271             else
3272                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3273         }
3274         else {
3275             index_previous = index;
3276         }
3277         index = queue->smsg_msglist_index[index].next;
3278 #else
3279 #if !ONE_SEND_QUEUE && SMP_LOCKS
3280         CmiLock(current_list->lock);
3281         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3282         {
3283             current_list->pushed = 1;
3284             PCQueuePush(nonEmptyQueues, (char*)current_list);
3285         }
3286         CmiUnlock(current_list->lock); 
3287 #endif
3288 #endif
3289
3290     }   // end pooling for all cores
3291     return done;
3292 }
3293
3294 static void ProcessDeadlock();
3295 void LrtsAdvanceCommunication(int whileidle)
3296 {
3297     static int count = 0;
3298     /*  Receive Msg first */
3299 #if CMK_SMP_TRACE_COMMTHREAD
3300     double startT, endT;
3301 #endif
3302     if (useDynamicSMSG && whileidle)
3303     {
3304 #if CMK_SMP_TRACE_COMMTHREAD
3305         startT = CmiWallTimer();
3306 #endif
3307         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3308 #if CMK_SMP_TRACE_COMMTHREAD
3309         endT = CmiWallTimer();
3310         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3311 #endif
3312     }
3313
3314 #if CMK_SMP_TRACE_COMMTHREAD
3315     startT = CmiWallTimer();
3316 #endif
3317     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3318     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3319 #if CMK_SMP_TRACE_COMMTHREAD
3320     endT = CmiWallTimer();
3321     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3322 #endif
3323
3324 #if CMK_SMP_TRACE_COMMTHREAD
3325     startT = CmiWallTimer();
3326 #endif
3327     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3328     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3329 #if CMK_SMP_TRACE_COMMTHREAD
3330     endT = CmiWallTimer();
3331     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3332 #endif
3333
3334 #if CMK_SMP_TRACE_COMMTHREAD
3335     startT = CmiWallTimer();
3336 #endif
3337     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3338
3339 #if CQWRITE
3340     PumpCqWriteTransactions();
3341 #endif
3342
3343 #if REMOTE_EVENT
3344     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions());
3345 #endif
3346
3347     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3348 #if CMK_SMP_TRACE_COMMTHREAD
3349     endT = CmiWallTimer();
3350     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3351 #endif
3352  
3353 #if CMK_SMP_TRACE_COMMTHREAD
3354     startT = CmiWallTimer();
3355 #endif
3356     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3357     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3358 #if CMK_SMP_TRACE_COMMTHREAD
3359     endT = CmiWallTimer();
3360     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3361 #endif
3362
3363     /* Send buffered Message */
3364 #if CMK_SMP_TRACE_COMMTHREAD
3365     startT = CmiWallTimer();
3366 #endif
3367 #if CMK_USE_OOB
3368     if (SendBufferMsg(&smsg_oob_queue) == 1)
3369 #endif
3370     {
3371         STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
3372     }
3373     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3374 #if CMK_SMP_TRACE_COMMTHREAD
3375     endT = CmiWallTimer();
3376     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3377 #endif
3378
3379 #if CMK_SMP && ! LARGEPAGE
3380     if (_detected_hang)  ProcessDeadlock();
3381 #endif
3382 }
3383
3384 static void set_smsg_max()
3385 {
3386     char *env;
3387
3388     if(mysize <=512)
3389     {
3390         SMSG_MAX_MSG = 1024;
3391     }else if (mysize <= 4096)
3392     {
3393         SMSG_MAX_MSG = 1024;
3394     }else if (mysize <= 16384)
3395     {
3396         SMSG_MAX_MSG = 512;
3397     }else {
3398         SMSG_MAX_MSG = 256;
3399     }
3400
3401     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3402     if (env) SMSG_MAX_MSG = atoi(env);
3403     CmiAssert(SMSG_MAX_MSG > 0);
3404 }    
3405
3406 /* useDynamicSMSG */
3407 static void _init_dynamic_smsg()
3408 {
3409     gni_return_t status;
3410     uint32_t     vmdh_index = -1;
3411     int i;
3412
3413     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3414     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3415     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3416     for(i=0; i<mysize; i++) {
3417         smsg_connected_flag[i] = 0;
3418         smsg_attr_vector_local[i] = NULL;
3419         smsg_attr_vector_remote[i] = NULL;
3420     }
3421
3422     set_smsg_max();
3423
3424     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3425     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3426     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3427     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3428     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3429
3430     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3431     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3432     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3433     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3434     mailbox_list->offset = 0;
3435     mailbox_list->next = 0;
3436     
3437     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3438         mailbox_list->size, smsg_rx_cqh,
3439         GNI_MEM_READWRITE,   
3440         vmdh_index,
3441         &(mailbox_list->mem_hndl));
3442     GNI_RC_CHECK("MEMORY registration for smsg", status);
3443
3444     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3445     GNI_RC_CHECK("Unbound EP", status);
3446     
3447     alloc_smsg_attr(&send_smsg_attr);
3448
3449     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3450     GNI_RC_CHECK("post unbound datagram", status);
3451
3452       /* always pre-connect to proc 0 */
3453     //if (myrank != 0) connect_to(0);
3454
3455     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3456     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3457 }
3458
3459 static void _init_static_smsg()
3460 {
3461     gni_smsg_attr_t      *smsg_attr;
3462     gni_smsg_attr_t      remote_smsg_attr;
3463     gni_smsg_attr_t      *smsg_attr_vec;
3464     gni_mem_handle_t     my_smsg_mdh_mailbox;
3465     int      ret, i;
3466    &nb