fix uGNI machine.c send_smsg_message function call error
[charm.git] / src / arch / gni / machine.c
1
2 /** @file
3  * GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #define DIRECT_SEQ 0xFFFFFFE 
48 #include "cmidirect.h"
49 #endif
50
51 #if REGULARPAGE 
52 #define     LARGEPAGE              0
53 #else
54 #define     LARGEPAGE              1
55 #endif
56
57 #if CMK_SMP
58 #define MULTI_THREAD_SEND          0
59 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
60 #endif
61
62 #if MULTI_THREAD_SEND
63 #define CMK_WORKER_SINGLE_TASK     1
64 #endif
65
66 #define REMOTE_EVENT               1
67 #define CQWRITE                    0
68
69 #define CMI_EXERT_SEND_LARGE_CAP   0
70 #define CMI_EXERT_RECV_RDMA_CAP    0
71
72
73 #define CMI_SENDBUFFERSMSG_CAP            0
74 #define CMI_PUMPNETWORKSMSG_CAP           0
75 #define CMI_PUMPREMOTETRANSACTIONS_CAP    0
76 #define CMI_PUMPLOCALTRANSACTIONS_CAP     0
77
78 #if CMI_SENDBUFFERSMSG_CAP
79 int     SendBufferMsg_cap  = 20;
80 #endif
81
82 #if CMI_PUMPNETWORKSMSG_CAP
83 int     PumpNetworkSmsg_cap = 20;
84 #endif
85
86 #if CMI_PUMPREMOTETRANSACTIONS_CAP
87 int     PumpRemoteTransactions_cap = 20;
88 #endif
89
90 #if CMI_PUMPREMOTETRANSACTIONS_CAP
91 int     PumpLocalTransactions_cap = 15;
92 #endif
93
94 #if CMI_EXERT_SEND_LARGE_CAP
95 static int SEND_large_cap = 20;
96 static int SEND_large_pending = 0;
97 #endif
98
99 #if CMI_EXERT_RECV_RDMA_CAP
100 static int   RDMA_cap =   10;
101 static int   RDMA_pending = 0;
102 #endif
103
104 #define USE_LRTS_MEMPOOL                  1
105
106 #define PRINT_SYH                         0
107
108 // Trace communication thread
109 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
110 #define TRACE_THRESHOLD     0.00001
111 #define CMI_MPI_TRACE_MOREDETAILED 0
112 #undef CMI_MPI_TRACE_USEREVENTS
113 #define CMI_MPI_TRACE_USEREVENTS 1
114 #else
115 #undef CMK_SMP_TRACE_COMMTHREAD
116 #define CMK_SMP_TRACE_COMMTHREAD 0
117 #endif
118
119 #define CMK_TRACE_COMMOVERHEAD 0
120 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
121 #undef CMI_MPI_TRACE_USEREVENTS
122 #define CMI_MPI_TRACE_USEREVENTS 1
123 #else
124 #undef CMK_TRACE_COMMOVERHEAD
125 #define CMK_TRACE_COMMOVERHEAD 0
126 #endif
127
128 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
129 CpvStaticDeclare(double, projTraceStart);
130 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
131 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
132 #define  EVENT_TIME()   CpvAccess(projTraceStart)
133 #else
134 #define  START_EVENT()
135 #define  END_EVENT(x)
136 #define  EVENT_TIME()   (0.0)
137 #endif
138
139 #if USE_LRTS_MEMPOOL
140
141 #define oneMB (1024ll*1024)
142 #define oneGB (1024ll*1024*1024)
143
144 static CmiInt8 _mempool_size = 8*oneMB;
145 static CmiInt8 _expand_mem =  4*oneMB;
146 static CmiInt8 _mempool_size_limit = 0;
147
148 static CmiInt8 _totalmem = 0.8*oneGB;
149
150 #if LARGEPAGE
151 static CmiInt8 BIG_MSG  =  16*oneMB;
152 static CmiInt8 ONE_SEG  =  4*oneMB;
153 #else
154 static CmiInt8 BIG_MSG  =  4*oneMB;
155 static CmiInt8 ONE_SEG  =  2*oneMB;
156 #endif
157 #if MULTI_THREAD_SEND
158 static int BIG_MSG_PIPELINE = 1;
159 #else
160 static int BIG_MSG_PIPELINE = 4;
161 #endif
162
163 // dynamic flow control
164 static CmiInt8 buffered_send_msg = 0;
165 static CmiInt8 register_memory_size = 0;
166
167 #if LARGEPAGE
168 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
169 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
170 static CmiInt8  register_count = 0;
171 #else
172 #if CMK_SMP && COMM_THREAD_SEND 
173 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
174 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
175 #else
176 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
177 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
178 #endif
179
180
181 #endif
182
183 #endif     /* end USE_LRTS_MEMPOOL */
184
185 #if MULTI_THREAD_SEND 
186 #define     CMI_GNI_LOCK(x)       CmiLock(x);
187 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
188 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
189 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
190 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
191 #else
192 #define     CMI_GNI_LOCK(x)
193 #define     CMI_GNI_TRYLOCK(x)         (0)
194 #define     CMI_GNI_UNLOCK(x)
195 #define     CMI_PCQUEUEPOP_LOCK(Q)   
196 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
197 #endif
198
199 static int _tlbpagesize = 4096;
200
201 //static int _smpd_count  = 0;
202
203 static int   user_set_flag  = 0;
204
205 static int _checkProgress = 1;             /* check deadlock */
206 static int _detected_hang = 0;
207
208 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
209
210 // dynamic SMSG
211 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
212
213 static int avg_smsg_connection = 32;
214 static int                 *smsg_connected_flag= 0;
215 static gni_smsg_attr_t     **smsg_attr_vector_local;
216 static gni_smsg_attr_t     **smsg_attr_vector_remote;
217 static gni_ep_handle_t     ep_hndl_unbound;
218 static gni_smsg_attr_t     send_smsg_attr;
219 static gni_smsg_attr_t     recv_smsg_attr;
220
221 typedef struct _dynamic_smsg_mailbox{
222    void     *mailbox_base;
223    int      size;
224    int      offset;
225    gni_mem_handle_t  mem_hndl;
226    struct      _dynamic_smsg_mailbox  *next;
227 }dynamic_smsg_mailbox_t;
228
229 static dynamic_smsg_mailbox_t  *mailbox_list;
230
231 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
232 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
233
234 #if PRINT_SYH
235 int         lrts_send_msg_id = 0;
236 int         lrts_local_done_msg = 0;
237 int         lrts_send_rdma_success = 0;
238 #endif
239
240 #include "machine.h"
241
242 #include "pcqueue.h"
243
244 #include "mempool.h"
245
246 #if CMK_PERSISTENT_COMM
247 #define PERSISTENT_GET_BASE 0 
248 #if !PERSISTENT_GET_BASE
249 #define CMK_PERSISTENT_COMM_PUT 1 
250 #endif
251 #include "machine-persistent.h"
252 #define  POST_HIGHPRIORITY_RDMA    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
253 #else  
254 #define  POST_HIGHPRIORITY_RDMA   
255 #endif
256
257 #if REMOTE_EVENT && (CMK_USE_OOB || CMK_PERSISTENT_COMM_PUT) 
258 #define  PUMP_REMOTE_HIGHPRIORITY    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
259 #else
260 #define  PUMP_REMOTE_HIGHPRIORITY
261 #endif
262
263 //#define  USE_ONESIDED 1
264 #ifdef USE_ONESIDED
265 //onesided implementation is wrong, since no place to restore omdh
266 #include "onesided.h"
267 onesided_hnd_t   onesided_hnd;
268 onesided_md_t    omdh;
269 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
270
271 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
272
273 #else
274 uint8_t   onesided_hnd, omdh;
275
276 #if REMOTE_EVENT || CQWRITE 
277 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
278     if(register_memory_size+size>= MAX_REG_MEM) { \
279         status = GNI_RC_ERROR_NOMEM;} \
280     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
281         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
282 #else
283 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
284         if (register_memory_size + size >= MAX_REG_MEM) { \
285             status = GNI_RC_ERROR_NOMEM; \
286         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
287             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
288 #endif
289
290 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
291     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
292              register_memory_size -= size; \
293          else CmiAbort("MEM_DEregister");  \
294     } while (0)
295 #endif
296
297 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
298 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
299 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
300 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
301 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
302 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
303 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
304 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
305 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
306 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
307 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
308 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
309 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
310 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
311
312 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
313 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
314
315 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
316 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
317 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
318 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
319
320 #define ALIGNBUF                64
321
322 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
323 /* If SMSG is not used */
324
325 #define FMA_PER_CORE  1024
326 #define FMA_BUFFER_SIZE 1024
327
328 /* If SMSG is used */
329 static int  SMSG_MAX_MSG = 1024;
330 #define SMSG_MAX_CREDIT    72
331
332 #define MSGQ_MAXSIZE       2048
333
334 /* large message transfer with FMA or BTE */
335 #if ! REMOTE_EVENT
336 #define LRTS_GNI_RDMA_THRESHOLD  1024 
337 #else
338    /* remote events only work with RDMA */
339 #define LRTS_GNI_RDMA_THRESHOLD  0 
340 #endif
341
342 #if CMK_SMP
343 static int  REMOTE_QUEUE_ENTRIES=163840; 
344 static int LOCAL_QUEUE_ENTRIES=163840; 
345 #else
346 static int  REMOTE_QUEUE_ENTRIES=20480;
347 static int LOCAL_QUEUE_ENTRIES=20480; 
348 #endif
349
350 #define BIG_MSG_TAG             0x26
351 #define PUT_DONE_TAG            0x28
352 #define DIRECT_PUT_DONE_TAG     0x29
353 #define ACK_TAG                 0x30
354 /* SMSG is data message */
355 #define SMALL_DATA_TAG          0x31
356 /* SMSG is a control message to initialize a BTE */
357 #define LMSG_INIT_TAG           0x33 
358 #define LMSG_PERSISTENT_INIT_TAG           0x34 
359 #define LMSG_OOB_INIT_TAG       0x35
360
361 #define DEBUG
362 #ifdef GNI_RC_CHECK
363 #undef GNI_RC_CHECK
364 #endif
365 #ifdef DEBUG
366 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
367 #else
368 #define GNI_RC_CHECK(msg,rc)
369 #endif
370
371 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
372 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
373 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
374
375 static int useStaticMSGQ = 0;
376 static int useStaticFMA = 0;
377 static int mysize, myrank;
378 static gni_nic_handle_t   nic_hndl;
379
380 typedef struct {
381     gni_mem_handle_t mdh;
382     uint64_t addr;
383 } mdh_addr_t ;
384 // this is related to dynamic SMSG
385
386 typedef struct mdh_addr_list{
387     gni_mem_handle_t mdh;
388    void *addr;
389     struct mdh_addr_list *next;
390 }mdh_addr_list_t;
391
392 static unsigned int         smsg_memlen;
393 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
394 mdh_addr_t          setup_mem;
395 mdh_addr_t          *smsg_connection_vec = 0;
396 gni_mem_handle_t    smsg_connection_memhndl;
397 static int          smsg_expand_slots = 10;
398 static int          smsg_available_slot = 0;
399 static void         *smsg_mailbox_mempool = 0;
400 mdh_addr_list_t     *smsg_dynamic_list = 0;
401
402 static void             *smsg_mailbox_base;
403 gni_msgq_attr_t         msgq_attrs;
404 gni_msgq_handle_t       msgq_handle;
405 gni_msgq_ep_attr_t      msgq_ep_attrs;
406 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
407
408 /* =====Beginning of Declarations of Machine Specific Variables===== */
409 static int cookie;
410 static int modes = 0;
411 static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
412 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
413 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
414 static gni_cq_handle_t       highprior_rdma_tx_cqh = NULL;      // rdma - local event
415 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
416 static gni_cq_handle_t       highpriority_rx_cqh = NULL;      // mempool - remote event
417 static gni_ep_handle_t       *ep_hndl_array;
418
419 static CmiNodeLock           *ep_lock_array;
420 static CmiNodeLock           default_tx_cq_lock; 
421 static CmiNodeLock           rdma_tx_cq_lock; 
422 static CmiNodeLock           global_gni_lock; 
423 static CmiNodeLock           rx_cq_lock;
424 static CmiNodeLock           smsg_mailbox_lock;
425 static CmiNodeLock           smsg_rx_cq_lock;
426 static CmiNodeLock           *mempool_lock;
427 //#define     CMK_WITH_STATS      1
428 typedef struct msg_list
429 {
430     uint32_t destNode;
431     uint32_t size;
432     void *msg;
433     uint8_t tag;
434 #if CMK_WITH_STATS
435     double  creation_time;
436 #endif
437 }MSG_LIST;
438
439
440 typedef struct control_msg
441 {
442     uint64_t            source_addr;    /* address from the start of buffer  */
443     uint64_t            dest_addr;      /* address from the start of buffer */
444     int                 total_length;   /* total length */
445     int                 length;         /* length of this packet */
446 #if REMOTE_EVENT
447     int                 ack_index;      /* index from integer to address */
448 #endif
449     int     seq_id;         //big message   0 meaning single message
450     gni_mem_handle_t    source_mem_hndl;
451 #if PERSISTENT_GET_BASE
452     gni_mem_handle_t    dest_mem_hndl;
453 #endif
454     struct control_msg  *next;
455 } CONTROL_MSG;
456
457 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
458
459 typedef struct ack_msg
460 {
461     uint64_t            source_addr;    /* address from the start of buffer  */
462 #if ! USE_LRTS_MEMPOOL
463     gni_mem_handle_t    source_mem_hndl;
464     int                 length;          /* total length */
465 #endif
466     struct ack_msg     *next;
467 } ACK_MSG;
468
469 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
470
471 #if CMK_DIRECT
472 typedef struct{
473     uint64_t    handler_addr;
474 }CMK_DIRECT_HEADER;
475
476 typedef struct {
477     char core[CmiMsgHeaderSizeBytes];
478     uint64_t handler;
479 }cmidirectMsg;
480
481 //SYH
482 CpvDeclare(int, CmiHandleDirectIdx);
483 void CmiHandleDirectMsg(cmidirectMsg* msg)
484 {
485
486     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
487    (*(_handle->callbackFnPtr))(_handle->callbackData);
488    CmiFree(msg);
489 }
490
491 void CmiDirectInit()
492 {
493     CpvInitialize(int,  CmiHandleDirectIdx);
494     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
495 }
496
497 #endif
498 typedef struct  rmda_msg
499 {
500     int                   destNode;
501 #if REMOTE_EVENT
502     int                   ack_index;
503 #endif
504     gni_post_descriptor_t *pd;
505 }RDMA_REQUEST;
506
507
508 #if CMK_SMP
509 #define SMP_LOCKS                       1
510 #else
511 #define SMP_LOCKS                       0
512 #endif
513 #define ONE_SEND_QUEUE                  0
514
515 typedef PCQueue BufferList;
516 typedef struct  msg_list_index
517 {
518     PCQueue       sendSmsgBuf;
519 #if  SMP_LOCKS
520     CmiNodeLock   lock;
521     int           pushed;
522     int           destpe;
523 #endif
524 } MSG_LIST_INDEX;
525 char                *destpe_avail;
526 PCQueue sendRdmaBuf;
527 PCQueue sendHighPriorBuf;
528 // buffered send queue
529 #if ! ONE_SEND_QUEUE
530 typedef struct smsg_queue
531 {
532     MSG_LIST_INDEX   *smsg_msglist_index;
533     int               smsg_head_index;
534 #if  SMP_LOCKS
535     PCQueue     nonEmptyQueues;
536 #endif
537 } SMSG_QUEUE;
538 #else
539 typedef struct smsg_queue
540 {
541     PCQueue       sendMsgBuf;
542 }  SMSG_QUEUE;
543 #endif
544
545 SMSG_QUEUE                  smsg_queue;
546 #if CMK_USE_OOB
547 SMSG_QUEUE                  smsg_oob_queue;
548 #define SEND_OOB_SMSG(x)            SendBufferMsg(&x, NULL);
549 #define PUMP_LOCAL_HIGHPRIORITY    STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh,  rdma_tx_cq_lock)); 
550 #else
551 #define SEND_OOB_SMSG(x)            
552 #define PUMP_LOCAL_HIGHPRIORITY     
553 #endif
554
555 #define FreeMsgList(d)   free(d);
556 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
557
558 #define FreeControlMsg(d)      free(d);
559 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
560
561 #define FreeAckMsg(d)      free(d);
562 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
563
564 #define FreeRdmaRequest(d)       free(d);
565 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
566 /* reuse gni_post_descriptor_t */
567 static gni_post_descriptor_t *post_freelist=0;
568
569 #define FreePostDesc(d)     free(d);
570 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
571
572
573 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
574 static int      buffered_smsg_counter = 0;
575
576 /* SmsgSend return success but message sent is not confirmed by remote side */
577 static MSG_LIST *buffered_fma_head = 0;
578 static MSG_LIST *buffered_fma_tail = 0;
579
580 /* functions  */
581 #define IsFree(a,ind)  !( a& (1<<(ind) ))
582 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
583 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
584
585 CpvDeclare(mempool_type*, mempool);
586
587 #if CMK_PERSISTENT_COMM_PUT
588 CpvDeclare(mempool_type*, persistent_mempool);
589 #endif
590
591 #if CMK_SMSGS_FREE_AFTER_EVENT
592 /* 
593   SMSGS pool 
594   the pool is to buffer sending smsgs until it can be free'ed .
595 */
596
597 #if CMK_SMP
598 #define SMSG_INIT_SIZE                4096
599 #else
600 #define SMSG_INIT_SIZE                1024
601 #endif
602
603 struct SmsgsStruct {
604 void *addr;
605 int next;
606 int type;               /* 1: charm   2: control msg */
607 };
608
609 typedef struct SmsgsPool {
610     struct SmsgsStruct   *indexes;
611     int                   size;
612     int                   freehead;
613     CmiNodeLock           lock;
614 } SmsgsPool;
615
616 static SmsgsPool  smsgsPool;
617
618 #define  GetSmsgsIndexType(pool, s)             (pool.indexes[s].type)
619 #define  GetSmsgsIndexAddress(pool, s)          (pool.indexes[s].addr)
620
621 static void SmsgsPool_init(SmsgsPool *pool)
622 {
623     int i;
624     pool->size = SMSG_INIT_SIZE;
625     pool->indexes = (struct SmsgsStruct *)malloc(pool->size*sizeof(struct SmsgsStruct));
626     for (i=0; i<pool->size-1; i++) {
627         pool->indexes[i].next = i+1;
628     }
629     pool->indexes[i].next = -1;
630     pool->freehead = 0;
631 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM_PUT
632     pool->lock  = CmiCreateLock();
633 #else
634     pool->lock  = 0;
635 #endif
636 }
637
638 static int SmsgsPool_getslot(SmsgsPool *pool, void *addr, int type)
639 {
640     int s, i;
641 #if MULTI_THREAD_SEND  
642     CmiLock(pool->lock);
643 #endif
644     s = pool->freehead;
645     if (s == -1) {
646         int newsize = pool->size * 2;
647         //printf("[%d] SmsgsPool_getslot %p expand to: %d\n", myrank, pool, newsize);
648         struct SmsgsStruct *old_pool = pool->indexes;
649         pool->indexes = (struct SmsgsStruct *)malloc(newsize*sizeof(struct SmsgsStruct));
650         memcpy(pool->indexes, old_pool, pool->size*sizeof(struct SmsgsStruct));
651         for (i=pool->size; i<newsize-1; i++) {
652             pool->indexes[i].next = i+1;
653         }
654         pool->indexes[i].next = -1;
655         pool->freehead = pool->size;
656         s = pool->size;
657         pool->size = newsize;
658         free(old_pool);
659     }
660     pool->freehead = pool->indexes[s].next;
661     pool->indexes[s].addr = addr;
662     pool->indexes[s].type = type;
663 #if MULTI_THREAD_SEND
664     CmiUnlock(pool->lock);
665 #endif
666     return s;
667 }
668
669 static void SmsgsPool_freeslot(SmsgsPool *pool, int s)
670 {
671     CmiAssert(s>=0 && s<pool->size);
672 #if MULTI_THREAD_SEND
673     CmiLock(pool->lock);
674 #endif
675     pool->indexes[s].next = pool->freehead;
676     pool->freehead = s;
677 #if MULTI_THREAD_SEND
678     CmiUnlock(pool->lock);
679 #endif
680 }
681
682 #endif  /* CMK_SMSGS_FREE_AFTER_EVENT */
683
684
685 #if REMOTE_EVENT
686 /* ack pool for remote events */
687
688 static int  SHIFT   =           18;
689 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
690 #define RANK_MASK               ((1<<SHIFT) - 1)
691 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
692
693 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
694 #define GET_RANK(evt)           ((evt) & RANK_MASK)
695 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
696
697 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
698 #define DIRECT_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
699
700 #if CMK_SMP
701 #define INIT_SIZE                4096
702 #else
703 #define INIT_SIZE                1024
704 #endif
705
706 struct IndexStruct {
707 void *addr;
708 int next;
709 int type;     // 1: ACK   2: Persistent
710 };
711
712 typedef struct IndexPool {
713     struct IndexStruct   *indexes;
714     int                   size;
715     int                   freehead;
716     CmiNodeLock           lock;
717 } IndexPool;
718
719 static IndexPool  ackPool;
720 #if CMK_PERSISTENT_COMM_PUT
721 static IndexPool  persistPool;
722 #else
723 #define persistPool ackPool 
724 #endif
725
726 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
727 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
728
729 static void IndexPool_init(IndexPool *pool)
730 {
731     int i;
732     if ((1<<SHIFT) < mysize) 
733         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
734     pool->size = INIT_SIZE;
735     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
736     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
737     for (i=0; i<pool->size-1; i++) {
738         pool->indexes[i].next = i+1;
739         pool->indexes[i].type = 0;
740     }
741     pool->indexes[i].next = -1;
742     pool->indexes[i].type = 0;
743     pool->freehead = 0;
744 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM_PUT
745     pool->lock  = CmiCreateLock();
746 #else
747     pool->lock  = 0;
748 #endif
749 }
750
751 static int IndexPool_getslot(IndexPool *pool, void *addr, int type)
752 {
753     int s, i;
754 #if MULTI_THREAD_SEND  
755     CmiLock(pool->lock);
756 #endif
757     CmiAssert(type == 1 || type == 2);
758     s = pool->freehead;
759     if (s == -1) {
760         int newsize = pool->size * 2;
761         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
762         if (newsize > (1<<(32-SHIFT-1))) {
763             static int warned = 0;
764             if (!warned)
765               printf("[%d] Warning: IndexPool_getslot %p overflow when expanding to: %d\n", myrank, pool, newsize);
766             warned = 1;
767             return -1;
768             CmiAbort("IndexPool for remote events overflows, try compile Charm++ with remote event disabled.");
769         }
770         struct IndexStruct *old_ackpool = pool->indexes;
771         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
772         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
773         for (i=pool->size; i<newsize-1; i++) {
774             pool->indexes[i].next = i+1;
775             pool->indexes[i].type = 0;
776         }
777         pool->indexes[i].next = -1;
778         pool->indexes[i].type = 0;
779         pool->freehead = pool->size;
780         s = pool->size;
781         pool->size = newsize;
782         free(old_ackpool);
783     }
784     pool->freehead = pool->indexes[s].next;
785     pool->indexes[s].addr = addr;
786     CmiAssert(pool->indexes[s].type == 0);
787     pool->indexes[s].type = type;
788 #if MULTI_THREAD_SEND
789     CmiUnlock(pool->lock);
790 #endif
791     return s;
792 }
793
794 static void IndexPool_freeslot(IndexPool *pool, int s)
795 {
796     CmiAssert(s>=0 && s<pool->size);
797 #if MULTI_THREAD_SEND
798     CmiLock(pool->lock);
799 #endif
800     pool->indexes[s].next = pool->freehead;
801     pool->indexes[s].type = 0;
802     pool->freehead = s;
803 #if MULTI_THREAD_SEND
804     CmiUnlock(pool->lock);
805 #endif
806 }
807
808
809 #endif   /* REMOTE_EVENT */
810
811 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
812 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
813 #define CHARM_MAGIC_NUMBER               126
814
815 #if CMK_ERROR_CHECKING
816 extern unsigned char computeCheckSum(unsigned char *data, int len);
817 static int checksum_flag = 0;
818 #define CMI_SET_CHECKSUM(msg, len)      \
819         if (checksum_flag)  {   \
820           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
821           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
822         }
823 #define CMI_CHECK_CHECKSUM(msg, len)    \
824         if (checksum_flag)      \
825           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
826             CmiAbort("Fatal error: checksum doesn't agree!\n");
827 #else
828 #define CMI_SET_CHECKSUM(msg, len)
829 #define CMI_CHECK_CHECKSUM(msg, len)
830 #endif
831 /* =====End of Definitions of Message-Corruption Related Macros=====*/
832
833 static int print_stats = 0;
834 static int stats_off = 0;
835 void CmiTurnOnStats()
836 {
837     stats_off = 0;
838     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
839 }
840
841 void CmiTurnOffStats()
842 {
843     stats_off = 1;
844 }
845
846 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
847
848 #if CMK_WITH_STATS
849 FILE *counterLog = NULL;
850 typedef struct comm_thread_stats
851 {
852     uint64_t  smsg_data_count;
853     uint64_t  lmsg_init_count;
854     uint64_t  ack_count;
855     uint64_t  big_msg_ack_count;
856     uint64_t  smsg_count;
857     uint64_t  direct_put_done_count;
858     uint64_t  put_done_count;
859     //times of calling SmsgSend
860     uint64_t  try_smsg_data_count;
861     uint64_t  try_lmsg_init_count;
862     uint64_t  try_ack_count;
863     uint64_t  try_big_msg_ack_count;
864     uint64_t  try_direct_put_done_count;
865     uint64_t  try_put_done_count;
866     uint64_t  try_smsg_count;
867     
868     double    max_time_in_send_buffered_smsg;
869     double    all_time_in_send_buffered_smsg;
870
871     uint64_t  rdma_get_count, rdma_put_count;
872     uint64_t  try_rdma_get_count, try_rdma_put_count;
873     double    max_time_from_control_to_rdma_init;
874     double    all_time_from_control_to_rdma_init;
875
876     double    max_time_from_rdma_init_to_rdma_done;
877     double    all_time_from_rdma_init_to_rdma_done;
878
879     int      count_in_PumpNetwork;
880     double   time_in_PumpNetwork;
881     double   max_time_in_PumpNetwork;
882     int      count_in_SendBufferMsg_smsg;
883     double   time_in_SendBufferMsg_smsg;
884     double   max_time_in_SendBufferMsg_smsg;
885     int      count_in_SendRdmaMsg;
886     double   time_in_SendRdmaMsg;
887     double   max_time_in_SendRdmaMsg;
888     int      count_in_PumpRemoteTransactions;
889     double   time_in_PumpRemoteTransactions;
890     double   max_time_in_PumpRemoteTransactions;
891     int      count_in_PumpLocalTransactions_rdma;
892     double   time_in_PumpLocalTransactions_rdma;
893     double   max_time_in_PumpLocalTransactions_rdma;
894     int      count_in_PumpDatagramConnection;
895     double   time_in_PumpDatagramConnection;
896     double   max_time_in_PumpDatagramConnection;
897 } Comm_Thread_Stats;
898
899 static Comm_Thread_Stats   comm_stats;
900
901 static char *counters_dirname = "counters";
902
903 static void init_comm_stats()
904 {
905   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
906   if (print_stats){
907       char ln[200];
908       int code = mkdir(counters_dirname, 00777); 
909       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
910       counterLog=fopen(ln,"w");
911       if (counterLog == NULL) CmiAbort("Counter files open failed");
912   }
913 }
914
915 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
916
917 #define SMSG_SENT_DONE(creation_time, tag)  \
918         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
919             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++;  \
920             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
921             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
922             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
923             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
924             comm_stats.smsg_count++; \
925             double inbuff_time = CmiWallTimer() - creation_time;   \
926             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
927             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
928         }
929
930 #define SMSG_TRY_SEND(tag)  \
931         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
932             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
933             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
934             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
935             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
936             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
937             comm_stats.try_smsg_count++; \
938         }
939
940 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
941
942 #define  RDMA_TRANS_DONE(x)      \
943          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
944              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
945              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
946          }
947
948 #define  RDMA_TRANS_INIT(type, x)      \
949          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
950              double rdma_trans_time = CmiWallTimer() - x ; \
951              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
952              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
953          }
954
955 #define STATS_PUMPNETWORK_TIME(x)   \
956         { double t = CmiWallTimer(); \
957           x;        \
958           t = CmiWallTimer() - t;          \
959           comm_stats.count_in_PumpNetwork++;        \
960           comm_stats.time_in_PumpNetwork += t;   \
961           if (t>comm_stats.max_time_in_PumpNetwork)      \
962               comm_stats.max_time_in_PumpNetwork = t;    \
963         }
964
965 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
966         { double t = CmiWallTimer(); \
967           x;        \
968           t = CmiWallTimer() - t;          \
969           comm_stats.count_in_PumpRemoteTransactions ++;        \
970           comm_stats.time_in_PumpRemoteTransactions += t;   \
971           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
972               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
973         }
974
975 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
976         { double t = CmiWallTimer(); \
977           x;        \
978           t = CmiWallTimer() - t;          \
979           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
980           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
981           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
982               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
983         }
984
985 #define STATS_SEND_SMSGS_TIME(x)   \
986         { double t = CmiWallTimer(); \
987           x;        \
988           t = CmiWallTimer() - t;          \
989           comm_stats.count_in_SendBufferMsg_smsg ++;        \
990           comm_stats.time_in_SendBufferMsg_smsg += t;   \
991           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
992               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
993         }
994
995 #define STATS_SENDRDMAMSG_TIME(x)   \
996         { double t = CmiWallTimer(); \
997           x;        \
998           t = CmiWallTimer() - t;          \
999           comm_stats.count_in_SendRdmaMsg ++;        \
1000           comm_stats.time_in_SendRdmaMsg += t;   \
1001           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
1002               comm_stats.max_time_in_SendRdmaMsg = t;    \
1003         }
1004
1005 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
1006         { double t = CmiWallTimer(); \
1007           x;        \
1008           t = CmiWallTimer() - t;          \
1009           comm_stats.count_in_PumpDatagramConnection ++;        \
1010           comm_stats.time_in_PumpDatagramConnection += t;   \
1011           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
1012               comm_stats.max_time_in_PumpDatagramConnection = t;    \
1013         }
1014
1015 static void print_comm_stats()
1016 {
1017     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
1018     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
1019             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
1020             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
1021     
1022     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
1023             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
1024             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
1025
1026     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
1027     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
1028     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
1029
1030
1031     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
1032     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
1033     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
1034     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
1035     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
1036     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
1037     if (useDynamicSMSG)
1038     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
1039
1040     fclose(counterLog);
1041 }
1042
1043 #else
1044 #define STATS_PUMPNETWORK_TIME(x)                  x
1045 #define STATS_SEND_SMSGS_TIME(x)                   x
1046 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
1047 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1048 #define STATS_SENDRDMAMSG_TIME(x)                  x
1049 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1050 #endif
1051
1052 static void
1053 allgather(void *in,void *out, int len)
1054 {
1055     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1056     int i,rc;
1057     int my_rank;
1058     char *tmp_buf,*out_ptr;
1059
1060     if(!already_called) {
1061
1062         rc = PMI_Get_size(&job_size);
1063         CmiAssert(rc == PMI_SUCCESS);
1064         rc = PMI_Get_rank(&my_rank);
1065         CmiAssert(rc == PMI_SUCCESS);
1066
1067         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1068         CmiAssert(ivec_ptr != NULL);
1069
1070         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1071         CmiAssert(rc == PMI_SUCCESS);
1072
1073         already_called = 1;
1074
1075     }
1076
1077     tmp_buf = (char *)malloc(job_size * len);
1078     CmiAssert(tmp_buf);
1079
1080     rc = PMI_Allgather(in,tmp_buf,len);
1081     CmiAssert(rc == PMI_SUCCESS);
1082
1083     out_ptr = out;
1084
1085     for(i=0;i<job_size;i++) {
1086
1087         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1088
1089     }
1090
1091     free(tmp_buf);
1092 }
1093
1094 static void
1095 allgather_2(void *in,void *out, int len)
1096 {
1097     //PMI_Allgather is out of order
1098     int i,rc, extend_len;
1099     int  rank_index;
1100     char *out_ptr, *out_ref;
1101     char *in2;
1102
1103     extend_len = sizeof(int) + len;
1104     in2 = (char*)malloc(extend_len);
1105
1106     memcpy(in2, &myrank, sizeof(int));
1107     memcpy(in2+sizeof(int), in, len);
1108
1109     out_ptr = (char*)malloc(mysize*extend_len);
1110
1111     rc = PMI_Allgather(in2, out_ptr, extend_len);
1112     GNI_RC_CHECK("allgather", rc);
1113
1114     out_ref = out;
1115
1116     for(i=0;i<mysize;i++) {
1117         //rank index 
1118         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1119         //copy to the rank index slot
1120         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1121     }
1122
1123     free(out_ptr);
1124     free(in2);
1125
1126 }
1127
1128 static unsigned int get_gni_nic_address(int device_id)
1129 {
1130     unsigned int address, cpu_id;
1131     gni_return_t status;
1132     int i, alps_dev_id=-1,alps_address=-1;
1133     char *token, *p_ptr;
1134
1135     p_ptr = getenv("PMI_GNI_DEV_ID");
1136     if (!p_ptr) {
1137         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1138        
1139         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1140     } else {
1141         while ((token = strtok(p_ptr,":")) != NULL) {
1142             alps_dev_id = atoi(token);
1143             if (alps_dev_id == device_id) {
1144                 break;
1145             }
1146             p_ptr = NULL;
1147         }
1148         CmiAssert(alps_dev_id != -1);
1149         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1150         CmiAssert(p_ptr != NULL);
1151         i = 0;
1152         while ((token = strtok(p_ptr,":")) != NULL) {
1153             if (i == alps_dev_id) {
1154                 alps_address = atoi(token);
1155                 break;
1156             }
1157             p_ptr = NULL;
1158             ++i;
1159         }
1160         CmiAssert(alps_address != -1);
1161         address = alps_address;
1162     }
1163     return address;
1164 }
1165
1166 static uint8_t get_ptag(void)
1167 {
1168     char *p_ptr, *token;
1169     uint8_t ptag;
1170
1171     p_ptr = getenv("PMI_GNI_PTAG");
1172     CmiAssert(p_ptr != NULL);
1173     token = strtok(p_ptr, ":");
1174     ptag = (uint8_t)atoi(token);
1175     return ptag;
1176         
1177 }
1178
1179 static uint32_t get_cookie(void)
1180 {
1181     uint32_t cookie;
1182     char *p_ptr, *token;
1183
1184     p_ptr = getenv("PMI_GNI_COOKIE");
1185     CmiAssert(p_ptr != NULL);
1186     token = strtok(p_ptr, ":");
1187     cookie = (uint32_t)atoi(token);
1188
1189     return cookie;
1190 }
1191
1192 #if LARGEPAGE
1193
1194 /* directly mmap memory from hugetlbfs for large pages */
1195
1196 #include <sys/stat.h>
1197 #include <fcntl.h>
1198 #include <sys/mman.h>
1199 #include <hugetlbfs.h>
1200
1201 // size must be _tlbpagesize aligned
1202 void *my_get_huge_pages(size_t size)
1203 {
1204     char filename[512];
1205     int fd;
1206     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1207     void *ptr = NULL;
1208
1209     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1210     fd = open(filename, O_RDWR | O_CREAT, mode);
1211     if (fd == -1) {
1212         CmiAbort("my_get_huge_pages: open filed");
1213     }
1214     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1215     if (ptr == MAP_FAILED) ptr = NULL;
1216 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1217     close(fd);
1218     unlink(filename);
1219     return ptr;
1220 }
1221
1222 void my_free_huge_pages(void *ptr, int size)
1223 {
1224 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1225     int ret = munmap(ptr, size);
1226     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1227 }
1228
1229 #endif
1230
1231 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1232 /* TODO: add any that are related */
1233 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1234
1235
1236 #include "machine-lrts.h"
1237 #include "machine-common-core.c"
1238
1239
1240 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1241 static void SendRdmaMsg(PCQueue );
1242 static void PumpNetworkSmsg();
1243 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1244 #if CQWRITE
1245 static void PumpCqWriteTransactions();
1246 #endif
1247 #if REMOTE_EVENT
1248 static void PumpRemoteTransactions(gni_cq_handle_t);
1249 #endif
1250
1251 #if MACHINE_DEBUG_LOG
1252 static CmiInt8 buffered_recv_msg = 0;
1253 int         lrts_smsg_success = 0;
1254 int         lrts_received_msg = 0;
1255 #endif
1256
1257 static void sweep_mempool(mempool_type *mptr)
1258 {
1259     int n = 0;
1260     block_header *current = &(mptr->block_head);
1261
1262     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1263     while( current!= NULL) {
1264         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1265         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1266     }
1267     printf("[n %d] sweep_mempool slot END.\n", myrank);
1268 }
1269
1270
1271 static INLINE_KEYWORD  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1272 {
1273     block_header *current = *from;
1274
1275     //while(register_memory_size>= MAX_REG_MEM)
1276     //{
1277         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1278             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1279
1280         *from = current;
1281         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1282         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1283         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1284     //}
1285     return GNI_RC_SUCCESS;
1286 }
1287
1288 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1289 {
1290     gni_return_t status = GNI_RC_SUCCESS;
1291     //int size = GetMempoolsize(msg);
1292     //void *blockaddr = GetMempoolBlockPtr(msg);
1293     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1294    
1295     block_header *current = &(mptr->block_head);
1296     while(register_memory_size>= MAX_REG_MEM)
1297     {
1298         status = deregisterMemory(mptr, &current);
1299         if (status != GNI_RC_SUCCESS) break;
1300     }
1301     if(register_memory_size>= MAX_REG_MEM) return status;
1302
1303     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1304     while(1)
1305     {
1306         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1307         if(status == GNI_RC_SUCCESS)
1308         {
1309             break;
1310         }
1311         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1312         {
1313             GNI_RC_CHECK("registerFromMempool", status);
1314         }
1315         else
1316         {
1317             status = deregisterMemory(mptr, &current);
1318             if (status != GNI_RC_SUCCESS) break;
1319         }
1320     }; 
1321     return status;
1322 }
1323
1324 INLINE_KEYWORD
1325 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1326 {
1327     static int rank = -1;
1328     int i;
1329     gni_return_t status;
1330     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1331     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1332     mempool_type *mptr;
1333
1334     status = registerFromMempool(mptr1, msg, size, t, cqh);
1335     if (status == GNI_RC_SUCCESS) return status;
1336 #if CMK_SMP 
1337     for (i=0; i<CmiMyNodeSize()+1; i++) {
1338       rank = (rank+1)%(CmiMyNodeSize()+1);
1339       mptr = CpvAccessOther(mempool, rank);
1340       if (mptr == mptr1) continue;
1341       status = registerFromMempool(mptr, msg, size, t, cqh);
1342       if (status == GNI_RC_SUCCESS) return status;
1343     }
1344 #endif
1345     return  GNI_RC_ERROR_RESOURCE;
1346 }
1347
1348 INLINE_KEYWORD
1349 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1350 {
1351     MSG_LIST        *msg_tmp;
1352     MallocMsgList(msg_tmp);
1353     msg_tmp->destNode = destNode;
1354     msg_tmp->size   = size;
1355     msg_tmp->msg    = msg;
1356     msg_tmp->tag    = tag;
1357 #if CMK_WITH_STATS
1358     SMSG_CREATION(msg_tmp)
1359 #endif
1360
1361 #if ONE_SEND_QUEUE
1362     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1363 #else
1364 #if SMP_LOCKS
1365     CmiLock(queue->smsg_msglist_index[destNode].lock);
1366     if(queue->smsg_msglist_index[destNode].pushed == 0)
1367     {
1368         PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1369     }
1370     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1371     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1372 #else
1373     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1374 #endif
1375 #endif
1376
1377 #if PRINT_SYH
1378     buffered_smsg_counter++;
1379 #endif
1380 }
1381
1382 INLINE_KEYWORD static void print_smsg_attr(gni_smsg_attr_t     *a)
1383 {
1384     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1385 }
1386 #if 0
1387 INLINE_KEYWORD
1388 static void setup_smsg_connection(int destNode)
1389 {
1390     mdh_addr_list_t  *new_entry = 0;
1391     gni_post_descriptor_t *pd;
1392     gni_smsg_attr_t      *smsg_attr;
1393     gni_return_t status = GNI_RC_NOT_DONE;
1394     RDMA_REQUEST        *rdma_request_msg;
1395     
1396     if(smsg_available_slot == smsg_expand_slots)
1397     {
1398         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1399         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1400         memset(new_entry->addr, 0, smsg_memlen*smsg_expand_slots);
1401
1402         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1403             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1404             GNI_MEM_READWRITE,   
1405             -1,
1406             &(new_entry->mdh));
1407         smsg_available_slot = 0; 
1408         new_entry->next = smsg_dynamic_list;
1409         smsg_dynamic_list = new_entry;
1410     }
1411     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1412     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1413     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1414     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1415     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1416     smsg_attr->buff_size = smsg_memlen;
1417     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1418     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1419     smsg_local_attr_vec[destNode] = smsg_attr;
1420     smsg_available_slot++;
1421     MallocPostDesc(pd);
1422     pd->type            = GNI_POST_FMA_PUT;
1423     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1424     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1425     pd->length          = sizeof(gni_smsg_attr_t);
1426     pd->local_addr      = (uint64_t) smsg_attr;
1427     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1428     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1429     pd->src_cq_hndl     = 0;
1430
1431     pd->rdma_mode       = 0;
1432     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1433     print_smsg_attr(smsg_attr);
1434     if(status == GNI_RC_ERROR_RESOURCE )
1435     {
1436         MallocRdmaRequest(rdma_request_msg);
1437         rdma_request_msg->destNode = destNode;
1438         rdma_request_msg->pd = pd;
1439         /* buffer this request */
1440     }
1441 #if PRINT_SYH
1442     if(status != GNI_RC_SUCCESS)
1443        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1444     else
1445         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1446 #endif
1447 }
1448 #endif
1449 /* useDynamicSMSG */
1450 INLINE_KEYWORD
1451 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1452 {
1453     gni_return_t status = GNI_RC_NOT_DONE;
1454
1455     if(mailbox_list->offset == mailbox_list->size)
1456     {
1457         dynamic_smsg_mailbox_t *new_mailbox_entry;
1458         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1459         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1460         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1461         memset(new_mailbox_entry->mailbox_base, 0, new_mailbox_entry->size);
1462         new_mailbox_entry->offset = 0;
1463         
1464         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1465             new_mailbox_entry->size, smsg_rx_cqh,
1466             GNI_MEM_READWRITE,   
1467             -1,
1468             &(new_mailbox_entry->mem_hndl));
1469
1470         GNI_RC_CHECK("register", status);
1471         new_mailbox_entry->next = mailbox_list;
1472         mailbox_list = new_mailbox_entry;
1473     }
1474     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1475     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1476     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1477     local_smsg_attr->mbox_offset = mailbox_list->offset;
1478     mailbox_list->offset += smsg_memlen;
1479     local_smsg_attr->buff_size = smsg_memlen;
1480     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1481     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1482 }
1483
1484 /* useDynamicSMSG */
1485 INLINE_KEYWORD
1486 static int connect_to(int destNode)
1487 {
1488     gni_return_t status = GNI_RC_NOT_DONE;
1489     CmiAssert(smsg_connected_flag[destNode] == 0);
1490     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1491     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1492     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1493     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1494     
1495     CMI_GNI_LOCK(global_gni_lock)
1496     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1497     CMI_GNI_UNLOCK(global_gni_lock)
1498     if (status == GNI_RC_ERROR_RESOURCE) {
1499       /* possibly destNode is making connection at the same time */
1500       free(smsg_attr_vector_local[destNode]);
1501       smsg_attr_vector_local[destNode] = NULL;
1502       free(smsg_attr_vector_remote[destNode]);
1503       smsg_attr_vector_remote[destNode] = NULL;
1504       mailbox_list->offset -= smsg_memlen;
1505 #if PRINT_SYH
1506     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1507 #endif
1508       return 0;
1509     }
1510     GNI_RC_CHECK("GNI_Post", status);
1511     smsg_connected_flag[destNode] = 1;
1512 #if PRINT_SYH
1513     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1514 #endif
1515     return 1;
1516 }
1517
1518 INLINE_KEYWORD
1519 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr, int ischarm )
1520 {
1521     unsigned int          remote_address;
1522     uint32_t              remote_id;
1523     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1524     gni_smsg_attr_t       *smsg_attr;
1525     gni_post_descriptor_t *pd;
1526     gni_post_state_t      post_state;
1527     char                  *real_data; 
1528     int                   msgid = 0;
1529
1530     if (useDynamicSMSG) {
1531         switch (smsg_connected_flag[destNode]) {
1532         case 0: 
1533             connect_to(destNode);         /* continue to case 1 */
1534         case 1:                           /* pending connection, do nothing */
1535             status = GNI_RC_NOT_DONE;
1536             if(inbuff ==0)
1537                 buffer_small_msgs(queue, msg, size, destNode, tag);
1538             return status;
1539         }
1540     }
1541 #if ! ONE_SEND_QUEUE
1542     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1543 #endif
1544     {
1545         //CMI_GNI_LOCK(smsg_mailbox_lock)
1546         CMI_GNI_LOCK(default_tx_cq_lock)
1547 #if CMK_SMP_TRACE_COMMTHREAD
1548         int oldpe = -1;
1549         int oldeventid = -1;
1550         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG)
1551         { 
1552             START_EVENT();
1553             if ( tag == SMALL_DATA_TAG)
1554                 real_data = (char*)msg; 
1555             else 
1556                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1557             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1558             TRACE_COMM_SET_COMM_MSGID(real_data);
1559         }
1560 #endif
1561 #if REMOTE_EVENT
1562         if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG) {
1563             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1564             if (control_msg_tmp->seq_id <= 0 && control_msg_tmp->ack_index == -1)
1565             {
1566                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1567                 if (control_msg_tmp->ack_index == -1) {    /* table overflow */
1568                     status = GNI_RC_NOT_DONE;
1569                     if (inbuff ==0)
1570                         buffer_small_msgs(queue, msg, size, destNode, tag);
1571                     return status;
1572                 }
1573             }
1574         }
1575 #endif
1576 #if     CMK_WITH_STATS
1577         SMSG_TRY_SEND(tag)
1578 #endif
1579 #if CMK_WITH_STATS
1580         double              creation_time;
1581         if (ptr == NULL)
1582             creation_time = CmiWallTimer();
1583         else
1584             creation_time = ptr->creation_time;
1585 #endif
1586
1587 #if CMK_SMSGS_FREE_AFTER_EVENT
1588         msgid = SmsgsPool_getslot(&smsgsPool, msg, ischarm?1:2);
1589 #endif
1590         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, msgid, tag);
1591 #if CMK_SMP_TRACE_COMMTHREAD
1592         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1593 #endif
1594         CMI_GNI_UNLOCK(default_tx_cq_lock)
1595         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1596         if(status == GNI_RC_SUCCESS)
1597         {
1598 #if     CMK_WITH_STATS
1599             SMSG_SENT_DONE(creation_time,tag) 
1600 #endif
1601 #if CMK_SMP_TRACE_COMMTHREAD
1602             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG )
1603             { 
1604                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1605             }
1606 #endif
1607         }else {
1608             status = GNI_RC_ERROR_RESOURCE;
1609 #if CMK_SMSGS_FREE_AFTER_EVENT
1610             SmsgsPool_freeslot(&smsgsPool, msgid);
1611 #endif
1612         }
1613     }
1614     if(status != GNI_RC_SUCCESS && inbuff ==0)
1615         buffer_small_msgs(queue, msg, size, destNode, tag);
1616     return status;
1617 }
1618
1619 INLINE_KEYWORD
1620 static CONTROL_MSG* construct_control_msg(int size, char *msg, int seqno)
1621 {
1622     /* construct a control message and send */
1623     CONTROL_MSG         *control_msg_tmp;
1624     MallocControlMsg(control_msg_tmp);
1625     control_msg_tmp->source_addr = (uint64_t)msg;
1626     control_msg_tmp->seq_id    = seqno;
1627     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1628 #if REMOTE_EVENT
1629     control_msg_tmp->ack_index    =  -1;
1630 #endif
1631 #if     USE_LRTS_MEMPOOL
1632     if(size < BIG_MSG)
1633     {
1634         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1635     }
1636     else
1637     {
1638         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1639         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1640         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1641     }
1642 #else
1643     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1644 #endif
1645     return control_msg_tmp;
1646 }
1647
1648 #define BLOCKING_SEND_CONTROL    0
1649
1650 // Large message, send control to receiver, receiver register memory and do a GET, 
1651 // return 1 - send no success
1652 INLINE_KEYWORD static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr, uint8_t lmsg_tag)
1653 {
1654     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1655     uint32_t            vmdh_index  = -1;
1656     int                 size;
1657     int                 offset = 0;
1658     uint64_t            source_addr;
1659     int                 register_size; 
1660     void                *msg;
1661
1662     size    =   control_msg_tmp->total_length;
1663     source_addr = control_msg_tmp->source_addr;
1664     register_size = control_msg_tmp->length;
1665
1666 #if  USE_LRTS_MEMPOOL
1667     if( control_msg_tmp->seq_id <=0 ){
1668 #if BLOCKING_SEND_CONTROL
1669         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1670             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1671                 LrtsAdvanceCommunication(0);
1672         }
1673 #endif
1674         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1675         {
1676             msg = (void*)source_addr;
1677             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1678             {
1679                 if(!inbuff)
1680                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1681                 return GNI_RC_ERROR_NOMEM;
1682             }
1683             //register the corresponding mempool
1684             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1685             if(status == GNI_RC_SUCCESS)
1686             {
1687                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1688             }
1689         }else
1690         {
1691             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1692             status = GNI_RC_SUCCESS;
1693         }
1694         if(NoMsgInSend(source_addr))
1695             register_size = GetMempoolsize((void*)(source_addr));
1696         else
1697             register_size = 0;
1698     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1699     {
1700         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1701         source_addr += offset;
1702         size = control_msg_tmp->length;
1703 #if BLOCKING_SEND_CONTROL
1704         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1705             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1706                 LrtsAdvanceCommunication(0);
1707         }
1708 #endif
1709         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1710             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1711             {
1712                 if(!inbuff)
1713                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1714                 return GNI_RC_ERROR_NOMEM;
1715             }
1716             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1717             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1718         }
1719         else
1720         {
1721             status = GNI_RC_SUCCESS;
1722         }
1723         register_size = 0;  
1724     }
1725
1726 #if CMI_EXERT_SEND_LARGE_CAP
1727     if(SEND_large_pending >= SEND_large_cap)
1728     {
1729         status = GNI_RC_ERROR_NOMEM;
1730     }
1731 #endif
1732  
1733     if(status == GNI_RC_SUCCESS)
1734     {
1735        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, inbuff, smsg_ptr, 0); 
1736         if(status == GNI_RC_SUCCESS)
1737         {
1738 #if CMI_EXERT_SEND_LARGE_CAP
1739             SEND_large_pending++;
1740 #endif
1741             buffered_send_msg += register_size;
1742             if(control_msg_tmp->seq_id == 0)
1743             {
1744                 IncreaseMsgInSend(source_addr);
1745             }
1746 #if ! CMK_SMSGS_FREE_AFTER_EVENT
1747             FreeControlMsg(control_msg_tmp);
1748 #endif
1749             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, lmsg_tag); 
1750         }else
1751             status = GNI_RC_ERROR_RESOURCE;
1752
1753     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1754     {
1755         CmiAbort("Memory registor for large msg\n");
1756     }else 
1757     {
1758         status = GNI_RC_ERROR_NOMEM; 
1759         if(!inbuff)
1760             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1761     }
1762     return status;
1763 #else
1764     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1765     if(status == GNI_RC_SUCCESS)
1766     {
1767         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, 0, NULL, 0);
1768 #if ! CMK_SMSGS_FREE_AFTER_EVENT
1769         if(status == GNI_RC_SUCCESS)
1770         {
1771             FreeControlMsg(control_msg_tmp);
1772         }
1773 #endif
1774     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1775     {
1776         CmiAbort("Memory registor for large msg\n");
1777     }else 
1778     {
1779         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1780     }
1781     return status;
1782 #endif
1783 }
1784
1785 INLINE_KEYWORD void LrtsBeginIdle() {}
1786
1787 INLINE_KEYWORD void LrtsStillIdle() {}
1788
1789 INLINE_KEYWORD void LrtsNotifyIdle() {}
1790
1791 INLINE_KEYWORD void LrtsPrepareEnvelope(char *msg, int size)
1792 {
1793     CmiSetMsgSize(msg, size);
1794     CMI_SET_CHECKSUM(msg, size);
1795 }
1796
1797 CmiCommHandle LrtsSendFunc(int destNode, int destPE, int size, char *msg, int mode)
1798 {
1799     gni_return_t        status  =   GNI_RC_SUCCESS;
1800     uint8_t tag;
1801     CONTROL_MSG         *control_msg_tmp;
1802     int                 oob = ( mode & OUT_OF_BAND);
1803     SMSG_QUEUE          *queue;
1804
1805     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1806 #if CMK_USE_OOB
1807     queue = oob? &smsg_oob_queue : &smsg_queue;
1808     tag = oob? LMSG_OOB_INIT_TAG: LMSG_INIT_TAG;
1809 #else
1810     queue = &smsg_queue;
1811     tag = LMSG_INIT_TAG;
1812 #endif
1813
1814     LrtsPrepareEnvelope(msg, size);
1815
1816 #if PRINT_SYH
1817     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1818 #endif 
1819
1820 #if CMK_SMP 
1821     if(size <= SMSG_MAX_MSG)
1822         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1823     else if (size < BIG_MSG) {
1824         control_msg_tmp =  construct_control_msg(size, msg, 0);
1825         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1826     }
1827     else {
1828         CmiSetMsgSeq(msg, 0);
1829         control_msg_tmp =  construct_control_msg(size, msg, 1);
1830         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1831     }
1832 #else   //non-smp, smp(worker sending)
1833     if(size <= SMSG_MAX_MSG)
1834     {
1835         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL, 1))
1836         {  
1837 #if !CMK_SMSGS_FREE_AFTER_EVENT
1838             CmiFree(msg); 
1839 #endif
1840         }
1841     }
1842     else if (size < BIG_MSG) {
1843         control_msg_tmp =  construct_control_msg(size, msg, 0);
1844         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1845     }
1846     else {
1847 #if     USE_LRTS_MEMPOOL
1848         CmiSetMsgSeq(msg, 0);
1849         control_msg_tmp =  construct_control_msg(size, msg, 1);
1850         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1851 #else
1852         control_msg_tmp =  construct_control_msg(size, msg, 0);
1853         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1854 #endif
1855     }
1856 #endif
1857     return 0;
1858 }
1859
1860 #if 0
1861 // this is no different from the common code
1862 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1863 {
1864   int i;
1865 #if CMK_BROADCAST_USE_CMIREFERENCE
1866   for(i=0;i<npes;i++) {
1867     if (pes[i] == CmiMyPe())
1868       CmiSyncSend(pes[i], len, msg);
1869     else {
1870       CmiReference(msg);
1871       CmiSyncSendAndFree(pes[i], len, msg);
1872     }
1873   }
1874 #else
1875   for(i=0;i<npes;i++) {
1876     CmiSyncSend(pes[i], len, msg);
1877   }
1878 #endif
1879 }
1880
1881 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1882 {
1883   /* A better asynchronous implementation may be wanted, but at least it works */
1884   CmiSyncListSendFn(npes, pes, len, msg);
1885   return (CmiCommHandle) 0;
1886 }
1887
1888 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1889 {
1890   if (npes == 1) {
1891       CmiSyncSendAndFree(pes[0], len, msg);
1892       return;
1893   }
1894 #if CMK_PERSISTENT_COMM
1895   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE 
1896 #if CMK_SMP
1897             && IS_PERSISTENT_MEMORY(msg)
1898 #endif
1899      ){
1900       int i;
1901       for(i=0;i<npes;i++) {
1902         if (pes[i] == CmiMyPe())
1903           CmiSyncSend(pes[i], len, msg);
1904         else {
1905           CmiReference(msg);
1906           CmiSyncSendAndFree(pes[i], len, msg);
1907         }
1908       }
1909       CmiFree(msg);
1910       return;
1911   }
1912 #endif
1913   
1914 #if CMK_BROADCAST_USE_CMIREFERENCE
1915   CmiSyncListSendFn(npes, pes, len, msg);
1916   CmiFree(msg);
1917 #else
1918   int i;
1919   for(i=0;i<npes-1;i++) {
1920     CmiSyncSend(pes[i], len, msg);
1921   }
1922   if (npes>0)
1923     CmiSyncSendAndFree(pes[npes-1], len, msg);
1924   else 
1925     CmiFree(msg);
1926 #endif
1927 }
1928 #endif
1929
1930 static void    PumpDatagramConnection();
1931 static      int         event_SetupConnect = 111;
1932 static      int         event_PumpSmsg = 222 ;
1933 static      int         event_PumpTransaction = 333;
1934 static      int         event_PumpRdmaTransaction = 444;
1935 static      int         event_SendBufferSmsg = 484;
1936 static      int         event_SendFmaRdmaMsg = 555;
1937 static      int         event_AdvanceCommunication = 666;
1938
1939 static void registerUserTraceEvents() {
1940 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1941     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1942     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1943     event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1944     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1945     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1946     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1947     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1948 #endif
1949 }
1950
1951 static void ProcessDeadlock()
1952 {
1953     static CmiUInt8 *ptr = NULL;
1954     static CmiUInt8  last = 0, mysum, sum;
1955     static int count = 0;
1956     gni_return_t status;
1957     int i;
1958
1959 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1960 //sweep_mempool(CpvAccess(mempool));
1961     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1962     mysum = smsg_send_count + smsg_recv_count;
1963     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1964     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1965     GNI_RC_CHECK("PMI_Allgather", status);
1966     sum = 0;
1967     for (i=0; i<mysize; i++)  sum+= ptr[i];
1968     if (last == 0 || sum == last) 
1969         count++;
1970     else
1971         count = 0;
1972     last = sum;
1973     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1974     if (count == 2) { 
1975         /* detected twice, it is a real deadlock */
1976         if (myrank == 0)  {
1977             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1978             CmiAbort("Fatal> Deadlock detected.");
1979         }
1980
1981     }
1982     _detected_hang = 0;
1983 }
1984
1985 static void CheckProgress()
1986 {
1987     if (smsg_send_count == last_smsg_send_count &&
1988         smsg_recv_count == last_smsg_recv_count ) 
1989     {
1990         _detected_hang = 1;
1991 #if !CMK_SMP
1992         if (_detected_hang) ProcessDeadlock();
1993 #endif
1994
1995     }
1996     else {
1997         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1998         last_smsg_send_count = smsg_send_count;
1999         last_smsg_recv_count = smsg_recv_count;
2000         _detected_hang = 0;
2001     }
2002 }
2003
2004 static void set_limit()
2005 {
2006     //if (!user_set_flag && CmiMyRank() == 0) {
2007     if (CmiMyRank() == 0) {
2008         int mynode = CmiPhysicalNodeID(CmiMyPe());
2009         int numpes = CmiNumPesOnPhysicalNode(mynode);
2010         int numprocesses = numpes / CmiMyNodeSize();
2011         MAX_REG_MEM  = _totalmem / numprocesses;
2012         MAX_BUFF_SEND = MAX_REG_MEM / 2;
2013         if (CmiMyPe() == 0)
2014            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
2015         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
2016         {
2017              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
2018              CmiAbort("memory registration\n");
2019         }
2020     }
2021 }
2022
2023 void LrtsPostCommonInit(int everReturn)
2024 {
2025 #if CMK_DIRECT
2026     CmiDirectInit();
2027 #endif
2028 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
2029     CpvInitialize(double, projTraceStart);
2030     /* only PE 0 needs to care about registration (to generate sts file). */
2031     //if (CmiMyPe() == 0) 
2032     {
2033         registerMachineUserEventsFunction(&registerUserTraceEvents);
2034     }
2035 #endif
2036
2037 #if !CMK_SMP
2038     if (useDynamicSMSG)
2039     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
2040 #endif
2041
2042 #if ! LARGEPAGE
2043     if (_checkProgress)
2044 #if CMK_SMP
2045     if (CmiMyRank() == 0)
2046 #endif
2047     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
2048 #endif
2049  
2050 #if !LARGEPAGE
2051     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
2052 #endif
2053 }
2054
2055 /* this is called by worker thread */
2056 void LrtsPostNonLocal()
2057 {
2058 #if 1
2059
2060 #if CMK_SMP_TRACE_COMMTHREAD
2061     double startT, endT;
2062 #endif
2063
2064 #if MULTI_THREAD_SEND
2065     if(mysize == 1) return;
2066
2067     if (CmiMyRank() % 6 != 3) return;
2068
2069 #if CMK_SMP_TRACE_COMMTHREAD
2070     traceEndIdle();
2071     startT = CmiWallTimer();
2072 #endif
2073
2074     CmiMachineProgressImpl();
2075
2076 #if CMK_SMP_TRACE_COMMTHREAD
2077     endT = CmiWallTimer();
2078     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2079     traceBeginIdle();
2080 #endif
2081
2082 #endif
2083 #endif
2084 }
2085
2086 /* Network progress function is used to poll the network when for
2087    messages. This flushes receive buffers on some  implementations*/
2088 #if CMK_MACHINE_PROGRESS_DEFINED
2089 void CmiMachineProgressImpl() {
2090 #if ! CMK_SMP || MULTI_THREAD_SEND
2091
2092     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
2093     SEND_OOB_SMSG(smsg_oob_queue)
2094     PUMP_REMOTE_HIGHPRIORITY
2095     PUMP_LOCAL_HIGHPRIORITY
2096     POST_HIGHPRIORITY_RDMA
2097
2098 #if 0
2099 #if CMK_WORKER_SINGLE_TASK
2100     if (CmiMyRank() % 6 == 0)
2101 #endif
2102     PumpNetworkSmsg();
2103
2104 #if CMK_WORKER_SINGLE_TASK
2105     if (CmiMyRank() % 6 == 1)
2106 #endif
2107     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2108
2109 #if CMK_WORKER_SINGLE_TASK
2110     if (CmiMyRank() % 6 == 2)
2111 #endif
2112     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2113
2114 #if REMOTE_EVENT
2115 #if CMK_WORKER_SINGLE_TASK
2116     if (CmiMyRank() % 6 == 3)
2117 #endif
2118     PumpRemoteTransactions(rdma_rx_cqh);         // rdma_rx_cqh
2119 #endif
2120
2121 #if CMK_WORKER_SINGLE_TASK
2122     if (CmiMyRank() % 6 == 4)
2123 #endif
2124     {
2125 #if CMK_USE_OOB
2126     SendBufferMsg(&smsg_oob_queue, NULL);
2127     SendBufferMsg(&smsg_queue, &smsg_oob_queue);
2128 #else
2129     SendBufferMsg(&smsg_queue, NULL);
2130 #endif
2131     }
2132
2133 #if CMK_WORKER_SINGLE_TASK
2134     if (CmiMyRank() % 6 == 5)
2135 #endif
2136 #if CMK_SMP
2137     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
2138 #else
2139     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
2140 #endif
2141
2142 #endif
2143 #endif
2144 }
2145 #endif
2146
2147
2148 /* useDynamicSMSG */
2149 static void    PumpDatagramConnection()
2150 {
2151     uint32_t          remote_address;
2152     uint32_t          remote_id;
2153     gni_return_t status;
2154     gni_post_state_t  post_state;
2155     uint64_t          datagram_id;
2156     int i;
2157
2158    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2159    {
2160        if (datagram_id >= mysize) {           /* bound endpoint */
2161            int pe = datagram_id - mysize;
2162            CMI_GNI_LOCK(global_gni_lock)
2163            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2164            CMI_GNI_UNLOCK(global_gni_lock)
2165            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2166            {
2167                CmiAssert(remote_id == pe);
2168                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2169                GNI_RC_CHECK("Dynamic SMSG Init", status);
2170 #if PRINT_SYH
2171                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2172 #endif
2173                CmiAssert(smsg_connected_flag[pe] == 1);
2174                smsg_connected_flag[pe] = 2;
2175            }
2176        }
2177        else {         /* unbound ep */
2178            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2179            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2180            {
2181                CmiAssert(remote_id<mysize);
2182                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2183                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2184                GNI_RC_CHECK("Dynamic SMSG Init", status);
2185 #if PRINT_SYH
2186                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2187 #endif
2188                smsg_connected_flag[remote_id] = 2;
2189
2190                alloc_smsg_attr(&send_smsg_attr);
2191                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2192                GNI_RC_CHECK("post unbound datagram", status);
2193            }
2194        }
2195    }
2196 }
2197
2198 /* pooling CQ to receive network message */
2199 static void PumpNetworkRdmaMsgs()
2200 {
2201     gni_cq_entry_t      event_data;
2202     gni_return_t        status;
2203
2204 }
2205
2206 INLINE_KEYWORD
2207 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2208 {
2209     RDMA_REQUEST        *rdma_request_msg;
2210     MallocRdmaRequest(rdma_request_msg);
2211     rdma_request_msg->destNode = inst_id;
2212     rdma_request_msg->pd = pd;
2213 #if REMOTE_EVENT
2214     rdma_request_msg->ack_index = ack_index;
2215 #endif
2216     PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2217 }
2218
2219 static void getLargeMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2220 static void getPersistentMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2221 static void PRINT_CONTROL(void *header)
2222 {
2223     CONTROL_MSG *control_msg = (CONTROL_MSG *) header;
2224
2225     printf(" length=%d , seq_id = %d, addr = %lld:%lld:%lld  \n", control_msg->length, control_msg->seq_id, control_msg->source_addr, (control_msg->source_mem_hndl).qword1, (control_msg->source_mem_hndl).qword2 );
2226 #if PERSISTENT_GET_BASE
2227     printf(" memhdl = %lld:%lld:%lld \n", control_msg->dest_addr, (control_msg->dest_mem_hndl).qword1, (control_msg->dest_mem_hndl).qword2);
2228 #endif
2229 }
2230 static void PumpNetworkSmsg()
2231 {
2232     uint64_t            inst_id;
2233     gni_cq_entry_t      event_data;
2234     gni_return_t        status;
2235     void                *header;
2236     uint8_t             msg_tag;
2237     int                 msg_nbytes;
2238     void                *msg_data;
2239     gni_mem_handle_t    msg_mem_hndl;
2240     gni_smsg_attr_t     *smsg_attr;
2241     gni_smsg_attr_t     *remote_smsg_attr;
2242     int                 init_flag;
2243     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2244     uint64_t            source_addr;
2245     SMSG_QUEUE         *queue = &smsg_queue;
2246     PCQueue             tmp_queue;
2247 #if  CMK_DIRECT
2248     cmidirectMsg        *direct_msg;
2249 #endif
2250 #if CMI_PUMPNETWORKSMSG_CAP 
2251     int                  recv_cnt = 0;
2252     while(recv_cnt< PumpNetworkSmsg_cap) {
2253 #else
2254     while(1) {
2255 #endif
2256         CMI_GNI_LOCK(smsg_rx_cq_lock)
2257         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2258         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2259         if(status != GNI_RC_SUCCESS) break;
2260
2261         inst_id = GNI_CQ_GET_INST_ID(event_data);
2262 #if REMOTE_EVENT
2263         inst_id = GET_RANK(inst_id);      /* important */
2264 #endif
2265         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2266 #if PRINT_SYH
2267         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2268 #endif
2269         if (useDynamicSMSG) {
2270             /* subtle: smsg may come before connection is setup */
2271             while (smsg_connected_flag[inst_id] != 2) 
2272                PumpDatagramConnection();
2273         }
2274         msg_tag = GNI_SMSG_ANY_TAG;
2275         while(1) {
2276             CMI_GNI_LOCK(smsg_mailbox_lock)
2277             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2278             if (status != GNI_RC_SUCCESS)
2279             {
2280                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2281                 break;
2282             }
2283 #if         CMI_PUMPNETWORKSMSG_CAP
2284             recv_cnt++; 
2285 #endif
2286 #if PRINT_SYH
2287             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2288 #endif
2289             /* copy msg out and then put into queue (small message) */
2290             switch (msg_tag) {
2291             case SMALL_DATA_TAG:
2292             {
2293                 msg_nbytes = CmiGetMsgSize(header);
2294                 CmiAssert(msg_nbytes > 0);
2295                 msg_data    = CmiAlloc(msg_nbytes);
2296                 memcpy(msg_data, (char*)header, msg_nbytes);
2297                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2298                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2299                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2300                 handleOneRecvedMsg(msg_nbytes, msg_data);
2301                 break;
2302             }
2303             case LMSG_PERSISTENT_INIT_TAG:
2304             {   CMI_GNI_UNLOCK(smsg_mailbox_lock)
2305                 getPersistentMsgRequest(header, inst_id, msg_tag, sendRdmaBuf);
2306                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2307                 break;
2308             }
2309             case LMSG_INIT_TAG:
2310             case LMSG_OOB_INIT_TAG:
2311             {
2312                 tmp_queue = (msg_tag == LMSG_INIT_TAG)? sendRdmaBuf : sendHighPriorBuf; 
2313 #if MULTI_THREAD_SEND
2314                 MallocControlMsg(control_msg_tmp);
2315                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2316                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2317                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2318                 getLargeMsgRequest(control_msg_tmp, inst_id, msg_tag, tmp_queue);
2319                 FreeControlMsg(control_msg_tmp);
2320 #else
2321                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2322                 getLargeMsgRequest(header, inst_id, msg_tag, tmp_queue);
2323                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2324 #endif
2325                 break;
2326             }
2327 #if !REMOTE_EVENT && !CQWRITE
2328             case ACK_TAG:   //msg fit into mempool
2329             {
2330                 /* Get is done, release message . Now put is not used yet*/
2331                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2332                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2333                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2334 #if ! USE_LRTS_MEMPOOL
2335                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2336 #else
2337                 DecreaseMsgInSend(msg);
2338 #endif
2339                 if(NoMsgInSend(msg))
2340                     buffered_send_msg -= GetMempoolsize(msg);
2341                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2342                 CmiFree(msg);
2343 #if CMI_EXERT_SEND_LARGE_CAP
2344                 SEND_large_pending--;
2345 #endif
2346                 break;
2347             }
2348 #endif
2349             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2350             {
2351 #if MULTI_THREAD_SEND
2352                 MallocControlMsg(header_tmp);
2353                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2354                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2355 #else
2356                 header_tmp = (CONTROL_MSG *) header;
2357 #endif
2358                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2359 #if CMI_EXERT_SEND_LARGE_CAP
2360                 SEND_large_pending--;
2361 #endif
2362                 void *msg = (void*)(header_tmp->source_addr);
2363                 int cur_seq = CmiGetMsgSeq(msg);
2364                 int offset = ONE_SEG*(cur_seq+1);
2365                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2366                 buffered_send_msg -= header_tmp->length;
2367                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2368                 if (remain_size < 0) remain_size = 0;
2369                 CmiSetMsgSize(msg, remain_size);
2370                 if(remain_size <= 0) //transaction done
2371                 {
2372                     CmiFree(msg);
2373                 }else if (header_tmp->total_length > offset)
2374                 {
2375                     CmiSetMsgSeq(msg, cur_seq+1);
2376                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2377                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2378                     //send next seg
2379                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2380                          // pipelining
2381                     if (header_tmp->seq_id == 1) {
2382                       int i;
2383                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2384                         int seq = cur_seq+i+2;
2385                         CmiSetMsgSeq(msg, seq-1);
2386                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2387                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2388                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2389                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2390                       }
2391                     }
2392                 }
2393 #if MULTI_THREAD_SEND
2394                 FreeControlMsg(header_tmp);
2395 #else
2396                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2397 #endif
2398                 break;
2399             }
2400 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE
2401             case PUT_DONE_TAG:  {   //persistent message
2402                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2403                 int size = ((CONTROL_MSG *) header)->length;
2404                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2405                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2406                 CmiReference(msg);
2407                 CMI_CHECK_CHECKSUM(msg, size);
2408                 handleOneRecvedMsg(size, msg); 
2409 #if PRINT_SYH
2410                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2411 #endif
2412                 break;
2413             }
2414 #endif
2415 #if CMK_DIRECT
2416             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2417                 //create a trigger message
2418                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2419                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2420                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2421                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2422                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2423                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2424                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2425                 break;
2426 #endif
2427             default:
2428                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2429                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2430                 printf("weird tag problem %d \n", msg_tag);
2431                 CmiAbort("Unknown tag\n");
2432             }               // end switch
2433 #if PRINT_SYH
2434             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2435 #endif
2436             smsg_recv_count ++;
2437             msg_tag = GNI_SMSG_ANY_TAG;
2438         } //endwhile GNI_SmsgGetNextWTag
2439     }   //end while GetEvent
2440     if(status == GNI_RC_ERROR_RESOURCE)
2441     {
2442         printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
2443         GNI_RC_CHECK("Smsg_rx_cq full", status);
2444     }
2445 }
2446
2447 static void printDesc(gni_post_descriptor_t *pd)
2448 {
2449     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2450 }
2451
2452 #if CQWRITE
2453 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2454 {
2455     gni_post_descriptor_t *pd;
2456     gni_return_t        status = GNI_RC_SUCCESS;
2457     
2458     MallocPostDesc(pd);
2459     pd->type = GNI_POST_CQWRITE;
2460     pd->cq_mode = GNI_CQMODE_SILENT;
2461     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2462     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2463     pd->cqwrite_value = data;
2464     pd->remote_mem_hndl = mem_hndl;
2465     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2466     GNI_RC_CHECK("GNI_PostCqWrite", status);
2467 }
2468 #endif
2469
2470 // register memory for a message
2471 // return mem handle
2472 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2473 {
2474     gni_return_t status = GNI_RC_SUCCESS;
2475
2476     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2477
2478 #if CMK_PERSISTENT_COMM_PUT
2479       // persistent message is always registered
2480       // BIG_MSG small pieces do not have malloc chunk header
2481     if (IS_PERSISTENT_MEMORY(msg)) {
2482         *memh = GetMemHndl(msg);
2483         return GNI_RC_SUCCESS;
2484     }
2485 #endif
2486     if(seqno == 0 
2487 #if CMK_PERSISTENT_COMM_PUT
2488          || seqno == PERSIST_SEQ
2489 #endif
2490       )
2491     {
2492         if(IsMemHndlZero((GetMemHndl(msg))))
2493         {
2494             msg = (void*)(msg);
2495             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2496             if(status == GNI_RC_SUCCESS)
2497                 *memh = GetMemHndl(msg);
2498         }
2499         else {
2500             *memh = GetMemHndl(msg);
2501         }
2502     }
2503     else {
2504         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2505         status = registerMemory(msg, size, memh, NULL); 
2506     }
2507     return status;
2508 }
2509
2510 static void getPersistentMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2511 {
2512 #if   PERSISTENT_GET_BASE
2513     CONTROL_MSG         *request_msg;
2514     gni_return_t        status;
2515     gni_post_descriptor_t *pd;
2516     request_msg = (CONTROL_MSG *) header;
2517
2518     MallocPostDesc(pd);
2519     pd->cqwrite_value = request_msg->seq_id;
2520     pd->first_operand = ALIGN64(request_msg->length); //  total length
2521     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2522         pd->type            = GNI_POST_FMA_GET;
2523     else
2524         pd->type            = GNI_POST_RDMA_GET;
2525     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
2526     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2527     pd->length          = ALIGN64(request_msg->length);
2528     pd->local_addr      = (uint64_t) request_msg->dest_addr;
2529     pd->local_mem_hndl  = request_msg->dest_mem_hndl;
2530     pd->remote_addr     = (uint64_t) request_msg->source_addr;
2531     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2532     pd->src_cq_hndl     = 0;
2533     pd->rdma_mode       = 0;
2534     pd->amo_cmd         = 0;
2535 #if REMOTE_EVENT
2536     bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2537 #else
2538     bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2539 #endif
2540
2541 #endif
2542 }
2543
2544 // for BIG_MSG called on receiver side for receiving control message
2545 // LMSG_INIT_TAG
2546 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2547 {
2548 #if     USE_LRTS_MEMPOOL
2549     CONTROL_MSG         *request_msg;
2550     gni_return_t        status = GNI_RC_SUCCESS;
2551     void                *msg_data;
2552     gni_post_descriptor_t *pd;
2553     gni_mem_handle_t    msg_mem_hndl;
2554     int                 size, transaction_size, offset = 0;
2555     size_t              register_size = 0;
2556
2557     // initial a get to transfer data from the sender side */
2558     request_msg = (CONTROL_MSG *) header;
2559     size = request_msg->total_length;
2560     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2561     MallocPostDesc(pd);
2562 #if CMK_WITH_STATS 
2563     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2564 #endif
2565     if(request_msg->seq_id < 2)   {
2566         MACHSTATE2(8, "%d seq id in get large msg requrest %d\n", CmiMyRank(), request_msg->seq_id);
2567 #if CMK_SMP_TRACE_COMMTHREAD 
2568         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2569 #endif
2570         msg_data = CmiAlloc(size);
2571         CmiSetMsgSeq(msg_data, 0);
2572         _MEMCHECK(msg_data);
2573     }
2574     else {
2575         offset = ONE_SEG*(request_msg->seq_id-1);
2576         msg_data = (char*)request_msg->dest_addr + offset;
2577     }
2578    
2579     pd->cqwrite_value = request_msg->seq_id;
2580
2581     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2582     SetMemHndlZero(pd->local_mem_hndl);
2583     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2584     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2585         if(NoMsgInRecv( (void*)(msg_data)))
2586             register_size = GetMempoolsize((void*)(msg_data));
2587     }
2588
2589     pd->first_operand = ALIGN64(size);                   //  total length
2590
2591     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2592         pd->type            = GNI_POST_FMA_GET;
2593     else
2594         pd->type            = GNI_POST_RDMA_GET;
2595     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2596     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2597     pd->length          = transaction_size;
2598     pd->local_addr      = (uint64_t) msg_data;
2599     pd->remote_addr     = request_msg->source_addr + offset;
2600     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2601
2602     if (tag == LMSG_OOB_INIT_TAG) 
2603         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2604     else
2605     {
2606 #if MULTI_THREAD_SEND
2607         pd->src_cq_hndl     = rdma_tx_cqh;
2608 #else
2609         pd->src_cq_hndl     = 0;
2610 #endif
2611     }
2612
2613     pd->rdma_mode       = 0;
2614     pd->amo_cmd         = 0;
2615 #if CMI_EXERT_RECV_RDMA_CAP
2616     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2617 #endif
2618     //memory registration success
2619     if(status == GNI_RC_SUCCESS && tag == LMSG_OOB_INIT_TAG )
2620     {
2621         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2622         CMI_GNI_LOCK(lock)
2623 #if REMOTE_EVENT
2624         if( request_msg->seq_id == 0)
2625         {
2626             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2627             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2628             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2629         }
2630 #endif
2631
2632 #if CMK_WITH_STATS
2633         RDMA_TRY_SEND(pd->type)
2634 #endif
2635         if(pd->type == GNI_POST_RDMA_GET) 
2636         {
2637             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2638         }
2639         else
2640         {
2641             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2642         }
2643         CMI_GNI_UNLOCK(lock)
2644
2645         if(status == GNI_RC_SUCCESS )
2646         {
2647 #if CMI_EXERT_RECV_RDMA_CAP
2648             RDMA_pending++;
2649 #endif
2650             if(pd->cqwrite_value == 0)
2651             {
2652 #if MACHINE_DEBUG_LOG
2653                 buffered_recv_msg += register_size;
2654                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2655 #endif
2656                 IncreaseMsgInRecv(msg_data);
2657 #if CMK_SMP_TRACE_COMMTHREAD 
2658                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2659 #endif
2660             }
2661 #if  CMK_WITH_STATS
2662             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2663             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2664 #endif
2665         }
2666     }else if (status != GNI_RC_SUCCESS)
2667     {
2668         SetMemHndlZero((pd->local_mem_hndl));
2669     }
2670         if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM || tag != LMSG_OOB_INIT_TAG)
2671     {
2672 #if REMOTE_EVENT
2673         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2674 #else
2675         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2676 #endif
2677     }else if (status != GNI_RC_SUCCESS) {
2678         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2679         GNI_RC_CHECK("GetLargeAFter posting", status);
2680     }
2681 #else
2682     CONTROL_MSG         *request_msg;
2683     gni_return_t        status;
2684     void                *msg_data;
2685     gni_post_descriptor_t *pd;
2686     RDMA_REQUEST        *rdma_request_msg;
2687     gni_mem_handle_t    msg_mem_hndl;
2688     //int source;
2689     // initial a get to transfer data from the sender side */
2690     request_msg = (CONTROL_MSG *) header;
2691     msg_data = CmiAlloc(request_msg->length);
2692     _MEMCHECK(msg_data);
2693
2694     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2695
2696     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2697     {
2698         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2699     }
2700
2701     MallocPostDesc(pd);
2702     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2703         pd->type            = GNI_POST_FMA_GET;
2704     else
2705         pd->type            = GNI_POST_RDMA_GET;
2706     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2707     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2708     pd->length          = ALIGN64(request_msg->length);
2709     pd->local_addr      = (uint64_t) msg_data;
2710     pd->remote_addr     = request_msg->source_addr;
2711     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2712     if (tag == LMSG_OOB_INIT_TAG) 
2713         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2714     else
2715     {
2716 #if MULTI_THREAD_SEND
2717         pd->src_cq_hndl     = rdma_tx_cqh;
2718 #else
2719         pd->src_cq_hndl     = 0;
2720 #endif
2721     }
2722     pd->rdma_mode       = 0;
2723     pd->amo_cmd         = 0;
2724
2725     //memory registration successful
2726     if(status == GNI_RC_SUCCESS)
2727     {
2728         pd->local_mem_hndl  = msg_mem_hndl;
2729        
2730         if(pd->type == GNI_POST_RDMA_GET) 
2731         {
2732             CMI_GNI_LOCK(rdma_tx_cq_lock)
2733             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2734             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2735         }
2736         else
2737         {
2738             CMI_GNI_LOCK(default_tx_cq_lock)
2739             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2740             CMI_GNI_UNLOCK(default_tx_cq_lock)
2741         }
2742
2743     }else
2744     {
2745         SetMemHndlZero(pd->local_mem_hndl);
2746     }
2747     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2748     {
2749         MallocRdmaRequest(rdma_request_msg);
2750         rdma_request_msg->next = 0;
2751         rdma_request_msg->destNode = inst_id;
2752         rdma_request_msg->pd = pd;
2753         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2754     }else {
2755         GNI_RC_CHECK("AFter posting", status);
2756     }
2757 #endif
2758 }
2759
2760 #if CQWRITE
2761 static void PumpCqWriteTransactions()
2762 {
2763
2764     gni_cq_entry_t          ev;
2765     gni_return_t            status;
2766     void                    *msg;  
2767     int                     msg_size;
2768     while(1) {
2769         //CMI_GNI_LOCK(my_cq_lock) 
2770         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2771         //CMI_GNI_UNLOCK(my_cq_lock)
2772         if(status != GNI_RC_SUCCESS) break;
2773         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2774 #if CMK_PERSISTENT_COMM_PUT
2775 #if PRINT_SYH
2776         printf(" %d CQ write event %p\n", myrank, msg);
2777 #endif
2778         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2779 #if PRINT_SYH
2780             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2781 #endif
2782             CmiReference(msg);
2783             msg_size = CmiGetMsgSize(msg);
2784             CMI_CHECK_CHECKSUM(msg, msg_size);
2785             handleOneRecvedMsg(msg_size, msg); 
2786             continue;
2787         }
2788 #endif
2789 #if ! USE_LRTS_MEMPOOL
2790        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2791 #else
2792         DecreaseMsgInSend(msg);
2793 #endif
2794         if(NoMsgInSend(msg))
2795             buffered_send_msg -= GetMempoolsize(msg);
2796         CmiFree(msg);
2797     };
2798     if(status == GNI_RC_ERROR_RESOURCE)
2799     {
2800         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2801     }
2802 }
2803 #endif
2804
2805 #if REMOTE_EVENT
2806 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2807 {
2808     gni_cq_entry_t          ev;
2809     gni_return_t            status;
2810     void                    *msg;   
2811     int                     inst_id, index, type, size;
2812
2813 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2814     int                     pump_count = 0;
2815 #endif
2816     while(1) {
2817 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2818         if (pump_count > PumpRemoteTransactions_cap) break;
2819 #endif
2820         CMI_GNI_LOCK(global_gni_lock)
2821 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
2822         status = GNI_CqGetEvent(rx_cqh, &ev);
2823 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2824         CMI_GNI_UNLOCK(global_gni_lock)
2825
2826         if(status != GNI_RC_SUCCESS) break;
2827
2828 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2829         pump_count ++;
2830 #endif
2831
2832         inst_id = GNI_CQ_GET_INST_ID(ev);
2833         index = GET_INDEX(inst_id);
2834         type = GET_TYPE(inst_id);
2835         switch (type) {
2836         case 0:    // ACK
2837             CmiAssert(index>=0 && index<ackPool.size);
2838             CMI_GNI_LOCK(ackPool.lock);
2839             //CmiAssert(GetIndexType(ackPool, index) == 1);
2840             msg = GetIndexAddress(ackPool, index);
2841             CMI_GNI_UNLOCK(ackPool.lock);
2842 #if PRINT_SYH
2843             MACHSTATE4(8,"[%d] PumpRemoteTransactions: ack: %lld index: %d type: %d.\n", myrank, msg, index, type);
2844 #endif
2845 #if ! USE_LRTS_MEMPOOL
2846            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2847 #else
2848             DecreaseMsgInSend(msg);
2849 #endif
2850             if(NoMsgInSend(msg))
2851                 buffered_send_msg -= GetMempoolsize(msg);
2852             CmiFree(msg);
2853             IndexPool_freeslot(&ackPool, index);
2854 #if CMI_EXERT_SEND_LARGE_CAP
2855             SEND_large_pending--;
2856 #endif
2857             break;
2858 #if CMK_PERSISTENT_COMM_PUT
2859         case 1:  {    // PERSISTENT
2860             CmiLock(persistPool.lock);
2861             CmiAssert(GetIndexType(persistPool, index) == 2);
2862             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2863             CmiUnlock(persistPool.lock);
2864             msg = slot->destBuf[slot->addrIndex].destAddress;
2865             size = CmiGetMsgSize(msg);
2866             CmiReference(msg);
2867             CMI_CHECK_CHECKSUM(msg, size);
2868             handleOneRecvedMsg(size, msg); 
2869             break;
2870             }
2871 #endif
2872         default:
2873             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2874             CmiAbort("PumpRemoteTransactions: unknown type");
2875         }
2876     }
2877     if(status == GNI_RC_ERROR_RESOURCE)
2878     {
2879         printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
2880         GNI_RC_CHECK("PumpRemoteTransactions: rx_cqh full", status);
2881     }
2882 }
2883 #endif
2884
2885 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2886 {
2887     gni_cq_entry_t          ev;
2888     gni_return_t            status;
2889     uint64_t                type, inst_id;
2890     gni_post_descriptor_t   *tmp_pd;
2891     MSG_LIST                *ptr;
2892     CONTROL_MSG             *ack_msg_tmp;
2893     ACK_MSG                 *ack_msg;
2894     uint8_t                 msg_tag;
2895 #if CMK_DIRECT
2896     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2897 #endif
2898     SMSG_QUEUE         *queue = &smsg_queue;
2899 #if CMI_PUMPLOCALTRANSACTIONS_CAP
2900     int         pump_count = 0;
2901     while(pump_count < PumpLocalTransactions_cap) {
2902         pump_count++;
2903 #else
2904     while(1) {
2905 #endif
2906         CMI_GNI_LOCK(my_cq_lock) 
2907         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2908         CMI_GNI_UNLOCK(my_cq_lock)
2909         if(status != GNI_RC_SUCCESS) break;
2910         
2911         type = GNI_CQ_GET_TYPE(ev);
2912         if (type == GNI_CQ_EVENT_TYPE_POST)
2913         {
2914
2915 #if CMI_EXERT_RECV_RDMA_CAP
2916             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2917             RDMA_pending--;
2918 #endif
2919             inst_id     = GNI_CQ_GET_INST_ID(ev);
2920 #if PRINT_SYH
2921             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2922 #endif
2923             CMI_GNI_LOCK(my_cq_lock)
2924             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2925             CMI_GNI_UNLOCK(my_cq_lock)
2926
2927             switch (tmp_pd->type) {
2928 #if CMK_PERSISTENT_COMM_PUT  || CMK_DIRECT
2929             case GNI_POST_RDMA_PUT:
2930 #if CMK_PERSISTENT_COMM_PUT && ! USE_LRTS_MEMPOOL
2931                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2932 #endif
2933             case GNI_POST_FMA_PUT:
2934                 if(tmp_pd->amo_cmd == 1) {
2935 #if CMK_DIRECT
2936                     //sender ACK to receiver to trigger it is done
2937                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2938                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2939                     msg_tag = DIRECT_PUT_DONE_TAG;
2940 #endif
2941                 }
2942                 else {
2943                     CmiFree((void *)tmp_pd->local_addr);
2944 #if REMOTE_EVENT
2945                     FreePostDesc(tmp_pd);
2946                     continue;
2947 #elif CQWRITE
2948                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2949                     FreePostDesc(tmp_pd);
2950                     continue;
2951 #else
2952                     MallocControlMsg(ack_msg_tmp);
2953                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2954                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2955                     ack_msg_tmp->length  = tmp_pd->length;
2956                     msg_tag = PUT_DONE_TAG;
2957 #endif
2958                 }
2959                 break;
2960 #endif
2961             case GNI_POST_RDMA_GET:
2962             case GNI_POST_FMA_GET:  {
2963                 MACHSTATE2(8, "PumpLocal Get done %lld=>%lld\n", tmp_pd->local_addr, tmp_pd->remote_addr);
2964 #if  ! USE_LRTS_MEMPOOL
2965                 MallocControlMsg(ack_msg_tmp);
2966                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2967                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2968                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2969                 msg_tag = ACK_TAG;  
2970 #else
2971 #if CMK_WITH_STATS
2972                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2973 #endif
2974                 int seq_id = tmp_pd->cqwrite_value;
2975                 if(seq_id > 0)      // BIG_MSG
2976                 {
2977                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2978                     MallocControlMsg(ack_msg_tmp);
2979                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2980                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2981                     ack_msg_tmp->seq_id = seq_id;
2982                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2983                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2984                     ack_msg_tmp->length = tmp_pd->length;
2985                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2986                     msg_tag = BIG_MSG_TAG; 
2987                 } 
2988                 else
2989                 {
2990                     if(seq_id < 0)
2991                         CmiReference((void*)tmp_pd->local_addr);
2992                     msg_tag = ACK_TAG; 
2993 #if  !REMOTE_EVENT && !CQWRITE
2994                     MallocAckMsg(ack_msg);
2995                     ack_msg->source_addr = tmp_pd->remote_addr;
2996 #endif
2997                 }
2998 #endif
2999                 break;
3000             }
3001             case  GNI_POST_CQWRITE:
3002                    FreePostDesc(tmp_pd);
3003                    continue;
3004             default:
3005                 CmiPrintf("type=%d\n", tmp_pd->type);
3006                 CmiAbort("PumpLocalTransactions: unknown type!");
3007             }      /* end of switch */
3008
3009 #if CMK_DIRECT
3010             if (tmp_pd->amo_cmd == 1) {
3011                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL, 0);
3012 #if ! CMK_SMSGS_FREE_AFTER_EVENT
3013                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
3014 #endif
3015             }
3016             else
3017 #endif
3018             if (msg_tag == ACK_TAG) {
3019 #if !REMOTE_EVENT
3020 #if   !CQWRITE
3021                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL, 0); 
3022 #if !CMK_SMSGS_FREE_AFTER_EVENT
3023                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
3024 #endif
3025 #else
3026                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
3027 #endif
3028 #endif
3029             }
3030             else {
3031                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL, 0); 
3032 #if !CMK_SMSGS_FREE_AFTER_EVENT
3033                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
3034 #endif
3035             }
3036 #if CMK_PERSISTENT_COMM_PUT
3037             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
3038 #endif
3039             {
3040                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
3041 #if PRINT_SYH
3042                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
3043 #endif
3044                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
3045                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
3046
3047                     //CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
3048                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
3049 #if MACHINE_DEBUG_LOG
3050                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
3051                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
3052                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
3053 #endif
3054                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, CmiGetMsgSize(tmp_pd->local_addr));
3055                     handleOneRecvedMsg(CmiGetMsgSize(tmp_pd->local_addr), (void*)tmp_pd->local_addr);
3056                 }else if(msg_tag == BIG_MSG_TAG){
3057                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
3058                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
3059                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
3060                         START_EVENT();
3061 #if PRINT_SYH
3062                         printf("Pipeline msg done [%d]\n", myrank);
3063 #endif
3064 #if     CMK_SMP_TRACE_COMMTHREAD
3065                         if( tmp_pd->cqwrite_value == 1)
3066                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
3067 #endif
3068                         CMI_CHECK_CHECKSUM(msg, CmiGetMsgSize(msg));
3069                         handleOneRecvedMsg(CmiGetMsgSize(msg), msg);
3070                     }
3071                 }
3072             }
3073             FreePostDesc(tmp_pd);
3074         }
3075 #if CMK_SMSGS_FREE_AFTER_EVENT
3076         else if  (type == GNI_CQ_EVENT_TYPE_SMSG) {
3077             // a SmsgsSend is done
3078             int msgid = GNI_CQ_GET_MSG_ID(ev);
3079             int type = GetSmsgsIndexType(smsgsPool, msgid);
3080             void *addr = GetSmsgsIndexAddress(smsgsPool, msgid);
3081             switch (type) {
3082             case 1:
3083                 CmiFree(addr);
3084                 break;
3085             case 2:
3086                 free(addr);
3087                 break;
3088             default:
3089                 CmiAbort("Invalid SmsgsIndex");
3090             }
3091             SmsgsPool_freeslot(&smsgsPool, msgid);
3092         }
3093 #endif
3094     } //end while
3095     if(status == GNI_RC_ERROR_RESOURCE)
3096     {
3097         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
3098         GNI_RC_CHECK("Smsg_tx_cq full", status);
3099     }
3100 }
3101
3102 static void  SendRdmaMsg( BufferList sendqueue)
3103 {
3104     gni_return_t            status = GNI_RC_SUCCESS;
3105     gni_mem_handle_t        msg_mem_hndl;
3106     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
3107     RDMA_REQUEST            *pre = 0;
3108     uint64_t                register_size = 0;
3109     void                    *msg;
3110     int                     i;
3111
3112     int len = PCQueueLength(sendqueue);
3113     for (i=0; i<len; i++)
3114     {
3115 #if CMI_EXERT_RECV_RDMA_CAP
3116         if( RDMA_pending >= RDMA_cap) break;
3117 #endif
3118         CMI_PCQUEUEPOP_LOCK( sendqueue)
3119         ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
3120         CMI_PCQUEUEPOP_UNLOCK( sendqueue)
3121         if (ptr == NULL) break;
3122         
3123         gni_post_descriptor_t *pd = ptr->pd;
3124         
3125         msg = (void*)(pd->local_addr);
3126         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
3127         register_size = 0;
3128         if(pd->cqwrite_value == 0) {
3129             if(NoMsgInRecv(msg))
3130                 register_size = GetMempoolsize(msg);
3131         }
3132
3133         if(status == GNI_RC_SUCCESS)        //mem register good
3134         {
3135             int destNode = ptr->destNode;
3136             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
3137             CMI_GNI_LOCK(lock);
3138 #if REMOTE_EVENT
3139             if( pd->cqwrite_value == 0 || pd->cqwrite_value == -1) {
3140                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3141                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
3142                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3143             }
3144 #if CMK_PERSISTENT_COMM_PUT
3145             else if (pd->cqwrite_value == PERSIST_SEQ) {
3146                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3147                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
3148                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3149             }
3150 #endif
3151 #if CMK_DIRECT
3152             else if (pd->cqwrite_value == DIRECT_SEQ) {
3153                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3154                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, DIRECT_EVENT(ptr->ack_index));
3155                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3156             }
3157 #endif
3158
3159 #endif
3160 #if CMK_WITH_STATS
3161             RDMA_TRY_SEND(pd->type)
3162 #endif
3163 #if CMK_SMP_TRACE_COMMTHREAD
3164             if(IS_PUT(pd->type))
3165             {
3166                  START_EVENT();
3167                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
3168             }
3169 #endif
3170             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
3171             {
3172                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
3173             }
3174             else
3175             {
3176                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3177             }
3178             CMI_GNI_UNLOCK(lock);
3179             
3180             if(status == GNI_RC_SUCCESS)    //post good
3181             {
3182                 MACHSTATE4(8, "post noempty-rdma  %d (%lld==%lld,%d) \n", ptr->destNode, pd->local_addr, pd->remote_addr,  register_memory_size); 
3183 #if CMI_EXERT_RECV_RDMA_CAP
3184                 RDMA_pending ++;
3185 #endif
3186                 if(pd->cqwrite_value <= 0)
3187                 {
3188 #if CMK_SMP_TRACE_COMMTHREAD 
3189                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3190 #endif
3191                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3192                 }
3193 #if  CMK_WITH_STATS
3194                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3195                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3196 #endif
3197 #if MACHINE_DEBUG_LOG
3198                 buffered_recv_msg += register_size;
3199                 MACHSTATE(8, "GO request from buffered\n"); 
3200 #endif
3201 #if PRINT_SYH
3202                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
3203 #endif
3204                 FreeRdmaRequest(ptr);
3205             }else           // cannot post
3206             {
3207                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3208 #if PRINT_SYH
3209                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3210 #endif
3211                 break;
3212             }
3213         } else          //memory registration fails
3214         {
3215             PCQueuePush(sendqueue, (char*)ptr);
3216         }
3217     } //end while
3218 }
3219
3220 static 
3221 INLINE_KEYWORD gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
3222 {
3223     CONTROL_MSG         *control_msg_tmp;
3224     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
3225
3226     MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3227     if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {   
3228             /* connection not exists yet */
3229 #if CMK_SMP
3230             /* non-smp case, connect is issued in send_smsg_message */
3231         if (smsg_connected_flag[ptr->destNode] == 0)
3232             connect_to(ptr->destNode); 
3233 #endif
3234     }
3235     else
3236     switch(ptr->tag)
3237     {
3238     case SMALL_DATA_TAG:
3239         status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr, 1);  
3240 #if !CMK_SMSGS_FREE_AFTER_EVENT
3241         if(status == GNI_RC_SUCCESS)
3242         {
3243             CmiFree(ptr->msg);
3244         }
3245 #endif
3246         break;
3247     case LMSG_PERSISTENT_INIT_TAG:
3248     case LMSG_INIT_TAG:
3249     case LMSG_OOB_INIT_TAG:
3250         control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3251         status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr, ptr->tag);
3252         break;
3253 #if !REMOTE_EVENT && !CQWRITE
3254     case ACK_TAG:
3255         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, 0);  
3256 #if !CMK_SMSGS_FREE_AFTER_EVENT
3257         if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3258 #endif
3259         break;
3260 #endif
3261     case BIG_MSG_TAG:
3262         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, 0);  
3263 #if !CMK_SMSGS_FREE_AFTER_EVENT
3264         if(status == GNI_RC_SUCCESS)
3265         {
3266             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3267         }
3268 #endif
3269         break;
3270 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE 
3271     case PUT_DONE_TAG:
3272         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, 0);
3273 #if !CMK_SMSGS_FREE_AFTER_EVENT
3274         if(status == GNI_RC_SUCCESS)
3275         {
3276             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3277         }
3278 #endif
3279         break;
3280 #endif
3281 #if CMK_DIRECT
3282     case DIRECT_PUT_DONE_TAG:
3283         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr, 0);
3284 #if !CMK_SMSGS_FREE_AFTER_EVENT
3285         if(status == GNI_RC_SUCCESS)
3286         {
3287             free((CMK_DIRECT_HEADER*)ptr->msg);
3288         }
3289 #endif
3290         break;
3291 #endif
3292     default:
3293         printf("Weird tag\n");
3294         CmiAbort("should not happen\n");
3295     }       // end switch
3296     return status;
3297 }
3298
3299 // return 1 if all messages are sent
3300
3301 #if ONE_SEND_QUEUE
3302
3303 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3304 {
3305     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3306     CONTROL_MSG         *control_msg_tmp;
3307     gni_return_t        status;
3308     int                 done = 1;
3309     uint64_t            register_size;
3310     void                *register_addr;
3311     int                 index_previous = -1;
3312 #if     CMI_SENDBUFFERSMSG_CAP
3313     int                 sent_length = 0;
3314 #endif
3315     int          index = 0;
3316     memset(destpe_avail, 0, mysize * sizeof(char));
3317     for (index=0; index<1; index++)
3318     {
3319         int i, len = PCQueueLength(queue->sendMsgBuf);
3320         for (i=0; i<len; i++) 
3321         {
3322             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3323             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3324             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3325             if(ptr == NULL) break;
3326             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3327                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3328                 continue;
3329             }
3330             status = _sendOneBufferedSmsg(queue, ptr);
3331 #if CMI_SENDBUFFERSMSG_CAP
3332             sent_length++;
3333 #endif
3334             if(status == GNI_RC_SUCCESS)
3335             {
3336 #if PRINT_SYH
3337                 buffered_smsg_counter--;
3338                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3339 #endif
3340                 FreeMsgList(ptr);
3341             }else {
3342                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3343                 done = 0;
3344                 if(status == GNI_RC_ERROR_RESOURCE)
3345                 {
3346                     destpe_avail[ptr->destNode] = 1;
3347                 }
3348             } 
3349         } //end while
3350     }   // end pooling for all cores
3351     return done;
3352 }
3353
3354 #else   /*  ! ONE_SEND_QUEUE  */
3355
3356 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3357 {
3358     MSG_LIST            *ptr;
3359     gni_return_t        status;
3360     int                 done = 1;
3361 #if     CMI_SENDBUFFERSMSG_CAP
3362     int                 sent_length = 0;
3363 #endif
3364     int idx;
3365 #if SMP_LOCKS
3366     int          index = -1;
3367     int nonempty = PCQueueLength(queue->nonEmptyQueues);
3368     for(idx =0; idx<nonempty; idx++) 
3369     {
3370         index++;  if (index >= nonempty) index = 0;
3371 #if CMI_SENDBUFFERSMSG_CAP
3372         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3373 #endif
3374         CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3375         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3376         CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3377         if(current_list == NULL) break; 
3378         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3379             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3380             continue;
3381         }
3382         PCQueue current_queue= current_list->sendSmsgBuf;
3383         CmiLock(current_list->lock);
3384         int i, len = PCQueueLength(current_queue);
3385         current_list->pushed = 0;
3386         CmiUnlock(current_list->lock);
3387 #else      /* ! SMP_LOCKS */
3388     static int          index = -1;
3389     for(idx =0; idx<mysize; idx++) 
3390     {
3391         index++;  if (index == mysize) index = 0;
3392 #if CMI_SENDBUFFERSMSG_CAP
3393         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3394 #endif
3395         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue;             // check urgent queue
3396         //if (index == myrank) continue;
3397         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3398         int i, len = PCQueueLength(current_queue);
3399 #endif
3400         for (i=0; i<len; i++)  {
3401             CMI_PCQUEUEPOP_LOCK(current_queue)
3402             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3403             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3404             if (ptr == 0) break;
3405
3406             status = _sendOneBufferedSmsg(queue, ptr);
3407 #if CMI_SENDBUFFERSMSG_CAP
3408             sent_length++;
3409 #endif
3410             if(status == GNI_RC_SUCCESS)
3411             {
3412 #if PRINT_SYH
3413                 buffered_smsg_counter--;
3414                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3415 #endif
3416                 FreeMsgList(ptr);
3417             }else {
3418                 PCQueuePush(current_queue, (char*)ptr);
3419                 done = 0;
3420                 if(status == GNI_RC_ERROR_RESOURCE)
3421                 {
3422                     break;
3423                 }
3424             } 
3425         } //end for i
3426 #if SMP_LOCKS
3427         CmiLock(current_list->lock);
3428         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3429         {
3430             current_list->pushed = 1;
3431             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3432         }
3433         CmiUnlock(current_list->lock); 
3434 #endif
3435     }   // end pooling for all cores
3436     return done;
3437 }
3438
3439 #endif
3440
3441 static void ProcessDeadlock();
3442 void LrtsAdvanceCommunication(int whileidle)
3443 {
3444     static int count = 0;
3445     /*  Receive Msg first */
3446 #if CMK_SMP_TRACE_COMMTHREAD
3447     double startT, endT;
3448 #endif
3449     if (useDynamicSMSG && whileidle)
3450     {
3451 #if CMK_SMP_TRACE_COMMTHREAD
3452         startT = CmiWallTimer();
3453 #endif
3454         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3455 #if CMK_SMP_TRACE_COMMTHREAD
3456         endT = CmiWallTimer();
3457         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3458 #endif
3459     }
3460
3461     SEND_OOB_SMSG(smsg_oob_queue)
3462     PUMP_REMOTE_HIGHPRIORITY
3463     PUMP_LOCAL_HIGHPRIORITY
3464     POST_HIGHPRIORITY_RDMA
3465     // Receiving small messages and persistent
3466 #if CMK_SMP_TRACE_COMMTHREAD
3467     startT = CmiWallTimer();
3468 #endif
3469     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3470 #if CMK_SMP_TRACE_COMMTHREAD
3471     endT = CmiWallTimer();
3472     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3473 #endif
3474
3475     SEND_OOB_SMSG(smsg_oob_queue)
3476     PUMP_REMOTE_HIGHPRIORITY
3477     PUMP_LOCAL_HIGHPRIORITY
3478     POST_HIGHPRIORITY_RDMA
3479     
3480     ///* Send buffered Message */
3481 #if CMK_SMP_TRACE_COMMTHREAD
3482     startT = CmiWallTimer();
3483 #endif
3484 #if CMK_USE_OOB
3485     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3486 #else
3487     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3488 #endif
3489 #if CMK_SMP_TRACE_COMMTHREAD
3490     endT = CmiWallTimer();
3491     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3492 #endif
3493
3494     SEND_OOB_SMSG(smsg_oob_queue)
3495     PUMP_REMOTE_HIGHPRIORITY
3496     PUMP_LOCAL_HIGHPRIORITY
3497     POST_HIGHPRIORITY_RDMA
3498
3499     //Pump Get messages or PUT messages
3500 #if CMK_SMP_TRACE_COMMTHREAD
3501     startT = CmiWallTimer();
3502 #endif
3503     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3504 #if MULTI_THREAD_SEND
3505     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3506 #endif
3507 #if CMK_SMP_TRACE_COMMTHREAD
3508     endT = CmiWallTimer();
3509     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3510 #endif
3511     
3512     SEND_OOB_SMSG(smsg_oob_queue)
3513     PUMP_REMOTE_HIGHPRIORITY
3514     PUMP_LOCAL_HIGHPRIORITY
3515     POST_HIGHPRIORITY_RDMA
3516     //Pump Remote event
3517 #if CMK_SMP_TRACE_COMMTHREAD
3518     startT = CmiWallTimer();
3519 #endif
3520 #if CQWRITE
3521     PumpCqWriteTransactions();
3522 #endif
3523 #if REMOTE_EVENT
3524     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3525 #endif
3526 #if CMK_SMP_TRACE_COMMTHREAD
3527     endT = CmiWallTimer();
3528     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3529 #endif
3530
3531     SEND_OOB_SMSG(smsg_oob_queue)
3532     PUMP_REMOTE_HIGHPRIORITY
3533     PUMP_LOCAL_HIGHPRIORITY
3534     POST_HIGHPRIORITY_RDMA
3535
3536 #if CMK_SMP_TRACE_COMMTHREAD
3537     startT = CmiWallTimer();
3538 #endif
3539     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3540 #if CMK_SMP_TRACE_COMMTHREAD
3541     endT = CmiWallTimer();
3542     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3543 #endif
3544
3545 #if CMK_SMP && ! LARGEPAGE