fix remote_event bug on CrayXC
[charm.git] / src / arch / gni / machine.c
1
2 /** @file
3  * GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #define DIRECT_SEQ 0xFFFFFFE 
48 #include "cmidirect.h"
49 #endif
50
51 #if REGULARPAGE 
52 #define     LARGEPAGE              0
53 #else
54 #define     LARGEPAGE              1
55 #endif
56
57 #if CMK_SMP
58 #define MULTI_THREAD_SEND          0
59 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
60 #endif
61
62 #if MULTI_THREAD_SEND
63 #define CMK_WORKER_SINGLE_TASK     1
64 #endif
65
66 #define REMOTE_EVENT               1
67
68 #define CQWRITE                    0
69
70 #define CMI_EXERT_SEND_LARGE_CAP   0
71 #define CMI_EXERT_RECV_RDMA_CAP    0
72
73
74 #define CMI_SENDBUFFERSMSG_CAP            0
75 #define CMI_PUMPNETWORKSMSG_CAP           0
76 #define CMI_PUMPREMOTETRANSACTIONS_CAP    0
77 #define CMI_PUMPLOCALTRANSACTIONS_CAP     0
78
79 #if CMI_SENDBUFFERSMSG_CAP
80 int     SendBufferMsg_cap  = 20;
81 #endif
82
83 #if CMI_PUMPNETWORKSMSG_CAP
84 int     PumpNetworkSmsg_cap = 20;
85 #endif
86
87 #if CMI_PUMPREMOTETRANSACTIONS_CAP
88 int     PumpRemoteTransactions_cap = 20;
89 #endif
90
91 #if CMI_PUMPREMOTETRANSACTIONS_CAP
92 int     PumpLocalTransactions_cap = 15;
93 #endif
94
95 #if CMI_EXERT_SEND_LARGE_CAP
96 static int SEND_large_cap = 20;
97 static int SEND_large_pending = 0;
98 #endif
99
100 #if CMI_EXERT_RECV_RDMA_CAP
101 static int   RDMA_cap =   10;
102 static int   RDMA_pending = 0;
103 #endif
104
105 #define USE_LRTS_MEMPOOL                  1
106
107 #define PRINT_SYH                         0
108
109 // Trace communication thread
110 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
111 #define TRACE_THRESHOLD     0.00001
112 #define CMI_MPI_TRACE_MOREDETAILED 0
113 #undef CMI_MPI_TRACE_USEREVENTS
114 #define CMI_MPI_TRACE_USEREVENTS 1
115 #else
116 #undef CMK_SMP_TRACE_COMMTHREAD
117 #define CMK_SMP_TRACE_COMMTHREAD 0
118 #endif
119
120 #define CMK_TRACE_COMMOVERHEAD 0
121 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
122 #undef CMI_MPI_TRACE_USEREVENTS
123 #define CMI_MPI_TRACE_USEREVENTS 1
124 #else
125 #undef CMK_TRACE_COMMOVERHEAD
126 #define CMK_TRACE_COMMOVERHEAD 0
127 #endif
128
129 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
130 CpvStaticDeclare(double, projTraceStart);
131 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
132 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
133 #define  EVENT_TIME()   CpvAccess(projTraceStart)
134 #else
135 #define  START_EVENT()
136 #define  END_EVENT(x)
137 #define  EVENT_TIME()   (0.0)
138 #endif
139
140 #if USE_LRTS_MEMPOOL
141
142 #define oneMB (1024ll*1024)
143 #define oneGB (1024ll*1024*1024)
144
145 static CmiInt8 _mempool_size = 8*oneMB;
146 static CmiInt8 _expand_mem =  4*oneMB;
147 static CmiInt8 _mempool_size_limit = 0;
148
149 static CmiInt8 _totalmem = 0.8*oneGB;
150
151 #if LARGEPAGE
152 static CmiInt8 BIG_MSG  =  16*oneMB;
153 static CmiInt8 ONE_SEG  =  4*oneMB;
154 #else
155 static CmiInt8 BIG_MSG  =  4*oneMB;
156 static CmiInt8 ONE_SEG  =  2*oneMB;
157 #endif
158 #if MULTI_THREAD_SEND
159 static int BIG_MSG_PIPELINE = 1;
160 #else
161 static int BIG_MSG_PIPELINE = 4;
162 #endif
163
164 // dynamic flow control
165 static CmiInt8 buffered_send_msg = 0;
166 static CmiInt8 register_memory_size = 0;
167
168 #if LARGEPAGE
169 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
170 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
171 static CmiInt8  register_count = 0;
172 #else
173 #if CMK_SMP && COMM_THREAD_SEND 
174 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
175 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
176 #else
177 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
178 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
179 #endif
180
181
182 #endif
183
184 #endif     /* end USE_LRTS_MEMPOOL */
185
186 #if MULTI_THREAD_SEND 
187 #define     CMI_GNI_LOCK(x)       CmiLock(x);
188 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
189 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
190 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
191 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
192 #else
193 #define     CMI_GNI_LOCK(x)
194 #define     CMI_GNI_TRYLOCK(x)         (0)
195 #define     CMI_GNI_UNLOCK(x)
196 #define     CMI_PCQUEUEPOP_LOCK(Q)   
197 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
198 #endif
199
200 static int _tlbpagesize = 4096;
201
202 //static int _smpd_count  = 0;
203
204 static int   user_set_flag  = 0;
205
206 static int _checkProgress = 1;             /* check deadlock */
207 static int _detected_hang = 0;
208
209 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
210
211 // dynamic SMSG
212 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
213
214 static int avg_smsg_connection = 32;
215 static int                 *smsg_connected_flag= 0;
216 static gni_smsg_attr_t     **smsg_attr_vector_local;
217 static gni_smsg_attr_t     **smsg_attr_vector_remote;
218 static gni_ep_handle_t     ep_hndl_unbound;
219 static gni_smsg_attr_t     send_smsg_attr;
220 static gni_smsg_attr_t     recv_smsg_attr;
221
222 typedef struct _dynamic_smsg_mailbox{
223    void     *mailbox_base;
224    int      size;
225    int      offset;
226    gni_mem_handle_t  mem_hndl;
227    struct      _dynamic_smsg_mailbox  *next;
228 }dynamic_smsg_mailbox_t;
229
230 static dynamic_smsg_mailbox_t  *mailbox_list;
231
232 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
233 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
234
235 #if PRINT_SYH
236 int         lrts_send_msg_id = 0;
237 int         lrts_local_done_msg = 0;
238 int         lrts_send_rdma_success = 0;
239 #endif
240
241 #include "machine.h"
242
243 #include "pcqueue.h"
244
245 #include "mempool.h"
246
247 #if CMK_PERSISTENT_COMM
248 #define PERSISTENT_GET_BASE 0 
249 #if !PERSISTENT_GET_BASE
250 #define CMK_PERSISTENT_COMM_PUT 1 
251 #endif
252 #include "machine-persistent.h"
253 #define  POST_HIGHPRIORITY_RDMA    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
254 #else  
255 #define  POST_HIGHPRIORITY_RDMA   
256 #endif
257
258 #if REMOTE_EVENT && (CMK_USE_OOB || CMK_PERSISTENT_COMM_PUT) 
259 #define  PUMP_REMOTE_HIGHPRIORITY    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
260 #else
261 #define  PUMP_REMOTE_HIGHPRIORITY
262 #endif
263
264 //#define  USE_ONESIDED 1
265 #ifdef USE_ONESIDED
266 //onesided implementation is wrong, since no place to restore omdh
267 #include "onesided.h"
268 onesided_hnd_t   onesided_hnd;
269 onesided_md_t    omdh;
270 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
271
272 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
273
274 #else
275 uint8_t   onesided_hnd, omdh;
276
277 #if REMOTE_EVENT || CQWRITE 
278 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
279     if(register_memory_size+size>= MAX_REG_MEM) { \
280         status = GNI_RC_ERROR_NOMEM;} \
281     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
282         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
283 #else
284 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
285         if (register_memory_size + size >= MAX_REG_MEM) { \
286             status = GNI_RC_ERROR_NOMEM; \
287         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
288             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
289 #endif
290
291 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
292     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
293              register_memory_size -= size; \
294          else CmiAbort("MEM_DEregister");  \
295     } while (0)
296 #endif
297
298 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
299 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
300 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
301 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
302 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
303 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
304 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
305 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
306 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
307 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
308 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
309 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
310 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
311 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
312
313 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
314 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
315
316 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
317 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
318 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
319 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
320
321 #define ALIGNBUF                64
322
323 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
324 /* If SMSG is not used */
325
326 #define FMA_PER_CORE  1024
327 #define FMA_BUFFER_SIZE 1024
328
329 /* If SMSG is used */
330 static int  SMSG_MAX_MSG = 1024;
331 #define SMSG_MAX_CREDIT    72
332
333 #define MSGQ_MAXSIZE       2048
334
335 /* large message transfer with FMA or BTE */
336 #if ! REMOTE_EVENT
337 #define LRTS_GNI_RDMA_THRESHOLD  1024 
338 #else
339    /* remote events only work with RDMA */
340 #define LRTS_GNI_RDMA_THRESHOLD  0 
341 #endif
342
343 #if CMK_SMP
344 static int  REMOTE_QUEUE_ENTRIES=163840; 
345 static int LOCAL_QUEUE_ENTRIES=163840; 
346 #else
347 static int  REMOTE_QUEUE_ENTRIES=20480;
348 static int LOCAL_QUEUE_ENTRIES=20480; 
349 #endif
350
351 #define BIG_MSG_TAG             0x26
352 #define PUT_DONE_TAG            0x28
353 #define DIRECT_PUT_DONE_TAG     0x29
354 #define ACK_TAG                 0x30
355 /* SMSG is data message */
356 #define SMALL_DATA_TAG          0x31
357 /* SMSG is a control message to initialize a BTE */
358 #define LMSG_INIT_TAG           0x33 
359 #define LMSG_PERSISTENT_INIT_TAG           0x34 
360 #define LMSG_OOB_INIT_TAG       0x35
361
362 #define DEBUG
363 #ifdef GNI_RC_CHECK
364 #undef GNI_RC_CHECK
365 #endif
366 #ifdef DEBUG
367 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
368 #else
369 #define GNI_RC_CHECK(msg,rc)
370 #endif
371
372 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
373 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
374 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
375
376 static int useStaticMSGQ = 0;
377 static int useStaticFMA = 0;
378 static int mysize, myrank;
379 static gni_nic_handle_t   nic_hndl;
380
381 typedef struct {
382     gni_mem_handle_t mdh;
383     uint64_t addr;
384 } mdh_addr_t ;
385 // this is related to dynamic SMSG
386
387 typedef struct mdh_addr_list{
388     gni_mem_handle_t mdh;
389    void *addr;
390     struct mdh_addr_list *next;
391 }mdh_addr_list_t;
392
393 static unsigned int         smsg_memlen;
394 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
395 mdh_addr_t          setup_mem;
396 mdh_addr_t          *smsg_connection_vec = 0;
397 gni_mem_handle_t    smsg_connection_memhndl;
398 static int          smsg_expand_slots = 10;
399 static int          smsg_available_slot = 0;
400 static void         *smsg_mailbox_mempool = 0;
401 mdh_addr_list_t     *smsg_dynamic_list = 0;
402
403 static void             *smsg_mailbox_base;
404 gni_msgq_attr_t         msgq_attrs;
405 gni_msgq_handle_t       msgq_handle;
406 gni_msgq_ep_attr_t      msgq_ep_attrs;
407 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
408
409 /* =====Beginning of Declarations of Machine Specific Variables===== */
410 static int cookie;
411 static int modes = 0;
412 static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
413 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
414 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
415 static gni_cq_handle_t       highprior_rdma_tx_cqh = NULL;      // rdma - local event
416 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
417 static gni_cq_handle_t       highpriority_rx_cqh = NULL;      // mempool - remote event
418 static gni_ep_handle_t       *ep_hndl_array;
419
420 static CmiNodeLock           *ep_lock_array;
421 static CmiNodeLock           default_tx_cq_lock; 
422 static CmiNodeLock           rdma_tx_cq_lock; 
423 static CmiNodeLock           global_gni_lock; 
424 static CmiNodeLock           rx_cq_lock;
425 static CmiNodeLock           smsg_mailbox_lock;
426 static CmiNodeLock           smsg_rx_cq_lock;
427 static CmiNodeLock           *mempool_lock;
428 //#define     CMK_WITH_STATS      1
429 typedef struct msg_list
430 {
431     uint32_t destNode;
432     uint32_t size;
433     void *msg;
434     uint8_t tag;
435 #if CMK_WITH_STATS
436     double  creation_time;
437 #endif
438 }MSG_LIST;
439
440
441 typedef struct control_msg
442 {
443     uint64_t            source_addr;    /* address from the start of buffer  */
444     uint64_t            dest_addr;      /* address from the start of buffer */
445     int                 total_length;   /* total length */
446     int                 length;         /* length of this packet */
447 #if REMOTE_EVENT
448     int                 ack_index;      /* index from integer to address */
449 #endif
450     int     seq_id;         //big message   0 meaning single message
451     gni_mem_handle_t    source_mem_hndl;
452 #if PERSISTENT_GET_BASE
453     gni_mem_handle_t    dest_mem_hndl;
454 #endif
455     struct control_msg  *next;
456 } CONTROL_MSG;
457
458 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
459
460 typedef struct ack_msg
461 {
462     uint64_t            source_addr;    /* address from the start of buffer  */
463 #if ! USE_LRTS_MEMPOOL
464     gni_mem_handle_t    source_mem_hndl;
465     int                 length;          /* total length */
466 #endif
467     struct ack_msg     *next;
468 } ACK_MSG;
469
470 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
471
472 #if CMK_DIRECT
473 typedef struct{
474     uint64_t    handler_addr;
475 }CMK_DIRECT_HEADER;
476
477 typedef struct {
478     char core[CmiMsgHeaderSizeBytes];
479     uint64_t handler;
480 }cmidirectMsg;
481
482 //SYH
483 CpvDeclare(int, CmiHandleDirectIdx);
484 void CmiHandleDirectMsg(cmidirectMsg* msg)
485 {
486
487     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
488    (*(_handle->callbackFnPtr))(_handle->callbackData);
489    CmiFree(msg);
490 }
491
492 void CmiDirectInit()
493 {
494     CpvInitialize(int,  CmiHandleDirectIdx);
495     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
496 }
497
498 #endif
499 typedef struct  rmda_msg
500 {
501     int                   destNode;
502 #if REMOTE_EVENT
503     int                   ack_index;
504 #endif
505     gni_post_descriptor_t *pd;
506 }RDMA_REQUEST;
507
508
509 #if CMK_SMP
510 #define SMP_LOCKS                       1
511 #else
512 #define SMP_LOCKS                       0
513 #endif
514 #define ONE_SEND_QUEUE                  0
515
516 typedef PCQueue BufferList;
517 typedef struct  msg_list_index
518 {
519     PCQueue       sendSmsgBuf;
520 #if  SMP_LOCKS
521     CmiNodeLock   lock;
522     int           pushed;
523     int           destpe;
524 #endif
525 } MSG_LIST_INDEX;
526 char                *destpe_avail;
527 PCQueue sendRdmaBuf;
528 PCQueue sendHighPriorBuf;
529 // buffered send queue
530 #if ! ONE_SEND_QUEUE
531 typedef struct smsg_queue
532 {
533     MSG_LIST_INDEX   *smsg_msglist_index;
534     int               smsg_head_index;
535 #if  SMP_LOCKS
536     PCQueue     nonEmptyQueues;
537 #endif
538 } SMSG_QUEUE;
539 #else
540 typedef struct smsg_queue
541 {
542     PCQueue       sendMsgBuf;
543 }  SMSG_QUEUE;
544 #endif
545
546 SMSG_QUEUE                  smsg_queue;
547 #if CMK_USE_OOB
548 SMSG_QUEUE                  smsg_oob_queue;
549 #define SEND_OOB_SMSG(x)            SendBufferMsg(&x, NULL);
550 #define PUMP_LOCAL_HIGHPRIORITY    STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh,  rdma_tx_cq_lock)); 
551 #else
552 #define SEND_OOB_SMSG(x)            
553 #define PUMP_LOCAL_HIGHPRIORITY     
554 #endif
555
556 #define FreeMsgList(d)   free(d);
557 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
558
559 #define FreeControlMsg(d)      free(d);
560 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
561
562 #define FreeAckMsg(d)      free(d);
563 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
564
565 #define FreeRdmaRequest(d)       free(d);
566 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
567 /* reuse gni_post_descriptor_t */
568 static gni_post_descriptor_t *post_freelist=0;
569
570 #define FreePostDesc(d)     free(d);
571 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
572
573
574 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
575 static int      buffered_smsg_counter = 0;
576
577 /* SmsgSend return success but message sent is not confirmed by remote side */
578 static MSG_LIST *buffered_fma_head = 0;
579 static MSG_LIST *buffered_fma_tail = 0;
580
581 /* functions  */
582 #define IsFree(a,ind)  !( a& (1<<(ind) ))
583 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
584 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
585
586 CpvDeclare(mempool_type*, mempool);
587
588 #if CMK_PERSISTENT_COMM_PUT
589 CpvDeclare(mempool_type*, persistent_mempool);
590 #endif
591
592 #if CMK_SMSGS_FREE_AFTER_EVENT
593 /* 
594   SMSGS pool 
595   the pool is to buffer sending smsgs until it can be free'ed .
596 */
597
598 #if CMK_SMP
599 #define SMSG_INIT_SIZE                4096
600 #else
601 #define SMSG_INIT_SIZE                1024
602 #endif
603
604 struct SmsgsStruct {
605 void *addr;
606 int next;
607 int type;               /* 1: charm   2: control msg */
608 };
609
610 typedef struct SmsgsPool {
611     struct SmsgsStruct   *indexes;
612     int                   size;
613     int                   freehead;
614     CmiNodeLock           lock;
615 } SmsgsPool;
616
617 static SmsgsPool  smsgsPool;
618
619 #define  GetSmsgsIndexType(pool, s)             (pool.indexes[s].type)
620 #define  GetSmsgsIndexAddress(pool, s)          (pool.indexes[s].addr)
621
622 static void SmsgsPool_init(SmsgsPool *pool)
623 {
624     int i;
625     pool->size = SMSG_INIT_SIZE;
626     pool->indexes = (struct SmsgsStruct *)malloc(pool->size*sizeof(struct SmsgsStruct));
627     for (i=0; i<pool->size-1; i++) {
628         pool->indexes[i].next = i+1;
629     }
630     pool->indexes[i].next = -1;
631     pool->freehead = 0;
632 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM_PUT
633     pool->lock  = CmiCreateLock();
634 #else
635     pool->lock  = 0;
636 #endif
637 }
638
639 static int SmsgsPool_getslot(SmsgsPool *pool, void *addr, int type)
640 {
641     int s, i;
642 #if MULTI_THREAD_SEND  
643     CmiLock(pool->lock);
644 #endif
645     s = pool->freehead;
646     if (s == -1) {
647         int newsize = pool->size * 2;
648         //printf("[%d] SmsgsPool_getslot %p expand to: %d\n", myrank, pool, newsize);
649         struct SmsgsStruct *old_pool = pool->indexes;
650         pool->indexes = (struct SmsgsStruct *)malloc(newsize*sizeof(struct SmsgsStruct));
651         memcpy(pool->indexes, old_pool, pool->size*sizeof(struct SmsgsStruct));
652         for (i=pool->size; i<newsize-1; i++) {
653             pool->indexes[i].next = i+1;
654         }
655         pool->indexes[i].next = -1;
656         pool->freehead = pool->size;
657         s = pool->size;
658         pool->size = newsize;
659         free(old_pool);
660     }
661     pool->freehead = pool->indexes[s].next;
662     pool->indexes[s].addr = addr;
663     pool->indexes[s].type = type;
664 #if MULTI_THREAD_SEND
665     CmiUnlock(pool->lock);
666 #endif
667     return s;
668 }
669
670 static void SmsgsPool_freeslot(SmsgsPool *pool, int s)
671 {
672     CmiAssert(s>=0 && s<pool->size);
673 #if MULTI_THREAD_SEND
674     CmiLock(pool->lock);
675 #endif
676     pool->indexes[s].next = pool->freehead;
677     pool->freehead = s;
678 #if MULTI_THREAD_SEND
679     CmiUnlock(pool->lock);
680 #endif
681 }
682
683 #endif  /* CMK_SMSGS_FREE_AFTER_EVENT */
684
685
686 #if REMOTE_EVENT
687 /* ack pool for remote events */
688
689 static int  SHIFT   =           18;
690 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
691 #define RANK_MASK               ((1<<SHIFT) - 1)
692 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
693
694 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
695 #define GET_RANK(evt)           ((evt) & RANK_MASK)
696 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
697
698 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
699 #define DIRECT_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
700
701 #if CMK_SMP
702 #define INIT_SIZE                4096
703 #else
704 #define INIT_SIZE                1024
705 #endif
706
707 struct IndexStruct {
708 void *addr;
709 int next;
710 int type;     // 1: ACK   2: Persistent
711 };
712
713 typedef struct IndexPool {
714     struct IndexStruct   *indexes;
715     int                   size;
716     int                   freehead;
717     CmiNodeLock           lock;
718 } IndexPool;
719
720 static IndexPool  ackPool;
721 #if CMK_PERSISTENT_COMM_PUT
722 static IndexPool  persistPool;
723 #else
724 #define persistPool ackPool 
725 #endif
726
727 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
728 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
729
730 static void IndexPool_init(IndexPool *pool)
731 {
732     int i;
733     if ((1<<SHIFT) < mysize) 
734         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
735     pool->size = INIT_SIZE;
736     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
737     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
738     for (i=0; i<pool->size-1; i++) {
739         pool->indexes[i].next = i+1;
740         pool->indexes[i].type = 0;
741     }
742     pool->indexes[i].next = -1;
743     pool->indexes[i].type = 0;
744     pool->freehead = 0;
745 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM_PUT
746     pool->lock  = CmiCreateLock();
747 #else
748     pool->lock  = 0;
749 #endif
750 }
751
752 static int IndexPool_getslot(IndexPool *pool, void *addr, int type)
753 {
754     int s, i;
755 #if MULTI_THREAD_SEND  
756     CmiLock(pool->lock);
757 #endif
758     CmiAssert(type == 1 || type == 2);
759     s = pool->freehead;
760     if (s == -1) {
761         int newsize = pool->size * 2;
762         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
763         if (newsize > (1<<(32-SHIFT-1))) {
764             static int warned = 0;
765             if (!warned)
766               printf("[%d] Warning: IndexPool_getslot %p overflow when expanding to: %d\n", myrank, pool, newsize);
767             warned = 1;
768             return -1;
769             CmiAbort("IndexPool for remote events overflows, try compile Charm++ with remote event disabled.");
770         }
771         struct IndexStruct *old_ackpool = pool->indexes;
772         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
773         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
774         for (i=pool->size; i<newsize-1; i++) {
775             pool->indexes[i].next = i+1;
776             pool->indexes[i].type = 0;
777         }
778         pool->indexes[i].next = -1;
779         pool->indexes[i].type = 0;
780         pool->freehead = pool->size;
781         s = pool->size;
782         pool->size = newsize;
783         free(old_ackpool);
784     }
785     pool->freehead = pool->indexes[s].next;
786     pool->indexes[s].addr = addr;
787     CmiAssert(pool->indexes[s].type == 0);
788     pool->indexes[s].type = type;
789 #if MULTI_THREAD_SEND
790     CmiUnlock(pool->lock);
791 #endif
792     return s;
793 }
794
795 static void IndexPool_freeslot(IndexPool *pool, int s)
796 {
797     CmiAssert(s>=0 && s<pool->size);
798 #if MULTI_THREAD_SEND
799     CmiLock(pool->lock);
800 #endif
801     pool->indexes[s].next = pool->freehead;
802     pool->indexes[s].type = 0;
803     pool->freehead = s;
804 #if MULTI_THREAD_SEND
805     CmiUnlock(pool->lock);
806 #endif
807 }
808
809
810 #endif   /* REMOTE_EVENT */
811
812 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
813 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
814 #define CHARM_MAGIC_NUMBER               126
815
816 #if CMK_ERROR_CHECKING
817 extern unsigned char computeCheckSum(unsigned char *data, int len);
818 static int checksum_flag = 0;
819 #define CMI_SET_CHECKSUM(msg, len)      \
820         if (checksum_flag)  {   \
821           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
822           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
823         }
824 #define CMI_CHECK_CHECKSUM(msg, len)    \
825         if (checksum_flag)      \
826           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
827             CmiAbort("Fatal error: checksum doesn't agree!\n");
828 #else
829 #define CMI_SET_CHECKSUM(msg, len)
830 #define CMI_CHECK_CHECKSUM(msg, len)
831 #endif
832 /* =====End of Definitions of Message-Corruption Related Macros=====*/
833
834 static int print_stats = 0;
835 static int stats_off = 0;
836 void CmiTurnOnStats()
837 {
838     stats_off = 0;
839     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
840 }
841
842 void CmiTurnOffStats()
843 {
844     stats_off = 1;
845 }
846
847 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
848
849 #if CMK_WITH_STATS
850 FILE *counterLog = NULL;
851 typedef struct comm_thread_stats
852 {
853     uint64_t  smsg_data_count;
854     uint64_t  lmsg_init_count;
855     uint64_t  ack_count;
856     uint64_t  big_msg_ack_count;
857     uint64_t  smsg_count;
858     uint64_t  direct_put_done_count;
859     uint64_t  put_done_count;
860     //times of calling SmsgSend
861     uint64_t  try_smsg_data_count;
862     uint64_t  try_lmsg_init_count;
863     uint64_t  try_ack_count;
864     uint64_t  try_big_msg_ack_count;
865     uint64_t  try_direct_put_done_count;
866     uint64_t  try_put_done_count;
867     uint64_t  try_smsg_count;
868     
869     double    max_time_in_send_buffered_smsg;
870     double    all_time_in_send_buffered_smsg;
871
872     uint64_t  rdma_get_count, rdma_put_count;
873     uint64_t  try_rdma_get_count, try_rdma_put_count;
874     double    max_time_from_control_to_rdma_init;
875     double    all_time_from_control_to_rdma_init;
876
877     double    max_time_from_rdma_init_to_rdma_done;
878     double    all_time_from_rdma_init_to_rdma_done;
879
880     int      count_in_PumpNetwork;
881     double   time_in_PumpNetwork;
882     double   max_time_in_PumpNetwork;
883     int      count_in_SendBufferMsg_smsg;
884     double   time_in_SendBufferMsg_smsg;
885     double   max_time_in_SendBufferMsg_smsg;
886     int      count_in_SendRdmaMsg;
887     double   time_in_SendRdmaMsg;
888     double   max_time_in_SendRdmaMsg;
889     int      count_in_PumpRemoteTransactions;
890     double   time_in_PumpRemoteTransactions;
891     double   max_time_in_PumpRemoteTransactions;
892     int      count_in_PumpLocalTransactions_rdma;
893     double   time_in_PumpLocalTransactions_rdma;
894     double   max_time_in_PumpLocalTransactions_rdma;
895     int      count_in_PumpDatagramConnection;
896     double   time_in_PumpDatagramConnection;
897     double   max_time_in_PumpDatagramConnection;
898 } Comm_Thread_Stats;
899
900 static Comm_Thread_Stats   comm_stats;
901
902 static char *counters_dirname = "counters";
903
904 static void init_comm_stats()
905 {
906   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
907   if (print_stats){
908       char ln[200];
909       int code = mkdir(counters_dirname, 00777); 
910       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
911       counterLog=fopen(ln,"w");
912       if (counterLog == NULL) CmiAbort("Counter files open failed");
913   }
914 }
915
916 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
917
918 #define SMSG_SENT_DONE(creation_time, tag)  \
919         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
920             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++;  \
921             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
922             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
923             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
924             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
925             comm_stats.smsg_count++; \
926             double inbuff_time = CmiWallTimer() - creation_time;   \
927             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
928             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
929         }
930
931 #define SMSG_TRY_SEND(tag)  \
932         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
933             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
934             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
935             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
936             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
937             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
938             comm_stats.try_smsg_count++; \
939         }
940
941 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
942
943 #define  RDMA_TRANS_DONE(x)      \
944          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
945              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
946              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
947          }
948
949 #define  RDMA_TRANS_INIT(type, x)      \
950          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
951              double rdma_trans_time = CmiWallTimer() - x ; \
952              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
953              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
954          }
955
956 #define STATS_PUMPNETWORK_TIME(x)   \
957         { double t = CmiWallTimer(); \
958           x;        \
959           t = CmiWallTimer() - t;          \
960           comm_stats.count_in_PumpNetwork++;        \
961           comm_stats.time_in_PumpNetwork += t;   \
962           if (t>comm_stats.max_time_in_PumpNetwork)      \
963               comm_stats.max_time_in_PumpNetwork = t;    \
964         }
965
966 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
967         { double t = CmiWallTimer(); \
968           x;        \
969           t = CmiWallTimer() - t;          \
970           comm_stats.count_in_PumpRemoteTransactions ++;        \
971           comm_stats.time_in_PumpRemoteTransactions += t;   \
972           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
973               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
974         }
975
976 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
977         { double t = CmiWallTimer(); \
978           x;        \
979           t = CmiWallTimer() - t;          \
980           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
981           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
982           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
983               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
984         }
985
986 #define STATS_SEND_SMSGS_TIME(x)   \
987         { double t = CmiWallTimer(); \
988           x;        \
989           t = CmiWallTimer() - t;          \
990           comm_stats.count_in_SendBufferMsg_smsg ++;        \
991           comm_stats.time_in_SendBufferMsg_smsg += t;   \
992           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
993               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
994         }
995
996 #define STATS_SENDRDMAMSG_TIME(x)   \
997         { double t = CmiWallTimer(); \
998           x;        \
999           t = CmiWallTimer() - t;          \
1000           comm_stats.count_in_SendRdmaMsg ++;        \
1001           comm_stats.time_in_SendRdmaMsg += t;   \
1002           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
1003               comm_stats.max_time_in_SendRdmaMsg = t;    \
1004         }
1005
1006 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
1007         { double t = CmiWallTimer(); \
1008           x;        \
1009           t = CmiWallTimer() - t;          \
1010           comm_stats.count_in_PumpDatagramConnection ++;        \
1011           comm_stats.time_in_PumpDatagramConnection += t;   \
1012           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
1013               comm_stats.max_time_in_PumpDatagramConnection = t;    \
1014         }
1015
1016 static void print_comm_stats()
1017 {
1018     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
1019     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
1020             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
1021             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
1022     
1023     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
1024             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
1025             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
1026
1027     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
1028     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
1029     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
1030
1031
1032     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
1033     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
1034     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
1035     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
1036     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
1037     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
1038     if (useDynamicSMSG)
1039     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
1040
1041     fclose(counterLog);
1042 }
1043
1044 #else
1045 #define STATS_PUMPNETWORK_TIME(x)                  x
1046 #define STATS_SEND_SMSGS_TIME(x)                   x
1047 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
1048 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1049 #define STATS_SENDRDMAMSG_TIME(x)                  x
1050 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1051 #endif
1052
1053 static void
1054 allgather(void *in,void *out, int len)
1055 {
1056     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1057     int i,rc;
1058     int my_rank;
1059     char *tmp_buf,*out_ptr;
1060
1061     if(!already_called) {
1062
1063         rc = PMI_Get_size(&job_size);
1064         CmiAssert(rc == PMI_SUCCESS);
1065         rc = PMI_Get_rank(&my_rank);
1066         CmiAssert(rc == PMI_SUCCESS);
1067
1068         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1069         CmiAssert(ivec_ptr != NULL);
1070
1071         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1072         CmiAssert(rc == PMI_SUCCESS);
1073
1074         already_called = 1;
1075
1076     }
1077
1078     tmp_buf = (char *)malloc(job_size * len);
1079     CmiAssert(tmp_buf);
1080
1081     rc = PMI_Allgather(in,tmp_buf,len);
1082     CmiAssert(rc == PMI_SUCCESS);
1083
1084     out_ptr = out;
1085
1086     for(i=0;i<job_size;i++) {
1087
1088         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1089
1090     }
1091
1092     free(tmp_buf);
1093 }
1094
1095 static void
1096 allgather_2(void *in,void *out, int len)
1097 {
1098     //PMI_Allgather is out of order
1099     int i,rc, extend_len;
1100     int  rank_index;
1101     char *out_ptr, *out_ref;
1102     char *in2;
1103
1104     extend_len = sizeof(int) + len;
1105     in2 = (char*)malloc(extend_len);
1106
1107     memcpy(in2, &myrank, sizeof(int));
1108     memcpy(in2+sizeof(int), in, len);
1109
1110     out_ptr = (char*)malloc(mysize*extend_len);
1111
1112     rc = PMI_Allgather(in2, out_ptr, extend_len);
1113     GNI_RC_CHECK("allgather", rc);
1114
1115     out_ref = out;
1116
1117     for(i=0;i<mysize;i++) {
1118         //rank index 
1119         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1120         //copy to the rank index slot
1121         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1122     }
1123
1124     free(out_ptr);
1125     free(in2);
1126
1127 }
1128
1129 static unsigned int get_gni_nic_address(int device_id)
1130 {
1131     unsigned int address, cpu_id;
1132     gni_return_t status;
1133     int i, alps_dev_id=-1,alps_address=-1;
1134     char *token, *p_ptr;
1135
1136     p_ptr = getenv("PMI_GNI_DEV_ID");
1137     if (!p_ptr) {
1138         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1139        
1140         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1141     } else {
1142         while ((token = strtok(p_ptr,":")) != NULL) {
1143             alps_dev_id = atoi(token);
1144             if (alps_dev_id == device_id) {
1145                 break;
1146             }
1147             p_ptr = NULL;
1148         }
1149         CmiAssert(alps_dev_id != -1);
1150         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1151         CmiAssert(p_ptr != NULL);
1152         i = 0;
1153         while ((token = strtok(p_ptr,":")) != NULL) {
1154             if (i == alps_dev_id) {
1155                 alps_address = atoi(token);
1156                 break;
1157             }
1158             p_ptr = NULL;
1159             ++i;
1160         }
1161         CmiAssert(alps_address != -1);
1162         address = alps_address;
1163     }
1164     return address;
1165 }
1166
1167 static uint8_t get_ptag(void)
1168 {
1169     char *p_ptr, *token;
1170     uint8_t ptag;
1171
1172     p_ptr = getenv("PMI_GNI_PTAG");
1173     CmiAssert(p_ptr != NULL);
1174     token = strtok(p_ptr, ":");
1175     ptag = (uint8_t)atoi(token);
1176     return ptag;
1177         
1178 }
1179
1180 static uint32_t get_cookie(void)
1181 {
1182     uint32_t cookie;
1183     char *p_ptr, *token;
1184
1185     p_ptr = getenv("PMI_GNI_COOKIE");
1186     CmiAssert(p_ptr != NULL);
1187     token = strtok(p_ptr, ":");
1188     cookie = (uint32_t)atoi(token);
1189
1190     return cookie;
1191 }
1192
1193 #if LARGEPAGE
1194
1195 /* directly mmap memory from hugetlbfs for large pages */
1196
1197 #include <sys/stat.h>
1198 #include <fcntl.h>
1199 #include <sys/mman.h>
1200 #include <hugetlbfs.h>
1201
1202 // size must be _tlbpagesize aligned
1203 void *my_get_huge_pages(size_t size)
1204 {
1205     char filename[512];
1206     int fd;
1207     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1208     void *ptr = NULL;
1209
1210     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1211     fd = open(filename, O_RDWR | O_CREAT, mode);
1212     if (fd == -1) {
1213         CmiAbort("my_get_huge_pages: open filed");
1214     }
1215     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1216     if (ptr == MAP_FAILED) ptr = NULL;
1217 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1218     close(fd);
1219     unlink(filename);
1220     return ptr;
1221 }
1222
1223 void my_free_huge_pages(void *ptr, int size)
1224 {
1225 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1226     int ret = munmap(ptr, size);
1227     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1228 }
1229
1230 #endif
1231
1232 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1233 /* TODO: add any that are related */
1234 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1235
1236
1237 #include "machine-lrts.h"
1238 #include "machine-common-core.c"
1239
1240
1241 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1242 static void SendRdmaMsg(PCQueue );
1243 static void PumpNetworkSmsg();
1244 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1245 #if CQWRITE
1246 static void PumpCqWriteTransactions();
1247 #endif
1248 #if REMOTE_EVENT
1249 static void PumpRemoteTransactions(gni_cq_handle_t);
1250 #endif
1251
1252 #if MACHINE_DEBUG_LOG
1253 static CmiInt8 buffered_recv_msg = 0;
1254 int         lrts_smsg_success = 0;
1255 int         lrts_received_msg = 0;
1256 #endif
1257
1258 static void sweep_mempool(mempool_type *mptr)
1259 {
1260     int n = 0;
1261     block_header *current = &(mptr->block_head);
1262
1263     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1264     while( current!= NULL) {
1265         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1266         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1267     }
1268     printf("[n %d] sweep_mempool slot END.\n", myrank);
1269 }
1270
1271
1272 static INLINE_KEYWORD  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1273 {
1274     block_header *current = *from;
1275
1276     //while(register_memory_size>= MAX_REG_MEM)
1277     //{
1278         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1279             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1280
1281         *from = current;
1282         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1283         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1284         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1285     //}
1286     return GNI_RC_SUCCESS;
1287 }
1288
1289 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1290 {
1291     gni_return_t status = GNI_RC_SUCCESS;
1292     //int size = GetMempoolsize(msg);
1293     //void *blockaddr = GetMempoolBlockPtr(msg);
1294     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1295    
1296     block_header *current = &(mptr->block_head);
1297     while(register_memory_size>= MAX_REG_MEM)
1298     {
1299         status = deregisterMemory(mptr, &current);
1300         if (status != GNI_RC_SUCCESS) break;
1301     }
1302     if(register_memory_size>= MAX_REG_MEM) return status;
1303
1304     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1305     while(1)
1306     {
1307         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1308         if(status == GNI_RC_SUCCESS)
1309         {
1310             break;
1311         }
1312         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1313         {
1314             GNI_RC_CHECK("registerFromMempool", status);
1315         }
1316         else
1317         {
1318             status = deregisterMemory(mptr, &current);
1319             if (status != GNI_RC_SUCCESS) break;
1320         }
1321     }; 
1322     return status;
1323 }
1324
1325 INLINE_KEYWORD
1326 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1327 {
1328     static int rank = -1;
1329     int i;
1330     gni_return_t status;
1331     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1332     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1333     mempool_type *mptr;
1334
1335     status = registerFromMempool(mptr1, msg, size, t, cqh);
1336     if (status == GNI_RC_SUCCESS) return status;
1337 #if CMK_SMP 
1338     for (i=0; i<CmiMyNodeSize()+1; i++) {
1339       rank = (rank+1)%(CmiMyNodeSize()+1);
1340       mptr = CpvAccessOther(mempool, rank);
1341       if (mptr == mptr1) continue;
1342       status = registerFromMempool(mptr, msg, size, t, cqh);
1343       if (status == GNI_RC_SUCCESS) return status;
1344     }
1345 #endif
1346     return  GNI_RC_ERROR_RESOURCE;
1347 }
1348
1349 INLINE_KEYWORD
1350 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1351 {
1352     MSG_LIST        *msg_tmp;
1353     MallocMsgList(msg_tmp);
1354     msg_tmp->destNode = destNode;
1355     msg_tmp->size   = size;
1356     msg_tmp->msg    = msg;
1357     msg_tmp->tag    = tag;
1358 #if CMK_WITH_STATS
1359     SMSG_CREATION(msg_tmp)
1360 #endif
1361
1362 #if ONE_SEND_QUEUE
1363     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1364 #else
1365 #if SMP_LOCKS
1366     CmiLock(queue->smsg_msglist_index[destNode].lock);
1367     if(queue->smsg_msglist_index[destNode].pushed == 0)
1368     {
1369         PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1370     }
1371     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1372     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1373 #else
1374     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1375 #endif
1376 #endif
1377
1378 #if PRINT_SYH
1379     buffered_smsg_counter++;
1380 #endif
1381 }
1382
1383 INLINE_KEYWORD static void print_smsg_attr(gni_smsg_attr_t     *a)
1384 {
1385     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1386 }
1387 #if 0
1388 INLINE_KEYWORD
1389 static void setup_smsg_connection(int destNode)
1390 {
1391     mdh_addr_list_t  *new_entry = 0;
1392     gni_post_descriptor_t *pd;
1393     gni_smsg_attr_t      *smsg_attr;
1394     gni_return_t status = GNI_RC_NOT_DONE;
1395     RDMA_REQUEST        *rdma_request_msg;
1396     
1397     if(smsg_available_slot == smsg_expand_slots)
1398     {
1399         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1400         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1401         memset(new_entry->addr, 0, smsg_memlen*smsg_expand_slots);
1402
1403         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1404             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1405             GNI_MEM_READWRITE,   
1406             -1,
1407             &(new_entry->mdh));
1408         smsg_available_slot = 0; 
1409         new_entry->next = smsg_dynamic_list;
1410         smsg_dynamic_list = new_entry;
1411     }
1412     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1413     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1414     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1415     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1416     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1417     smsg_attr->buff_size = smsg_memlen;
1418     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1419     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1420     smsg_local_attr_vec[destNode] = smsg_attr;
1421     smsg_available_slot++;
1422     MallocPostDesc(pd);
1423     pd->type            = GNI_POST_FMA_PUT;
1424     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1425     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1426     pd->length          = sizeof(gni_smsg_attr_t);
1427     pd->local_addr      = (uint64_t) smsg_attr;
1428     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1429     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1430     pd->src_cq_hndl     = 0;
1431
1432     pd->rdma_mode       = 0;
1433     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1434     print_smsg_attr(smsg_attr);
1435     if(status == GNI_RC_ERROR_RESOURCE )
1436     {
1437         MallocRdmaRequest(rdma_request_msg);
1438         rdma_request_msg->destNode = destNode;
1439         rdma_request_msg->pd = pd;
1440         /* buffer this request */
1441     }
1442 #if PRINT_SYH
1443     if(status != GNI_RC_SUCCESS)
1444        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1445     else
1446         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1447 #endif
1448 }
1449 #endif
1450 /* useDynamicSMSG */
1451 INLINE_KEYWORD
1452 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1453 {
1454     gni_return_t status = GNI_RC_NOT_DONE;
1455
1456     if(mailbox_list->offset == mailbox_list->size)
1457     {
1458         dynamic_smsg_mailbox_t *new_mailbox_entry;
1459         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1460         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1461         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1462         memset(new_mailbox_entry->mailbox_base, 0, new_mailbox_entry->size);
1463         new_mailbox_entry->offset = 0;
1464         
1465         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1466             new_mailbox_entry->size, smsg_rx_cqh,
1467             GNI_MEM_READWRITE,   
1468             -1,
1469             &(new_mailbox_entry->mem_hndl));
1470
1471         GNI_RC_CHECK("register", status);
1472         new_mailbox_entry->next = mailbox_list;
1473         mailbox_list = new_mailbox_entry;
1474     }
1475     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1476     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1477     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1478     local_smsg_attr->mbox_offset = mailbox_list->offset;
1479     mailbox_list->offset += smsg_memlen;
1480     local_smsg_attr->buff_size = smsg_memlen;
1481     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1482     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1483 }
1484
1485 /* useDynamicSMSG */
1486 INLINE_KEYWORD
1487 static int connect_to(int destNode)
1488 {
1489     gni_return_t status = GNI_RC_NOT_DONE;
1490     CmiAssert(smsg_connected_flag[destNode] == 0);
1491     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1492     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1493     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1494     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1495     
1496     CMI_GNI_LOCK(global_gni_lock)
1497     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1498     CMI_GNI_UNLOCK(global_gni_lock)
1499     if (status == GNI_RC_ERROR_RESOURCE) {
1500       /* possibly destNode is making connection at the same time */
1501       free(smsg_attr_vector_local[destNode]);
1502       smsg_attr_vector_local[destNode] = NULL;
1503       free(smsg_attr_vector_remote[destNode]);
1504       smsg_attr_vector_remote[destNode] = NULL;
1505       mailbox_list->offset -= smsg_memlen;
1506 #if PRINT_SYH
1507     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1508 #endif
1509       return 0;
1510     }
1511     GNI_RC_CHECK("GNI_Post", status);
1512     smsg_connected_flag[destNode] = 1;
1513 #if PRINT_SYH
1514     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1515 #endif
1516     return 1;
1517 }
1518
1519 INLINE_KEYWORD
1520 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr, int ischarm )
1521 {
1522     unsigned int          remote_address;
1523     uint32_t              remote_id;
1524     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1525     gni_smsg_attr_t       *smsg_attr;
1526     gni_post_descriptor_t *pd;
1527     gni_post_state_t      post_state;
1528     char                  *real_data; 
1529     int                   msgid = 0;
1530
1531     if (useDynamicSMSG) {
1532         switch (smsg_connected_flag[destNode]) {
1533         case 0: 
1534             connect_to(destNode);         /* continue to case 1 */
1535         case 1:                           /* pending connection, do nothing */
1536             status = GNI_RC_NOT_DONE;
1537             if(inbuff ==0)
1538                 buffer_small_msgs(queue, msg, size, destNode, tag);
1539             return status;
1540         }
1541     }
1542 #if ! ONE_SEND_QUEUE
1543     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1544 #endif
1545     {
1546         //CMI_GNI_LOCK(smsg_mailbox_lock)
1547         CMI_GNI_LOCK(default_tx_cq_lock)
1548 #if CMK_SMP_TRACE_COMMTHREAD
1549         int oldpe = -1;
1550         int oldeventid = -1;
1551         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG)
1552         { 
1553             START_EVENT();
1554             if ( tag == SMALL_DATA_TAG)
1555                 real_data = (char*)msg; 
1556             else 
1557                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1558             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1559             TRACE_COMM_SET_COMM_MSGID(real_data);
1560         }
1561 #endif
1562 #if REMOTE_EVENT
1563         if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG) {
1564             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1565             if (control_msg_tmp->seq_id <= 0 && control_msg_tmp->ack_index == -1)
1566             {
1567                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1568                 if (control_msg_tmp->ack_index == -1) {    /* table overflow */
1569                     status = GNI_RC_NOT_DONE;
1570                     if (inbuff ==0)
1571                         buffer_small_msgs(queue, msg, size, destNode, tag);
1572                     return status;
1573                 }
1574             }
1575         }
1576 #endif
1577 #if     CMK_WITH_STATS
1578         SMSG_TRY_SEND(tag)
1579 #endif
1580 #if CMK_WITH_STATS
1581         double              creation_time;
1582         if (ptr == NULL)
1583             creation_time = CmiWallTimer();
1584         else
1585             creation_time = ptr->creation_time;
1586 #endif
1587
1588 #if CMK_SMSGS_FREE_AFTER_EVENT
1589         msgid = SmsgsPool_getslot(&smsgsPool, msg, ischarm?1:2);
1590 #endif
1591         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, msgid, tag);
1592 #if CMK_SMP_TRACE_COMMTHREAD
1593         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1594 #endif
1595         CMI_GNI_UNLOCK(default_tx_cq_lock)
1596         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1597         if(status == GNI_RC_SUCCESS)
1598         {
1599 #if     CMK_WITH_STATS
1600             SMSG_SENT_DONE(creation_time,tag) 
1601 #endif
1602 #if CMK_SMP_TRACE_COMMTHREAD
1603             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG )
1604             { 
1605                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1606             }
1607 #endif
1608         }else {
1609             status = GNI_RC_ERROR_RESOURCE;
1610 #if CMK_SMSGS_FREE_AFTER_EVENT
1611             SmsgsPool_freeslot(&smsgsPool, msgid);
1612 #endif
1613         }
1614     }
1615     if(status != GNI_RC_SUCCESS && inbuff ==0)
1616         buffer_small_msgs(queue, msg, size, destNode, tag);
1617     return status;
1618 }
1619
1620 INLINE_KEYWORD
1621 static CONTROL_MSG* construct_control_msg(int size, char *msg, int seqno)
1622 {
1623     /* construct a control message and send */
1624     CONTROL_MSG         *control_msg_tmp;
1625     MallocControlMsg(control_msg_tmp);
1626     control_msg_tmp->source_addr = (uint64_t)msg;
1627     control_msg_tmp->seq_id    = seqno;
1628     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1629 #if REMOTE_EVENT
1630     control_msg_tmp->ack_index    =  -1;
1631 #endif
1632 #if     USE_LRTS_MEMPOOL
1633     if(size < BIG_MSG)
1634     {
1635         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1636     }
1637     else
1638     {
1639         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1640         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1641         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1642     }
1643 #else
1644     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1645 #endif
1646     return control_msg_tmp;
1647 }
1648
1649 #define BLOCKING_SEND_CONTROL    0
1650
1651 // Large message, send control to receiver, receiver register memory and do a GET, 
1652 // return 1 - send no success
1653 INLINE_KEYWORD static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr, uint8_t lmsg_tag)
1654 {
1655     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1656     uint32_t            vmdh_index  = -1;
1657     int                 size;
1658     int                 offset = 0;
1659     uint64_t            source_addr;
1660     int                 register_size; 
1661     void                *msg;
1662
1663     size    =   control_msg_tmp->total_length;
1664     source_addr = control_msg_tmp->source_addr;
1665     register_size = control_msg_tmp->length;
1666
1667 #if  USE_LRTS_MEMPOOL
1668     if( control_msg_tmp->seq_id <=0 ){
1669 #if BLOCKING_SEND_CONTROL
1670         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1671             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1672                 LrtsAdvanceCommunication(0);
1673         }
1674 #endif
1675         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1676         {
1677             msg = (void*)source_addr;
1678             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1679             {
1680                 if(!inbuff)
1681                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1682                 return GNI_RC_ERROR_NOMEM;
1683             }
1684             //register the corresponding mempool
1685             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1686             if(status == GNI_RC_SUCCESS)
1687             {
1688                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1689             }
1690         }else
1691         {
1692             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1693             status = GNI_RC_SUCCESS;
1694         }
1695         if(NoMsgInSend(source_addr))
1696             register_size = GetMempoolsize((void*)(source_addr));
1697         else
1698             register_size = 0;
1699     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1700     {
1701         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1702         source_addr += offset;
1703         size = control_msg_tmp->length;
1704 #if BLOCKING_SEND_CONTROL
1705         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1706             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1707                 LrtsAdvanceCommunication(0);
1708         }
1709 #endif
1710         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1711             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1712             {
1713                 if(!inbuff)
1714                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1715                 return GNI_RC_ERROR_NOMEM;
1716             }
1717             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1718             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1719         }
1720         else
1721         {
1722             status = GNI_RC_SUCCESS;
1723         }
1724         register_size = 0;  
1725     }
1726
1727 #if CMI_EXERT_SEND_LARGE_CAP
1728     if(SEND_large_pending >= SEND_large_cap)
1729     {
1730         status = GNI_RC_ERROR_NOMEM;
1731     }
1732 #endif
1733  
1734     if(status == GNI_RC_SUCCESS)
1735     {
1736        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, inbuff, smsg_ptr, 0); 
1737         if(status == GNI_RC_SUCCESS)
1738         {
1739 #if CMI_EXERT_SEND_LARGE_CAP
1740             SEND_large_pending++;
1741 #endif
1742             buffered_send_msg += register_size;
1743             if(control_msg_tmp->seq_id == 0)
1744             {
1745                 IncreaseMsgInSend(source_addr);
1746             }
1747 #if ! CMK_SMSGS_FREE_AFTER_EVENT
1748             FreeControlMsg(control_msg_tmp);
1749 #endif
1750             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, lmsg_tag); 
1751         }else
1752             status = GNI_RC_ERROR_RESOURCE;
1753
1754     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1755     {
1756         CmiAbort("Memory registor for large msg\n");
1757     }else 
1758     {
1759         status = GNI_RC_ERROR_NOMEM; 
1760         if(!inbuff)
1761             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1762     }
1763     return status;
1764 #else
1765     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1766     if(status == GNI_RC_SUCCESS)
1767     {
1768         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, 0, NULL, 0);
1769 #if ! CMK_SMSGS_FREE_AFTER_EVENT
1770         if(status == GNI_RC_SUCCESS)
1771         {
1772             FreeControlMsg(control_msg_tmp);
1773         }
1774 #endif
1775     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1776     {
1777         CmiAbort("Memory registor for large msg\n");
1778     }else 
1779     {
1780         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1781     }
1782     return status;
1783 #endif
1784 }
1785
1786 INLINE_KEYWORD void LrtsBeginIdle() {}
1787
1788 INLINE_KEYWORD void LrtsStillIdle() {}
1789
1790 INLINE_KEYWORD void LrtsNotifyIdle() {}
1791
1792 INLINE_KEYWORD void LrtsPrepareEnvelope(char *msg, int size)
1793 {
1794     CmiSetMsgSize(msg, size);
1795     CMI_SET_CHECKSUM(msg, size);
1796 }
1797
1798 CmiCommHandle LrtsSendFunc(int destNode, int destPE, int size, char *msg, int mode)
1799 {
1800     gni_return_t        status  =   GNI_RC_SUCCESS;
1801     uint8_t tag;
1802     CONTROL_MSG         *control_msg_tmp;
1803     int                 oob = ( mode & OUT_OF_BAND);
1804     SMSG_QUEUE          *queue;
1805
1806     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1807 #if CMK_USE_OOB
1808     queue = oob? &smsg_oob_queue : &smsg_queue;
1809     tag = oob? LMSG_OOB_INIT_TAG: LMSG_INIT_TAG;
1810 #else
1811     queue = &smsg_queue;
1812     tag = LMSG_INIT_TAG;
1813 #endif
1814
1815     LrtsPrepareEnvelope(msg, size);
1816
1817 #if PRINT_SYH
1818     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1819 #endif 
1820
1821 #if CMK_SMP 
1822     if(size <= SMSG_MAX_MSG)
1823         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1824     else if (size < BIG_MSG) {
1825         control_msg_tmp =  construct_control_msg(size, msg, 0);
1826         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1827     }
1828     else {
1829         CmiSetMsgSeq(msg, 0);
1830         control_msg_tmp =  construct_control_msg(size, msg, 1);
1831         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1832     }
1833 #else   //non-smp, smp(worker sending)
1834     if(size <= SMSG_MAX_MSG)
1835     {
1836         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL, 1))
1837         {  
1838 #if !CMK_SMSGS_FREE_AFTER_EVENT
1839             CmiFree(msg); 
1840 #endif
1841         }
1842     }
1843     else if (size < BIG_MSG) {
1844         control_msg_tmp =  construct_control_msg(size, msg, 0);
1845         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1846     }
1847     else {
1848 #if     USE_LRTS_MEMPOOL
1849         CmiSetMsgSeq(msg, 0);
1850         control_msg_tmp =  construct_control_msg(size, msg, 1);
1851         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1852 #else
1853         control_msg_tmp =  construct_control_msg(size, msg, 0);
1854         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1855 #endif
1856     }
1857 #endif
1858     return 0;
1859 }
1860
1861 #if 0
1862 // this is no different from the common code
1863 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1864 {
1865   int i;
1866 #if CMK_BROADCAST_USE_CMIREFERENCE
1867   for(i=0;i<npes;i++) {
1868     if (pes[i] == CmiMyPe())
1869       CmiSyncSend(pes[i], len, msg);
1870     else {
1871       CmiReference(msg);
1872       CmiSyncSendAndFree(pes[i], len, msg);
1873     }
1874   }
1875 #else
1876   for(i=0;i<npes;i++) {
1877     CmiSyncSend(pes[i], len, msg);
1878   }
1879 #endif
1880 }
1881
1882 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1883 {
1884   /* A better asynchronous implementation may be wanted, but at least it works */
1885   CmiSyncListSendFn(npes, pes, len, msg);
1886   return (CmiCommHandle) 0;
1887 }
1888
1889 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1890 {
1891   if (npes == 1) {
1892       CmiSyncSendAndFree(pes[0], len, msg);
1893       return;
1894   }
1895 #if CMK_PERSISTENT_COMM
1896   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE 
1897 #if CMK_SMP
1898             && IS_PERSISTENT_MEMORY(msg)
1899 #endif
1900      ){
1901       int i;
1902       for(i=0;i<npes;i++) {
1903         if (pes[i] == CmiMyPe())
1904           CmiSyncSend(pes[i], len, msg);
1905         else {
1906           CmiReference(msg);
1907           CmiSyncSendAndFree(pes[i], len, msg);
1908         }
1909       }
1910       CmiFree(msg);
1911       return;
1912   }
1913 #endif
1914   
1915 #if CMK_BROADCAST_USE_CMIREFERENCE
1916   CmiSyncListSendFn(npes, pes, len, msg);
1917   CmiFree(msg);
1918 #else
1919   int i;
1920   for(i=0;i<npes-1;i++) {
1921     CmiSyncSend(pes[i], len, msg);
1922   }
1923   if (npes>0)
1924     CmiSyncSendAndFree(pes[npes-1], len, msg);
1925   else 
1926     CmiFree(msg);
1927 #endif
1928 }
1929 #endif
1930
1931 static void    PumpDatagramConnection();
1932 static      int         event_SetupConnect = 111;
1933 static      int         event_PumpSmsg = 222 ;
1934 static      int         event_PumpTransaction = 333;
1935 static      int         event_PumpRdmaTransaction = 444;
1936 static      int         event_SendBufferSmsg = 484;
1937 static      int         event_SendFmaRdmaMsg = 555;
1938 static      int         event_AdvanceCommunication = 666;
1939
1940 static void registerUserTraceEvents() {
1941 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1942     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1943     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1944     event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1945     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1946     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1947     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1948     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1949 #endif
1950 }
1951
1952 static void ProcessDeadlock()
1953 {
1954     static CmiUInt8 *ptr = NULL;
1955     static CmiUInt8  last = 0, mysum, sum;
1956     static int count = 0;
1957     gni_return_t status;
1958     int i;
1959
1960 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1961 //sweep_mempool(CpvAccess(mempool));
1962     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1963     mysum = smsg_send_count + smsg_recv_count;
1964     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1965     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1966     GNI_RC_CHECK("PMI_Allgather", status);
1967     sum = 0;
1968     for (i=0; i<mysize; i++)  sum+= ptr[i];
1969     if (last == 0 || sum == last) 
1970         count++;
1971     else
1972         count = 0;
1973     last = sum;
1974     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1975     if (count == 2) { 
1976         /* detected twice, it is a real deadlock */
1977         if (myrank == 0)  {
1978             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1979             CmiAbort("Fatal> Deadlock detected.");
1980         }
1981
1982     }
1983     _detected_hang = 0;
1984 }
1985
1986 static void CheckProgress()
1987 {
1988     if (smsg_send_count == last_smsg_send_count &&
1989         smsg_recv_count == last_smsg_recv_count ) 
1990     {
1991         _detected_hang = 1;
1992 #if !CMK_SMP
1993         if (_detected_hang) ProcessDeadlock();
1994 #endif
1995
1996     }
1997     else {
1998         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1999         last_smsg_send_count = smsg_send_count;
2000         last_smsg_recv_count = smsg_recv_count;
2001         _detected_hang = 0;
2002     }
2003 }
2004
2005 static void set_limit()
2006 {
2007     //if (!user_set_flag && CmiMyRank() == 0) {
2008     if (CmiMyRank() == 0) {
2009         int mynode = CmiPhysicalNodeID(CmiMyPe());
2010         int numpes = CmiNumPesOnPhysicalNode(mynode);
2011         int numprocesses = numpes / CmiMyNodeSize();
2012         MAX_REG_MEM  = _totalmem / numprocesses;
2013         MAX_BUFF_SEND = MAX_REG_MEM / 2;
2014         if (CmiMyPe() == 0)
2015            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
2016         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
2017         {
2018              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
2019              CmiAbort("memory registration\n");
2020         }
2021     }
2022 }
2023
2024 void LrtsPostCommonInit(int everReturn)
2025 {
2026 #if CMK_DIRECT
2027     CmiDirectInit();
2028 #endif
2029 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
2030     CpvInitialize(double, projTraceStart);
2031     /* only PE 0 needs to care about registration (to generate sts file). */
2032     //if (CmiMyPe() == 0) 
2033     {
2034         registerMachineUserEventsFunction(&registerUserTraceEvents);
2035     }
2036 #endif
2037
2038 #if !CMK_SMP
2039     if (useDynamicSMSG)
2040     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
2041 #endif
2042
2043 #if ! LARGEPAGE
2044     if (_checkProgress)
2045 #if CMK_SMP
2046     if (CmiMyRank() == 0)
2047 #endif
2048     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
2049 #endif
2050  
2051 #if !LARGEPAGE
2052     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
2053 #endif
2054 }
2055
2056 /* this is called by worker thread */
2057 void LrtsPostNonLocal()
2058 {
2059 #if 1
2060
2061 #if CMK_SMP_TRACE_COMMTHREAD
2062     double startT, endT;
2063 #endif
2064
2065 #if MULTI_THREAD_SEND
2066     if(mysize == 1) return;
2067
2068     if (CmiMyRank() % 6 != 3) return;
2069
2070 #if CMK_SMP_TRACE_COMMTHREAD
2071     traceEndIdle();
2072     startT = CmiWallTimer();
2073 #endif
2074
2075     CmiMachineProgressImpl();
2076
2077 #if CMK_SMP_TRACE_COMMTHREAD
2078     endT = CmiWallTimer();
2079     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2080     traceBeginIdle();
2081 #endif
2082
2083 #endif
2084 #endif
2085 }
2086
2087 /* Network progress function is used to poll the network when for
2088    messages. This flushes receive buffers on some  implementations*/
2089 #if CMK_MACHINE_PROGRESS_DEFINED
2090 void CmiMachineProgressImpl() {
2091 #if ! CMK_SMP || MULTI_THREAD_SEND
2092
2093     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
2094     SEND_OOB_SMSG(smsg_oob_queue)
2095     PUMP_REMOTE_HIGHPRIORITY
2096     PUMP_LOCAL_HIGHPRIORITY
2097     POST_HIGHPRIORITY_RDMA
2098
2099 #if 0
2100 #if CMK_WORKER_SINGLE_TASK
2101     if (CmiMyRank() % 6 == 0)
2102 #endif
2103     PumpNetworkSmsg();
2104
2105 #if CMK_WORKER_SINGLE_TASK
2106     if (CmiMyRank() % 6 == 1)
2107 #endif
2108     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2109
2110 #if CMK_WORKER_SINGLE_TASK
2111     if (CmiMyRank() % 6 == 2)
2112 #endif
2113     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2114
2115 #if REMOTE_EVENT
2116 #if CMK_WORKER_SINGLE_TASK
2117     if (CmiMyRank() % 6 == 3)
2118 #endif
2119     PumpRemoteTransactions(rdma_rx_cqh);         // rdma_rx_cqh
2120 #endif
2121
2122 #if CMK_WORKER_SINGLE_TASK
2123     if (CmiMyRank() % 6 == 4)
2124 #endif
2125     {
2126 #if CMK_USE_OOB
2127     SendBufferMsg(&smsg_oob_queue, NULL);
2128     SendBufferMsg(&smsg_queue, &smsg_oob_queue);
2129 #else
2130     SendBufferMsg(&smsg_queue, NULL);
2131 #endif
2132     }
2133
2134 #if CMK_WORKER_SINGLE_TASK
2135     if (CmiMyRank() % 6 == 5)
2136 #endif
2137 #if CMK_SMP
2138     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
2139 #else
2140     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
2141 #endif
2142
2143 #endif
2144 #endif
2145 }
2146 #endif
2147
2148
2149 /* useDynamicSMSG */
2150 static void    PumpDatagramConnection()
2151 {
2152     uint32_t          remote_address;
2153     uint32_t          remote_id;
2154     gni_return_t status;
2155     gni_post_state_t  post_state;
2156     uint64_t          datagram_id;
2157     int i;
2158
2159    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2160    {
2161        if (datagram_id >= mysize) {           /* bound endpoint */
2162            int pe = datagram_id - mysize;
2163            CMI_GNI_LOCK(global_gni_lock)
2164            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2165            CMI_GNI_UNLOCK(global_gni_lock)
2166            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2167            {
2168                CmiAssert(remote_id == pe);
2169                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2170                GNI_RC_CHECK("Dynamic SMSG Init", status);
2171 #if PRINT_SYH
2172                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2173 #endif
2174                CmiAssert(smsg_connected_flag[pe] == 1);
2175                smsg_connected_flag[pe] = 2;
2176            }
2177        }
2178        else {         /* unbound ep */
2179            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2180            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2181            {
2182                CmiAssert(remote_id<mysize);
2183                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2184                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2185                GNI_RC_CHECK("Dynamic SMSG Init", status);
2186 #if PRINT_SYH
2187                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2188 #endif
2189                smsg_connected_flag[remote_id] = 2;
2190
2191                alloc_smsg_attr(&send_smsg_attr);
2192                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2193                GNI_RC_CHECK("post unbound datagram", status);
2194            }
2195        }
2196    }
2197 }
2198
2199 /* pooling CQ to receive network message */
2200 static void PumpNetworkRdmaMsgs()
2201 {
2202     gni_cq_entry_t      event_data;
2203     gni_return_t        status;
2204
2205 }
2206
2207 INLINE_KEYWORD
2208 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2209 {
2210     RDMA_REQUEST        *rdma_request_msg;
2211     MallocRdmaRequest(rdma_request_msg);
2212     rdma_request_msg->destNode = inst_id;
2213     rdma_request_msg->pd = pd;
2214 #if REMOTE_EVENT
2215     rdma_request_msg->ack_index = ack_index;
2216 #endif
2217     PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2218 }
2219
2220 static void getLargeMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2221 static void getPersistentMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2222 static void PRINT_CONTROL(void *header)
2223 {
2224     CONTROL_MSG *control_msg = (CONTROL_MSG *) header;
2225
2226     printf(" length=%d , seq_id = %d, addr = %lld:%lld:%lld  \n", control_msg->length, control_msg->seq_id, control_msg->source_addr, (control_msg->source_mem_hndl).qword1, (control_msg->source_mem_hndl).qword2 );
2227 #if PERSISTENT_GET_BASE
2228     printf(" memhdl = %lld:%lld:%lld \n", control_msg->dest_addr, (control_msg->dest_mem_hndl).qword1, (control_msg->dest_mem_hndl).qword2);
2229 #endif
2230 }
2231 static void PumpNetworkSmsg()
2232 {
2233     uint64_t            inst_id;
2234     gni_cq_entry_t      event_data;
2235     gni_return_t        status;
2236     void                *header;
2237     uint8_t             msg_tag;
2238     int                 msg_nbytes;
2239     void                *msg_data;
2240     gni_mem_handle_t    msg_mem_hndl;
2241     gni_smsg_attr_t     *smsg_attr;
2242     gni_smsg_attr_t     *remote_smsg_attr;
2243     int                 init_flag;
2244     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2245     uint64_t            source_addr;
2246     SMSG_QUEUE         *queue = &smsg_queue;
2247     PCQueue             tmp_queue;
2248 #if  CMK_DIRECT
2249     cmidirectMsg        *direct_msg;
2250 #endif
2251 #if CMI_PUMPNETWORKSMSG_CAP 
2252     int                  recv_cnt = 0;
2253     while(recv_cnt< PumpNetworkSmsg_cap) {
2254 #else
2255     while(1) {
2256 #endif
2257         CMI_GNI_LOCK(smsg_rx_cq_lock)
2258         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2259         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2260         if(status != GNI_RC_SUCCESS) break;
2261
2262         inst_id = GNI_CQ_GET_REM_INST_ID(event_data);
2263 #if REMOTE_EVENT
2264         inst_id = GET_RANK(inst_id);      /* important */
2265 #endif
2266         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2267 #if PRINT_SYH
2268         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2269 #endif
2270         if (useDynamicSMSG) {
2271             /* subtle: smsg may come before connection is setup */
2272             while (smsg_connected_flag[inst_id] != 2) 
2273                PumpDatagramConnection();
2274         }
2275         msg_tag = GNI_SMSG_ANY_TAG;
2276         while(1) {
2277             CMI_GNI_LOCK(smsg_mailbox_lock)
2278             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2279             if (status != GNI_RC_SUCCESS)
2280             {
2281                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2282                 break;
2283             }
2284 #if         CMI_PUMPNETWORKSMSG_CAP
2285             recv_cnt++; 
2286 #endif
2287 #if PRINT_SYH
2288             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2289 #endif
2290             /* copy msg out and then put into queue (small message) */
2291             switch (msg_tag) {
2292             case SMALL_DATA_TAG:
2293             {
2294                 msg_nbytes = CmiGetMsgSize(header);
2295                 CmiAssert(msg_nbytes > 0);
2296                 msg_data    = CmiAlloc(msg_nbytes);
2297                 memcpy(msg_data, (char*)header, msg_nbytes);
2298                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2299                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2300                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2301                 handleOneRecvedMsg(msg_nbytes, msg_data);
2302                 break;
2303             }
2304             case LMSG_PERSISTENT_INIT_TAG:
2305             {   CMI_GNI_UNLOCK(smsg_mailbox_lock)
2306                 getPersistentMsgRequest(header, inst_id, msg_tag, sendRdmaBuf);
2307                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2308                 break;
2309             }
2310             case LMSG_INIT_TAG:
2311             case LMSG_OOB_INIT_TAG:
2312             {
2313                 tmp_queue = (msg_tag == LMSG_INIT_TAG)? sendRdmaBuf : sendHighPriorBuf; 
2314 #if MULTI_THREAD_SEND
2315                 MallocControlMsg(control_msg_tmp);
2316                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2317                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2318                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2319                 getLargeMsgRequest(control_msg_tmp, inst_id, msg_tag, tmp_queue);
2320                 FreeControlMsg(control_msg_tmp);
2321 #else
2322                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2323                 getLargeMsgRequest(header, inst_id, msg_tag, tmp_queue);
2324                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2325 #endif
2326                 break;
2327             }
2328 #if !REMOTE_EVENT && !CQWRITE
2329             case ACK_TAG:   //msg fit into mempool
2330             {
2331                 /* Get is done, release message . Now put is not used yet*/
2332                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2333                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2334                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2335 #if ! USE_LRTS_MEMPOOL
2336                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2337 #else
2338                 DecreaseMsgInSend(msg);
2339 #endif
2340                 if(NoMsgInSend(msg))
2341                     buffered_send_msg -= GetMempoolsize(msg);
2342                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2343                 CmiFree(msg);
2344 #if CMI_EXERT_SEND_LARGE_CAP
2345                 SEND_large_pending--;
2346 #endif
2347                 break;
2348             }
2349 #endif
2350             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2351             {
2352 #if MULTI_THREAD_SEND
2353                 MallocControlMsg(header_tmp);
2354                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2355                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2356 #else
2357                 header_tmp = (CONTROL_MSG *) header;
2358 #endif
2359                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2360 #if CMI_EXERT_SEND_LARGE_CAP
2361                 SEND_large_pending--;
2362 #endif
2363                 void *msg = (void*)(header_tmp->source_addr);
2364                 int cur_seq = CmiGetMsgSeq(msg);
2365                 int offset = ONE_SEG*(cur_seq+1);
2366                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2367                 buffered_send_msg -= header_tmp->length;
2368                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2369                 if (remain_size < 0) remain_size = 0;
2370                 CmiSetMsgSize(msg, remain_size);
2371                 if(remain_size <= 0) //transaction done
2372                 {
2373                     CmiFree(msg);
2374                 }else if (header_tmp->total_length > offset)
2375                 {
2376                     CmiSetMsgSeq(msg, cur_seq+1);
2377                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2378                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2379                     //send next seg
2380                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2381                          // pipelining
2382                     if (header_tmp->seq_id == 1) {
2383                       int i;
2384                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2385                         int seq = cur_seq+i+2;
2386                         CmiSetMsgSeq(msg, seq-1);
2387                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2388                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2389                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2390                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2391                       }
2392                     }
2393                 }
2394 #if MULTI_THREAD_SEND
2395                 FreeControlMsg(header_tmp);
2396 #else
2397                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2398 #endif
2399                 break;
2400             }
2401 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE
2402             case PUT_DONE_TAG:  {   //persistent message
2403                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2404                 int size = ((CONTROL_MSG *) header)->length;
2405                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2406                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2407                 CmiReference(msg);
2408                 CMI_CHECK_CHECKSUM(msg, size);
2409                 handleOneRecvedMsg(size, msg); 
2410 #if PRINT_SYH
2411                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2412 #endif
2413                 break;
2414             }
2415 #endif
2416 #if CMK_DIRECT
2417             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2418                 //create a trigger message
2419                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2420                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2421                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2422                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2423                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2424                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2425                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2426                 break;
2427 #endif
2428             default:
2429                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2430                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2431                 printf("weird tag problem %d \n", msg_tag);
2432                 CmiAbort("Unknown tag\n");
2433             }               // end switch
2434 #if PRINT_SYH
2435             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2436 #endif
2437             smsg_recv_count ++;
2438             msg_tag = GNI_SMSG_ANY_TAG;
2439         } //endwhile GNI_SmsgGetNextWTag
2440     }   //end while GetEvent
2441     if(status == GNI_RC_ERROR_RESOURCE)
2442     {
2443         printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
2444         GNI_RC_CHECK("Smsg_rx_cq full", status);
2445     }
2446 }
2447
2448 static void printDesc(gni_post_descriptor_t *pd)
2449 {
2450     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2451 }
2452
2453 #if CQWRITE
2454 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2455 {
2456     gni_post_descriptor_t *pd;
2457     gni_return_t        status = GNI_RC_SUCCESS;
2458     
2459     MallocPostDesc(pd);
2460     pd->type = GNI_POST_CQWRITE;
2461     pd->cq_mode = GNI_CQMODE_SILENT;
2462     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2463     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2464     pd->cqwrite_value = data;
2465     pd->remote_mem_hndl = mem_hndl;
2466     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2467     GNI_RC_CHECK("GNI_PostCqWrite", status);
2468 }
2469 #endif
2470
2471 // register memory for a message
2472 // return mem handle
2473 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2474 {
2475     gni_return_t status = GNI_RC_SUCCESS;
2476
2477     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2478
2479 #if CMK_PERSISTENT_COMM_PUT
2480       // persistent message is always registered
2481       // BIG_MSG small pieces do not have malloc chunk header
2482     if (IS_PERSISTENT_MEMORY(msg)) {
2483         *memh = GetMemHndl(msg);
2484         return GNI_RC_SUCCESS;
2485     }
2486 #endif
2487     if(seqno == 0 
2488 #if CMK_PERSISTENT_COMM_PUT
2489          || seqno == PERSIST_SEQ
2490 #endif
2491       )
2492     {
2493         if(IsMemHndlZero((GetMemHndl(msg))))
2494         {
2495             msg = (void*)(msg);
2496             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2497             if(status == GNI_RC_SUCCESS)
2498                 *memh = GetMemHndl(msg);
2499         }
2500         else {
2501             *memh = GetMemHndl(msg);
2502         }
2503     }
2504     else {
2505         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2506         status = registerMemory(msg, size, memh, NULL); 
2507     }
2508     return status;
2509 }
2510
2511 static void getPersistentMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2512 {
2513 #if   PERSISTENT_GET_BASE
2514     CONTROL_MSG         *request_msg;
2515     gni_return_t        status;
2516     gni_post_descriptor_t *pd;
2517     request_msg = (CONTROL_MSG *) header;
2518
2519     MallocPostDesc(pd);
2520     pd->cqwrite_value = request_msg->seq_id;
2521     pd->first_operand = ALIGN64(request_msg->length); //  total length
2522     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2523         pd->type            = GNI_POST_FMA_GET;
2524     else
2525         pd->type            = GNI_POST_RDMA_GET;
2526     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
2527     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2528     pd->length          = ALIGN64(request_msg->length);
2529     pd->local_addr      = (uint64_t) request_msg->dest_addr;
2530     pd->local_mem_hndl  = request_msg->dest_mem_hndl;
2531     pd->remote_addr     = (uint64_t) request_msg->source_addr;
2532     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2533     pd->src_cq_hndl     = 0;
2534     pd->rdma_mode       = 0;
2535     pd->amo_cmd         = 0;
2536 #if REMOTE_EVENT
2537     bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2538 #else
2539     bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2540 #endif
2541
2542 #endif
2543 }
2544
2545 // for BIG_MSG called on receiver side for receiving control message
2546 // LMSG_INIT_TAG
2547 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2548 {
2549 #if     USE_LRTS_MEMPOOL
2550     CONTROL_MSG         *request_msg;
2551     gni_return_t        status = GNI_RC_SUCCESS;
2552     void                *msg_data;
2553     gni_post_descriptor_t *pd;
2554     gni_mem_handle_t    msg_mem_hndl;
2555     int                 size, transaction_size, offset = 0;
2556     size_t              register_size = 0;
2557
2558     // initial a get to transfer data from the sender side */
2559     request_msg = (CONTROL_MSG *) header;
2560     size = request_msg->total_length;
2561     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2562     MallocPostDesc(pd);
2563 #if CMK_WITH_STATS 
2564     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2565 #endif
2566     if(request_msg->seq_id < 2)   {
2567         MACHSTATE2(8, "%d seq id in get large msg requrest %d\n", CmiMyRank(), request_msg->seq_id);
2568 #if CMK_SMP_TRACE_COMMTHREAD 
2569         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2570 #endif
2571         msg_data = CmiAlloc(size);
2572         CmiSetMsgSeq(msg_data, 0);
2573         _MEMCHECK(msg_data);
2574     }
2575     else {
2576         offset = ONE_SEG*(request_msg->seq_id-1);
2577         msg_data = (char*)request_msg->dest_addr + offset;
2578     }
2579    
2580     pd->cqwrite_value = request_msg->seq_id;
2581
2582     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2583     SetMemHndlZero(pd->local_mem_hndl);
2584     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2585     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2586         if(NoMsgInRecv( (void*)(msg_data)))
2587             register_size = GetMempoolsize((void*)(msg_data));
2588     }
2589
2590     pd->first_operand = ALIGN64(size);                   //  total length
2591
2592     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2593         pd->type            = GNI_POST_FMA_GET;
2594     else
2595         pd->type            = GNI_POST_RDMA_GET;
2596     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2597     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2598     pd->length          = transaction_size;
2599     pd->local_addr      = (uint64_t) msg_data;
2600     pd->remote_addr     = request_msg->source_addr + offset;
2601     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2602
2603     if (tag == LMSG_OOB_INIT_TAG) 
2604         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2605     else
2606     {
2607 #if MULTI_THREAD_SEND
2608         pd->src_cq_hndl     = rdma_tx_cqh;
2609 #else
2610         pd->src_cq_hndl     = 0;
2611 #endif
2612     }
2613
2614     pd->rdma_mode       = 0;
2615     pd->amo_cmd         = 0;
2616 #if CMI_EXERT_RECV_RDMA_CAP
2617     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2618 #endif
2619     //memory registration success
2620     if(status == GNI_RC_SUCCESS && tag == LMSG_OOB_INIT_TAG )
2621     {
2622         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2623         CMI_GNI_LOCK(lock)
2624 #if REMOTE_EVENT
2625         if( request_msg->seq_id == 0)
2626         {
2627             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2628             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2629             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2630         }
2631 #endif
2632
2633 #if CMK_WITH_STATS
2634         RDMA_TRY_SEND(pd->type)
2635 #endif
2636         if(pd->type == GNI_POST_RDMA_GET) 
2637         {
2638             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2639         }
2640         else
2641         {
2642             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2643         }
2644         CMI_GNI_UNLOCK(lock)
2645
2646         if(status == GNI_RC_SUCCESS )
2647         {
2648 #if CMI_EXERT_RECV_RDMA_CAP
2649             RDMA_pending++;
2650 #endif
2651             if(pd->cqwrite_value == 0)
2652             {
2653 #if MACHINE_DEBUG_LOG
2654                 buffered_recv_msg += register_size;
2655                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2656 #endif
2657                 IncreaseMsgInRecv(msg_data);
2658 #if CMK_SMP_TRACE_COMMTHREAD 
2659                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2660 #endif
2661             }
2662 #if  CMK_WITH_STATS
2663             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2664             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2665 #endif
2666         }
2667     }else if (status != GNI_RC_SUCCESS)
2668     {
2669         SetMemHndlZero((pd->local_mem_hndl));
2670     }
2671         if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM || tag != LMSG_OOB_INIT_TAG)
2672     {
2673 #if REMOTE_EVENT
2674         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2675 #else
2676         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2677 #endif
2678     }else if (status != GNI_RC_SUCCESS) {
2679         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2680         GNI_RC_CHECK("GetLargeAFter posting", status);
2681     }
2682 #else
2683     CONTROL_MSG         *request_msg;
2684     gni_return_t        status;
2685     void                *msg_data;
2686     gni_post_descriptor_t *pd;
2687     RDMA_REQUEST        *rdma_request_msg;
2688     gni_mem_handle_t    msg_mem_hndl;
2689     //int source;
2690     // initial a get to transfer data from the sender side */
2691     request_msg = (CONTROL_MSG *) header;
2692     msg_data = CmiAlloc(request_msg->length);
2693     _MEMCHECK(msg_data);
2694
2695     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2696
2697     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2698     {
2699         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2700     }
2701
2702     MallocPostDesc(pd);
2703     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2704         pd->type            = GNI_POST_FMA_GET;
2705     else
2706         pd->type            = GNI_POST_RDMA_GET;
2707     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2708     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2709     pd->length          = ALIGN64(request_msg->length);
2710     pd->local_addr      = (uint64_t) msg_data;
2711     pd->remote_addr     = request_msg->source_addr;
2712     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2713     if (tag == LMSG_OOB_INIT_TAG) 
2714         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2715     else
2716     {
2717 #if MULTI_THREAD_SEND
2718         pd->src_cq_hndl     = rdma_tx_cqh;
2719 #else
2720         pd->src_cq_hndl     = 0;
2721 #endif
2722     }
2723     pd->rdma_mode       = 0;
2724     pd->amo_cmd         = 0;
2725
2726     //memory registration successful
2727     if(status == GNI_RC_SUCCESS)
2728     {
2729         pd->local_mem_hndl  = msg_mem_hndl;
2730        
2731         if(pd->type == GNI_POST_RDMA_GET) 
2732         {
2733             CMI_GNI_LOCK(rdma_tx_cq_lock)
2734             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2735             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2736         }
2737         else
2738         {
2739             CMI_GNI_LOCK(default_tx_cq_lock)
2740             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2741             CMI_GNI_UNLOCK(default_tx_cq_lock)
2742         }
2743
2744     }else
2745     {
2746         SetMemHndlZero(pd->local_mem_hndl);
2747     }
2748     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2749     {
2750         MallocRdmaRequest(rdma_request_msg);
2751         rdma_request_msg->next = 0;
2752         rdma_request_msg->destNode = inst_id;
2753         rdma_request_msg->pd = pd;
2754         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2755     }else {
2756         GNI_RC_CHECK("AFter posting", status);
2757     }
2758 #endif
2759 }
2760
2761 #if CQWRITE
2762 static void PumpCqWriteTransactions()
2763 {
2764
2765     gni_cq_entry_t          ev;
2766     gni_return_t            status;
2767     void                    *msg;  
2768     int                     msg_size;
2769     while(1) {
2770         //CMI_GNI_LOCK(my_cq_lock) 
2771         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2772         //CMI_GNI_UNLOCK(my_cq_lock)
2773         if(status != GNI_RC_SUCCESS) break;
2774         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2775 #if CMK_PERSISTENT_COMM_PUT
2776 #if PRINT_SYH
2777         printf(" %d CQ write event %p\n", myrank, msg);
2778 #endif
2779         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2780 #if PRINT_SYH
2781             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2782 #endif
2783             CmiReference(msg);
2784             msg_size = CmiGetMsgSize(msg);
2785             CMI_CHECK_CHECKSUM(msg, msg_size);
2786             handleOneRecvedMsg(msg_size, msg); 
2787             continue;
2788         }
2789 #endif
2790 #if ! USE_LRTS_MEMPOOL
2791        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2792 #else
2793         DecreaseMsgInSend(msg);
2794 #endif
2795         if(NoMsgInSend(msg))
2796             buffered_send_msg -= GetMempoolsize(msg);
2797         CmiFree(msg);
2798     };
2799     if(status == GNI_RC_ERROR_RESOURCE)
2800     {
2801         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2802     }
2803 }
2804 #endif
2805
2806 #if REMOTE_EVENT
2807 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2808 {
2809     gni_cq_entry_t          ev;
2810     gni_return_t            status;
2811     void                    *msg;   
2812     int                     inst_id, index, type, size;
2813
2814 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2815     int                     pump_count = 0;
2816 #endif
2817     while(1) {
2818 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2819         if (pump_count > PumpRemoteTransactions_cap) break;
2820 #endif
2821         CMI_GNI_LOCK(global_gni_lock)
2822 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
2823         status = GNI_CqGetEvent(rx_cqh, &ev);
2824 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2825         CMI_GNI_UNLOCK(global_gni_lock)
2826
2827         if(status != GNI_RC_SUCCESS) break;
2828
2829 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2830         pump_count ++;
2831 #endif
2832
2833         inst_id = GNI_CQ_GET_REM_INST_ID(ev);
2834         index = GET_INDEX(inst_id);
2835         type = GET_TYPE(inst_id);
2836         switch (type) {
2837         case 0:    // ACK
2838             CmiAssert(index>=0 && index<ackPool.size);
2839             CMI_GNI_LOCK(ackPool.lock);
2840             //CmiAssert(GetIndexType(ackPool, index) == 1);
2841             msg = GetIndexAddress(ackPool, index);
2842             CMI_GNI_UNLOCK(ackPool.lock);
2843 #if PRINT_SYH
2844             MACHSTATE4(8,"[%d] PumpRemoteTransactions: ack: %lld index: %d type: %d.\n", myrank, msg, index, type);
2845 #endif
2846 #if ! USE_LRTS_MEMPOOL
2847            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2848 #else
2849             DecreaseMsgInSend(msg);
2850 #endif
2851             if(NoMsgInSend(msg))
2852                 buffered_send_msg -= GetMempoolsize(msg);
2853             CmiFree(msg);
2854             IndexPool_freeslot(&ackPool, index);
2855 #if CMI_EXERT_SEND_LARGE_CAP
2856             SEND_large_pending--;
2857 #endif
2858             break;
2859 #if CMK_PERSISTENT_COMM_PUT
2860         case 1:  {    // PERSISTENT
2861             CmiLock(persistPool.lock);
2862             CmiAssert(GetIndexType(persistPool, index) == 2);
2863             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2864             CmiUnlock(persistPool.lock);
2865             msg = slot->destBuf[slot->addrIndex].destAddress;
2866             size = CmiGetMsgSize(msg);
2867             CmiReference(msg);
2868             CMI_CHECK_CHECKSUM(msg, size);
2869             handleOneRecvedMsg(size, msg); 
2870             break;
2871             }
2872 #endif
2873         default:
2874             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2875             CmiAbort("PumpRemoteTransactions: unknown type");
2876         }
2877     }
2878     if(status == GNI_RC_ERROR_RESOURCE)
2879     {
2880         printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
2881         GNI_RC_CHECK("PumpRemoteTransactions: rx_cqh full", status);
2882     }
2883 }
2884 #endif
2885
2886 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2887 {
2888     gni_cq_entry_t          ev;
2889     gni_return_t            status;
2890     uint64_t                type, inst_id;
2891     gni_post_descriptor_t   *tmp_pd;
2892     MSG_LIST                *ptr;
2893     CONTROL_MSG             *ack_msg_tmp;
2894     ACK_MSG                 *ack_msg;
2895     uint8_t                 msg_tag;
2896 #if CMK_DIRECT
2897     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2898 #endif
2899     SMSG_QUEUE         *queue = &smsg_queue;
2900 #if CMI_PUMPLOCALTRANSACTIONS_CAP
2901     int         pump_count = 0;
2902     while(pump_count < PumpLocalTransactions_cap) {
2903         pump_count++;
2904 #else
2905     while(1) {
2906 #endif
2907         CMI_GNI_LOCK(my_cq_lock) 
2908         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2909         CMI_GNI_UNLOCK(my_cq_lock)
2910         if(status != GNI_RC_SUCCESS) break;
2911         
2912         type = GNI_CQ_GET_TYPE(ev);
2913         if (type == GNI_CQ_EVENT_TYPE_POST)
2914         {
2915
2916 #if CMI_EXERT_RECV_RDMA_CAP
2917             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2918             RDMA_pending--;
2919 #endif
2920             inst_id     = GNI_CQ_GET_INST_ID(ev);
2921 #if PRINT_SYH
2922             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2923 #endif
2924             CMI_GNI_LOCK(my_cq_lock)
2925             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2926             CMI_GNI_UNLOCK(my_cq_lock)
2927
2928             switch (tmp_pd->type) {
2929 #if CMK_PERSISTENT_COMM_PUT  || CMK_DIRECT
2930             case GNI_POST_RDMA_PUT:
2931 #if CMK_PERSISTENT_COMM_PUT && ! USE_LRTS_MEMPOOL
2932                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2933 #endif
2934             case GNI_POST_FMA_PUT:
2935                 if(tmp_pd->amo_cmd == 1) {
2936 #if CMK_DIRECT
2937                     //sender ACK to receiver to trigger it is done
2938                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2939                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2940                     msg_tag = DIRECT_PUT_DONE_TAG;
2941 #endif
2942                 }
2943                 else {
2944                     CmiFree((void *)tmp_pd->local_addr);
2945 #if REMOTE_EVENT
2946                     FreePostDesc(tmp_pd);
2947                     continue;
2948 #elif CQWRITE
2949                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2950                     FreePostDesc(tmp_pd);
2951                     continue;
2952 #else
2953                     MallocControlMsg(ack_msg_tmp);
2954                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2955                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2956                     ack_msg_tmp->length  = tmp_pd->length;
2957                     msg_tag = PUT_DONE_TAG;
2958 #endif
2959                 }
2960                 break;
2961 #endif
2962             case GNI_POST_RDMA_GET:
2963             case GNI_POST_FMA_GET:  {
2964                 MACHSTATE2(8, "PumpLocal Get done %lld=>%lld\n", tmp_pd->local_addr, tmp_pd->remote_addr);
2965 #if  ! USE_LRTS_MEMPOOL
2966                 MallocControlMsg(ack_msg_tmp);
2967                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2968                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2969                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2970                 msg_tag = ACK_TAG;  
2971 #else
2972 #if CMK_WITH_STATS
2973                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2974 #endif
2975                 int seq_id = tmp_pd->cqwrite_value;
2976                 if(seq_id > 0)      // BIG_MSG
2977                 {
2978                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2979                     MallocControlMsg(ack_msg_tmp);
2980                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2981                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2982                     ack_msg_tmp->seq_id = seq_id;
2983                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2984                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2985                     ack_msg_tmp->length = tmp_pd->length;
2986                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2987                     msg_tag = BIG_MSG_TAG; 
2988                 } 
2989                 else
2990                 {
2991                     if(seq_id < 0)
2992                         CmiReference((void*)tmp_pd->local_addr);
2993                     msg_tag = ACK_TAG; 
2994 #if  !REMOTE_EVENT && !CQWRITE
2995                     MallocAckMsg(ack_msg);
2996                     ack_msg->source_addr = tmp_pd->remote_addr;
2997 #endif
2998                 }
2999 #endif
3000                 break;
3001             }
3002             case  GNI_POST_CQWRITE:
3003                    FreePostDesc(tmp_pd);
3004                    continue;
3005             default:
3006                 CmiPrintf("type=%d\n", tmp_pd->type);
3007                 CmiAbort("PumpLocalTransactions: unknown type!");
3008             }      /* end of switch */
3009
3010 #if CMK_DIRECT
3011             if (tmp_pd->amo_cmd == 1) {
3012                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL, 0);
3013 #if ! CMK_SMSGS_FREE_AFTER_EVENT
3014                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
3015 #endif
3016             }
3017             else
3018 #endif
3019             if (msg_tag == ACK_TAG) {
3020 #if !REMOTE_EVENT
3021 #if   !CQWRITE
3022                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL, 0); 
3023 #if !CMK_SMSGS_FREE_AFTER_EVENT
3024                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
3025 #endif
3026 #else
3027                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
3028 #endif
3029 #endif
3030             }
3031             else {
3032                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL, 0); 
3033 #if !CMK_SMSGS_FREE_AFTER_EVENT
3034                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
3035 #endif
3036             }
3037 #if CMK_PERSISTENT_COMM_PUT
3038             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
3039 #endif
3040             {
3041                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
3042 #if PRINT_SYH
3043                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
3044 #endif
3045                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
3046                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
3047
3048                     //CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
3049                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
3050 #if MACHINE_DEBUG_LOG
3051                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
3052                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
3053                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
3054 #endif
3055                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, CmiGetMsgSize(tmp_pd->local_addr));
3056                     handleOneRecvedMsg(CmiGetMsgSize(tmp_pd->local_addr), (void*)tmp_pd->local_addr);
3057                 }else if(msg_tag == BIG_MSG_TAG){
3058                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
3059                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
3060                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
3061                         START_EVENT();
3062 #if PRINT_SYH
3063                         printf("Pipeline msg done [%d]\n", myrank);
3064 #endif
3065 #if     CMK_SMP_TRACE_COMMTHREAD
3066                         if( tmp_pd->cqwrite_value == 1)
3067                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
3068 #endif
3069                         CMI_CHECK_CHECKSUM(msg, CmiGetMsgSize(msg));
3070                         handleOneRecvedMsg(CmiGetMsgSize(msg), msg);
3071                     }
3072                 }
3073             }
3074             FreePostDesc(tmp_pd);
3075         }
3076 #if CMK_SMSGS_FREE_AFTER_EVENT
3077         else if  (type == GNI_CQ_EVENT_TYPE_SMSG) {
3078             // a SmsgsSend is done
3079             int msgid = GNI_CQ_GET_MSG_ID(ev);
3080             int type = GetSmsgsIndexType(smsgsPool, msgid);
3081             void *addr = GetSmsgsIndexAddress(smsgsPool, msgid);
3082             switch (type) {
3083             case 1:
3084                 CmiFree(addr);
3085                 break;
3086             case 2:
3087                 free(addr);
3088                 break;
3089             default:
3090                 CmiAbort("Invalid SmsgsIndex");
3091             }
3092             SmsgsPool_freeslot(&smsgsPool, msgid);
3093         }
3094 #endif
3095     } //end while
3096     if(status == GNI_RC_ERROR_RESOURCE)
3097     {
3098         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
3099         GNI_RC_CHECK("Smsg_tx_cq full", status);
3100     }
3101 }
3102
3103 static void  SendRdmaMsg( BufferList sendqueue)
3104 {
3105     gni_return_t            status = GNI_RC_SUCCESS;
3106     gni_mem_handle_t        msg_mem_hndl;
3107     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
3108     RDMA_REQUEST            *pre = 0;
3109     uint64_t                register_size = 0;
3110     void                    *msg;
3111     int                     i;
3112
3113     int len = PCQueueLength(sendqueue);
3114     for (i=0; i<len; i++)
3115     {
3116 #if CMI_EXERT_RECV_RDMA_CAP
3117         if( RDMA_pending >= RDMA_cap) break;
3118 #endif
3119         CMI_PCQUEUEPOP_LOCK( sendqueue)
3120         ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
3121         CMI_PCQUEUEPOP_UNLOCK( sendqueue)
3122         if (ptr == NULL) break;
3123         
3124         gni_post_descriptor_t *pd = ptr->pd;
3125         
3126         msg = (void*)(pd->local_addr);
3127         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
3128         register_size = 0;
3129         if(pd->cqwrite_value == 0) {
3130             if(NoMsgInRecv(msg))
3131                 register_size = GetMempoolsize(msg);
3132         }
3133
3134         if(status == GNI_RC_SUCCESS)        //mem register good
3135         {
3136             int destNode = ptr->destNode;
3137             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
3138             CMI_GNI_LOCK(lock);
3139 #if REMOTE_EVENT
3140             if( pd->cqwrite_value == 0 || pd->cqwrite_value == -1) {
3141                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3142                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
3143                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3144             }
3145 #if CMK_PERSISTENT_COMM_PUT
3146             else if (pd->cqwrite_value == PERSIST_SEQ) {
3147                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3148                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
3149                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3150             }
3151 #endif
3152 #if CMK_DIRECT
3153             else if (pd->cqwrite_value == DIRECT_SEQ) {
3154                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3155                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, DIRECT_EVENT(ptr->ack_index));
3156                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3157             }
3158 #endif
3159
3160 #endif
3161 #if CMK_WITH_STATS
3162             RDMA_TRY_SEND(pd->type)
3163 #endif
3164 #if CMK_SMP_TRACE_COMMTHREAD
3165             if(IS_PUT(pd->type))
3166             {
3167                  START_EVENT();
3168                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
3169             }
3170 #endif
3171             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
3172             {
3173                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
3174             }
3175             else
3176             {
3177                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3178             }
3179             CMI_GNI_UNLOCK(lock);
3180             
3181             if(status == GNI_RC_SUCCESS)    //post good
3182             {
3183                 MACHSTATE4(8, "post noempty-rdma  %d (%lld==%lld,%d) \n", ptr->destNode, pd->local_addr, pd->remote_addr,  register_memory_size); 
3184 #if CMI_EXERT_RECV_RDMA_CAP
3185                 RDMA_pending ++;
3186 #endif
3187                 if(pd->cqwrite_value <= 0)
3188                 {
3189 #if CMK_SMP_TRACE_COMMTHREAD 
3190                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3191 #endif
3192                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3193                 }
3194 #if  CMK_WITH_STATS
3195                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3196                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3197 #endif
3198 #if MACHINE_DEBUG_LOG
3199                 buffered_recv_msg += register_size;
3200                 MACHSTATE(8, "GO request from buffered\n"); 
3201 #endif
3202 #if PRINT_SYH
3203                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
3204 #endif
3205                 FreeRdmaRequest(ptr);
3206             }else           // cannot post
3207             {
3208                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3209 #if PRINT_SYH
3210                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3211 #endif
3212                 break;
3213             }
3214         } else          //memory registration fails
3215         {
3216             PCQueuePush(sendqueue, (char*)ptr);
3217         }
3218     } //end while
3219 }
3220
3221 static 
3222 INLINE_KEYWORD gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
3223 {
3224     CONTROL_MSG         *control_msg_tmp;
3225     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
3226
3227     MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3228     if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {   
3229             /* connection not exists yet */
3230 #if CMK_SMP
3231             /* non-smp case, connect is issued in send_smsg_message */
3232         if (smsg_connected_flag[ptr->destNode] == 0)
3233             connect_to(ptr->destNode); 
3234 #endif
3235     }
3236     else
3237     switch(ptr->tag)
3238     {
3239     case SMALL_DATA_TAG:
3240         status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr, 1);  
3241 #if !CMK_SMSGS_FREE_AFTER_EVENT
3242         if(status == GNI_RC_SUCCESS)
3243         {
3244             CmiFree(ptr->msg);
3245         }
3246 #endif
3247         break;
3248     case LMSG_PERSISTENT_INIT_TAG:
3249     case LMSG_INIT_TAG:
3250     case LMSG_OOB_INIT_TAG:
3251         control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3252         status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr, ptr->tag);
3253         break;
3254 #if !REMOTE_EVENT && !CQWRITE
3255     case ACK_TAG:
3256         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, 0);  
3257 #if !CMK_SMSGS_FREE_AFTER_EVENT
3258         if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3259 #endif
3260         break;
3261 #endif
3262     case BIG_MSG_TAG:
3263         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, 0);  
3264 #if !CMK_SMSGS_FREE_AFTER_EVENT
3265         if(status == GNI_RC_SUCCESS)
3266         {
3267             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3268         }
3269 #endif
3270         break;
3271 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE 
3272     case PUT_DONE_TAG:
3273         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, 0);
3274 #if !CMK_SMSGS_FREE_AFTER_EVENT
3275         if(status == GNI_RC_SUCCESS)
3276         {
3277             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3278         }
3279 #endif
3280         break;
3281 #endif
3282 #if CMK_DIRECT
3283     case DIRECT_PUT_DONE_TAG:
3284         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr, 0);
3285 #if !CMK_SMSGS_FREE_AFTER_EVENT
3286         if(status == GNI_RC_SUCCESS)
3287         {
3288             free((CMK_DIRECT_HEADER*)ptr->msg);
3289         }
3290 #endif
3291         break;
3292 #endif
3293     default:
3294         printf("Weird tag\n");
3295         CmiAbort("should not happen\n");
3296     }       // end switch
3297     return status;
3298 }
3299
3300 // return 1 if all messages are sent
3301
3302 #if ONE_SEND_QUEUE
3303
3304 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3305 {
3306     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3307     CONTROL_MSG         *control_msg_tmp;
3308     gni_return_t        status;
3309     int                 done = 1;
3310     uint64_t            register_size;
3311     void                *register_addr;
3312     int                 index_previous = -1;
3313 #if     CMI_SENDBUFFERSMSG_CAP
3314     int                 sent_length = 0;
3315 #endif
3316     int          index = 0;
3317     memset(destpe_avail, 0, mysize * sizeof(char));
3318     for (index=0; index<1; index++)
3319     {
3320         int i, len = PCQueueLength(queue->sendMsgBuf);
3321         for (i=0; i<len; i++) 
3322         {
3323             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3324             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3325             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3326             if(ptr == NULL) break;
3327             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3328                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3329                 continue;
3330             }
3331             status = _sendOneBufferedSmsg(queue, ptr);
3332 #if CMI_SENDBUFFERSMSG_CAP
3333             sent_length++;
3334 #endif
3335             if(status == GNI_RC_SUCCESS)
3336             {
3337 #if PRINT_SYH
3338                 buffered_smsg_counter--;
3339                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3340 #endif
3341                 FreeMsgList(ptr);
3342             }else {
3343                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3344                 done = 0;
3345                 if(status == GNI_RC_ERROR_RESOURCE)
3346                 {
3347                     destpe_avail[ptr->destNode] = 1;
3348                 }
3349             } 
3350         } //end while
3351     }   // end pooling for all cores
3352     return done;
3353 }
3354
3355 #else   /*  ! ONE_SEND_QUEUE  */
3356
3357 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3358 {
3359     MSG_LIST            *ptr;
3360     gni_return_t        status;
3361     int                 done = 1;
3362 #if     CMI_SENDBUFFERSMSG_CAP
3363     int                 sent_length = 0;
3364 #endif
3365     int idx;
3366 #if SMP_LOCKS
3367     int          index = -1;
3368     int nonempty = PCQueueLength(queue->nonEmptyQueues);
3369     for(idx =0; idx<nonempty; idx++) 
3370     {
3371         index++;  if (index >= nonempty) index = 0;
3372 #if CMI_SENDBUFFERSMSG_CAP
3373         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3374 #endif
3375         CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3376         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3377         CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3378         if(current_list == NULL) break; 
3379         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3380             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3381             continue;
3382         }
3383         PCQueue current_queue= current_list->sendSmsgBuf;
3384         CmiLock(current_list->lock);
3385         int i, len = PCQueueLength(current_queue);
3386         current_list->pushed = 0;
3387         CmiUnlock(current_list->lock);
3388 #else      /* ! SMP_LOCKS */
3389     static int          index = -1;
3390     for(idx =0; idx<mysize; idx++) 
3391     {
3392         index++;  if (index == mysize) index = 0;
3393 #if CMI_SENDBUFFERSMSG_CAP
3394         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3395 #endif
3396         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue;             // check urgent queue
3397         //if (index == myrank) continue;
3398         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3399         int i, len = PCQueueLength(current_queue);
3400 #endif
3401         for (i=0; i<len; i++)  {
3402             CMI_PCQUEUEPOP_LOCK(current_queue)
3403             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3404             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3405             if (ptr == 0) break;
3406
3407             status = _sendOneBufferedSmsg(queue, ptr);
3408 #if CMI_SENDBUFFERSMSG_CAP
3409             sent_length++;
3410 #endif
3411             if(status == GNI_RC_SUCCESS)
3412             {
3413 #if PRINT_SYH
3414                 buffered_smsg_counter--;
3415                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3416 #endif
3417                 FreeMsgList(ptr);
3418             }else {
3419                 PCQueuePush(current_queue, (char*)ptr);
3420                 done = 0;
3421                 if(status == GNI_RC_ERROR_RESOURCE)
3422                 {
3423                     break;
3424                 }
3425             } 
3426         } //end for i
3427 #if SMP_LOCKS
3428         CmiLock(current_list->lock);
3429         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3430         {
3431             current_list->pushed = 1;
3432             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3433         }
3434         CmiUnlock(current_list->lock); 
3435 #endif
3436     }   // end pooling for all cores
3437     return done;
3438 }
3439
3440 #endif
3441
3442 static void ProcessDeadlock();
3443 void LrtsAdvanceCommunication(int whileidle)
3444 {
3445     static int count = 0;
3446     /*  Receive Msg first */
3447 #if CMK_SMP_TRACE_COMMTHREAD
3448     double startT, endT;
3449 #endif
3450     if (useDynamicSMSG && whileidle)
3451     {
3452 #if CMK_SMP_TRACE_COMMTHREAD
3453         startT = CmiWallTimer();
3454 #endif
3455         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3456 #if CMK_SMP_TRACE_COMMTHREAD
3457         endT = CmiWallTimer();
3458         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3459 #endif
3460     }
3461
3462     SEND_OOB_SMSG(smsg_oob_queue)
3463     PUMP_REMOTE_HIGHPRIORITY
3464     PUMP_LOCAL_HIGHPRIORITY
3465     POST_HIGHPRIORITY_RDMA
3466     // Receiving small messages and persistent
3467 #if CMK_SMP_TRACE_COMMTHREAD
3468     startT = CmiWallTimer();
3469 #endif
3470     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3471 #if CMK_SMP_TRACE_COMMTHREAD
3472     endT = CmiWallTimer();
3473     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3474 #endif
3475
3476     SEND_OOB_SMSG(smsg_oob_queue)
3477     PUMP_REMOTE_HIGHPRIORITY
3478     PUMP_LOCAL_HIGHPRIORITY
3479     POST_HIGHPRIORITY_RDMA
3480     
3481     ///* Send buffered Message */
3482 #if CMK_SMP_TRACE_COMMTHREAD
3483     startT = CmiWallTimer();
3484 #endif
3485 #if CMK_USE_OOB
3486     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3487 #else
3488     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3489 #endif
3490 #if CMK_SMP_TRACE_COMMTHREAD
3491     endT = CmiWallTimer();
3492     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3493 #endif
3494
3495     SEND_OOB_SMSG(smsg_oob_queue)
3496     PUMP_REMOTE_HIGHPRIORITY
3497     PUMP_LOCAL_HIGHPRIORITY
3498     POST_HIGHPRIORITY_RDMA
3499
3500     //Pump Get messages or PUT messages
3501 #if CMK_SMP_TRACE_COMMTHREAD
3502     startT = CmiWallTimer();
3503 #endif
3504     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3505 #if MULTI_THREAD_SEND
3506     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3507 #endif
3508 #if CMK_SMP_TRACE_COMMTHREAD
3509     endT = CmiWallTimer();
3510     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3511 #endif
3512     
3513     SEND_OOB_SMSG(smsg_oob_queue)
3514     PUMP_REMOTE_HIGHPRIORITY
3515     PUMP_LOCAL_HIGHPRIORITY
3516     POST_HIGHPRIORITY_RDMA
3517     //Pump Remote event
3518 #if CMK_SMP_TRACE_COMMTHREAD
3519     startT = CmiWallTimer();
3520 #endif
3521 #if CQWRITE
3522     PumpCqWriteTransactions();
3523 #endif
3524 #if REMOTE_EVENT
3525     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3526 #endif
3527 #if CMK_SMP_TRACE_COMMTHREAD
3528     endT = CmiWallTimer();
3529     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3530 #endif
3531
3532     SEND_OOB_SMSG(smsg_oob_queue)
3533     PUMP_REMOTE_HIGHPRIORITY
3534     PUMP_LOCAL_HIGHPRIORITY
3535     POST_HIGHPRIORITY_RDMA
3536
3537 #if CMK_SMP_TRACE_COMMTHREAD
3538     startT = CmiWallTimer();
3539 #endif
3540     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3541 #if CMK_SMP_TRACE_COMMTHREAD
3542     endT = CmiWallTimer();
3543     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3544 #endif
3545