added OOB receiving
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     1
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_LARGE_CAP   0
65 #define CMI_EXERT_RECV_RDMA_CAP    0
66
67
68 #define CMI_SENDBUFFERSMSG_CAP     0
69 #define CMI_PUMPNETWORKSMSG_CAP    0
70
71 #if CMI_SENDBUFFERSMSG_CAP
72 int     SendBufferMsg_cap  = 10;
73 #endif
74
75 #if CMI_PUMPNETWORKSMSG_CAP
76 int     PumpNetworkSmsg_cap = 10;
77 #endif
78
79 #if CMI_EXERT_SEND_LARGE_CAP
80 static int SEND_large_cap = 10;
81 static int SEND_large_pending = 0;
82 #endif
83
84 #if CMI_EXERT_RECV_RDMA_CAP
85 static int   RDMA_cap =   10;
86 static int   RDMA_pending = 0;
87 #endif
88
89 #define USE_LRTS_MEMPOOL                  1
90
91 #define PRINT_SYH                         0
92
93 // Trace communication thread
94 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
95 #define TRACE_THRESHOLD     0.00001
96 #define CMI_MPI_TRACE_MOREDETAILED 0
97 #undef CMI_MPI_TRACE_USEREVENTS
98 #define CMI_MPI_TRACE_USEREVENTS 1
99 #else
100 #undef CMK_SMP_TRACE_COMMTHREAD
101 #define CMK_SMP_TRACE_COMMTHREAD 0
102 #endif
103
104 #define CMK_TRACE_COMMOVERHEAD 0
105 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
106 #undef CMI_MPI_TRACE_USEREVENTS
107 #define CMI_MPI_TRACE_USEREVENTS 1
108 #else
109 #undef CMK_TRACE_COMMOVERHEAD
110 #define CMK_TRACE_COMMOVERHEAD 0
111 #endif
112
113 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
114 CpvStaticDeclare(double, projTraceStart);
115 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
116 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
117 #define  EVENT_TIME()   CpvAccess(projTraceStart)
118 #else
119 #define  START_EVENT()
120 #define  END_EVENT(x)
121 #define  EVENT_TIME()   (0.0)
122 #endif
123
124 #if USE_LRTS_MEMPOOL
125
126 #define oneMB (1024ll*1024)
127 #define oneGB (1024ll*1024*1024)
128
129 static CmiInt8 _mempool_size = 8*oneMB;
130 static CmiInt8 _expand_mem =  4*oneMB;
131 static CmiInt8 _mempool_size_limit = 0;
132
133 static CmiInt8 _totalmem = 0.8*oneGB;
134
135 #if LARGEPAGE
136 static CmiInt8 BIG_MSG  =  16*oneMB;
137 static CmiInt8 ONE_SEG  =  4*oneMB;
138 #else
139 static CmiInt8 BIG_MSG  =  4*oneMB;
140 static CmiInt8 ONE_SEG  =  2*oneMB;
141 #endif
142 #if MULTI_THREAD_SEND
143 static int BIG_MSG_PIPELINE = 1;
144 #else
145 static int BIG_MSG_PIPELINE = 4;
146 #endif
147
148 // dynamic flow control
149 static CmiInt8 buffered_send_msg = 0;
150 static CmiInt8 register_memory_size = 0;
151
152 #if LARGEPAGE
153 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
154 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
155 static CmiInt8  register_count = 0;
156 #else
157 #if CMK_SMP && COMM_THREAD_SEND 
158 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
159 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
160 #else
161 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
162 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
163 #endif
164
165
166 #endif
167
168 #endif     /* end USE_LRTS_MEMPOOL */
169
170 #if MULTI_THREAD_SEND 
171 #define     CMI_GNI_LOCK(x)       CmiLock(x);
172 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
173 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
174 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
175 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
176 #else
177 #define     CMI_GNI_LOCK(x)
178 #define     CMI_GNI_TRYLOCK(x)         (0)
179 #define     CMI_GNI_UNLOCK(x)
180 #define     CMI_PCQUEUEPOP_LOCK(Q)   
181 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
182 #endif
183
184 static int _tlbpagesize = 4096;
185
186 //static int _smpd_count  = 0;
187
188 static int   user_set_flag  = 0;
189
190 static int _checkProgress = 1;             /* check deadlock */
191 static int _detected_hang = 0;
192
193 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
194
195 // dynamic SMSG
196 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
197
198 static int avg_smsg_connection = 32;
199 static int                 *smsg_connected_flag= 0;
200 static gni_smsg_attr_t     **smsg_attr_vector_local;
201 static gni_smsg_attr_t     **smsg_attr_vector_remote;
202 static gni_ep_handle_t     ep_hndl_unbound;
203 static gni_smsg_attr_t     send_smsg_attr;
204 static gni_smsg_attr_t     recv_smsg_attr;
205
206 typedef struct _dynamic_smsg_mailbox{
207    void     *mailbox_base;
208    int      size;
209    int      offset;
210    gni_mem_handle_t  mem_hndl;
211    struct      _dynamic_smsg_mailbox  *next;
212 }dynamic_smsg_mailbox_t;
213
214 static dynamic_smsg_mailbox_t  *mailbox_list;
215
216 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
217 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
218
219 #if PRINT_SYH
220 int         lrts_send_msg_id = 0;
221 int         lrts_local_done_msg = 0;
222 int         lrts_send_rdma_success = 0;
223 #endif
224
225 #include "machine.h"
226
227 #include "pcqueue.h"
228
229 #include "mempool.h"
230
231 #if CMK_PERSISTENT_COMM
232 #include "machine-persistent.h"
233 #define  POST_HIGHPRIORITY_RDMA    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
234 #else  
235 #define  POST_HIGHPRIORITY_RDMA   
236 #endif
237
238 #if REMOTE_EVENT && CMK_USE_OOB  
239 #define  PUMP_REMOTE_HIGHPRIORITY    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
240 #else
241 #define  PUMP_REMOTE_HIGHPRIORITY
242 #endif
243
244 //#define  USE_ONESIDED 1
245 #ifdef USE_ONESIDED
246 //onesided implementation is wrong, since no place to restore omdh
247 #include "onesided.h"
248 onesided_hnd_t   onesided_hnd;
249 onesided_md_t    omdh;
250 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
251
252 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
253
254 #else
255 uint8_t   onesided_hnd, omdh;
256
257 #if REMOTE_EVENT || CQWRITE 
258 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
259     if(register_memory_size+size>= MAX_REG_MEM) { \
260         status = GNI_RC_ERROR_NOMEM;} \
261     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
262         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
263 #else
264 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
265         if (register_memory_size + size >= MAX_REG_MEM) { \
266             status = GNI_RC_ERROR_NOMEM; \
267         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
268             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
269 #endif
270
271 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
272     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
273              register_memory_size -= size; \
274          else CmiAbort("MEM_DEregister");  \
275     } while (0)
276 #endif
277
278 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
279 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
280 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
281 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
282 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
283 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
284 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
285 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
286 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
287 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
288 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
289 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
290 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
291 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
292
293 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
294 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
295
296 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
297 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
298 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
299 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
300
301 #define ALIGNBUF                64
302
303 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
304 /* If SMSG is not used */
305
306 #define FMA_PER_CORE  1024
307 #define FMA_BUFFER_SIZE 1024
308
309 /* If SMSG is used */
310 static int  SMSG_MAX_MSG = 1024;
311 #define SMSG_MAX_CREDIT    72
312
313 #define MSGQ_MAXSIZE       2048
314
315 /* large message transfer with FMA or BTE */
316 #if ! REMOTE_EVENT
317 #define LRTS_GNI_RDMA_THRESHOLD  1024 
318 #else
319    /* remote events only work with RDMA */
320 #define LRTS_GNI_RDMA_THRESHOLD  0 
321 #endif
322
323 #if CMK_SMP
324 static int  REMOTE_QUEUE_ENTRIES=163840; 
325 static int LOCAL_QUEUE_ENTRIES=163840; 
326 #else
327 static int  REMOTE_QUEUE_ENTRIES=20480;
328 static int LOCAL_QUEUE_ENTRIES=20480; 
329 #endif
330
331 #define BIG_MSG_TAG             0x26
332 #define PUT_DONE_TAG            0x28
333 #define DIRECT_PUT_DONE_TAG     0x29
334 #define ACK_TAG                 0x30
335 /* SMSG is data message */
336 #define SMALL_DATA_TAG          0x31
337 /* SMSG is a control message to initialize a BTE */
338 #define LMSG_INIT_TAG           0x33 
339 #define LMSG_OOB_INIT_TAG       0x35
340
341 #define DEBUG
342 #ifdef GNI_RC_CHECK
343 #undef GNI_RC_CHECK
344 #endif
345 #ifdef DEBUG
346 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
347 #else
348 #define GNI_RC_CHECK(msg,rc)
349 #endif
350
351 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
352 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
353 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
354
355 static int useStaticMSGQ = 0;
356 static int useStaticFMA = 0;
357 static int mysize, myrank;
358 static gni_nic_handle_t   nic_hndl;
359
360 typedef struct {
361     gni_mem_handle_t mdh;
362     uint64_t addr;
363 } mdh_addr_t ;
364 // this is related to dynamic SMSG
365
366 typedef struct mdh_addr_list{
367     gni_mem_handle_t mdh;
368    void *addr;
369     struct mdh_addr_list *next;
370 }mdh_addr_list_t;
371
372 static unsigned int         smsg_memlen;
373 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
374 mdh_addr_t          setup_mem;
375 mdh_addr_t          *smsg_connection_vec = 0;
376 gni_mem_handle_t    smsg_connection_memhndl;
377 static int          smsg_expand_slots = 10;
378 static int          smsg_available_slot = 0;
379 static void         *smsg_mailbox_mempool = 0;
380 mdh_addr_list_t     *smsg_dynamic_list = 0;
381
382 static void             *smsg_mailbox_base;
383 gni_msgq_attr_t         msgq_attrs;
384 gni_msgq_handle_t       msgq_handle;
385 gni_msgq_ep_attr_t      msgq_ep_attrs;
386 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
387
388 /* =====Beginning of Declarations of Machine Specific Variables===== */
389 static int cookie;
390 static int modes = 0;
391 static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
392 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
393 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
394 static gni_cq_handle_t       highprior_rdma_tx_cqh = NULL;      // rdma - local event
395 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
396 static gni_cq_handle_t       highpriority_rx_cqh = NULL;      // mempool - remote event
397 static gni_ep_handle_t       *ep_hndl_array;
398
399 static CmiNodeLock           *ep_lock_array;
400 static CmiNodeLock           default_tx_cq_lock; 
401 static CmiNodeLock           rdma_tx_cq_lock; 
402 static CmiNodeLock           global_gni_lock; 
403 static CmiNodeLock           rx_cq_lock;
404 static CmiNodeLock           smsg_mailbox_lock;
405 static CmiNodeLock           smsg_rx_cq_lock;
406 static CmiNodeLock           *mempool_lock;
407 //#define     CMK_WITH_STATS      1
408 typedef struct msg_list
409 {
410     uint32_t destNode;
411     uint32_t size;
412     void *msg;
413     uint8_t tag;
414 #if CMK_WITH_STATS
415     double  creation_time;
416 #endif
417 }MSG_LIST;
418
419
420 typedef struct control_msg
421 {
422     uint64_t            source_addr;    /* address from the start of buffer  */
423     uint64_t            dest_addr;      /* address from the start of buffer */
424     int                 total_length;   /* total length */
425     int                 length;         /* length of this packet */
426 #if REMOTE_EVENT
427     int                 ack_index;      /* index from integer to address */
428 #endif
429     uint8_t             seq_id;         //big message   0 meaning single message
430     gni_mem_handle_t    source_mem_hndl;
431     struct control_msg *next;
432 } CONTROL_MSG;
433
434 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
435
436 typedef struct ack_msg
437 {
438     uint64_t            source_addr;    /* address from the start of buffer  */
439 #if ! USE_LRTS_MEMPOOL
440     gni_mem_handle_t    source_mem_hndl;
441     int                 length;          /* total length */
442 #endif
443     struct ack_msg     *next;
444 } ACK_MSG;
445
446 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
447
448 #if CMK_DIRECT
449 typedef struct{
450     uint64_t    handler_addr;
451 }CMK_DIRECT_HEADER;
452
453 typedef struct {
454     char core[CmiMsgHeaderSizeBytes];
455     uint64_t handler;
456 }cmidirectMsg;
457
458 //SYH
459 CpvDeclare(int, CmiHandleDirectIdx);
460 void CmiHandleDirectMsg(cmidirectMsg* msg)
461 {
462
463     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
464    (*(_handle->callbackFnPtr))(_handle->callbackData);
465    CmiFree(msg);
466 }
467
468 void CmiDirectInit()
469 {
470     CpvInitialize(int,  CmiHandleDirectIdx);
471     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
472 }
473
474 #endif
475 typedef struct  rmda_msg
476 {
477     int                   destNode;
478 #if REMOTE_EVENT
479     int                   ack_index;
480 #endif
481     gni_post_descriptor_t *pd;
482 }RDMA_REQUEST;
483
484
485 #define SMP_LOCKS                       1
486 #define ONE_SEND_QUEUE                  0
487 typedef PCQueue BufferList;
488 typedef struct  msg_list_index
489 {
490     PCQueue       sendSmsgBuf;
491 #if  SMP_LOCKS
492     CmiNodeLock   lock;
493     int           pushed;
494     int           destpe;
495 #endif
496 } MSG_LIST_INDEX;
497 char                *destpe_avail;
498 PCQueue sendRdmaBuf;
499 PCQueue sendHighPriorBuf;
500 // buffered send queue
501 #if ! ONE_SEND_QUEUE
502 typedef struct smsg_queue
503 {
504     MSG_LIST_INDEX   *smsg_msglist_index;
505     int               smsg_head_index;
506 #if  SMP_LOCKS
507     PCQueue     nonEmptyQueues;
508 #endif
509 } SMSG_QUEUE;
510 #else
511 typedef struct smsg_queue
512 {
513     PCQueue       sendMsgBuf;
514 }  SMSG_QUEUE;
515 #endif
516
517 SMSG_QUEUE                  smsg_queue;
518 #if CMK_USE_OOB
519 SMSG_QUEUE                  smsg_oob_queue;
520 #define SEND_OOB_SMSG(x)            SendBufferMsg(&x, NULL);
521 #define PUMP_LOCAL_HIGHPRIORITY    STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh,  rdma_tx_cq_lock)); 
522 #else
523 #define SEND_OOB_SMSG(x)            
524 #define PUMP_LOCAL_HIGHPRIORITY     
525 #endif
526
527 #define FreeMsgList(d)   free(d);
528 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
529
530 #define FreeControlMsg(d)      free(d);
531 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
532
533 #define FreeAckMsg(d)      free(d);
534 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
535
536 #define FreeRdmaRequest(d)       free(d);
537 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
538 /* reuse gni_post_descriptor_t */
539 static gni_post_descriptor_t *post_freelist=0;
540
541 #define FreePostDesc(d)     free(d);
542 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
543
544
545 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
546 static int      buffered_smsg_counter = 0;
547
548 /* SmsgSend return success but message sent is not confirmed by remote side */
549 static MSG_LIST *buffered_fma_head = 0;
550 static MSG_LIST *buffered_fma_tail = 0;
551
552 /* functions  */
553 #define IsFree(a,ind)  !( a& (1<<(ind) ))
554 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
555 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
556
557 CpvDeclare(mempool_type*, mempool);
558
559 #if CMK_PERSISTENT_COMM
560 CpvDeclare(mempool_type*, persistent_mempool);
561 #endif
562
563 #if REMOTE_EVENT
564 /* ack pool for remote events */
565
566 static int  SHIFT   =           18;
567 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
568 #define RANK_MASK               ((1<<SHIFT) - 1)
569 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
570
571 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
572 #define GET_RANK(evt)           ((evt) & RANK_MASK)
573 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
574
575 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
576
577 #if CMK_SMP
578 #define INIT_SIZE                4096
579 #else
580 #define INIT_SIZE                1024
581 #endif
582
583 struct IndexStruct {
584 void *addr;
585 int next;
586 int type;     // 1: ACK   2: Persistent
587 };
588
589 typedef struct IndexPool {
590     struct IndexStruct   *indexes;
591     int                   size;
592     int                   freehead;
593     CmiNodeLock           lock;
594 } IndexPool;
595
596 static IndexPool  ackPool;
597 #if CMK_PERSISTENT_COMM
598 static IndexPool  persistPool;
599 #endif
600
601 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
602 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
603
604 static void IndexPool_init(IndexPool *pool)
605 {
606     int i;
607     if ((1<<SHIFT) < mysize) 
608         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
609     pool->size = INIT_SIZE;
610     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
611     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
612     for (i=0; i<pool->size-1; i++) {
613         pool->indexes[i].next = i+1;
614         pool->indexes[i].type = 0;
615     }
616     pool->indexes[i].next = -1;
617     pool->freehead = 0;
618 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
619     pool->lock  = CmiCreateLock();
620 #else
621     pool->lock  = 0;
622 #endif
623 }
624
625 static
626 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
627 {
628     int s, i;
629 #if MULTI_THREAD_SEND  
630     CmiLock(pool->lock);
631 #endif
632     s = pool->freehead;
633     if (s == -1) {
634         int newsize = pool->size * 2;
635         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
636         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
637         struct IndexStruct *old_ackpool = pool->indexes;
638         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
639         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
640         for (i=pool->size; i<newsize-1; i++) {
641             pool->indexes[i].next = i+1;
642             pool->indexes[i].type = 0;
643         }
644         pool->indexes[i].next = -1;
645         pool->indexes[i].type = 0;
646         pool->freehead = pool->size;
647         s = pool->size;
648         pool->size = newsize;
649         free(old_ackpool);
650     }
651     pool->freehead = pool->indexes[s].next;
652     pool->indexes[s].addr = addr;
653     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
654     pool->indexes[s].type = type;
655 #if MULTI_THREAD_SEND
656     CmiUnlock(pool->lock);
657 #endif
658     return s;
659 }
660
661 static
662 inline  void IndexPool_freeslot(IndexPool *pool, int s)
663 {
664     CmiAssert(s>=0 && s<pool->size);
665 #if MULTI_THREAD_SEND
666     CmiLock(pool->lock);
667 #endif
668     pool->indexes[s].next = pool->freehead;
669     pool->indexes[s].type = 0;
670     pool->freehead = s;
671 #if MULTI_THREAD_SEND
672     CmiUnlock(pool->lock);
673 #endif
674 }
675
676
677 #endif
678
679 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
680 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
681 #define CHARM_MAGIC_NUMBER               126
682
683 #if CMK_ERROR_CHECKING
684 extern unsigned char computeCheckSum(unsigned char *data, int len);
685 static int checksum_flag = 0;
686 #define CMI_SET_CHECKSUM(msg, len)      \
687         if (checksum_flag)  {   \
688           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
689           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
690         }
691 #define CMI_CHECK_CHECKSUM(msg, len)    \
692         if (checksum_flag)      \
693           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
694             CmiAbort("Fatal error: checksum doesn't agree!\n");
695 #else
696 #define CMI_SET_CHECKSUM(msg, len)
697 #define CMI_CHECK_CHECKSUM(msg, len)
698 #endif
699 /* =====End of Definitions of Message-Corruption Related Macros=====*/
700
701 static int print_stats = 0;
702 static int stats_off = 0;
703 void CmiTurnOnStats()
704 {
705     stats_off = 0;
706     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
707 }
708
709 void CmiTurnOffStats()
710 {
711     stats_off = 1;
712 }
713
714 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
715
716 #if CMK_WITH_STATS
717 FILE *counterLog = NULL;
718 typedef struct comm_thread_stats
719 {
720     uint64_t  smsg_data_count;
721     uint64_t  lmsg_init_count;
722     uint64_t  ack_count;
723     uint64_t  big_msg_ack_count;
724     uint64_t  smsg_count;
725     uint64_t  direct_put_done_count;
726     uint64_t  put_done_count;
727     //times of calling SmsgSend
728     uint64_t  try_smsg_data_count;
729     uint64_t  try_lmsg_init_count;
730     uint64_t  try_ack_count;
731     uint64_t  try_big_msg_ack_count;
732     uint64_t  try_direct_put_done_count;
733     uint64_t  try_put_done_count;
734     uint64_t  try_smsg_count;
735     
736     double    max_time_in_send_buffered_smsg;
737     double    all_time_in_send_buffered_smsg;
738
739     uint64_t  rdma_get_count, rdma_put_count;
740     uint64_t  try_rdma_get_count, try_rdma_put_count;
741     double    max_time_from_control_to_rdma_init;
742     double    all_time_from_control_to_rdma_init;
743
744     double    max_time_from_rdma_init_to_rdma_done;
745     double    all_time_from_rdma_init_to_rdma_done;
746
747     int      count_in_PumpNetwork;
748     double   time_in_PumpNetwork;
749     double   max_time_in_PumpNetwork;
750     int      count_in_SendBufferMsg_smsg;
751     double   time_in_SendBufferMsg_smsg;
752     double   max_time_in_SendBufferMsg_smsg;
753     int      count_in_SendRdmaMsg;
754     double   time_in_SendRdmaMsg;
755     double   max_time_in_SendRdmaMsg;
756     int      count_in_PumpRemoteTransactions;
757     double   time_in_PumpRemoteTransactions;
758     double   max_time_in_PumpRemoteTransactions;
759     int      count_in_PumpLocalTransactions_rdma;
760     double   time_in_PumpLocalTransactions_rdma;
761     double   max_time_in_PumpLocalTransactions_rdma;
762     int      count_in_PumpDatagramConnection;
763     double   time_in_PumpDatagramConnection;
764     double   max_time_in_PumpDatagramConnection;
765 } Comm_Thread_Stats;
766
767 static Comm_Thread_Stats   comm_stats;
768
769 static char *counters_dirname = "counters";
770
771 static void init_comm_stats()
772 {
773   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
774   if (print_stats){
775       char ln[200];
776       int code = mkdir(counters_dirname, 00777); 
777       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
778       counterLog=fopen(ln,"w");
779       if (counterLog == NULL) CmiAbort("Counter files open failed");
780   }
781 }
782
783 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
784
785 #define SMSG_SENT_DONE(creation_time, tag)  \
786         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
787             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++;  \
788             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
789             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
790             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
791             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
792             comm_stats.smsg_count++; \
793             double inbuff_time = CmiWallTimer() - creation_time;   \
794             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
795             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
796         }
797
798 #define SMSG_TRY_SEND(tag)  \
799         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
800             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
801             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
802             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
803             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
804             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
805             comm_stats.try_smsg_count++; \
806         }
807
808 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
809
810 #define  RDMA_TRANS_DONE(x)      \
811          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
812              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
813              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
814          }
815
816 #define  RDMA_TRANS_INIT(type, x)      \
817          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
818              double rdma_trans_time = CmiWallTimer() - x ; \
819              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
820              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
821          }
822
823 #define STATS_PUMPNETWORK_TIME(x)   \
824         { double t = CmiWallTimer(); \
825           x;        \
826           t = CmiWallTimer() - t;          \
827           comm_stats.count_in_PumpNetwork++;        \
828           comm_stats.time_in_PumpNetwork += t;   \
829           if (t>comm_stats.max_time_in_PumpNetwork)      \
830               comm_stats.max_time_in_PumpNetwork = t;    \
831         }
832
833 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
834         { double t = CmiWallTimer(); \
835           x;        \
836           t = CmiWallTimer() - t;          \
837           comm_stats.count_in_PumpRemoteTransactions ++;        \
838           comm_stats.time_in_PumpRemoteTransactions += t;   \
839           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
840               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
841         }
842
843 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
844         { double t = CmiWallTimer(); \
845           x;        \
846           t = CmiWallTimer() - t;          \
847           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
848           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
849           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
850               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
851         }
852
853 #define STATS_SEND_SMSGS_TIME(x)   \
854         { double t = CmiWallTimer(); \
855           x;        \
856           t = CmiWallTimer() - t;          \
857           comm_stats.count_in_SendBufferMsg_smsg ++;        \
858           comm_stats.time_in_SendBufferMsg_smsg += t;   \
859           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
860               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
861         }
862
863 #define STATS_SENDRDMAMSG_TIME(x)   \
864         { double t = CmiWallTimer(); \
865           x;        \
866           t = CmiWallTimer() - t;          \
867           comm_stats.count_in_SendRdmaMsg ++;        \
868           comm_stats.time_in_SendRdmaMsg += t;   \
869           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
870               comm_stats.max_time_in_SendRdmaMsg = t;    \
871         }
872
873 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
874         { double t = CmiWallTimer(); \
875           x;        \
876           t = CmiWallTimer() - t;          \
877           comm_stats.count_in_PumpDatagramConnection ++;        \
878           comm_stats.time_in_PumpDatagramConnection += t;   \
879           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
880               comm_stats.max_time_in_PumpDatagramConnection = t;    \
881         }
882
883 static void print_comm_stats()
884 {
885     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
886     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
887             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
888             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
889     
890     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
891             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
892             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
893
894     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
895     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
896     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
897
898
899     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
900     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
901     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
902     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
903     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
904     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
905     if (useDynamicSMSG)
906     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
907
908     fclose(counterLog);
909 }
910
911 #else
912 #define STATS_PUMPNETWORK_TIME(x)                  x
913 #define STATS_SEND_SMSGS_TIME(x)                   x
914 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
915 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
916 #define STATS_SENDRDMAMSG_TIME(x)                  x
917 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
918 #endif
919
920 static void
921 allgather(void *in,void *out, int len)
922 {
923     static int *ivec_ptr=NULL,already_called=0,job_size=0;
924     int i,rc;
925     int my_rank;
926     char *tmp_buf,*out_ptr;
927
928     if(!already_called) {
929
930         rc = PMI_Get_size(&job_size);
931         CmiAssert(rc == PMI_SUCCESS);
932         rc = PMI_Get_rank(&my_rank);
933         CmiAssert(rc == PMI_SUCCESS);
934
935         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
936         CmiAssert(ivec_ptr != NULL);
937
938         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
939         CmiAssert(rc == PMI_SUCCESS);
940
941         already_called = 1;
942
943     }
944
945     tmp_buf = (char *)malloc(job_size * len);
946     CmiAssert(tmp_buf);
947
948     rc = PMI_Allgather(in,tmp_buf,len);
949     CmiAssert(rc == PMI_SUCCESS);
950
951     out_ptr = out;
952
953     for(i=0;i<job_size;i++) {
954
955         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
956
957     }
958
959     free(tmp_buf);
960 }
961
962 static void
963 allgather_2(void *in,void *out, int len)
964 {
965     //PMI_Allgather is out of order
966     int i,rc, extend_len;
967     int  rank_index;
968     char *out_ptr, *out_ref;
969     char *in2;
970
971     extend_len = sizeof(int) + len;
972     in2 = (char*)malloc(extend_len);
973
974     memcpy(in2, &myrank, sizeof(int));
975     memcpy(in2+sizeof(int), in, len);
976
977     out_ptr = (char*)malloc(mysize*extend_len);
978
979     rc = PMI_Allgather(in2, out_ptr, extend_len);
980     GNI_RC_CHECK("allgather", rc);
981
982     out_ref = out;
983
984     for(i=0;i<mysize;i++) {
985         //rank index 
986         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
987         //copy to the rank index slot
988         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
989     }
990
991     free(out_ptr);
992     free(in2);
993
994 }
995
996 static unsigned int get_gni_nic_address(int device_id)
997 {
998     unsigned int address, cpu_id;
999     gni_return_t status;
1000     int i, alps_dev_id=-1,alps_address=-1;
1001     char *token, *p_ptr;
1002
1003     p_ptr = getenv("PMI_GNI_DEV_ID");
1004     if (!p_ptr) {
1005         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1006        
1007         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1008     } else {
1009         while ((token = strtok(p_ptr,":")) != NULL) {
1010             alps_dev_id = atoi(token);
1011             if (alps_dev_id == device_id) {
1012                 break;
1013             }
1014             p_ptr = NULL;
1015         }
1016         CmiAssert(alps_dev_id != -1);
1017         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1018         CmiAssert(p_ptr != NULL);
1019         i = 0;
1020         while ((token = strtok(p_ptr,":")) != NULL) {
1021             if (i == alps_dev_id) {
1022                 alps_address = atoi(token);
1023                 break;
1024             }
1025             p_ptr = NULL;
1026             ++i;
1027         }
1028         CmiAssert(alps_address != -1);
1029         address = alps_address;
1030     }
1031     return address;
1032 }
1033
1034 static uint8_t get_ptag(void)
1035 {
1036     char *p_ptr, *token;
1037     uint8_t ptag;
1038
1039     p_ptr = getenv("PMI_GNI_PTAG");
1040     CmiAssert(p_ptr != NULL);
1041     token = strtok(p_ptr, ":");
1042     ptag = (uint8_t)atoi(token);
1043     return ptag;
1044         
1045 }
1046
1047 static uint32_t get_cookie(void)
1048 {
1049     uint32_t cookie;
1050     char *p_ptr, *token;
1051
1052     p_ptr = getenv("PMI_GNI_COOKIE");
1053     CmiAssert(p_ptr != NULL);
1054     token = strtok(p_ptr, ":");
1055     cookie = (uint32_t)atoi(token);
1056
1057     return cookie;
1058 }
1059
1060 #if LARGEPAGE
1061
1062 /* directly mmap memory from hugetlbfs for large pages */
1063
1064 #include <sys/stat.h>
1065 #include <fcntl.h>
1066 #include <sys/mman.h>
1067 #include <hugetlbfs.h>
1068
1069 // size must be _tlbpagesize aligned
1070 void *my_get_huge_pages(size_t size)
1071 {
1072     char filename[512];
1073     int fd;
1074     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1075     void *ptr = NULL;
1076
1077     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1078     fd = open(filename, O_RDWR | O_CREAT, mode);
1079     if (fd == -1) {
1080         CmiAbort("my_get_huge_pages: open filed");
1081     }
1082     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1083     if (ptr == MAP_FAILED) ptr = NULL;
1084 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1085     close(fd);
1086     unlink(filename);
1087     return ptr;
1088 }
1089
1090 void my_free_huge_pages(void *ptr, int size)
1091 {
1092 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1093     int ret = munmap(ptr, size);
1094     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1095 }
1096
1097 #endif
1098
1099 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1100 /* TODO: add any that are related */
1101 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1102
1103
1104 #include "machine-lrts.h"
1105 #include "machine-common-core.c"
1106
1107 /* Network progress function is used to poll the network when for
1108    messages. This flushes receive buffers on some  implementations*/
1109 #if CMK_MACHINE_PROGRESS_DEFINED
1110 void CmiMachineProgressImpl() {
1111 }
1112 #endif
1113
1114 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1115 static void SendRdmaMsg(PCQueue );
1116 static void PumpNetworkSmsg();
1117 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1118 #if CQWRITE
1119 static void PumpCqWriteTransactions();
1120 #endif
1121 #if REMOTE_EVENT
1122 static void PumpRemoteTransactions(gni_cq_handle_t);
1123 #endif
1124
1125 #if MACHINE_DEBUG_LOG
1126 FILE *debugLog = NULL;
1127 static CmiInt8 buffered_recv_msg = 0;
1128 int         lrts_smsg_success = 0;
1129 int         lrts_received_msg = 0;
1130 #endif
1131
1132 static void sweep_mempool(mempool_type *mptr)
1133 {
1134     int n = 0;
1135     block_header *current = &(mptr->block_head);
1136
1137     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1138     while( current!= NULL) {
1139         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1140         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1141     }
1142     printf("[n %d] sweep_mempool slot END.\n", myrank);
1143 }
1144
1145 inline
1146 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1147 {
1148     block_header *current = *from;
1149
1150     //while(register_memory_size>= MAX_REG_MEM)
1151     //{
1152         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1153             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1154
1155         *from = current;
1156         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1157         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1158         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1159     //}
1160     return GNI_RC_SUCCESS;
1161 }
1162
1163 inline 
1164 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1165 {
1166     gni_return_t status = GNI_RC_SUCCESS;
1167     //int size = GetMempoolsize(msg);
1168     //void *blockaddr = GetMempoolBlockPtr(msg);
1169     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1170    
1171     block_header *current = &(mptr->block_head);
1172     while(register_memory_size>= MAX_REG_MEM)
1173     {
1174         status = deregisterMemory(mptr, &current);
1175         if (status != GNI_RC_SUCCESS) break;
1176     }
1177     if(register_memory_size>= MAX_REG_MEM) return status;
1178
1179     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1180     while(1)
1181     {
1182         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1183         if(status == GNI_RC_SUCCESS)
1184         {
1185             break;
1186         }
1187         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1188         {
1189             GNI_RC_CHECK("registerFromMempool", status);
1190         }
1191         else
1192         {
1193             status = deregisterMemory(mptr, &current);
1194             if (status != GNI_RC_SUCCESS) break;
1195         }
1196     }; 
1197     return status;
1198 }
1199
1200 inline 
1201 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1202 {
1203     static int rank = -1;
1204     int i;
1205     gni_return_t status;
1206     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1207     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1208     mempool_type *mptr;
1209
1210     status = registerFromMempool(mptr1, msg, size, t, cqh);
1211     if (status == GNI_RC_SUCCESS) return status;
1212 #if CMK_SMP 
1213     for (i=0; i<CmiMyNodeSize()+1; i++) {
1214       rank = (rank+1)%(CmiMyNodeSize()+1);
1215       mptr = CpvAccessOther(mempool, rank);
1216       if (mptr == mptr1) continue;
1217       status = registerFromMempool(mptr, msg, size, t, cqh);
1218       if (status == GNI_RC_SUCCESS) return status;
1219     }
1220 #endif
1221     return  GNI_RC_ERROR_RESOURCE;
1222 }
1223
1224 inline
1225 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1226 {
1227     MSG_LIST        *msg_tmp;
1228     MallocMsgList(msg_tmp);
1229     msg_tmp->destNode = destNode;
1230     msg_tmp->size   = size;
1231     msg_tmp->msg    = msg;
1232     msg_tmp->tag    = tag;
1233 #if CMK_WITH_STATS
1234     SMSG_CREATION(msg_tmp)
1235 #endif
1236
1237 #if ONE_SEND_QUEUE
1238     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1239 #else
1240 #if SMP_LOCKS
1241     CmiLock(queue->smsg_msglist_index[destNode].lock);
1242     if(queue->smsg_msglist_index[destNode].pushed == 0)
1243     {
1244         PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1245     }
1246     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1247     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1248 #else
1249     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1250 #endif
1251 #endif
1252
1253 #if PRINT_SYH
1254     buffered_smsg_counter++;
1255 #endif
1256 }
1257
1258 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1259 {
1260     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1261 }
1262
1263 inline
1264 static void setup_smsg_connection(int destNode)
1265 {
1266     mdh_addr_list_t  *new_entry = 0;
1267     gni_post_descriptor_t *pd;
1268     gni_smsg_attr_t      *smsg_attr;
1269     gni_return_t status = GNI_RC_NOT_DONE;
1270     RDMA_REQUEST        *rdma_request_msg;
1271     
1272     if(smsg_available_slot == smsg_expand_slots)
1273     {
1274         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1275         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1276         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1277
1278         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1279             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1280             GNI_MEM_READWRITE,   
1281             -1,
1282             &(new_entry->mdh));
1283         smsg_available_slot = 0; 
1284         new_entry->next = smsg_dynamic_list;
1285         smsg_dynamic_list = new_entry;
1286     }
1287     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1288     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1289     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1290     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1291     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1292     smsg_attr->buff_size = smsg_memlen;
1293     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1294     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1295     smsg_local_attr_vec[destNode] = smsg_attr;
1296     smsg_available_slot++;
1297     MallocPostDesc(pd);
1298     pd->type            = GNI_POST_FMA_PUT;
1299     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1300     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1301     pd->length          = sizeof(gni_smsg_attr_t);
1302     pd->local_addr      = (uint64_t) smsg_attr;
1303     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1304     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1305     pd->src_cq_hndl     = 0;
1306
1307     pd->rdma_mode       = 0;
1308     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1309     print_smsg_attr(smsg_attr);
1310     if(status == GNI_RC_ERROR_RESOURCE )
1311     {
1312         MallocRdmaRequest(rdma_request_msg);
1313         rdma_request_msg->destNode = destNode;
1314         rdma_request_msg->pd = pd;
1315         /* buffer this request */
1316     }
1317 #if PRINT_SYH
1318     if(status != GNI_RC_SUCCESS)
1319        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1320     else
1321         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1322 #endif
1323 }
1324
1325 /* useDynamicSMSG */
1326 inline 
1327 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1328 {
1329     gni_return_t status = GNI_RC_NOT_DONE;
1330
1331     if(mailbox_list->offset == mailbox_list->size)
1332     {
1333         dynamic_smsg_mailbox_t *new_mailbox_entry;
1334         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1335         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1336         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1337         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1338         new_mailbox_entry->offset = 0;
1339         
1340         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1341             new_mailbox_entry->size, smsg_rx_cqh,
1342             GNI_MEM_READWRITE,   
1343             -1,
1344             &(new_mailbox_entry->mem_hndl));
1345
1346         GNI_RC_CHECK("register", status);
1347         new_mailbox_entry->next = mailbox_list;
1348         mailbox_list = new_mailbox_entry;
1349     }
1350     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1351     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1352     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1353     local_smsg_attr->mbox_offset = mailbox_list->offset;
1354     mailbox_list->offset += smsg_memlen;
1355     local_smsg_attr->buff_size = smsg_memlen;
1356     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1357     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1358 }
1359
1360 /* useDynamicSMSG */
1361 inline 
1362 static int connect_to(int destNode)
1363 {
1364     gni_return_t status = GNI_RC_NOT_DONE;
1365     CmiAssert(smsg_connected_flag[destNode] == 0);
1366     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1367     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1368     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1369     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1370     
1371     CMI_GNI_LOCK(global_gni_lock)
1372     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1373     CMI_GNI_UNLOCK(global_gni_lock)
1374     if (status == GNI_RC_ERROR_RESOURCE) {
1375       /* possibly destNode is making connection at the same time */
1376       free(smsg_attr_vector_local[destNode]);
1377       smsg_attr_vector_local[destNode] = NULL;
1378       free(smsg_attr_vector_remote[destNode]);
1379       smsg_attr_vector_remote[destNode] = NULL;
1380       mailbox_list->offset -= smsg_memlen;
1381 #if PRINT_SYH
1382     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1383 #endif
1384       return 0;
1385     }
1386     GNI_RC_CHECK("GNI_Post", status);
1387     smsg_connected_flag[destNode] = 1;
1388 #if PRINT_SYH
1389     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1390 #endif
1391     return 1;
1392 }
1393
1394 inline 
1395 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1396 {
1397     unsigned int          remote_address;
1398     uint32_t              remote_id;
1399     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1400     gni_smsg_attr_t       *smsg_attr;
1401     gni_post_descriptor_t *pd;
1402     gni_post_state_t      post_state;
1403     char                  *real_data; 
1404
1405     if (useDynamicSMSG) {
1406         switch (smsg_connected_flag[destNode]) {
1407         case 0: 
1408             connect_to(destNode);         /* continue to case 1 */
1409         case 1:                           /* pending connection, do nothing */
1410             status = GNI_RC_NOT_DONE;
1411             if(inbuff ==0)
1412                 buffer_small_msgs(queue, msg, size, destNode, tag);
1413             return status;
1414         }
1415     }
1416 #if ! ONE_SEND_QUEUE
1417     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1418 #endif
1419     {
1420         //CMI_GNI_LOCK(smsg_mailbox_lock)
1421         CMI_GNI_LOCK(default_tx_cq_lock)
1422 #if CMK_SMP_TRACE_COMMTHREAD
1423         int oldpe = -1;
1424         int oldeventid = -1;
1425         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
1426         { 
1427             START_EVENT();
1428             if ( tag == SMALL_DATA_TAG)
1429                 real_data = (char*)msg; 
1430             else 
1431                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1432             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1433             TRACE_COMM_SET_COMM_MSGID(real_data);
1434         }
1435 #endif
1436 #if REMOTE_EVENT
1437         if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) {
1438             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1439             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1440                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1441         }
1442         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1443 #endif
1444 #if     CMK_WITH_STATS
1445         SMSG_TRY_SEND(tag)
1446 #endif
1447 #if CMK_WITH_STATS
1448     double              creation_time;
1449     if (ptr == NULL)
1450         creation_time = CmiWallTimer();
1451     else
1452         creation_time = ptr->creation_time;
1453 #endif
1454
1455     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1456 #if CMK_SMP_TRACE_COMMTHREAD
1457         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1458 #endif
1459         CMI_GNI_UNLOCK(default_tx_cq_lock)
1460         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1461         if(status == GNI_RC_SUCCESS)
1462         {
1463 #if     CMK_WITH_STATS
1464             SMSG_SENT_DONE(creation_time,tag) 
1465 #endif
1466 #if CMK_SMP_TRACE_COMMTHREAD
1467             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
1468             { 
1469                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1470             }
1471 #endif
1472         }else
1473             status = GNI_RC_ERROR_RESOURCE;
1474     }
1475     if(status != GNI_RC_SUCCESS && inbuff ==0)
1476         buffer_small_msgs(queue, msg, size, destNode, tag);
1477     return status;
1478 }
1479
1480 inline 
1481 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1482 {
1483     /* construct a control message and send */
1484     CONTROL_MSG         *control_msg_tmp;
1485     MallocControlMsg(control_msg_tmp);
1486     control_msg_tmp->source_addr = (uint64_t)msg;
1487     control_msg_tmp->seq_id    = seqno;
1488     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1489 #if REMOTE_EVENT
1490     control_msg_tmp->ack_index    =  -1;
1491 #endif
1492 #if     USE_LRTS_MEMPOOL
1493     if(size < BIG_MSG)
1494     {
1495         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1496     }
1497     else
1498     {
1499         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1500         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1501         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1502     }
1503 #else
1504     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1505 #endif
1506     return control_msg_tmp;
1507 }
1508
1509 #define BLOCKING_SEND_CONTROL    0
1510
1511 // Large message, send control to receiver, receiver register memory and do a GET, 
1512 // return 1 - send no success
1513 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr, uint8_t lmsg_tag)
1514 {
1515     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1516     uint32_t            vmdh_index  = -1;
1517     int                 size;
1518     int                 offset = 0;
1519     uint64_t            source_addr;
1520     int                 register_size; 
1521     void                *msg;
1522
1523     size    =   control_msg_tmp->total_length;
1524     source_addr = control_msg_tmp->source_addr;
1525     register_size = control_msg_tmp->length;
1526
1527 #if  USE_LRTS_MEMPOOL
1528     if( control_msg_tmp->seq_id == 0 ){
1529 #if BLOCKING_SEND_CONTROL
1530         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1531             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1532                 LrtsAdvanceCommunication(0);
1533         }
1534 #endif
1535         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1536         {
1537             msg = (void*)source_addr;
1538             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1539             {
1540                 if(!inbuff)
1541                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1542                 return GNI_RC_ERROR_NOMEM;
1543             }
1544             //register the corresponding mempool
1545             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1546             if(status == GNI_RC_SUCCESS)
1547             {
1548                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1549             }
1550         }else
1551         {
1552             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1553             status = GNI_RC_SUCCESS;
1554         }
1555         if(NoMsgInSend(source_addr))
1556             register_size = GetMempoolsize((void*)(source_addr));
1557         else
1558             register_size = 0;
1559     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1560     {
1561         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1562         source_addr += offset;
1563         size = control_msg_tmp->length;
1564 #if BLOCKING_SEND_CONTROL
1565         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1566             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1567                 LrtsAdvanceCommunication(0);
1568         }
1569 #endif
1570         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1571             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1572             {
1573                 if(!inbuff)
1574                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1575                 return GNI_RC_ERROR_NOMEM;
1576             }
1577             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1578             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1579         }
1580         else
1581         {
1582             status = GNI_RC_SUCCESS;
1583         }
1584         register_size = 0;  
1585     }
1586
1587 #if CMI_EXERT_SEND_LARGE_CAP
1588     if(SEND_large_pending >= SEND_large_cap)
1589     {
1590         status = GNI_RC_ERROR_NOMEM;
1591     }
1592 #endif
1593  
1594     if(status == GNI_RC_SUCCESS)
1595     {
1596        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, inbuff, smsg_ptr); 
1597         if(status == GNI_RC_SUCCESS)
1598         {
1599 #if CMI_EXERT_SEND_LARGE_CAP
1600             SEND_large_pending++;
1601 #endif
1602             buffered_send_msg += register_size;
1603             if(control_msg_tmp->seq_id == 0)
1604             {
1605                 IncreaseMsgInSend(source_addr);
1606             }
1607             FreeControlMsg(control_msg_tmp);
1608             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, lmsg_tag); 
1609         }else
1610             status = GNI_RC_ERROR_RESOURCE;
1611
1612     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1613     {
1614         CmiAbort("Memory registor for large msg\n");
1615     }else 
1616     {
1617         status = GNI_RC_ERROR_NOMEM; 
1618         if(!inbuff)
1619             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1620     }
1621     return status;
1622 #else
1623     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1624     if(status == GNI_RC_SUCCESS)
1625     {
1626         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, 0, NULL);  
1627         if(status == GNI_RC_SUCCESS)
1628         {
1629             FreeControlMsg(control_msg_tmp);
1630         }
1631     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1632     {
1633         CmiAbort("Memory registor for large msg\n");
1634     }else 
1635     {
1636         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1637     }
1638     return status;
1639 #endif
1640 }
1641
1642 inline void LrtsPrepareEnvelope(char *msg, int size)
1643 {
1644     CmiSetMsgSize(msg, size);
1645     CMI_SET_CHECKSUM(msg, size);
1646 }
1647
1648 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1649 {
1650     gni_return_t        status  =   GNI_RC_SUCCESS;
1651     uint8_t tag;
1652     CONTROL_MSG         *control_msg_tmp;
1653     int                 oob = ( mode & OUT_OF_BAND);
1654     SMSG_QUEUE          *queue;
1655
1656     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1657 #if CMK_USE_OOB
1658     queue = oob? &smsg_oob_queue : &smsg_queue;
1659     tag = oob? LMSG_OOB_INIT_TAG: LMSG_INIT_TAG;
1660 #else
1661     queue = &smsg_queue;
1662     tag = LMSG_INIT_TAG;
1663 #endif
1664
1665     LrtsPrepareEnvelope(msg, size);
1666
1667 #if PRINT_SYH
1668     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1669 #endif 
1670
1671 #if CMK_SMP 
1672     if(size <= SMSG_MAX_MSG)
1673         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1674     else if (size < BIG_MSG) {
1675         control_msg_tmp =  construct_control_msg(size, msg, 0);
1676         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1677     }
1678     else {
1679         CmiSetMsgSeq(msg, 0);
1680         control_msg_tmp =  construct_control_msg(size, msg, 1);
1681         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1682     }
1683 #else   //non-smp, smp(worker sending)
1684     if(size <= SMSG_MAX_MSG)
1685     {
1686         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1687             CmiFree(msg);
1688     }
1689     else if (size < BIG_MSG) {
1690         control_msg_tmp =  construct_control_msg(size, msg, 0);
1691         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1692     }
1693     else {
1694 #if     USE_LRTS_MEMPOOL
1695         CmiSetMsgSeq(msg, 0);
1696         control_msg_tmp =  construct_control_msg(size, msg, 1);
1697         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1698 #else
1699         control_msg_tmp =  construct_control_msg(size, msg, 0);
1700         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1701 #endif
1702     }
1703 #endif
1704     return 0;
1705 }
1706
1707 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1708 {
1709   int i;
1710 #if CMK_BROADCAST_USE_CMIREFERENCE
1711   for(i=0;i<npes;i++) {
1712     if (pes[i] == CmiMyPe())
1713       CmiSyncSend(pes[i], len, msg);
1714     else {
1715       CmiReference(msg);
1716       CmiSyncSendAndFree(pes[i], len, msg);
1717     }
1718   }
1719 #else
1720   for(i=0;i<npes;i++) {
1721     CmiSyncSend(pes[i], len, msg);
1722   }
1723 #endif
1724 }
1725
1726 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1727 {
1728   /* A better asynchronous implementation may be wanted, but at least it works */
1729   CmiSyncListSendFn(npes, pes, len, msg);
1730   return (CmiCommHandle) 0;
1731 }
1732
1733 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1734 {
1735   if (npes == 1) {
1736       CmiSyncSendAndFree(pes[0], len, msg);
1737       return;
1738   }
1739 #if CMK_PERSISTENT_COMM
1740   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE) {
1741       int i;
1742       for(i=0;i<npes;i++) {
1743         if (pes[i] == CmiMyPe())
1744           CmiSyncSend(pes[i], len, msg);
1745         else {
1746           CmiReference(msg);
1747           CmiSyncSendAndFree(pes[i], len, msg);
1748         }
1749       }
1750       CmiFree(msg);
1751       return;
1752   }
1753 #endif
1754   
1755 #if CMK_BROADCAST_USE_CMIREFERENCE
1756   CmiSyncListSendFn(npes, pes, len, msg);
1757   CmiFree(msg);
1758 #else
1759   int i;
1760   for(i=0;i<npes-1;i++) {
1761     CmiSyncSend(pes[i], len, msg);
1762   }
1763   if (npes>0)
1764     CmiSyncSendAndFree(pes[npes-1], len, msg);
1765   else 
1766     CmiFree(msg);
1767 #endif
1768 }
1769
1770 static void    PumpDatagramConnection();
1771 static      int         event_SetupConnect = 111;
1772 static      int         event_PumpSmsg = 222 ;
1773 static      int         event_PumpTransaction = 333;
1774 static      int         event_PumpRdmaTransaction = 444;
1775 static      int         event_SendBufferSmsg = 484;
1776 static      int         event_SendFmaRdmaMsg = 555;
1777 static      int         event_AdvanceCommunication = 666;
1778
1779 static void registerUserTraceEvents() {
1780 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1781     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1782     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1783     event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1784     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1785     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1786     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1787     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1788 #endif
1789 }
1790
1791 static void ProcessDeadlock()
1792 {
1793     static CmiUInt8 *ptr = NULL;
1794     static CmiUInt8  last = 0, mysum, sum;
1795     static int count = 0;
1796     gni_return_t status;
1797     int i;
1798
1799 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1800 //sweep_mempool(CpvAccess(mempool));
1801     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1802     mysum = smsg_send_count + smsg_recv_count;
1803     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1804     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1805     GNI_RC_CHECK("PMI_Allgather", status);
1806     sum = 0;
1807     for (i=0; i<mysize; i++)  sum+= ptr[i];
1808     if (last == 0 || sum == last) 
1809         count++;
1810     else
1811         count = 0;
1812     last = sum;
1813     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1814     if (count == 2) { 
1815         /* detected twice, it is a real deadlock */
1816         if (myrank == 0)  {
1817             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1818             CmiAbort("Fatal> Deadlock detected.");
1819         }
1820
1821     }
1822     _detected_hang = 0;
1823 }
1824
1825 static void CheckProgress()
1826 {
1827     if (smsg_send_count == last_smsg_send_count &&
1828         smsg_recv_count == last_smsg_recv_count ) 
1829     {
1830         _detected_hang = 1;
1831 #if !CMK_SMP
1832         if (_detected_hang) ProcessDeadlock();
1833 #endif
1834
1835     }
1836     else {
1837         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1838         last_smsg_send_count = smsg_send_count;
1839         last_smsg_recv_count = smsg_recv_count;
1840         _detected_hang = 0;
1841     }
1842 }
1843
1844 static void set_limit()
1845 {
1846     //if (!user_set_flag && CmiMyRank() == 0) {
1847     if (CmiMyRank() == 0) {
1848         int mynode = CmiPhysicalNodeID(CmiMyPe());
1849         int numpes = CmiNumPesOnPhysicalNode(mynode);
1850         int numprocesses = numpes / CmiMyNodeSize();
1851         MAX_REG_MEM  = _totalmem / numprocesses;
1852         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1853         if (CmiMyPe() == 0)
1854            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1855         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1856         {
1857              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1858              CmiAbort("memory registration\n");
1859         }
1860     }
1861 }
1862
1863 void LrtsPostCommonInit(int everReturn)
1864 {
1865 #if CMK_DIRECT
1866     CmiDirectInit();
1867 #endif
1868 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1869     CpvInitialize(double, projTraceStart);
1870     /* only PE 0 needs to care about registration (to generate sts file). */
1871     //if (CmiMyPe() == 0) 
1872     {
1873         registerMachineUserEventsFunction(&registerUserTraceEvents);
1874     }
1875 #endif
1876
1877 #if CMK_SMP
1878     CmiIdleState *s=CmiNotifyGetState();
1879     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1880     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1881 #else
1882     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1883     if (useDynamicSMSG)
1884     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1885 #endif
1886
1887 #if ! LARGEPAGE
1888     if (_checkProgress)
1889 #if CMK_SMP
1890     if (CmiMyRank() == 0)
1891 #endif
1892     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1893 #endif
1894  
1895 #if !LARGEPAGE
1896     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1897 #endif
1898 }
1899
1900 /* this is called by worker thread */
1901 void LrtsPostNonLocal(){
1902
1903 #if MULTI_THREAD_SEND
1904 #if CMK_WORKER_SINGLE_TASK
1905     if (CmiMyRank() % 5 == 1)
1906 #endif
1907     {
1908         traceEndIdle();
1909         PUMP_REMOTE_HIGHPRIORITY
1910         POST_HIGHPRIORITY_RDMA
1911         traceBeginIdle();
1912     }
1913 #endif
1914 #if 0
1915 #if CMK_SMP_TRACE_COMMTHREAD
1916     double startT, endT;
1917 #endif
1918
1919 #if MULTI_THREAD_SEND
1920     if(mysize == 1) return;
1921
1922 #if CMK_SMP_TRACE_COMMTHREAD
1923     traceEndIdle();
1924     startT = CmiWallTimer();
1925 #endif
1926
1927 #if CMK_WORKER_SINGLE_TASK
1928     if (CmiMyRank() % 6 == 0)
1929 #endif
1930     PumpNetworkSmsg();
1931
1932 #if CMK_WORKER_SINGLE_TASK
1933     if (CmiMyRank() % 6 == 1)
1934 #endif
1935     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1936
1937 #if CMK_WORKER_SINGLE_TASK
1938     if (CmiMyRank() % 6 == 2)
1939 #endif
1940     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1941
1942 #if REMOTE_EVENT
1943 #if CMK_WORKER_SINGLE_TASK
1944     if (CmiMyRank() % 6 == 3)
1945 #endif
1946     PumpRemoteTransactions(rdma_rx_cqh);         // rdma_rx_cqh
1947 #endif
1948
1949 #if CMK_WORKER_SINGLE_TASK
1950     if (CmiMyRank() % 6 == 4)
1951 #endif
1952     {
1953 #if CMK_USE_OOB
1954     SendBufferMsg(&smsg_oob_queue, NULL);
1955     SendBufferMsg(&smsg_queue, &smsg_oob_queue);
1956 #else
1957     SendBufferMsg(&smsg_queue, NULL);
1958 #endif
1959     }
1960
1961 #if CMK_WORKER_SINGLE_TASK
1962     if (CmiMyRank() % 6 == 5)
1963 #endif
1964 #if CMK_SMP
1965     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
1966 #else
1967     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
1968 #endif
1969
1970 #if CMK_SMP_TRACE_COMMTHREAD
1971     endT = CmiWallTimer();
1972     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1973     traceBeginIdle();
1974 #endif
1975
1976 #endif
1977 #endif
1978 }
1979
1980 /* useDynamicSMSG */
1981 static void    PumpDatagramConnection()
1982 {
1983     uint32_t          remote_address;
1984     uint32_t          remote_id;
1985     gni_return_t status;
1986     gni_post_state_t  post_state;
1987     uint64_t          datagram_id;
1988     int i;
1989
1990    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1991    {
1992        if (datagram_id >= mysize) {           /* bound endpoint */
1993            int pe = datagram_id - mysize;
1994            CMI_GNI_LOCK(global_gni_lock)
1995            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1996            CMI_GNI_UNLOCK(global_gni_lock)
1997            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1998            {
1999                CmiAssert(remote_id == pe);
2000                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2001                GNI_RC_CHECK("Dynamic SMSG Init", status);
2002 #if PRINT_SYH
2003                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2004 #endif
2005                CmiAssert(smsg_connected_flag[pe] == 1);
2006                smsg_connected_flag[pe] = 2;
2007            }
2008        }
2009        else {         /* unbound ep */
2010            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2011            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2012            {
2013                CmiAssert(remote_id<mysize);
2014                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2015                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2016                GNI_RC_CHECK("Dynamic SMSG Init", status);
2017 #if PRINT_SYH
2018                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2019 #endif
2020                smsg_connected_flag[remote_id] = 2;
2021
2022                alloc_smsg_attr(&send_smsg_attr);
2023                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2024                GNI_RC_CHECK("post unbound datagram", status);
2025            }
2026        }
2027    }
2028 }
2029
2030 /* pooling CQ to receive network message */
2031 static void PumpNetworkRdmaMsgs()
2032 {
2033     gni_cq_entry_t      event_data;
2034     gni_return_t        status;
2035
2036 }
2037
2038 inline 
2039 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2040 {
2041     RDMA_REQUEST        *rdma_request_msg;
2042     MallocRdmaRequest(rdma_request_msg);
2043     rdma_request_msg->destNode = inst_id;
2044     rdma_request_msg->pd = pd;
2045 #if REMOTE_EVENT
2046     rdma_request_msg->ack_index = ack_index;
2047 #endif
2048     PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2049 }
2050
2051 static void getLargeMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2052
2053 static void PumpNetworkSmsg()
2054 {
2055     uint64_t            inst_id;
2056     gni_cq_entry_t      event_data;
2057     gni_return_t        status;
2058     void                *header;
2059     uint8_t             msg_tag;
2060     int                 msg_nbytes;
2061     void                *msg_data;
2062     gni_mem_handle_t    msg_mem_hndl;
2063     gni_smsg_attr_t     *smsg_attr;
2064     gni_smsg_attr_t     *remote_smsg_attr;
2065     int                 init_flag;
2066     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2067     uint64_t            source_addr;
2068     SMSG_QUEUE         *queue = &smsg_queue;
2069     PCQueue             tmp_queue;
2070 #if  CMK_DIRECT
2071     cmidirectMsg        *direct_msg;
2072 #endif
2073 #if CMI_PUMPNETWORKSMSG_CAP 
2074     int                  recv_cnt = 0;
2075     while(recv_cnt< PumpNetworkSmsg_cap) {
2076 #else
2077     while(1) {
2078 #endif
2079         CMI_GNI_LOCK(smsg_rx_cq_lock)
2080         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2081         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2082         if(status != GNI_RC_SUCCESS) break;
2083
2084         inst_id = GNI_CQ_GET_INST_ID(event_data);
2085 #if REMOTE_EVENT
2086         inst_id = GET_RANK(inst_id);      /* important */
2087 #endif
2088         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2089 #if PRINT_SYH
2090         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2091 #endif
2092         if (useDynamicSMSG) {
2093             /* subtle: smsg may come before connection is setup */
2094             while (smsg_connected_flag[inst_id] != 2) 
2095                PumpDatagramConnection();
2096         }
2097         msg_tag = GNI_SMSG_ANY_TAG;
2098         while(1) {
2099             CMI_GNI_LOCK(smsg_mailbox_lock)
2100             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2101             if (status != GNI_RC_SUCCESS)
2102             {
2103                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2104                 break;
2105             }
2106 #if         CMI_PUMPNETWORKSMSG_CAP
2107             recv_cnt++; 
2108 #endif
2109 #if PRINT_SYH
2110             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2111 #endif
2112             /* copy msg out and then put into queue (small message) */
2113             switch (msg_tag) {
2114             case SMALL_DATA_TAG:
2115             {
2116                 START_EVENT();
2117                 msg_nbytes = CmiGetMsgSize(header);
2118                 msg_data    = CmiAlloc(msg_nbytes);
2119                 memcpy(msg_data, (char*)header, msg_nbytes);
2120                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2121                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2122                 TRACE_COMM_CREATION(EVENT_TIME(), msg_data);
2123                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2124                 handleOneRecvedMsg(msg_nbytes, msg_data);
2125                 break;
2126             }
2127             case LMSG_INIT_TAG:
2128             case LMSG_OOB_INIT_TAG:
2129             {
2130                 tmp_queue = (msg_tag == LMSG_INIT_TAG)? sendRdmaBuf : sendHighPriorBuf; 
2131 #if MULTI_THREAD_SEND
2132                 MallocControlMsg(control_msg_tmp);
2133                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2134                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2135                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2136                 getLargeMsgRequest(control_msg_tmp, inst_id, msg_tag, tmp_queue);
2137                 FreeControlMsg(control_msg_tmp);
2138 #else
2139                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2140                 getLargeMsgRequest(header, inst_id, msg_tag, tmp_queue);
2141                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2142 #endif
2143                 break;
2144             }
2145 #if !REMOTE_EVENT && !CQWRITE
2146             case ACK_TAG:   //msg fit into mempool
2147             {
2148                 /* Get is done, release message . Now put is not used yet*/
2149                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2150                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2151                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2152 #if ! USE_LRTS_MEMPOOL
2153                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2154 #else
2155                 DecreaseMsgInSend(msg);
2156 #endif
2157                 if(NoMsgInSend(msg))
2158                     buffered_send_msg -= GetMempoolsize(msg);
2159                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2160                 CmiFree(msg);
2161 #if CMI_EXERT_SEND_LARGE_CAP
2162                 SEND_large_pending--;
2163 #endif
2164                 break;
2165             }
2166 #endif
2167             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2168             {
2169 #if MULTI_THREAD_SEND
2170                 MallocControlMsg(header_tmp);
2171                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2172                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2173 #else
2174                 header_tmp = (CONTROL_MSG *) header;
2175 #endif
2176                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2177 #if CMI_EXERT_SEND_LARGE_CAP
2178                 SEND_large_pending--;
2179 #endif
2180                 void *msg = (void*)(header_tmp->source_addr);
2181                 int cur_seq = CmiGetMsgSeq(msg);
2182                 int offset = ONE_SEG*(cur_seq+1);
2183                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2184                 buffered_send_msg -= header_tmp->length;
2185                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2186                 if (remain_size < 0) remain_size = 0;
2187                 CmiSetMsgSize(msg, remain_size);
2188                 if(remain_size <= 0) //transaction done
2189                 {
2190                     CmiFree(msg);
2191                 }else if (header_tmp->total_length > offset)
2192                 {
2193                     CmiSetMsgSeq(msg, cur_seq+1);
2194                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2195                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2196                     //send next seg
2197                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2198                          // pipelining
2199                     if (header_tmp->seq_id == 1) {
2200                       int i;
2201                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2202                         int seq = cur_seq+i+2;
2203                         CmiSetMsgSeq(msg, seq-1);
2204                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2205                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2206                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2207                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2208                       }
2209                     }
2210                 }
2211 #if MULTI_THREAD_SEND
2212                 FreeControlMsg(header_tmp);
2213 #else
2214                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2215 #endif
2216                 break;
2217             }
2218 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2219             case PUT_DONE_TAG:  {   //persistent message
2220                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2221                 int size = ((CONTROL_MSG *) header)->length;
2222                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2223                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2224                 CmiReference(msg);
2225                 CMI_CHECK_CHECKSUM(msg, size);
2226                 handleOneRecvedMsg(size, msg); 
2227 #if PRINT_SYH
2228                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2229 #endif
2230                 break;
2231             }
2232 #endif
2233 #if CMK_DIRECT
2234             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2235                 //create a trigger message
2236                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2237                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2238                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2239                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2240                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2241                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2242                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2243                 break;
2244 #endif
2245             default:
2246                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2247                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2248                 printf("weird tag problem\n");
2249                 CmiAbort("Unknown tag\n");
2250             }               // end switch
2251 #if PRINT_SYH
2252             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2253 #endif
2254             smsg_recv_count ++;
2255             msg_tag = GNI_SMSG_ANY_TAG;
2256         } //endwhile GNI_SmsgGetNextWTag
2257     }   //end while GetEvent
2258     if(status == GNI_RC_ERROR_RESOURCE)
2259     {
2260         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2261         GNI_RC_CHECK("Smsg_rx_cq full", status);
2262     }
2263 }
2264
2265 static void printDesc(gni_post_descriptor_t *pd)
2266 {
2267     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2268 }
2269
2270 #if CQWRITE
2271 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2272 {
2273     gni_post_descriptor_t *pd;
2274     gni_return_t        status = GNI_RC_SUCCESS;
2275     
2276     MallocPostDesc(pd);
2277     pd->type = GNI_POST_CQWRITE;
2278     pd->cq_mode = GNI_CQMODE_SILENT;
2279     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2280     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2281     pd->cqwrite_value = data;
2282     pd->remote_mem_hndl = mem_hndl;
2283     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2284     GNI_RC_CHECK("GNI_PostCqWrite", status);
2285 }
2286 #endif
2287
2288 // register memory for a message
2289 // return mem handle
2290 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2291 {
2292     gni_return_t status = GNI_RC_SUCCESS;
2293
2294     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2295
2296 #if CMK_PERSISTENT_COMM
2297       // persistent message is always registered
2298       // BIG_MSG small pieces do not have malloc chunk header
2299     if (IS_PERSISTENT_MEMORY(msg)) {
2300         *memh = GetMemHndl(msg);
2301         return GNI_RC_SUCCESS;
2302     }
2303 #endif
2304     if(seqno == 0 
2305 #if CMK_PERSISTENT_COMM
2306          || seqno == PERSIST_SEQ
2307 #endif
2308       )
2309     {
2310         if(IsMemHndlZero((GetMemHndl(msg))))
2311         {
2312             msg = (void*)(msg);
2313             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2314             if(status == GNI_RC_SUCCESS)
2315                 *memh = GetMemHndl(msg);
2316         }
2317         else {
2318             *memh = GetMemHndl(msg);
2319         }
2320     }
2321     else {
2322         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2323         status = registerMemory(msg, size, memh, NULL); 
2324     }
2325     return status;
2326 }
2327
2328 // for BIG_MSG called on receiver side for receiving control message
2329 // LMSG_INIT_TAG
2330 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2331 {
2332 #if     USE_LRTS_MEMPOOL
2333     CONTROL_MSG         *request_msg;
2334     gni_return_t        status = GNI_RC_SUCCESS;
2335     void                *msg_data;
2336     gni_post_descriptor_t *pd;
2337     gni_mem_handle_t    msg_mem_hndl;
2338     int                 size, transaction_size, offset = 0;
2339     size_t              register_size = 0;
2340
2341     // initial a get to transfer data from the sender side */
2342     request_msg = (CONTROL_MSG *) header;
2343     size = request_msg->total_length;
2344     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2345     MallocPostDesc(pd);
2346 #if CMK_WITH_STATS 
2347     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2348 #endif
2349     if(request_msg->seq_id < 2)   {
2350 #if CMK_SMP_TRACE_COMMTHREAD 
2351         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2352 #endif
2353         msg_data = CmiAlloc(size);
2354         CmiSetMsgSeq(msg_data, 0);
2355         _MEMCHECK(msg_data);
2356     }
2357     else {
2358         offset = ONE_SEG*(request_msg->seq_id-1);
2359         msg_data = (char*)request_msg->dest_addr + offset;
2360     }
2361    
2362     pd->cqwrite_value = request_msg->seq_id;
2363
2364     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2365     SetMemHndlZero(pd->local_mem_hndl);
2366     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2367     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2368         if(NoMsgInRecv( (void*)(msg_data)))
2369             register_size = GetMempoolsize((void*)(msg_data));
2370     }
2371
2372     pd->first_operand = ALIGN64(size);                   //  total length
2373
2374     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2375         pd->type            = GNI_POST_FMA_GET;
2376     else
2377         pd->type            = GNI_POST_RDMA_GET;
2378     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2379     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2380     pd->length          = transaction_size;
2381     pd->local_addr      = (uint64_t) msg_data;
2382     pd->remote_addr     = request_msg->source_addr + offset;
2383     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2384
2385     if (tag == LMSG_OOB_INIT_TAG) 
2386         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2387     else
2388     {
2389 #if MULTI_THREAD_SEND
2390         pd->src_cq_hndl     = rdma_tx_cqh;
2391 #else
2392         pd->src_cq_hndl     = 0;
2393 #endif
2394     }
2395
2396     pd->rdma_mode       = 0;
2397     pd->amo_cmd         = 0;
2398 #if CMI_EXERT_RECV_RDMA_CAP
2399     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2400 #endif
2401     //memory registration success
2402     if(status == GNI_RC_SUCCESS && tag == LMSG_OOB_INIT_TAG )
2403     {
2404         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2405         CMI_GNI_LOCK(lock)
2406 #if REMOTE_EVENT
2407         if( request_msg->seq_id == 0)
2408         {
2409             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2410             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2411             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2412         }
2413 #endif
2414
2415 #if CMK_WITH_STATS
2416         RDMA_TRY_SEND(pd->type)
2417 #endif
2418         if(pd->type == GNI_POST_RDMA_GET) 
2419         {
2420             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2421         }
2422         else
2423         {
2424             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2425         }
2426         CMI_GNI_UNLOCK(lock)
2427
2428         if(status == GNI_RC_SUCCESS )
2429         {
2430 #if CMI_EXERT_RECV_RDMA_CAP
2431             RDMA_pending++;
2432 #endif
2433             if(pd->cqwrite_value == 0)
2434             {
2435 #if MACHINE_DEBUG_LOG
2436                 buffered_recv_msg += register_size;
2437                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2438 #endif
2439                 IncreaseMsgInRecv(msg_data);
2440 #if CMK_SMP_TRACE_COMMTHREAD 
2441                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2442 #endif
2443             }
2444 #if  CMK_WITH_STATS
2445             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2446             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2447 #endif
2448         }
2449     }else
2450     {
2451         SetMemHndlZero((pd->local_mem_hndl));
2452     }
2453         if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM || tag != LMSG_OOB_INIT_TAG)
2454     {
2455 #if REMOTE_EVENT
2456         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2457 #else
2458         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2459 #endif
2460     }else if (status != GNI_RC_SUCCESS) {
2461         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2462         GNI_RC_CHECK("GetLargeAFter posting", status);
2463     }
2464 #else
2465     CONTROL_MSG         *request_msg;
2466     gni_return_t        status;
2467     void                *msg_data;
2468     gni_post_descriptor_t *pd;
2469     RDMA_REQUEST        *rdma_request_msg;
2470     gni_mem_handle_t    msg_mem_hndl;
2471     //int source;
2472     // initial a get to transfer data from the sender side */
2473     request_msg = (CONTROL_MSG *) header;
2474     msg_data = CmiAlloc(request_msg->length);
2475     _MEMCHECK(msg_data);
2476
2477     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2478
2479     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2480     {
2481         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2482     }
2483
2484     MallocPostDesc(pd);
2485     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2486         pd->type            = GNI_POST_FMA_GET;
2487     else
2488         pd->type            = GNI_POST_RDMA_GET;
2489     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2490     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2491     pd->length          = ALIGN64(request_msg->length);
2492     pd->local_addr      = (uint64_t) msg_data;
2493     pd->remote_addr     = request_msg->source_addr;
2494     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2495     if (tag == LMSG_OOB_INIT_TAG) 
2496         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2497     else
2498     {
2499 #if MULTI_THREAD_SEND
2500         pd->src_cq_hndl     = rdma_tx_cqh;
2501 #else
2502         pd->src_cq_hndl     = 0;
2503 #endif
2504     }
2505     pd->rdma_mode       = 0;
2506     pd->amo_cmd         = 0;
2507
2508     //memory registration successful
2509     if(status == GNI_RC_SUCCESS)
2510     {
2511         pd->local_mem_hndl  = msg_mem_hndl;
2512        
2513         if(pd->type == GNI_POST_RDMA_GET) 
2514         {
2515             CMI_GNI_LOCK(rdma_tx_cq_lock)
2516             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2517             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2518         }
2519         else
2520         {
2521             CMI_GNI_LOCK(default_tx_cq_lock)
2522             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2523             CMI_GNI_UNLOCK(default_tx_cq_lock)
2524         }
2525
2526     }else
2527     {
2528         SetMemHndlZero(pd->local_mem_hndl);
2529     }
2530     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2531     {
2532         MallocRdmaRequest(rdma_request_msg);
2533         rdma_request_msg->next = 0;
2534         rdma_request_msg->destNode = inst_id;
2535         rdma_request_msg->pd = pd;
2536         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2537     }else {
2538         GNI_RC_CHECK("AFter posting", status);
2539     }
2540 #endif
2541 }
2542
2543 #if CQWRITE
2544 static void PumpCqWriteTransactions()
2545 {
2546
2547     gni_cq_entry_t          ev;
2548     gni_return_t            status;
2549     void                    *msg;  
2550     int                     msg_size;
2551     while(1) {
2552         //CMI_GNI_LOCK(my_cq_lock) 
2553         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2554         //CMI_GNI_UNLOCK(my_cq_lock)
2555         if(status != GNI_RC_SUCCESS) break;
2556         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2557 #if CMK_PERSISTENT_COMM
2558 #if PRINT_SYH
2559         printf(" %d CQ write event %p\n", myrank, msg);
2560 #endif
2561         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2562 #if PRINT_SYH
2563             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2564 #endif
2565             CmiReference(msg);
2566             msg_size = CmiGetMsgSize(msg);
2567             CMI_CHECK_CHECKSUM(msg, msg_size);
2568             handleOneRecvedMsg(msg_size, msg); 
2569             continue;
2570         }
2571 #endif
2572 #if ! USE_LRTS_MEMPOOL
2573        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2574 #else
2575         DecreaseMsgInSend(msg);
2576 #endif
2577         if(NoMsgInSend(msg))
2578             buffered_send_msg -= GetMempoolsize(msg);
2579         CmiFree(msg);
2580     };
2581     if(status == GNI_RC_ERROR_RESOURCE)
2582     {
2583         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2584     }
2585 }
2586 #endif
2587
2588 #if REMOTE_EVENT
2589 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2590 {
2591     gni_cq_entry_t          ev;
2592     gni_return_t            status;
2593     void                    *msg;   
2594     int                     inst_id, index, type, size;
2595
2596     while(1) {
2597         CMI_GNI_LOCK(global_gni_lock)
2598 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
2599         status = GNI_CqGetEvent(rx_cqh, &ev);
2600 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2601         CMI_GNI_UNLOCK(global_gni_lock)
2602
2603         if(status != GNI_RC_SUCCESS) break;
2604
2605         inst_id = GNI_CQ_GET_INST_ID(ev);
2606         index = GET_INDEX(inst_id);
2607         type = GET_TYPE(inst_id);
2608         switch (type) {
2609         case 0:    // ACK
2610             CmiAssert(index>=0 && index<ackPool.size);
2611             CMI_GNI_LOCK(ackPool.lock);
2612             CmiAssert(GetIndexType(ackPool, index) == 1);
2613             msg = GetIndexAddress(ackPool, index);
2614             CMI_GNI_UNLOCK(ackPool.lock);
2615 #if PRINT_SYH
2616             printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2617 #endif
2618 #if ! USE_LRTS_MEMPOOL
2619            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2620 #else
2621             DecreaseMsgInSend(msg);
2622 #endif
2623             if(NoMsgInSend(msg))
2624                 buffered_send_msg -= GetMempoolsize(msg);
2625             CmiFree(msg);
2626             IndexPool_freeslot(&ackPool, index);
2627 #if CMI_EXERT_SEND_LARGE_CAP
2628             SEND_large_pending--;
2629 #endif
2630             break;
2631 #if CMK_PERSISTENT_COMM
2632         case 1:  {    // PERSISTENT
2633             CmiLock(persistPool.lock);
2634             CmiAssert(GetIndexType(persistPool, index) == 2);
2635             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2636             CmiUnlock(persistPool.lock);
2637             START_EVENT();
2638             msg = slot->destBuf[0].destAddress;
2639             size = CmiGetMsgSize(msg);
2640             CmiReference(msg);
2641             CMI_CHECK_CHECKSUM(msg, size);
2642             TRACE_COMM_CREATION(EVENT_TIME(), msg);
2643             handleOneRecvedMsg(size, msg); 
2644             break;
2645             }
2646 #endif
2647         default:
2648             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2649             CmiAbort("PumpRemoteTransactions: unknown type");
2650         }
2651     }
2652     if(status == GNI_RC_ERROR_RESOURCE)
2653     {
2654         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2655     }
2656 }
2657 #endif
2658
2659 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2660 {
2661     gni_cq_entry_t          ev;
2662     gni_return_t            status;
2663     uint64_t                type, inst_id;
2664     gni_post_descriptor_t   *tmp_pd;
2665     MSG_LIST                *ptr;
2666     CONTROL_MSG             *ack_msg_tmp;
2667     ACK_MSG                 *ack_msg;
2668     uint8_t                 msg_tag;
2669 #if CMK_DIRECT
2670     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2671 #endif
2672     SMSG_QUEUE         *queue = &smsg_queue;
2673
2674     while(1) {
2675         CMI_GNI_LOCK(my_cq_lock) 
2676         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2677         CMI_GNI_UNLOCK(my_cq_lock)
2678         if(status != GNI_RC_SUCCESS) break;
2679         
2680         type = GNI_CQ_GET_TYPE(ev);
2681         if (type == GNI_CQ_EVENT_TYPE_POST)
2682         {
2683
2684 #if CMI_EXERT_RECV_RDMA_CAP
2685             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2686             RDMA_pending--;
2687 #endif
2688             inst_id     = GNI_CQ_GET_INST_ID(ev);
2689 #if PRINT_SYH
2690             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2691 #endif
2692             CMI_GNI_LOCK(my_cq_lock)
2693             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2694             CMI_GNI_UNLOCK(my_cq_lock)
2695
2696             switch (tmp_pd->type) {
2697 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2698             case GNI_POST_RDMA_PUT:
2699 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2700                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2701 #endif
2702             case GNI_POST_FMA_PUT:
2703                 if(tmp_pd->amo_cmd == 1) {
2704 #if CMK_DIRECT
2705                     //sender ACK to receiver to trigger it is done
2706                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2707                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2708                     msg_tag = DIRECT_PUT_DONE_TAG;
2709 #endif
2710                 }
2711                 else {
2712                     CmiFree((void *)tmp_pd->local_addr);
2713 #if REMOTE_EVENT
2714                     FreePostDesc(tmp_pd);
2715                     continue;
2716 #elif CQWRITE
2717                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2718                     FreePostDesc(tmp_pd);
2719                     continue;
2720 #else
2721                     MallocControlMsg(ack_msg_tmp);
2722                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2723                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2724                     ack_msg_tmp->length  = tmp_pd->length;
2725                     msg_tag = PUT_DONE_TAG;
2726 #endif
2727                 }
2728                 break;
2729 #endif
2730             case GNI_POST_RDMA_GET:
2731             case GNI_POST_FMA_GET:  {
2732 #if  ! USE_LRTS_MEMPOOL
2733                 MallocControlMsg(ack_msg_tmp);
2734                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2735                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2736                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2737                 msg_tag = ACK_TAG;  
2738 #else
2739 #if CMK_WITH_STATS
2740                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2741 #endif
2742                 int seq_id = tmp_pd->cqwrite_value;
2743                 if(seq_id > 0)      // BIG_MSG
2744                 {
2745                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2746                     MallocControlMsg(ack_msg_tmp);
2747                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2748                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2749                     ack_msg_tmp->seq_id = seq_id;
2750                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2751                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2752                     ack_msg_tmp->length = tmp_pd->length;
2753                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2754                     msg_tag = BIG_MSG_TAG; 
2755                 } 
2756                 else
2757                 {
2758                     msg_tag = ACK_TAG; 
2759 #if  !REMOTE_EVENT && !CQWRITE
2760                     MallocAckMsg(ack_msg);
2761                     ack_msg->source_addr = tmp_pd->remote_addr;
2762 #endif
2763                 }
2764 #endif
2765                 break;
2766             }
2767             case  GNI_POST_CQWRITE:
2768                    FreePostDesc(tmp_pd);
2769                    continue;
2770             default:
2771                 CmiPrintf("type=%d\n", tmp_pd->type);
2772                 CmiAbort("PumpLocalTransactions: unknown type!");
2773             }      /* end of switch */
2774
2775 #if CMK_DIRECT
2776             if (tmp_pd->amo_cmd == 1) {
2777                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2778                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2779             }
2780             else
2781 #endif
2782             if (msg_tag == ACK_TAG) {
2783 #if !REMOTE_EVENT
2784 #if   !CQWRITE
2785                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2786                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2787 #else
2788                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2789 #endif
2790 #endif
2791             }
2792             else {
2793                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2794                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2795             }
2796 #if CMK_PERSISTENT_COMM
2797             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2798 #endif
2799             {
2800                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2801 #if PRINT_SYH
2802                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2803 #endif
2804                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2805                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2806
2807                     START_EVENT();
2808                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2809                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2810 #if MACHINE_DEBUG_LOG
2811                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2812                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2813                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2814 #endif
2815                     TRACE_COMM_CREATION(EVENT_TIME(), (void*)tmp_pd->local_addr);
2816                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2817                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2818                 }else if(msg_tag == BIG_MSG_TAG){
2819                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2820                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2821                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2822                         START_EVENT();
2823 #if PRINT_SYH
2824                         printf("Pipeline msg done [%d]\n", myrank);
2825 #endif
2826 #if     CMK_SMP_TRACE_COMMTHREAD
2827                         if( tmp_pd->cqwrite_value == 1)
2828                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2829 #endif
2830                         TRACE_COMM_CREATION(EVENT_TIME(), msg);
2831                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2832                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2833                     }
2834                 }
2835             }
2836             FreePostDesc(tmp_pd);
2837         }
2838     } //end while
2839     if(status == GNI_RC_ERROR_RESOURCE)
2840     {
2841         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2842         GNI_RC_CHECK("Smsg_tx_cq full", status);
2843     }
2844 }
2845
2846 static void  SendRdmaMsg( BufferList sendqueue)
2847 {
2848     gni_return_t            status = GNI_RC_SUCCESS;
2849     gni_mem_handle_t        msg_mem_hndl;
2850     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2851     RDMA_REQUEST            *pre = 0;
2852     uint64_t                register_size = 0;
2853     void                    *msg;
2854     int                     i;
2855
2856     int len = PCQueueLength(sendqueue);
2857     for (i=0; i<len; i++)
2858     {
2859 #if CMI_EXERT_RECV_RDMA_CAP
2860         if( RDMA_pending >= RDMA_cap) break;
2861 #endif
2862         CMI_PCQUEUEPOP_LOCK( sendqueue)
2863         ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
2864         CMI_PCQUEUEPOP_UNLOCK( sendqueue)
2865         if (ptr == NULL) break;
2866         
2867         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2868         gni_post_descriptor_t *pd = ptr->pd;
2869         
2870         msg = (void*)(pd->local_addr);
2871         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2872         register_size = 0;
2873         if(pd->cqwrite_value == 0) {
2874             if(NoMsgInRecv(msg))
2875                 register_size = GetMempoolsize(msg);
2876         }
2877
2878         if(status == GNI_RC_SUCCESS)        //mem register good
2879         {
2880             int destNode = ptr->destNode;
2881             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2882             CMI_GNI_LOCK(lock);
2883 #if REMOTE_EVENT
2884             if( pd->cqwrite_value == 0) {
2885                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2886                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
2887                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2888             }
2889 #if CMK_PERSISTENT_COMM
2890             else if (pd->cqwrite_value == PERSIST_SEQ) {
2891                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2892                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
2893                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2894             }
2895 #endif
2896 #endif
2897 #if CMK_WITH_STATS
2898             RDMA_TRY_SEND(pd->type)
2899 #endif
2900 #if CMK_SMP_TRACE_COMMTHREAD
2901             if(IS_PUT(pd->type))
2902             {
2903                  START_EVENT();
2904                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
2905             }
2906 #endif
2907
2908             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2909             {
2910                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
2911             }
2912             else
2913             {
2914                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
2915             }
2916             CMI_GNI_UNLOCK(lock);
2917             
2918             if(status == GNI_RC_SUCCESS)    //post good
2919             {
2920 #if CMI_EXERT_RECV_RDMA_CAP
2921                 RDMA_pending ++;
2922 #endif
2923                 if(pd->cqwrite_value == 0)
2924                 {
2925 #if CMK_SMP_TRACE_COMMTHREAD 
2926                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2927 #endif
2928                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2929                 }
2930 #if  CMK_WITH_STATS
2931                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2932                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2933 #endif
2934 #if MACHINE_DEBUG_LOG
2935                 buffered_recv_msg += register_size;
2936                 MACHSTATE(8, "GO request from buffered\n"); 
2937 #endif
2938 #if PRINT_SYH
2939                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
2940 #endif
2941             }else           // cannot post
2942             {
2943                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2944 #if PRINT_SYH
2945                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
2946 #endif
2947                 break;
2948             }
2949         } else          //memory registration fails
2950         {
2951             PCQueuePush(sendqueue, (char*)ptr);
2952         }
2953     } //end while
2954 }
2955
2956 static 
2957 inline gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
2958 {
2959     CONTROL_MSG         *control_msg_tmp;
2960     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
2961
2962     MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2963     if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {   
2964             /* connection not exists yet */
2965 #if CMK_SMP
2966             /* non-smp case, connect is issued in send_smsg_message */
2967         if (smsg_connected_flag[ptr->destNode] == 0)
2968             connect_to(ptr->destNode); 
2969 #endif
2970     }
2971     else
2972     switch(ptr->tag)
2973     {
2974     case SMALL_DATA_TAG:
2975         status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2976         if(status == GNI_RC_SUCCESS)
2977         {
2978             CmiFree(ptr->msg);
2979         }
2980         break;
2981     case LMSG_INIT_TAG:
2982     case LMSG_OOB_INIT_TAG:
2983         control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2984         status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr, ptr->tag);
2985         break;
2986 #if !REMOTE_EVENT && !CQWRITE
2987     case ACK_TAG:
2988         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2989         if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2990         break;
2991 #endif
2992     case BIG_MSG_TAG:
2993         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2994         if(status == GNI_RC_SUCCESS)
2995         {
2996             FreeControlMsg((CONTROL_MSG*)ptr->msg);
2997         }
2998         break;
2999 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3000     case PUT_DONE_TAG:
3001         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3002         if(status == GNI_RC_SUCCESS)
3003         {
3004             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3005         }
3006         break;
3007 #endif
3008 #if CMK_DIRECT
3009     case DIRECT_PUT_DONE_TAG:
3010         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3011         if(status == GNI_RC_SUCCESS)
3012         {
3013             free((CMK_DIRECT_HEADER*)ptr->msg);
3014         }
3015         break;
3016 #endif
3017     default:
3018         printf("Weird tag\n");
3019         CmiAbort("should not happen\n");
3020     }       // end switch
3021     return status;
3022 }
3023
3024 // return 1 if all messages are sent
3025
3026 #if ONE_SEND_QUEUE
3027
3028 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3029 {
3030     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3031     CONTROL_MSG         *control_msg_tmp;
3032     gni_return_t        status;
3033     int                 done = 1;
3034     uint64_t            register_size;
3035     void                *register_addr;
3036     int                 index_previous = -1;
3037 #if     CMI_SENDBUFFERSMSG_CAP
3038     int                 sent_length = 0;
3039 #endif
3040     int          index = 0;
3041     memset(destpe_avail, 0, mysize * sizeof(char));
3042     for (index=0; index<1; index++)
3043     {
3044         int i, len = PCQueueLength(queue->sendMsgBuf);
3045         for (i=0; i<len; i++) 
3046         {
3047             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3048             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3049             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3050             if(ptr == NULL) break;
3051             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3052                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3053                 continue;
3054             }
3055             status = _sendOneBufferedSmsg(queue, ptr);
3056 #if CMI_SENDBUFFERSMSG_CAP
3057             sent_length++;
3058 #endif
3059             if(status == GNI_RC_SUCCESS)
3060             {
3061 #if PRINT_SYH
3062                 buffered_smsg_counter--;
3063                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3064 #endif
3065                 FreeMsgList(ptr);
3066             }else {
3067                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3068                 done = 0;
3069                 if(status == GNI_RC_ERROR_RESOURCE)
3070                 {
3071                     destpe_avail[ptr->destNode] = 1;
3072                 }
3073             } 
3074         } //end while
3075     }   // end pooling for all cores
3076     return done;
3077 }
3078
3079 #else   /*  ! ONE_SEND_QUEUE  */
3080
3081 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3082 {
3083     MSG_LIST            *ptr;
3084     gni_return_t        status;
3085     int                 done = 1;
3086 #if     CMI_SENDBUFFERSMSG_CAP
3087     int                 sent_length = 0;
3088 #endif
3089     int idx;
3090 #if SMP_LOCKS
3091     int          index = -1;
3092     int nonempty = PCQueueLength(queue->nonEmptyQueues);
3093     for(idx =0; idx<nonempty; idx++) 
3094     {
3095         index++;  if (index >= nonempty) index = 0;
3096 #if CMI_SENDBUFFERSMSG_CAP
3097         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3098 #endif
3099         CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3100         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3101         CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3102         if(current_list == NULL) break; 
3103         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3104             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3105             continue;
3106         }
3107         PCQueue current_queue= current_list->sendSmsgBuf;
3108         CmiLock(current_list->lock);
3109         int i, len = PCQueueLength(current_queue);
3110         current_list->pushed = 0;
3111         CmiUnlock(current_list->lock);
3112 #else      /* ! SMP_LOCKS */
3113     static int          index = -1;
3114     for(idx =0; idx<mysize; idx++) 
3115     {
3116         index++;  if (index == mysize) index = 0;
3117 #if CMI_SENDBUFFERSMSG_CAP
3118         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3119 #endif
3120         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue;             // check urgent queue
3121         //if (index == myrank) continue;
3122         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3123         int i, len = PCQueueLength(current_queue);
3124 #endif
3125         for (i=0; i<len; i++)  {
3126             CMI_PCQUEUEPOP_LOCK(current_queue)
3127             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3128             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3129             if (ptr == 0) break;
3130
3131             status = _sendOneBufferedSmsg(queue, ptr);
3132 #if CMI_SENDBUFFERSMSG_CAP
3133             sent_length++;
3134 #endif
3135             if(status == GNI_RC_SUCCESS)
3136             {
3137 #if PRINT_SYH
3138                 buffered_smsg_counter--;
3139                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3140 #endif
3141                 FreeMsgList(ptr);
3142             }else {
3143                 PCQueuePush(current_queue, (char*)ptr);
3144                 done = 0;
3145                 if(status == GNI_RC_ERROR_RESOURCE)
3146                 {
3147                     break;
3148                 }
3149             } 
3150         } //end for i
3151 #if SMP_LOCKS
3152         CmiLock(current_list->lock);
3153         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3154         {
3155             current_list->pushed = 1;
3156             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3157         }
3158         CmiUnlock(current_list->lock); 
3159 #endif
3160     }   // end pooling for all cores
3161     return done;
3162 }
3163
3164 #endif
3165
3166 static void ProcessDeadlock();
3167 void LrtsAdvanceCommunication(int whileidle)
3168 {
3169     static int count = 0;
3170     /*  Receive Msg first */
3171 #if CMK_SMP_TRACE_COMMTHREAD
3172     double startT, endT;
3173 #endif
3174     if (useDynamicSMSG && whileidle)
3175     {
3176 #if CMK_SMP_TRACE_COMMTHREAD
3177         startT = CmiWallTimer();
3178 #endif
3179         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3180 #if CMK_SMP_TRACE_COMMTHREAD
3181         endT = CmiWallTimer();
3182         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3183 #endif
3184     }
3185
3186     SEND_OOB_SMSG(smsg_oob_queue)
3187     PUMP_REMOTE_HIGHPRIORITY
3188     PUMP_LOCAL_HIGHPRIORITY
3189     POST_HIGHPRIORITY_RDMA
3190     // Receiving small messages and persistent
3191 #if CMK_SMP_TRACE_COMMTHREAD
3192     startT = CmiWallTimer();
3193 #endif
3194     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3195 #if CMK_SMP_TRACE_COMMTHREAD
3196     endT = CmiWallTimer();
3197     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3198 #endif
3199
3200     ///* Send buffered Message */
3201 #if CMK_SMP_TRACE_COMMTHREAD
3202     startT = CmiWallTimer();
3203 #endif
3204 #if CMK_USE_OOB
3205     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3206 #else
3207     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3208 #endif
3209 #if CMK_SMP_TRACE_COMMTHREAD
3210     endT = CmiWallTimer();
3211     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3212 #endif
3213
3214     SEND_OOB_SMSG(smsg_oob_queue)
3215     PUMP_REMOTE_HIGHPRIORITY
3216     PUMP_LOCAL_HIGHPRIORITY
3217     POST_HIGHPRIORITY_RDMA
3218
3219     //Pump Get messages or PUT messages
3220 #if CMK_SMP_TRACE_COMMTHREAD
3221     startT = CmiWallTimer();
3222 #endif
3223     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3224 #if MULTI_THREAD_SEND
3225     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3226 #endif
3227 #if CMK_SMP_TRACE_COMMTHREAD
3228     endT = CmiWallTimer();
3229     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3230 #endif
3231     
3232     SEND_OOB_SMSG(smsg_oob_queue)
3233     PUMP_REMOTE_HIGHPRIORITY
3234     PUMP_LOCAL_HIGHPRIORITY
3235     POST_HIGHPRIORITY_RDMA
3236     //Pump Remote event
3237 #if CMK_SMP_TRACE_COMMTHREAD
3238     startT = CmiWallTimer();
3239 #endif
3240 #if CQWRITE
3241     PumpCqWriteTransactions();
3242 #endif
3243 #if REMOTE_EVENT
3244     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3245 #endif
3246 #if CMK_SMP_TRACE_COMMTHREAD
3247     endT = CmiWallTimer();
3248     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3249 #endif
3250
3251     SEND_OOB_SMSG(smsg_oob_queue)
3252     PUMP_REMOTE_HIGHPRIORITY
3253     PUMP_LOCAL_HIGHPRIORITY
3254     POST_HIGHPRIORITY_RDMA
3255
3256 #if CMK_SMP_TRACE_COMMTHREAD
3257     startT = CmiWallTimer();
3258 #endif
3259     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3260 #if CMK_SMP_TRACE_COMMTHREAD
3261     endT = CmiWallTimer();
3262     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3263 #endif
3264
3265 #if CMK_SMP && ! LARGEPAGE
3266     if (_detected_hang)  ProcessDeadlock();
3267 #endif
3268 }
3269
3270 static void set_smsg_max()
3271 {
3272     char *env;
3273
3274     if(mysize <=512)
3275     {
3276         SMSG_MAX_MSG = 1024;
3277     }else if (mysize <= 4096)
3278     {
3279         SMSG_MAX_MSG = 1024;
3280     }else if (mysize <= 16384)
3281     {
3282         SMSG_MAX_MSG = 512;
3283     }else {
3284         SMSG_MAX_MSG = 256;
3285     }
3286
3287     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3288     if (env) SMSG_MAX_MSG = atoi(env);
3289     CmiAssert(SMSG_MAX_MSG > 0);
3290 }    
3291
3292 /* useDynamicSMSG */
3293 static void _init_dynamic_smsg()
3294 {
3295     gni_return_t status;
3296     uint32_t     vmdh_index = -1;
3297     int i;
3298
3299     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3300     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3301     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3302     for(i=0; i<mysize; i++) {
3303         smsg_connected_flag[i] = 0;
3304         smsg_attr_vector_local[i] = NULL;
3305         smsg_attr_vector_remote[i] = NULL;
3306     }
3307
3308     set_smsg_max();
3309
3310     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3311     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3312     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3313     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3314     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3315
3316     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3317     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3318     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3319     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3320     mailbox_list->offset = 0;
3321     mailbox_list->next = 0;
3322     
3323     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3324         mailbox_list->size, smsg_rx_cqh,
3325         GNI_MEM_READWRITE,   
3326         vmdh_index,
3327         &(mailbox_list->mem_hndl));
3328     GNI_RC_CHECK("MEMORY registration for smsg", status);
3329
3330     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3331     GNI_RC_CHECK("Unbound EP", status);
3332     
3333     alloc_smsg_attr(&send_smsg_attr);
3334
3335     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3336     GNI_RC_CHECK("post unbound datagram", status);
3337
3338       /* always pre-connect to proc 0 */
3339     //if (myrank != 0) connect_to(0);
3340
3341     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3342     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3343 }
3344
3345 static void _init_static_smsg()
3346 {
3347     gni_smsg_attr_t      *smsg_attr;
3348     gni_smsg_attr_t      remote_smsg_attr;
3349     gni_smsg_attr_t      *smsg_attr_vec;
3350     gni_mem_handle_t     my_smsg_mdh_mailbox;
3351     int      ret, i;
3352     gni_return_t status;
3353     uint32_t              vmdh_index = -1;
3354     mdh_addr_t            base_infor;
3355     mdh_addr_t            *base_addr_vec;
3356
3357     set_smsg_max();
3358     
3359     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3360     
3361     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3362     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3363     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3364     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3365     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3366     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3367     CmiAssert(ret == 0);
3368     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3369     
3370     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3371             smsg_memlen*(mysize), smsg_rx_cqh,
3372             GNI_MEM_READWRITE,   
3373             vmdh_index,
3374             &my_smsg_mdh_mailbox);
3375     register_memory_size += smsg_memlen*(mysize);
3376     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3377
3378     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3379     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3380
3381     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3382     base_infor.mdh =  my_smsg_mdh_mailbox;
3383     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3384
3385     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3386  
3387     for(i=0; i<mysize; i++)
3388     {
3389         if(i==myrank)
3390             continue;
3391         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3392         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3393         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3394         smsg_attr[i].mbox_offset = i*smsg_memlen;
3395         smsg_attr[i].buff_size = smsg_memlen;
3396         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3397         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3398     }
3399
3400     for(i=0; i<mysize; i++)
3401     {
3402         if (myrank == i) continue;
3403
3404         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3405         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3406         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3407         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3408         remote_smsg_attr.buff_size = smsg_memlen;
3409         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3410         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3411
3412         /* initialize the smsg channel */
3413         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3414         GNI_RC_CHECK("SMSG Init", status);
3415     } //end initialization
3416
3417     free(base_addr_vec);
3418     free(smsg_attr);
3419
3420     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3421     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3422
3423
3424 inline
3425 static void _init_send_queue(SMSG_QUEUE *queue)
3426 {
3427      int i;
3428 #if ONE_SEND_QUEUE
3429      queue->sendMsgBuf = PCQueueCreate();
3430      destpe_avail = (char*)malloc(mysize * sizeof(char));
3431 #else
3432      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3433 #if SMP_LOCKS
3434      queue->nonEmptyQueues = PCQueueCreate();
3435 #endif
3436      for(i =0; i<mysize; i++)
3437      {
3438          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3439 #if SMP_LOCKS
3440          queue->smsg_msglist_index[i].pushed = 0;
3441          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3442          queue->smsg_msglist_index[i].destpe = i;
3443 #endif
3444      }
3445 #endif
3446 }
3447
3448 inline
3449 static void _init_smsg()
3450 {
3451     if(mysize > 1) {
3452         if (useDynamicSMSG)
3453             _init_dynamic_smsg();
3454         else
3455             _init_static_smsg();
3456     }
3457
3458     _init_send_queue(&smsg_queue);
3459 #if CMK_USE_OOB
3460     _init_send_queue(&smsg_oob_queue);
3461 #endif
3462 }