turn largepage off
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     1
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_LARGE_CAP   0
65 #define CMI_EXERT_RECV_RDMA_CAP    0
66
67
68 #define CMI_SENDBUFFERSMSG_CAP            0
69 #define CMI_PUMPNETWORKSMSG_CAP           0
70 #define CMI_PUMPREMOTETRANSACTIONS_CAP    0
71 #define CMI_PUMPLOCALTRANSACTIONS_CAP     0
72
73 #if CMI_SENDBUFFERSMSG_CAP
74 int     SendBufferMsg_cap  = 20;
75 #endif
76
77 #if CMI_PUMPNETWORKSMSG_CAP
78 int     PumpNetworkSmsg_cap = 20;
79 #endif
80
81 #if CMI_PUMPREMOTETRANSACTIONS_CAP
82 int     PumpRemoteTransactions_cap = 20;
83 #endif
84
85 #if CMI_PUMPREMOTETRANSACTIONS_CAP
86 int     PumpLocalTransactions_cap = 15;
87 #endif
88
89 #if CMI_EXERT_SEND_LARGE_CAP
90 static int SEND_large_cap = 20;
91 static int SEND_large_pending = 0;
92 #endif
93
94 #if CMI_EXERT_RECV_RDMA_CAP
95 static int   RDMA_cap =   10;
96 static int   RDMA_pending = 0;
97 #endif
98
99 #define USE_LRTS_MEMPOOL                  1
100
101 #define PRINT_SYH                         0
102
103 // Trace communication thread
104 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
105 #define TRACE_THRESHOLD     0.00001
106 #define CMI_MPI_TRACE_MOREDETAILED 0
107 #undef CMI_MPI_TRACE_USEREVENTS
108 #define CMI_MPI_TRACE_USEREVENTS 1
109 #else
110 #undef CMK_SMP_TRACE_COMMTHREAD
111 #define CMK_SMP_TRACE_COMMTHREAD 0
112 #endif
113
114 #define CMK_TRACE_COMMOVERHEAD 0
115 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
116 #undef CMI_MPI_TRACE_USEREVENTS
117 #define CMI_MPI_TRACE_USEREVENTS 1
118 #else
119 #undef CMK_TRACE_COMMOVERHEAD
120 #define CMK_TRACE_COMMOVERHEAD 0
121 #endif
122
123 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
124 CpvStaticDeclare(double, projTraceStart);
125 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
126 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
127 #define  EVENT_TIME()   CpvAccess(projTraceStart)
128 #else
129 #define  START_EVENT()
130 #define  END_EVENT(x)
131 #define  EVENT_TIME()   (0.0)
132 #endif
133
134 #if USE_LRTS_MEMPOOL
135
136 #define oneMB (1024ll*1024)
137 #define oneGB (1024ll*1024*1024)
138
139 static CmiInt8 _mempool_size = 8*oneMB;
140 static CmiInt8 _expand_mem =  4*oneMB;
141 static CmiInt8 _mempool_size_limit = 0;
142
143 static CmiInt8 _totalmem = 0.8*oneGB;
144
145 #if LARGEPAGE
146 static CmiInt8 BIG_MSG  =  16*oneMB;
147 static CmiInt8 ONE_SEG  =  4*oneMB;
148 #else
149 static CmiInt8 BIG_MSG  =  4*oneMB;
150 static CmiInt8 ONE_SEG  =  2*oneMB;
151 #endif
152 #if MULTI_THREAD_SEND
153 static int BIG_MSG_PIPELINE = 1;
154 #else
155 static int BIG_MSG_PIPELINE = 4;
156 #endif
157
158 // dynamic flow control
159 static CmiInt8 buffered_send_msg = 0;
160 static CmiInt8 register_memory_size = 0;
161
162 #if LARGEPAGE
163 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
164 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
165 static CmiInt8  register_count = 0;
166 #else
167 #if CMK_SMP && COMM_THREAD_SEND 
168 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
169 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
170 #else
171 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
172 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
173 #endif
174
175
176 #endif
177
178 #endif     /* end USE_LRTS_MEMPOOL */
179
180 #if MULTI_THREAD_SEND 
181 #define     CMI_GNI_LOCK(x)       CmiLock(x);
182 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
183 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
184 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
185 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
186 #else
187 #define     CMI_GNI_LOCK(x)
188 #define     CMI_GNI_TRYLOCK(x)         (0)
189 #define     CMI_GNI_UNLOCK(x)
190 #define     CMI_PCQUEUEPOP_LOCK(Q)   
191 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
192 #endif
193
194 static int _tlbpagesize = 4096;
195
196 //static int _smpd_count  = 0;
197
198 static int   user_set_flag  = 0;
199
200 static int _checkProgress = 1;             /* check deadlock */
201 static int _detected_hang = 0;
202
203 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
204
205 // dynamic SMSG
206 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
207
208 static int avg_smsg_connection = 32;
209 static int                 *smsg_connected_flag= 0;
210 static gni_smsg_attr_t     **smsg_attr_vector_local;
211 static gni_smsg_attr_t     **smsg_attr_vector_remote;
212 static gni_ep_handle_t     ep_hndl_unbound;
213 static gni_smsg_attr_t     send_smsg_attr;
214 static gni_smsg_attr_t     recv_smsg_attr;
215
216 typedef struct _dynamic_smsg_mailbox{
217    void     *mailbox_base;
218    int      size;
219    int      offset;
220    gni_mem_handle_t  mem_hndl;
221    struct      _dynamic_smsg_mailbox  *next;
222 }dynamic_smsg_mailbox_t;
223
224 static dynamic_smsg_mailbox_t  *mailbox_list;
225
226 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
227 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
228
229 #if PRINT_SYH
230 int         lrts_send_msg_id = 0;
231 int         lrts_local_done_msg = 0;
232 int         lrts_send_rdma_success = 0;
233 #endif
234
235 #include "machine.h"
236
237 #include "pcqueue.h"
238
239 #include "mempool.h"
240
241 #if CMK_PERSISTENT_COMM
242 #include "machine-persistent.h"
243 #define  POST_HIGHPRIORITY_RDMA    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
244 #else  
245 #define  POST_HIGHPRIORITY_RDMA   
246 #endif
247
248 #if REMOTE_EVENT && (CMK_USE_OOB || CMK_PERSISTENT_COMM) 
249 #define  PUMP_REMOTE_HIGHPRIORITY    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
250 #else
251 #define  PUMP_REMOTE_HIGHPRIORITY
252 #endif
253
254 //#define  USE_ONESIDED 1
255 #ifdef USE_ONESIDED
256 //onesided implementation is wrong, since no place to restore omdh
257 #include "onesided.h"
258 onesided_hnd_t   onesided_hnd;
259 onesided_md_t    omdh;
260 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
261
262 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
263
264 #else
265 uint8_t   onesided_hnd, omdh;
266
267 #if REMOTE_EVENT || CQWRITE 
268 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
269     if(register_memory_size+size>= MAX_REG_MEM) { \
270         status = GNI_RC_ERROR_NOMEM;} \
271     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
272         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
273 #else
274 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
275         if (register_memory_size + size >= MAX_REG_MEM) { \
276             status = GNI_RC_ERROR_NOMEM; \
277         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
278             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
279 #endif
280
281 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
282     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
283              register_memory_size -= size; \
284          else CmiAbort("MEM_DEregister");  \
285     } while (0)
286 #endif
287
288 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
289 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
290 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
291 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
292 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
293 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
294 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
295 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
296 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
297 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
298 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
299 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
300 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
301 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
302
303 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
304 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
305
306 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
307 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
308 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
309 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
310
311 #define ALIGNBUF                64
312
313 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
314 /* If SMSG is not used */
315
316 #define FMA_PER_CORE  1024
317 #define FMA_BUFFER_SIZE 1024
318
319 /* If SMSG is used */
320 static int  SMSG_MAX_MSG = 1024;
321 #define SMSG_MAX_CREDIT    72
322
323 #define MSGQ_MAXSIZE       2048
324
325 /* large message transfer with FMA or BTE */
326 #if ! REMOTE_EVENT
327 #define LRTS_GNI_RDMA_THRESHOLD  1024 
328 #else
329    /* remote events only work with RDMA */
330 #define LRTS_GNI_RDMA_THRESHOLD  0 
331 #endif
332
333 #if CMK_SMP
334 static int  REMOTE_QUEUE_ENTRIES=163840; 
335 static int LOCAL_QUEUE_ENTRIES=163840; 
336 #else
337 static int  REMOTE_QUEUE_ENTRIES=20480;
338 static int LOCAL_QUEUE_ENTRIES=20480; 
339 #endif
340
341 #define BIG_MSG_TAG             0x26
342 #define PUT_DONE_TAG            0x28
343 #define DIRECT_PUT_DONE_TAG     0x29
344 #define ACK_TAG                 0x30
345 /* SMSG is data message */
346 #define SMALL_DATA_TAG          0x31
347 /* SMSG is a control message to initialize a BTE */
348 #define LMSG_INIT_TAG           0x33 
349 #define LMSG_OOB_INIT_TAG       0x35
350
351 #define DEBUG
352 #ifdef GNI_RC_CHECK
353 #undef GNI_RC_CHECK
354 #endif
355 #ifdef DEBUG
356 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
357 #else
358 #define GNI_RC_CHECK(msg,rc)
359 #endif
360
361 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
362 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
363 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
364
365 static int useStaticMSGQ = 0;
366 static int useStaticFMA = 0;
367 static int mysize, myrank;
368 static gni_nic_handle_t   nic_hndl;
369
370 typedef struct {
371     gni_mem_handle_t mdh;
372     uint64_t addr;
373 } mdh_addr_t ;
374 // this is related to dynamic SMSG
375
376 typedef struct mdh_addr_list{
377     gni_mem_handle_t mdh;
378    void *addr;
379     struct mdh_addr_list *next;
380 }mdh_addr_list_t;
381
382 static unsigned int         smsg_memlen;
383 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
384 mdh_addr_t          setup_mem;
385 mdh_addr_t          *smsg_connection_vec = 0;
386 gni_mem_handle_t    smsg_connection_memhndl;
387 static int          smsg_expand_slots = 10;
388 static int          smsg_available_slot = 0;
389 static void         *smsg_mailbox_mempool = 0;
390 mdh_addr_list_t     *smsg_dynamic_list = 0;
391
392 static void             *smsg_mailbox_base;
393 gni_msgq_attr_t         msgq_attrs;
394 gni_msgq_handle_t       msgq_handle;
395 gni_msgq_ep_attr_t      msgq_ep_attrs;
396 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
397
398 /* =====Beginning of Declarations of Machine Specific Variables===== */
399 static int cookie;
400 static int modes = 0;
401 static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
402 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
403 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
404 static gni_cq_handle_t       highprior_rdma_tx_cqh = NULL;      // rdma - local event
405 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
406 static gni_cq_handle_t       highpriority_rx_cqh = NULL;      // mempool - remote event
407 static gni_ep_handle_t       *ep_hndl_array;
408
409 static CmiNodeLock           *ep_lock_array;
410 static CmiNodeLock           default_tx_cq_lock; 
411 static CmiNodeLock           rdma_tx_cq_lock; 
412 static CmiNodeLock           global_gni_lock; 
413 static CmiNodeLock           rx_cq_lock;
414 static CmiNodeLock           smsg_mailbox_lock;
415 static CmiNodeLock           smsg_rx_cq_lock;
416 static CmiNodeLock           *mempool_lock;
417 //#define     CMK_WITH_STATS      1
418 typedef struct msg_list
419 {
420     uint32_t destNode;
421     uint32_t size;
422     void *msg;
423     uint8_t tag;
424 #if CMK_WITH_STATS
425     double  creation_time;
426 #endif
427 }MSG_LIST;
428
429
430 typedef struct control_msg
431 {
432     uint64_t            source_addr;    /* address from the start of buffer  */
433     uint64_t            dest_addr;      /* address from the start of buffer */
434     int                 total_length;   /* total length */
435     int                 length;         /* length of this packet */
436 #if REMOTE_EVENT
437     int                 ack_index;      /* index from integer to address */
438 #endif
439     uint8_t             seq_id;         //big message   0 meaning single message
440     gni_mem_handle_t    source_mem_hndl;
441     struct control_msg *next;
442 } CONTROL_MSG;
443
444 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
445
446 typedef struct ack_msg
447 {
448     uint64_t            source_addr;    /* address from the start of buffer  */
449 #if ! USE_LRTS_MEMPOOL
450     gni_mem_handle_t    source_mem_hndl;
451     int                 length;          /* total length */
452 #endif
453     struct ack_msg     *next;
454 } ACK_MSG;
455
456 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
457
458 #if CMK_DIRECT
459 typedef struct{
460     uint64_t    handler_addr;
461 }CMK_DIRECT_HEADER;
462
463 typedef struct {
464     char core[CmiMsgHeaderSizeBytes];
465     uint64_t handler;
466 }cmidirectMsg;
467
468 //SYH
469 CpvDeclare(int, CmiHandleDirectIdx);
470 void CmiHandleDirectMsg(cmidirectMsg* msg)
471 {
472
473     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
474    (*(_handle->callbackFnPtr))(_handle->callbackData);
475    CmiFree(msg);
476 }
477
478 void CmiDirectInit()
479 {
480     CpvInitialize(int,  CmiHandleDirectIdx);
481     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
482 }
483
484 #endif
485 typedef struct  rmda_msg
486 {
487     int                   destNode;
488 #if REMOTE_EVENT
489     int                   ack_index;
490 #endif
491     gni_post_descriptor_t *pd;
492 }RDMA_REQUEST;
493
494
495 #define SMP_LOCKS                       1
496 #define ONE_SEND_QUEUE                  0
497 typedef PCQueue BufferList;
498 typedef struct  msg_list_index
499 {
500     PCQueue       sendSmsgBuf;
501 #if  SMP_LOCKS
502     CmiNodeLock   lock;
503     int           pushed;
504     int           destpe;
505 #endif
506 } MSG_LIST_INDEX;
507 char                *destpe_avail;
508 PCQueue sendRdmaBuf;
509 PCQueue sendHighPriorBuf;
510 // buffered send queue
511 #if ! ONE_SEND_QUEUE
512 typedef struct smsg_queue
513 {
514     MSG_LIST_INDEX   *smsg_msglist_index;
515     int               smsg_head_index;
516 #if  SMP_LOCKS
517     PCQueue     nonEmptyQueues;
518 #endif
519 } SMSG_QUEUE;
520 #else
521 typedef struct smsg_queue
522 {
523     PCQueue       sendMsgBuf;
524 }  SMSG_QUEUE;
525 #endif
526
527 SMSG_QUEUE                  smsg_queue;
528 #if CMK_USE_OOB
529 SMSG_QUEUE                  smsg_oob_queue;
530 #define SEND_OOB_SMSG(x)            SendBufferMsg(&x, NULL);
531 #define PUMP_LOCAL_HIGHPRIORITY    STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh,  rdma_tx_cq_lock)); 
532 #else
533 #define SEND_OOB_SMSG(x)            
534 #define PUMP_LOCAL_HIGHPRIORITY     
535 #endif
536
537 #define FreeMsgList(d)   free(d);
538 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
539
540 #define FreeControlMsg(d)      free(d);
541 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
542
543 #define FreeAckMsg(d)      free(d);
544 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
545
546 #define FreeRdmaRequest(d)       free(d);
547 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
548 /* reuse gni_post_descriptor_t */
549 static gni_post_descriptor_t *post_freelist=0;
550
551 #define FreePostDesc(d)     free(d);
552 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
553
554
555 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
556 static int      buffered_smsg_counter = 0;
557
558 /* SmsgSend return success but message sent is not confirmed by remote side */
559 static MSG_LIST *buffered_fma_head = 0;
560 static MSG_LIST *buffered_fma_tail = 0;
561
562 /* functions  */
563 #define IsFree(a,ind)  !( a& (1<<(ind) ))
564 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
565 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
566
567 CpvDeclare(mempool_type*, mempool);
568
569 #if CMK_PERSISTENT_COMM
570 CpvDeclare(mempool_type*, persistent_mempool);
571 #endif
572
573 #if REMOTE_EVENT
574 /* ack pool for remote events */
575
576 static int  SHIFT   =           18;
577 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
578 #define RANK_MASK               ((1<<SHIFT) - 1)
579 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
580
581 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
582 #define GET_RANK(evt)           ((evt) & RANK_MASK)
583 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
584
585 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
586
587 #if CMK_SMP
588 #define INIT_SIZE                4096
589 #else
590 #define INIT_SIZE                1024
591 #endif
592
593 struct IndexStruct {
594 void *addr;
595 int next;
596 int type;     // 1: ACK   2: Persistent
597 };
598
599 typedef struct IndexPool {
600     struct IndexStruct   *indexes;
601     int                   size;
602     int                   freehead;
603     CmiNodeLock           lock;
604 } IndexPool;
605
606 static IndexPool  ackPool;
607 #if CMK_PERSISTENT_COMM
608 static IndexPool  persistPool;
609 #endif
610
611 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
612 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
613
614 static void IndexPool_init(IndexPool *pool)
615 {
616     int i;
617     if ((1<<SHIFT) < mysize) 
618         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
619     pool->size = INIT_SIZE;
620     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
621     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
622     for (i=0; i<pool->size-1; i++) {
623         pool->indexes[i].next = i+1;
624         pool->indexes[i].type = 0;
625     }
626     pool->indexes[i].next = -1;
627     pool->freehead = 0;
628 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
629     pool->lock  = CmiCreateLock();
630 #else
631     pool->lock  = 0;
632 #endif
633 }
634
635 static
636 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
637 {
638     int s, i;
639 #if MULTI_THREAD_SEND  
640     CmiLock(pool->lock);
641 #endif
642     s = pool->freehead;
643     if (s == -1) {
644         int newsize = pool->size * 2;
645         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
646         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
647         struct IndexStruct *old_ackpool = pool->indexes;
648         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
649         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
650         for (i=pool->size; i<newsize-1; i++) {
651             pool->indexes[i].next = i+1;
652             pool->indexes[i].type = 0;
653         }
654         pool->indexes[i].next = -1;
655         pool->indexes[i].type = 0;
656         pool->freehead = pool->size;
657         s = pool->size;
658         pool->size = newsize;
659         free(old_ackpool);
660     }
661     pool->freehead = pool->indexes[s].next;
662     pool->indexes[s].addr = addr;
663     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
664     pool->indexes[s].type = type;
665 #if MULTI_THREAD_SEND
666     CmiUnlock(pool->lock);
667 #endif
668     return s;
669 }
670
671 static
672 inline  void IndexPool_freeslot(IndexPool *pool, int s)
673 {
674     CmiAssert(s>=0 && s<pool->size);
675 #if MULTI_THREAD_SEND
676     CmiLock(pool->lock);
677 #endif
678     pool->indexes[s].next = pool->freehead;
679     pool->indexes[s].type = 0;
680     pool->freehead = s;
681 #if MULTI_THREAD_SEND
682     CmiUnlock(pool->lock);
683 #endif
684 }
685
686
687 #endif
688
689 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
690 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
691 #define CHARM_MAGIC_NUMBER               126
692
693 #if CMK_ERROR_CHECKING
694 extern unsigned char computeCheckSum(unsigned char *data, int len);
695 static int checksum_flag = 0;
696 #define CMI_SET_CHECKSUM(msg, len)      \
697         if (checksum_flag)  {   \
698           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
699           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
700         }
701 #define CMI_CHECK_CHECKSUM(msg, len)    \
702         if (checksum_flag)      \
703           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
704             CmiAbort("Fatal error: checksum doesn't agree!\n");
705 #else
706 #define CMI_SET_CHECKSUM(msg, len)
707 #define CMI_CHECK_CHECKSUM(msg, len)
708 #endif
709 /* =====End of Definitions of Message-Corruption Related Macros=====*/
710
711 static int print_stats = 0;
712 static int stats_off = 0;
713 void CmiTurnOnStats()
714 {
715     stats_off = 0;
716     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
717 }
718
719 void CmiTurnOffStats()
720 {
721     stats_off = 1;
722 }
723
724 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
725
726 #if CMK_WITH_STATS
727 FILE *counterLog = NULL;
728 typedef struct comm_thread_stats
729 {
730     uint64_t  smsg_data_count;
731     uint64_t  lmsg_init_count;
732     uint64_t  ack_count;
733     uint64_t  big_msg_ack_count;
734     uint64_t  smsg_count;
735     uint64_t  direct_put_done_count;
736     uint64_t  put_done_count;
737     //times of calling SmsgSend
738     uint64_t  try_smsg_data_count;
739     uint64_t  try_lmsg_init_count;
740     uint64_t  try_ack_count;
741     uint64_t  try_big_msg_ack_count;
742     uint64_t  try_direct_put_done_count;
743     uint64_t  try_put_done_count;
744     uint64_t  try_smsg_count;
745     
746     double    max_time_in_send_buffered_smsg;
747     double    all_time_in_send_buffered_smsg;
748
749     uint64_t  rdma_get_count, rdma_put_count;
750     uint64_t  try_rdma_get_count, try_rdma_put_count;
751     double    max_time_from_control_to_rdma_init;
752     double    all_time_from_control_to_rdma_init;
753
754     double    max_time_from_rdma_init_to_rdma_done;
755     double    all_time_from_rdma_init_to_rdma_done;
756
757     int      count_in_PumpNetwork;
758     double   time_in_PumpNetwork;
759     double   max_time_in_PumpNetwork;
760     int      count_in_SendBufferMsg_smsg;
761     double   time_in_SendBufferMsg_smsg;
762     double   max_time_in_SendBufferMsg_smsg;
763     int      count_in_SendRdmaMsg;
764     double   time_in_SendRdmaMsg;
765     double   max_time_in_SendRdmaMsg;
766     int      count_in_PumpRemoteTransactions;
767     double   time_in_PumpRemoteTransactions;
768     double   max_time_in_PumpRemoteTransactions;
769     int      count_in_PumpLocalTransactions_rdma;
770     double   time_in_PumpLocalTransactions_rdma;
771     double   max_time_in_PumpLocalTransactions_rdma;
772     int      count_in_PumpDatagramConnection;
773     double   time_in_PumpDatagramConnection;
774     double   max_time_in_PumpDatagramConnection;
775 } Comm_Thread_Stats;
776
777 static Comm_Thread_Stats   comm_stats;
778
779 static char *counters_dirname = "counters";
780
781 static void init_comm_stats()
782 {
783   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
784   if (print_stats){
785       char ln[200];
786       int code = mkdir(counters_dirname, 00777); 
787       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
788       counterLog=fopen(ln,"w");
789       if (counterLog == NULL) CmiAbort("Counter files open failed");
790   }
791 }
792
793 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
794
795 #define SMSG_SENT_DONE(creation_time, tag)  \
796         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
797             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++;  \
798             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
799             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
800             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
801             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
802             comm_stats.smsg_count++; \
803             double inbuff_time = CmiWallTimer() - creation_time;   \
804             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
805             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
806         }
807
808 #define SMSG_TRY_SEND(tag)  \
809         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
810             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
811             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
812             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
813             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
814             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
815             comm_stats.try_smsg_count++; \
816         }
817
818 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
819
820 #define  RDMA_TRANS_DONE(x)      \
821          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
822              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
823              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
824          }
825
826 #define  RDMA_TRANS_INIT(type, x)      \
827          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
828              double rdma_trans_time = CmiWallTimer() - x ; \
829              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
830              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
831          }
832
833 #define STATS_PUMPNETWORK_TIME(x)   \
834         { double t = CmiWallTimer(); \
835           x;        \
836           t = CmiWallTimer() - t;          \
837           comm_stats.count_in_PumpNetwork++;        \
838           comm_stats.time_in_PumpNetwork += t;   \
839           if (t>comm_stats.max_time_in_PumpNetwork)      \
840               comm_stats.max_time_in_PumpNetwork = t;    \
841         }
842
843 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
844         { double t = CmiWallTimer(); \
845           x;        \
846           t = CmiWallTimer() - t;          \
847           comm_stats.count_in_PumpRemoteTransactions ++;        \
848           comm_stats.time_in_PumpRemoteTransactions += t;   \
849           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
850               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
851         }
852
853 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
854         { double t = CmiWallTimer(); \
855           x;        \
856           t = CmiWallTimer() - t;          \
857           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
858           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
859           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
860               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
861         }
862
863 #define STATS_SEND_SMSGS_TIME(x)   \
864         { double t = CmiWallTimer(); \
865           x;        \
866           t = CmiWallTimer() - t;          \
867           comm_stats.count_in_SendBufferMsg_smsg ++;        \
868           comm_stats.time_in_SendBufferMsg_smsg += t;   \
869           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
870               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
871         }
872
873 #define STATS_SENDRDMAMSG_TIME(x)   \
874         { double t = CmiWallTimer(); \
875           x;        \
876           t = CmiWallTimer() - t;          \
877           comm_stats.count_in_SendRdmaMsg ++;        \
878           comm_stats.time_in_SendRdmaMsg += t;   \
879           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
880               comm_stats.max_time_in_SendRdmaMsg = t;    \
881         }
882
883 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
884         { double t = CmiWallTimer(); \
885           x;        \
886           t = CmiWallTimer() - t;          \
887           comm_stats.count_in_PumpDatagramConnection ++;        \
888           comm_stats.time_in_PumpDatagramConnection += t;   \
889           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
890               comm_stats.max_time_in_PumpDatagramConnection = t;    \
891         }
892
893 static void print_comm_stats()
894 {
895     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
896     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
897             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
898             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
899     
900     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
901             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
902             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
903
904     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
905     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
906     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
907
908
909     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
910     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
911     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
912     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
913     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
914     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
915     if (useDynamicSMSG)
916     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
917
918     fclose(counterLog);
919 }
920
921 #else
922 #define STATS_PUMPNETWORK_TIME(x)                  x
923 #define STATS_SEND_SMSGS_TIME(x)                   x
924 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
925 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
926 #define STATS_SENDRDMAMSG_TIME(x)                  x
927 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
928 #endif
929
930 static void
931 allgather(void *in,void *out, int len)
932 {
933     static int *ivec_ptr=NULL,already_called=0,job_size=0;
934     int i,rc;
935     int my_rank;
936     char *tmp_buf,*out_ptr;
937
938     if(!already_called) {
939
940         rc = PMI_Get_size(&job_size);
941         CmiAssert(rc == PMI_SUCCESS);
942         rc = PMI_Get_rank(&my_rank);
943         CmiAssert(rc == PMI_SUCCESS);
944
945         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
946         CmiAssert(ivec_ptr != NULL);
947
948         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
949         CmiAssert(rc == PMI_SUCCESS);
950
951         already_called = 1;
952
953     }
954
955     tmp_buf = (char *)malloc(job_size * len);
956     CmiAssert(tmp_buf);
957
958     rc = PMI_Allgather(in,tmp_buf,len);
959     CmiAssert(rc == PMI_SUCCESS);
960
961     out_ptr = out;
962
963     for(i=0;i<job_size;i++) {
964
965         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
966
967     }
968
969     free(tmp_buf);
970 }
971
972 static void
973 allgather_2(void *in,void *out, int len)
974 {
975     //PMI_Allgather is out of order
976     int i,rc, extend_len;
977     int  rank_index;
978     char *out_ptr, *out_ref;
979     char *in2;
980
981     extend_len = sizeof(int) + len;
982     in2 = (char*)malloc(extend_len);
983
984     memcpy(in2, &myrank, sizeof(int));
985     memcpy(in2+sizeof(int), in, len);
986
987     out_ptr = (char*)malloc(mysize*extend_len);
988
989     rc = PMI_Allgather(in2, out_ptr, extend_len);
990     GNI_RC_CHECK("allgather", rc);
991
992     out_ref = out;
993
994     for(i=0;i<mysize;i++) {
995         //rank index 
996         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
997         //copy to the rank index slot
998         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
999     }
1000
1001     free(out_ptr);
1002     free(in2);
1003
1004 }
1005
1006 static unsigned int get_gni_nic_address(int device_id)
1007 {
1008     unsigned int address, cpu_id;
1009     gni_return_t status;
1010     int i, alps_dev_id=-1,alps_address=-1;
1011     char *token, *p_ptr;
1012
1013     p_ptr = getenv("PMI_GNI_DEV_ID");
1014     if (!p_ptr) {
1015         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1016        
1017         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1018     } else {
1019         while ((token = strtok(p_ptr,":")) != NULL) {
1020             alps_dev_id = atoi(token);
1021             if (alps_dev_id == device_id) {
1022                 break;
1023             }
1024             p_ptr = NULL;
1025         }
1026         CmiAssert(alps_dev_id != -1);
1027         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1028         CmiAssert(p_ptr != NULL);
1029         i = 0;
1030         while ((token = strtok(p_ptr,":")) != NULL) {
1031             if (i == alps_dev_id) {
1032                 alps_address = atoi(token);
1033                 break;
1034             }
1035             p_ptr = NULL;
1036             ++i;
1037         }
1038         CmiAssert(alps_address != -1);
1039         address = alps_address;
1040     }
1041     return address;
1042 }
1043
1044 static uint8_t get_ptag(void)
1045 {
1046     char *p_ptr, *token;
1047     uint8_t ptag;
1048
1049     p_ptr = getenv("PMI_GNI_PTAG");
1050     CmiAssert(p_ptr != NULL);
1051     token = strtok(p_ptr, ":");
1052     ptag = (uint8_t)atoi(token);
1053     return ptag;
1054         
1055 }
1056
1057 static uint32_t get_cookie(void)
1058 {
1059     uint32_t cookie;
1060     char *p_ptr, *token;
1061
1062     p_ptr = getenv("PMI_GNI_COOKIE");
1063     CmiAssert(p_ptr != NULL);
1064     token = strtok(p_ptr, ":");
1065     cookie = (uint32_t)atoi(token);
1066
1067     return cookie;
1068 }
1069
1070 #if LARGEPAGE
1071
1072 /* directly mmap memory from hugetlbfs for large pages */
1073
1074 #include <sys/stat.h>
1075 #include <fcntl.h>
1076 #include <sys/mman.h>
1077 #include <hugetlbfs.h>
1078
1079 // size must be _tlbpagesize aligned
1080 void *my_get_huge_pages(size_t size)
1081 {
1082     char filename[512];
1083     int fd;
1084     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1085     void *ptr = NULL;
1086
1087     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1088     fd = open(filename, O_RDWR | O_CREAT, mode);
1089     if (fd == -1) {
1090         CmiAbort("my_get_huge_pages: open filed");
1091     }
1092     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1093     if (ptr == MAP_FAILED) ptr = NULL;
1094 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1095     close(fd);
1096     unlink(filename);
1097     return ptr;
1098 }
1099
1100 void my_free_huge_pages(void *ptr, int size)
1101 {
1102 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1103     int ret = munmap(ptr, size);
1104     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1105 }
1106
1107 #endif
1108
1109 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1110 /* TODO: add any that are related */
1111 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1112
1113
1114 #include "machine-lrts.h"
1115 #include "machine-common-core.c"
1116
1117
1118 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1119 static void SendRdmaMsg(PCQueue );
1120 static void PumpNetworkSmsg();
1121 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1122 #if CQWRITE
1123 static void PumpCqWriteTransactions();
1124 #endif
1125 #if REMOTE_EVENT
1126 static void PumpRemoteTransactions(gni_cq_handle_t);
1127 #endif
1128
1129 #if MACHINE_DEBUG_LOG
1130 FILE *debugLog = NULL;
1131 static CmiInt8 buffered_recv_msg = 0;
1132 int         lrts_smsg_success = 0;
1133 int         lrts_received_msg = 0;
1134 #endif
1135
1136 static void sweep_mempool(mempool_type *mptr)
1137 {
1138     int n = 0;
1139     block_header *current = &(mptr->block_head);
1140
1141     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1142     while( current!= NULL) {
1143         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1144         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1145     }
1146     printf("[n %d] sweep_mempool slot END.\n", myrank);
1147 }
1148
1149 inline
1150 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1151 {
1152     block_header *current = *from;
1153
1154     //while(register_memory_size>= MAX_REG_MEM)
1155     //{
1156         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1157             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1158
1159         *from = current;
1160         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1161         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1162         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1163     //}
1164     return GNI_RC_SUCCESS;
1165 }
1166
1167 inline 
1168 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1169 {
1170     gni_return_t status = GNI_RC_SUCCESS;
1171     //int size = GetMempoolsize(msg);
1172     //void *blockaddr = GetMempoolBlockPtr(msg);
1173     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1174    
1175     block_header *current = &(mptr->block_head);
1176     while(register_memory_size>= MAX_REG_MEM)
1177     {
1178         status = deregisterMemory(mptr, &current);
1179         if (status != GNI_RC_SUCCESS) break;
1180     }
1181     if(register_memory_size>= MAX_REG_MEM) return status;
1182
1183     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1184     while(1)
1185     {
1186         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1187         if(status == GNI_RC_SUCCESS)
1188         {
1189             break;
1190         }
1191         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1192         {
1193             GNI_RC_CHECK("registerFromMempool", status);
1194         }
1195         else
1196         {
1197             status = deregisterMemory(mptr, &current);
1198             if (status != GNI_RC_SUCCESS) break;
1199         }
1200     }; 
1201     return status;
1202 }
1203
1204 inline 
1205 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1206 {
1207     static int rank = -1;
1208     int i;
1209     gni_return_t status;
1210     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1211     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1212     mempool_type *mptr;
1213
1214     status = registerFromMempool(mptr1, msg, size, t, cqh);
1215     if (status == GNI_RC_SUCCESS) return status;
1216 #if CMK_SMP 
1217     for (i=0; i<CmiMyNodeSize()+1; i++) {
1218       rank = (rank+1)%(CmiMyNodeSize()+1);
1219       mptr = CpvAccessOther(mempool, rank);
1220       if (mptr == mptr1) continue;
1221       status = registerFromMempool(mptr, msg, size, t, cqh);
1222       if (status == GNI_RC_SUCCESS) return status;
1223     }
1224 #endif
1225     return  GNI_RC_ERROR_RESOURCE;
1226 }
1227
1228 inline
1229 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1230 {
1231     MSG_LIST        *msg_tmp;
1232     MallocMsgList(msg_tmp);
1233     msg_tmp->destNode = destNode;
1234     msg_tmp->size   = size;
1235     msg_tmp->msg    = msg;
1236     msg_tmp->tag    = tag;
1237 #if CMK_WITH_STATS
1238     SMSG_CREATION(msg_tmp)
1239 #endif
1240
1241 #if ONE_SEND_QUEUE
1242     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1243 #else
1244 #if SMP_LOCKS
1245     CmiLock(queue->smsg_msglist_index[destNode].lock);
1246     if(queue->smsg_msglist_index[destNode].pushed == 0)
1247     {
1248         PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1249     }
1250     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1251     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1252 #else
1253     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1254 #endif
1255 #endif
1256
1257 #if PRINT_SYH
1258     buffered_smsg_counter++;
1259 #endif
1260 }
1261
1262 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1263 {
1264     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1265 }
1266
1267 inline
1268 static void setup_smsg_connection(int destNode)
1269 {
1270     mdh_addr_list_t  *new_entry = 0;
1271     gni_post_descriptor_t *pd;
1272     gni_smsg_attr_t      *smsg_attr;
1273     gni_return_t status = GNI_RC_NOT_DONE;
1274     RDMA_REQUEST        *rdma_request_msg;
1275     
1276     if(smsg_available_slot == smsg_expand_slots)
1277     {
1278         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1279         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1280         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1281
1282         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1283             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1284             GNI_MEM_READWRITE,   
1285             -1,
1286             &(new_entry->mdh));
1287         smsg_available_slot = 0; 
1288         new_entry->next = smsg_dynamic_list;
1289         smsg_dynamic_list = new_entry;
1290     }
1291     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1292     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1293     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1294     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1295     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1296     smsg_attr->buff_size = smsg_memlen;
1297     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1298     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1299     smsg_local_attr_vec[destNode] = smsg_attr;
1300     smsg_available_slot++;
1301     MallocPostDesc(pd);
1302     pd->type            = GNI_POST_FMA_PUT;
1303     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1304     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1305     pd->length          = sizeof(gni_smsg_attr_t);
1306     pd->local_addr      = (uint64_t) smsg_attr;
1307     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1308     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1309     pd->src_cq_hndl     = 0;
1310
1311     pd->rdma_mode       = 0;
1312     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1313     print_smsg_attr(smsg_attr);
1314     if(status == GNI_RC_ERROR_RESOURCE )
1315     {
1316         MallocRdmaRequest(rdma_request_msg);
1317         rdma_request_msg->destNode = destNode;
1318         rdma_request_msg->pd = pd;
1319         /* buffer this request */
1320     }
1321 #if PRINT_SYH
1322     if(status != GNI_RC_SUCCESS)
1323        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1324     else
1325         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1326 #endif
1327 }
1328
1329 /* useDynamicSMSG */
1330 inline 
1331 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1332 {
1333     gni_return_t status = GNI_RC_NOT_DONE;
1334
1335     if(mailbox_list->offset == mailbox_list->size)
1336     {
1337         dynamic_smsg_mailbox_t *new_mailbox_entry;
1338         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1339         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1340         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1341         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1342         new_mailbox_entry->offset = 0;
1343         
1344         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1345             new_mailbox_entry->size, smsg_rx_cqh,
1346             GNI_MEM_READWRITE,   
1347             -1,
1348             &(new_mailbox_entry->mem_hndl));
1349
1350         GNI_RC_CHECK("register", status);
1351         new_mailbox_entry->next = mailbox_list;
1352         mailbox_list = new_mailbox_entry;
1353     }
1354     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1355     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1356     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1357     local_smsg_attr->mbox_offset = mailbox_list->offset;
1358     mailbox_list->offset += smsg_memlen;
1359     local_smsg_attr->buff_size = smsg_memlen;
1360     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1361     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1362 }
1363
1364 /* useDynamicSMSG */
1365 inline 
1366 static int connect_to(int destNode)
1367 {
1368     gni_return_t status = GNI_RC_NOT_DONE;
1369     CmiAssert(smsg_connected_flag[destNode] == 0);
1370     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1371     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1372     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1373     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1374     
1375     CMI_GNI_LOCK(global_gni_lock)
1376     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1377     CMI_GNI_UNLOCK(global_gni_lock)
1378     if (status == GNI_RC_ERROR_RESOURCE) {
1379       /* possibly destNode is making connection at the same time */
1380       free(smsg_attr_vector_local[destNode]);
1381       smsg_attr_vector_local[destNode] = NULL;
1382       free(smsg_attr_vector_remote[destNode]);
1383       smsg_attr_vector_remote[destNode] = NULL;
1384       mailbox_list->offset -= smsg_memlen;
1385 #if PRINT_SYH
1386     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1387 #endif
1388       return 0;
1389     }
1390     GNI_RC_CHECK("GNI_Post", status);
1391     smsg_connected_flag[destNode] = 1;
1392 #if PRINT_SYH
1393     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1394 #endif
1395     return 1;
1396 }
1397
1398 inline 
1399 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1400 {
1401     unsigned int          remote_address;
1402     uint32_t              remote_id;
1403     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1404     gni_smsg_attr_t       *smsg_attr;
1405     gni_post_descriptor_t *pd;
1406     gni_post_state_t      post_state;
1407     char                  *real_data; 
1408
1409     if (useDynamicSMSG) {
1410         switch (smsg_connected_flag[destNode]) {
1411         case 0: 
1412             connect_to(destNode);         /* continue to case 1 */
1413         case 1:                           /* pending connection, do nothing */
1414             status = GNI_RC_NOT_DONE;
1415             if(inbuff ==0)
1416                 buffer_small_msgs(queue, msg, size, destNode, tag);
1417             return status;
1418         }
1419     }
1420 #if ! ONE_SEND_QUEUE
1421     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1422 #endif
1423     {
1424         //CMI_GNI_LOCK(smsg_mailbox_lock)
1425         CMI_GNI_LOCK(default_tx_cq_lock)
1426 #if CMK_SMP_TRACE_COMMTHREAD
1427         int oldpe = -1;
1428         int oldeventid = -1;
1429         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
1430         { 
1431             START_EVENT();
1432             if ( tag == SMALL_DATA_TAG)
1433                 real_data = (char*)msg; 
1434             else 
1435                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1436             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1437             TRACE_COMM_SET_COMM_MSGID(real_data);
1438         }
1439 #endif
1440 #if REMOTE_EVENT
1441         if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) {
1442             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1443             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1444                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1445         }
1446         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1447 #endif
1448 #if     CMK_WITH_STATS
1449         SMSG_TRY_SEND(tag)
1450 #endif
1451 #if CMK_WITH_STATS
1452     double              creation_time;
1453     if (ptr == NULL)
1454         creation_time = CmiWallTimer();
1455     else
1456         creation_time = ptr->creation_time;
1457 #endif
1458
1459     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1460 #if CMK_SMP_TRACE_COMMTHREAD
1461         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1462 #endif
1463         CMI_GNI_UNLOCK(default_tx_cq_lock)
1464         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1465         if(status == GNI_RC_SUCCESS)
1466         {
1467 #if     CMK_WITH_STATS
1468             SMSG_SENT_DONE(creation_time,tag) 
1469 #endif
1470 #if CMK_SMP_TRACE_COMMTHREAD
1471             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
1472             { 
1473                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1474             }
1475 #endif
1476         }else
1477             status = GNI_RC_ERROR_RESOURCE;
1478     }
1479     if(status != GNI_RC_SUCCESS && inbuff ==0)
1480         buffer_small_msgs(queue, msg, size, destNode, tag);
1481     return status;
1482 }
1483
1484 inline 
1485 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1486 {
1487     /* construct a control message and send */
1488     CONTROL_MSG         *control_msg_tmp;
1489     MallocControlMsg(control_msg_tmp);
1490     control_msg_tmp->source_addr = (uint64_t)msg;
1491     control_msg_tmp->seq_id    = seqno;
1492     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1493 #if REMOTE_EVENT
1494     control_msg_tmp->ack_index    =  -1;
1495 #endif
1496 #if     USE_LRTS_MEMPOOL
1497     if(size < BIG_MSG)
1498     {
1499         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1500     }
1501     else
1502     {
1503         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1504         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1505         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1506     }
1507 #else
1508     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1509 #endif
1510     return control_msg_tmp;
1511 }
1512
1513 #define BLOCKING_SEND_CONTROL    0
1514
1515 // Large message, send control to receiver, receiver register memory and do a GET, 
1516 // return 1 - send no success
1517 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr, uint8_t lmsg_tag)
1518 {
1519     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1520     uint32_t            vmdh_index  = -1;
1521     int                 size;
1522     int                 offset = 0;
1523     uint64_t            source_addr;
1524     int                 register_size; 
1525     void                *msg;
1526
1527     size    =   control_msg_tmp->total_length;
1528     source_addr = control_msg_tmp->source_addr;
1529     register_size = control_msg_tmp->length;
1530
1531 #if  USE_LRTS_MEMPOOL
1532     if( control_msg_tmp->seq_id == 0 ){
1533 #if BLOCKING_SEND_CONTROL
1534         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1535             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1536                 LrtsAdvanceCommunication(0);
1537         }
1538 #endif
1539         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1540         {
1541             msg = (void*)source_addr;
1542             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1543             {
1544                 if(!inbuff)
1545                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1546                 return GNI_RC_ERROR_NOMEM;
1547             }
1548             //register the corresponding mempool
1549             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1550             if(status == GNI_RC_SUCCESS)
1551             {
1552                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1553             }
1554         }else
1555         {
1556             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1557             status = GNI_RC_SUCCESS;
1558         }
1559         if(NoMsgInSend(source_addr))
1560             register_size = GetMempoolsize((void*)(source_addr));
1561         else
1562             register_size = 0;
1563     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1564     {
1565         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1566         source_addr += offset;
1567         size = control_msg_tmp->length;
1568 #if BLOCKING_SEND_CONTROL
1569         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1570             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1571                 LrtsAdvanceCommunication(0);
1572         }
1573 #endif
1574         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1575             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1576             {
1577                 if(!inbuff)
1578                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1579                 return GNI_RC_ERROR_NOMEM;
1580             }
1581             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1582             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1583         }
1584         else
1585         {
1586             status = GNI_RC_SUCCESS;
1587         }
1588         register_size = 0;  
1589     }
1590
1591 #if CMI_EXERT_SEND_LARGE_CAP
1592     if(SEND_large_pending >= SEND_large_cap)
1593     {
1594         status = GNI_RC_ERROR_NOMEM;
1595     }
1596 #endif
1597  
1598     if(status == GNI_RC_SUCCESS)
1599     {
1600        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, inbuff, smsg_ptr); 
1601         if(status == GNI_RC_SUCCESS)
1602         {
1603 #if CMI_EXERT_SEND_LARGE_CAP
1604             SEND_large_pending++;
1605 #endif
1606             buffered_send_msg += register_size;
1607             if(control_msg_tmp->seq_id == 0)
1608             {
1609                 IncreaseMsgInSend(source_addr);
1610             }
1611             FreeControlMsg(control_msg_tmp);
1612             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, lmsg_tag); 
1613         }else
1614             status = GNI_RC_ERROR_RESOURCE;
1615
1616     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1617     {
1618         CmiAbort("Memory registor for large msg\n");
1619     }else 
1620     {
1621         status = GNI_RC_ERROR_NOMEM; 
1622         if(!inbuff)
1623             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1624     }
1625     return status;
1626 #else
1627     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1628     if(status == GNI_RC_SUCCESS)
1629     {
1630         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, 0, NULL);  
1631         if(status == GNI_RC_SUCCESS)
1632         {
1633             FreeControlMsg(control_msg_tmp);
1634         }
1635     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1636     {
1637         CmiAbort("Memory registor for large msg\n");
1638     }else 
1639     {
1640         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1641     }
1642     return status;
1643 #endif
1644 }
1645
1646 inline void LrtsPrepareEnvelope(char *msg, int size)
1647 {
1648     CmiSetMsgSize(msg, size);
1649     CMI_SET_CHECKSUM(msg, size);
1650 }
1651
1652 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1653 {
1654     gni_return_t        status  =   GNI_RC_SUCCESS;
1655     uint8_t tag;
1656     CONTROL_MSG         *control_msg_tmp;
1657     int                 oob = ( mode & OUT_OF_BAND);
1658     SMSG_QUEUE          *queue;
1659
1660     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1661 #if CMK_USE_OOB
1662     queue = oob? &smsg_oob_queue : &smsg_queue;
1663     tag = oob? LMSG_OOB_INIT_TAG: LMSG_INIT_TAG;
1664 #else
1665     queue = &smsg_queue;
1666     tag = LMSG_INIT_TAG;
1667 #endif
1668
1669     LrtsPrepareEnvelope(msg, size);
1670
1671 #if PRINT_SYH
1672     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1673 #endif 
1674
1675 #if CMK_SMP 
1676     if(size <= SMSG_MAX_MSG)
1677         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1678     else if (size < BIG_MSG) {
1679         control_msg_tmp =  construct_control_msg(size, msg, 0);
1680         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1681     }
1682     else {
1683         CmiSetMsgSeq(msg, 0);
1684         control_msg_tmp =  construct_control_msg(size, msg, 1);
1685         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1686     }
1687 #else   //non-smp, smp(worker sending)
1688     if(size <= SMSG_MAX_MSG)
1689     {
1690         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1691             CmiFree(msg);
1692     }
1693     else if (size < BIG_MSG) {
1694         control_msg_tmp =  construct_control_msg(size, msg, 0);
1695         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1696     }
1697     else {
1698 #if     USE_LRTS_MEMPOOL
1699         CmiSetMsgSeq(msg, 0);
1700         control_msg_tmp =  construct_control_msg(size, msg, 1);
1701         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1702 #else
1703         control_msg_tmp =  construct_control_msg(size, msg, 0);
1704         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1705 #endif
1706     }
1707 #endif
1708     return 0;
1709 }
1710
1711 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1712 {
1713   int i;
1714 #if CMK_BROADCAST_USE_CMIREFERENCE
1715   for(i=0;i<npes;i++) {
1716     if (pes[i] == CmiMyPe())
1717       CmiSyncSend(pes[i], len, msg);
1718     else {
1719       CmiReference(msg);
1720       CmiSyncSendAndFree(pes[i], len, msg);
1721     }
1722   }
1723 #else
1724   for(i=0;i<npes;i++) {
1725     CmiSyncSend(pes[i], len, msg);
1726   }
1727 #endif
1728 }
1729
1730 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1731 {
1732   /* A better asynchronous implementation may be wanted, but at least it works */
1733   CmiSyncListSendFn(npes, pes, len, msg);
1734   return (CmiCommHandle) 0;
1735 }
1736
1737 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1738 {
1739   if (npes == 1) {
1740       CmiSyncSendAndFree(pes[0], len, msg);
1741       return;
1742   }
1743 #if CMK_PERSISTENT_COMM
1744   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE 
1745 #if CMK_SMP
1746             && IS_PERSISTENT_MEMORY(msg)
1747 #endif
1748      ){
1749       int i;
1750       for(i=0;i<npes;i++) {
1751         if (pes[i] == CmiMyPe())
1752           CmiSyncSend(pes[i], len, msg);
1753         else {
1754           CmiReference(msg);
1755           CmiSyncSendAndFree(pes[i], len, msg);
1756         }
1757       }
1758       CmiFree(msg);
1759       return;
1760   }
1761 #endif
1762   
1763 #if CMK_BROADCAST_USE_CMIREFERENCE
1764   CmiSyncListSendFn(npes, pes, len, msg);
1765   CmiFree(msg);
1766 #else
1767   int i;
1768   for(i=0;i<npes-1;i++) {
1769     CmiSyncSend(pes[i], len, msg);
1770   }
1771   if (npes>0)
1772     CmiSyncSendAndFree(pes[npes-1], len, msg);
1773   else 
1774     CmiFree(msg);
1775 #endif
1776 }
1777
1778 static void    PumpDatagramConnection();
1779 static      int         event_SetupConnect = 111;
1780 static      int         event_PumpSmsg = 222 ;
1781 static      int         event_PumpTransaction = 333;
1782 static      int         event_PumpRdmaTransaction = 444;
1783 static      int         event_SendBufferSmsg = 484;
1784 static      int         event_SendFmaRdmaMsg = 555;
1785 static      int         event_AdvanceCommunication = 666;
1786
1787 static void registerUserTraceEvents() {
1788 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1789     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1790     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1791     event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1792     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1793     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1794     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1795     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1796 #endif
1797 }
1798
1799 static void ProcessDeadlock()
1800 {
1801     static CmiUInt8 *ptr = NULL;
1802     static CmiUInt8  last = 0, mysum, sum;
1803     static int count = 0;
1804     gni_return_t status;
1805     int i;
1806
1807 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1808 //sweep_mempool(CpvAccess(mempool));
1809     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1810     mysum = smsg_send_count + smsg_recv_count;
1811     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1812     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1813     GNI_RC_CHECK("PMI_Allgather", status);
1814     sum = 0;
1815     for (i=0; i<mysize; i++)  sum+= ptr[i];
1816     if (last == 0 || sum == last) 
1817         count++;
1818     else
1819         count = 0;
1820     last = sum;
1821     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1822     if (count == 2) { 
1823         /* detected twice, it is a real deadlock */
1824         if (myrank == 0)  {
1825             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1826             CmiAbort("Fatal> Deadlock detected.");
1827         }
1828
1829     }
1830     _detected_hang = 0;
1831 }
1832
1833 static void CheckProgress()
1834 {
1835     if (smsg_send_count == last_smsg_send_count &&
1836         smsg_recv_count == last_smsg_recv_count ) 
1837     {
1838         _detected_hang = 1;
1839 #if !CMK_SMP
1840         if (_detected_hang) ProcessDeadlock();
1841 #endif
1842
1843     }
1844     else {
1845         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1846         last_smsg_send_count = smsg_send_count;
1847         last_smsg_recv_count = smsg_recv_count;
1848         _detected_hang = 0;
1849     }
1850 }
1851
1852 static void set_limit()
1853 {
1854     //if (!user_set_flag && CmiMyRank() == 0) {
1855     if (CmiMyRank() == 0) {
1856         int mynode = CmiPhysicalNodeID(CmiMyPe());
1857         int numpes = CmiNumPesOnPhysicalNode(mynode);
1858         int numprocesses = numpes / CmiMyNodeSize();
1859         MAX_REG_MEM  = _totalmem / numprocesses;
1860         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1861         if (CmiMyPe() == 0)
1862            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1863         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1864         {
1865              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1866              CmiAbort("memory registration\n");
1867         }
1868     }
1869 }
1870
1871 void LrtsPostCommonInit(int everReturn)
1872 {
1873 #if CMK_DIRECT
1874     CmiDirectInit();
1875 #endif
1876 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1877     CpvInitialize(double, projTraceStart);
1878     /* only PE 0 needs to care about registration (to generate sts file). */
1879     //if (CmiMyPe() == 0) 
1880     {
1881         registerMachineUserEventsFunction(&registerUserTraceEvents);
1882     }
1883 #endif
1884
1885 #if CMK_SMP
1886     CmiIdleState *s=CmiNotifyGetState();
1887     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1888     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1889 #else
1890     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1891     if (useDynamicSMSG)
1892     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1893 #endif
1894
1895 #if ! LARGEPAGE
1896     if (_checkProgress)
1897 #if CMK_SMP
1898     if (CmiMyRank() == 0)
1899 #endif
1900     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1901 #endif
1902  
1903 #if !LARGEPAGE
1904     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1905 #endif
1906 }
1907
1908 /* this is called by worker thread */
1909 void LrtsPostNonLocal()
1910 {
1911 #if 1
1912
1913 #if CMK_SMP_TRACE_COMMTHREAD
1914     double startT, endT;
1915 #endif
1916
1917 #if MULTI_THREAD_SEND
1918     if(mysize == 1) return;
1919
1920     if (CmiMyRank() % 6 != 3) return;
1921
1922 #if CMK_SMP_TRACE_COMMTHREAD
1923     traceEndIdle();
1924     startT = CmiWallTimer();
1925 #endif
1926
1927     CmiMachineProgressImpl();
1928
1929 #if CMK_SMP_TRACE_COMMTHREAD
1930     endT = CmiWallTimer();
1931     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1932     traceBeginIdle();
1933 #endif
1934
1935 #endif
1936 #endif
1937 }
1938
1939 /* Network progress function is used to poll the network when for
1940    messages. This flushes receive buffers on some  implementations*/
1941 #if CMK_MACHINE_PROGRESS_DEFINED
1942 void CmiMachineProgressImpl() {
1943 #if ! CMK_SMP || MULTI_THREAD_SEND
1944
1945     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
1946     SEND_OOB_SMSG(smsg_oob_queue)
1947     PUMP_REMOTE_HIGHPRIORITY
1948     PUMP_LOCAL_HIGHPRIORITY
1949     POST_HIGHPRIORITY_RDMA
1950
1951 #if 0
1952 #if CMK_WORKER_SINGLE_TASK
1953     if (CmiMyRank() % 6 == 0)
1954 #endif
1955     PumpNetworkSmsg();
1956
1957 #if CMK_WORKER_SINGLE_TASK
1958     if (CmiMyRank() % 6 == 1)
1959 #endif
1960     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1961
1962 #if CMK_WORKER_SINGLE_TASK
1963     if (CmiMyRank() % 6 == 2)
1964 #endif
1965     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1966
1967 #if REMOTE_EVENT
1968 #if CMK_WORKER_SINGLE_TASK
1969     if (CmiMyRank() % 6 == 3)
1970 #endif
1971     PumpRemoteTransactions(rdma_rx_cqh);         // rdma_rx_cqh
1972 #endif
1973
1974 #if CMK_WORKER_SINGLE_TASK
1975     if (CmiMyRank() % 6 == 4)
1976 #endif
1977     {
1978 #if CMK_USE_OOB
1979     SendBufferMsg(&smsg_oob_queue, NULL);
1980     SendBufferMsg(&smsg_queue, &smsg_oob_queue);
1981 #else
1982     SendBufferMsg(&smsg_queue, NULL);
1983 #endif
1984     }
1985
1986 #if CMK_WORKER_SINGLE_TASK
1987     if (CmiMyRank() % 6 == 5)
1988 #endif
1989 #if CMK_SMP
1990     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
1991 #else
1992     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
1993 #endif
1994
1995 #endif
1996 #endif
1997 }
1998 #endif
1999
2000
2001 /* useDynamicSMSG */
2002 static void    PumpDatagramConnection()
2003 {
2004     uint32_t          remote_address;
2005     uint32_t          remote_id;
2006     gni_return_t status;
2007     gni_post_state_t  post_state;
2008     uint64_t          datagram_id;
2009     int i;
2010
2011    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2012    {
2013        if (datagram_id >= mysize) {           /* bound endpoint */
2014            int pe = datagram_id - mysize;
2015            CMI_GNI_LOCK(global_gni_lock)
2016            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2017            CMI_GNI_UNLOCK(global_gni_lock)
2018            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2019            {
2020                CmiAssert(remote_id == pe);
2021                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2022                GNI_RC_CHECK("Dynamic SMSG Init", status);
2023 #if PRINT_SYH
2024                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2025 #endif
2026                CmiAssert(smsg_connected_flag[pe] == 1);
2027                smsg_connected_flag[pe] = 2;
2028            }
2029        }
2030        else {         /* unbound ep */
2031            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2032            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2033            {
2034                CmiAssert(remote_id<mysize);
2035                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2036                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2037                GNI_RC_CHECK("Dynamic SMSG Init", status);
2038 #if PRINT_SYH
2039                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2040 #endif
2041                smsg_connected_flag[remote_id] = 2;
2042
2043                alloc_smsg_attr(&send_smsg_attr);
2044                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2045                GNI_RC_CHECK("post unbound datagram", status);
2046            }
2047        }
2048    }
2049 }
2050
2051 /* pooling CQ to receive network message */
2052 static void PumpNetworkRdmaMsgs()
2053 {
2054     gni_cq_entry_t      event_data;
2055     gni_return_t        status;
2056
2057 }
2058
2059 inline 
2060 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2061 {
2062     RDMA_REQUEST        *rdma_request_msg;
2063     MallocRdmaRequest(rdma_request_msg);
2064     rdma_request_msg->destNode = inst_id;
2065     rdma_request_msg->pd = pd;
2066 #if REMOTE_EVENT
2067     rdma_request_msg->ack_index = ack_index;
2068 #endif
2069     PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2070 }
2071
2072 static void getLargeMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2073
2074 static void PumpNetworkSmsg()
2075 {
2076     uint64_t            inst_id;
2077     gni_cq_entry_t      event_data;
2078     gni_return_t        status;
2079     void                *header;
2080     uint8_t             msg_tag;
2081     int                 msg_nbytes;
2082     void                *msg_data;
2083     gni_mem_handle_t    msg_mem_hndl;
2084     gni_smsg_attr_t     *smsg_attr;
2085     gni_smsg_attr_t     *remote_smsg_attr;
2086     int                 init_flag;
2087     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2088     uint64_t            source_addr;
2089     SMSG_QUEUE         *queue = &smsg_queue;
2090     PCQueue             tmp_queue;
2091 #if  CMK_DIRECT
2092     cmidirectMsg        *direct_msg;
2093 #endif
2094 #if CMI_PUMPNETWORKSMSG_CAP 
2095     int                  recv_cnt = 0;
2096     while(recv_cnt< PumpNetworkSmsg_cap) {
2097 #else
2098     while(1) {
2099 #endif
2100         CMI_GNI_LOCK(smsg_rx_cq_lock)
2101         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2102         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2103         if(status != GNI_RC_SUCCESS) break;
2104
2105         inst_id = GNI_CQ_GET_INST_ID(event_data);
2106 #if REMOTE_EVENT
2107         inst_id = GET_RANK(inst_id);      /* important */
2108 #endif
2109         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2110 #if PRINT_SYH
2111         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2112 #endif
2113         if (useDynamicSMSG) {
2114             /* subtle: smsg may come before connection is setup */
2115             while (smsg_connected_flag[inst_id] != 2) 
2116                PumpDatagramConnection();
2117         }
2118         msg_tag = GNI_SMSG_ANY_TAG;
2119         while(1) {
2120             CMI_GNI_LOCK(smsg_mailbox_lock)
2121             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2122             if (status != GNI_RC_SUCCESS)
2123             {
2124                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2125                 break;
2126             }
2127 #if         CMI_PUMPNETWORKSMSG_CAP
2128             recv_cnt++; 
2129 #endif
2130 #if PRINT_SYH
2131             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2132 #endif
2133             /* copy msg out and then put into queue (small message) */
2134             switch (msg_tag) {
2135             case SMALL_DATA_TAG:
2136             {
2137                 START_EVENT();
2138                 msg_nbytes = CmiGetMsgSize(header);
2139                 msg_data    = CmiAlloc(msg_nbytes);
2140                 memcpy(msg_data, (char*)header, msg_nbytes);
2141                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2142                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2143                 TRACE_COMM_CREATION(EVENT_TIME(), msg_data);
2144                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2145                 handleOneRecvedMsg(msg_nbytes, msg_data);
2146                 break;
2147             }
2148             case LMSG_INIT_TAG:
2149             case LMSG_OOB_INIT_TAG:
2150             {
2151                 tmp_queue = (msg_tag == LMSG_INIT_TAG)? sendRdmaBuf : sendHighPriorBuf; 
2152 #if MULTI_THREAD_SEND
2153                 MallocControlMsg(control_msg_tmp);
2154                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2155                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2156                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2157                 getLargeMsgRequest(control_msg_tmp, inst_id, msg_tag, tmp_queue);
2158                 FreeControlMsg(control_msg_tmp);
2159 #else
2160                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2161                 getLargeMsgRequest(header, inst_id, msg_tag, tmp_queue);
2162                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2163 #endif
2164                 break;
2165             }
2166 #if !REMOTE_EVENT && !CQWRITE
2167             case ACK_TAG:   //msg fit into mempool
2168             {
2169                 /* Get is done, release message . Now put is not used yet*/
2170                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2171                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2172                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2173 #if ! USE_LRTS_MEMPOOL
2174                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2175 #else
2176                 DecreaseMsgInSend(msg);
2177 #endif
2178                 if(NoMsgInSend(msg))
2179                     buffered_send_msg -= GetMempoolsize(msg);
2180                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2181                 CmiFree(msg);
2182 #if CMI_EXERT_SEND_LARGE_CAP
2183                 SEND_large_pending--;
2184 #endif
2185                 break;
2186             }
2187 #endif
2188             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2189             {
2190 #if MULTI_THREAD_SEND
2191                 MallocControlMsg(header_tmp);
2192                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2193                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2194 #else
2195                 header_tmp = (CONTROL_MSG *) header;
2196 #endif
2197                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2198 #if CMI_EXERT_SEND_LARGE_CAP
2199                 SEND_large_pending--;
2200 #endif
2201                 void *msg = (void*)(header_tmp->source_addr);
2202                 int cur_seq = CmiGetMsgSeq(msg);
2203                 int offset = ONE_SEG*(cur_seq+1);
2204                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2205                 buffered_send_msg -= header_tmp->length;
2206                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2207                 if (remain_size < 0) remain_size = 0;
2208                 CmiSetMsgSize(msg, remain_size);
2209                 if(remain_size <= 0) //transaction done
2210                 {
2211                     CmiFree(msg);
2212                 }else if (header_tmp->total_length > offset)
2213                 {
2214                     CmiSetMsgSeq(msg, cur_seq+1);
2215                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2216                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2217                     //send next seg
2218                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2219                          // pipelining
2220                     if (header_tmp->seq_id == 1) {
2221                       int i;
2222                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2223                         int seq = cur_seq+i+2;
2224                         CmiSetMsgSeq(msg, seq-1);
2225                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2226                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2227                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2228                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2229                       }
2230                     }
2231                 }
2232 #if MULTI_THREAD_SEND
2233                 FreeControlMsg(header_tmp);
2234 #else
2235                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2236 #endif
2237                 break;
2238             }
2239 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2240             case PUT_DONE_TAG:  {   //persistent message
2241                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2242                 int size = ((CONTROL_MSG *) header)->length;
2243                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2244                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2245                 CmiReference(msg);
2246                 CMI_CHECK_CHECKSUM(msg, size);
2247                 handleOneRecvedMsg(size, msg); 
2248 #if PRINT_SYH
2249                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2250 #endif
2251                 break;
2252             }
2253 #endif
2254 #if CMK_DIRECT
2255             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2256                 //create a trigger message
2257                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2258                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2259                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2260                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2261                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2262                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2263                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2264                 break;
2265 #endif
2266             default:
2267                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2268                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2269                 printf("weird tag problem\n");
2270                 CmiAbort("Unknown tag\n");
2271             }               // end switch
2272 #if PRINT_SYH
2273             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2274 #endif
2275             smsg_recv_count ++;
2276             msg_tag = GNI_SMSG_ANY_TAG;
2277         } //endwhile GNI_SmsgGetNextWTag
2278     }   //end while GetEvent
2279     if(status == GNI_RC_ERROR_RESOURCE)
2280     {
2281         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2282         GNI_RC_CHECK("Smsg_rx_cq full", status);
2283     }
2284 }
2285
2286 static void printDesc(gni_post_descriptor_t *pd)
2287 {
2288     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2289 }
2290
2291 #if CQWRITE
2292 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2293 {
2294     gni_post_descriptor_t *pd;
2295     gni_return_t        status = GNI_RC_SUCCESS;
2296     
2297     MallocPostDesc(pd);
2298     pd->type = GNI_POST_CQWRITE;
2299     pd->cq_mode = GNI_CQMODE_SILENT;
2300     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2301     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2302     pd->cqwrite_value = data;
2303     pd->remote_mem_hndl = mem_hndl;
2304     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2305     GNI_RC_CHECK("GNI_PostCqWrite", status);
2306 }
2307 #endif
2308
2309 // register memory for a message
2310 // return mem handle
2311 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2312 {
2313     gni_return_t status = GNI_RC_SUCCESS;
2314
2315     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2316
2317 #if CMK_PERSISTENT_COMM
2318       // persistent message is always registered
2319       // BIG_MSG small pieces do not have malloc chunk header
2320     if (IS_PERSISTENT_MEMORY(msg)) {
2321         *memh = GetMemHndl(msg);
2322         return GNI_RC_SUCCESS;
2323     }
2324 #endif
2325     if(seqno == 0 
2326 #if CMK_PERSISTENT_COMM
2327          || seqno == PERSIST_SEQ
2328 #endif
2329       )
2330     {
2331         if(IsMemHndlZero((GetMemHndl(msg))))
2332         {
2333             msg = (void*)(msg);
2334             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2335             if(status == GNI_RC_SUCCESS)
2336                 *memh = GetMemHndl(msg);
2337         }
2338         else {
2339             *memh = GetMemHndl(msg);
2340         }
2341     }
2342     else {
2343         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2344         status = registerMemory(msg, size, memh, NULL); 
2345     }
2346     return status;
2347 }
2348
2349 // for BIG_MSG called on receiver side for receiving control message
2350 // LMSG_INIT_TAG
2351 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2352 {
2353 #if     USE_LRTS_MEMPOOL
2354     CONTROL_MSG         *request_msg;
2355     gni_return_t        status = GNI_RC_SUCCESS;
2356     void                *msg_data;
2357     gni_post_descriptor_t *pd;
2358     gni_mem_handle_t    msg_mem_hndl;
2359     int                 size, transaction_size, offset = 0;
2360     size_t              register_size = 0;
2361
2362     // initial a get to transfer data from the sender side */
2363     request_msg = (CONTROL_MSG *) header;
2364     size = request_msg->total_length;
2365     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2366     MallocPostDesc(pd);
2367 #if CMK_WITH_STATS 
2368     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2369 #endif
2370     if(request_msg->seq_id < 2)   {
2371 #if CMK_SMP_TRACE_COMMTHREAD 
2372         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2373 #endif
2374         msg_data = CmiAlloc(size);
2375         CmiSetMsgSeq(msg_data, 0);
2376         _MEMCHECK(msg_data);
2377     }
2378     else {
2379         offset = ONE_SEG*(request_msg->seq_id-1);
2380         msg_data = (char*)request_msg->dest_addr + offset;
2381     }
2382    
2383     pd->cqwrite_value = request_msg->seq_id;
2384
2385     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2386     SetMemHndlZero(pd->local_mem_hndl);
2387     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2388     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2389         if(NoMsgInRecv( (void*)(msg_data)))
2390             register_size = GetMempoolsize((void*)(msg_data));
2391     }
2392
2393     pd->first_operand = ALIGN64(size);                   //  total length
2394
2395     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2396         pd->type            = GNI_POST_FMA_GET;
2397     else
2398         pd->type            = GNI_POST_RDMA_GET;
2399     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2400     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2401     pd->length          = transaction_size;
2402     pd->local_addr      = (uint64_t) msg_data;
2403     pd->remote_addr     = request_msg->source_addr + offset;
2404     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2405
2406     if (tag == LMSG_OOB_INIT_TAG) 
2407         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2408     else
2409     {
2410 #if MULTI_THREAD_SEND
2411         pd->src_cq_hndl     = rdma_tx_cqh;
2412 #else
2413         pd->src_cq_hndl     = 0;
2414 #endif
2415     }
2416
2417     pd->rdma_mode       = 0;
2418     pd->amo_cmd         = 0;
2419 #if CMI_EXERT_RECV_RDMA_CAP
2420     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2421 #endif
2422     //memory registration success
2423     if(status == GNI_RC_SUCCESS && tag == LMSG_OOB_INIT_TAG )
2424     {
2425         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2426         CMI_GNI_LOCK(lock)
2427 #if REMOTE_EVENT
2428         if( request_msg->seq_id == 0)
2429         {
2430             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2431             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2432             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2433         }
2434 #endif
2435
2436 #if CMK_WITH_STATS
2437         RDMA_TRY_SEND(pd->type)
2438 #endif
2439         if(pd->type == GNI_POST_RDMA_GET) 
2440         {
2441             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2442         }
2443         else
2444         {
2445             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2446         }
2447         CMI_GNI_UNLOCK(lock)
2448
2449         if(status == GNI_RC_SUCCESS )
2450         {
2451 #if CMI_EXERT_RECV_RDMA_CAP
2452             RDMA_pending++;
2453 #endif
2454             if(pd->cqwrite_value == 0)
2455             {
2456 #if MACHINE_DEBUG_LOG
2457                 buffered_recv_msg += register_size;
2458                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2459 #endif
2460                 IncreaseMsgInRecv(msg_data);
2461 #if CMK_SMP_TRACE_COMMTHREAD 
2462                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2463 #endif
2464             }
2465 #if  CMK_WITH_STATS
2466             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2467             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2468 #endif
2469         }
2470     }else if (status != GNI_RC_SUCCESS)
2471     {
2472         SetMemHndlZero((pd->local_mem_hndl));
2473     }
2474         if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM || tag != LMSG_OOB_INIT_TAG)
2475     {
2476 #if REMOTE_EVENT
2477         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2478 #else
2479         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2480 #endif
2481     }else if (status != GNI_RC_SUCCESS) {
2482         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2483         GNI_RC_CHECK("GetLargeAFter posting", status);
2484     }
2485 #else
2486     CONTROL_MSG         *request_msg;
2487     gni_return_t        status;
2488     void                *msg_data;
2489     gni_post_descriptor_t *pd;
2490     RDMA_REQUEST        *rdma_request_msg;
2491     gni_mem_handle_t    msg_mem_hndl;
2492     //int source;
2493     // initial a get to transfer data from the sender side */
2494     request_msg = (CONTROL_MSG *) header;
2495     msg_data = CmiAlloc(request_msg->length);
2496     _MEMCHECK(msg_data);
2497
2498     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2499
2500     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2501     {
2502         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2503     }
2504
2505     MallocPostDesc(pd);
2506     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2507         pd->type            = GNI_POST_FMA_GET;
2508     else
2509         pd->type            = GNI_POST_RDMA_GET;
2510     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2511     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2512     pd->length          = ALIGN64(request_msg->length);
2513     pd->local_addr      = (uint64_t) msg_data;
2514     pd->remote_addr     = request_msg->source_addr;
2515     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2516     if (tag == LMSG_OOB_INIT_TAG) 
2517         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2518     else
2519     {
2520 #if MULTI_THREAD_SEND
2521         pd->src_cq_hndl     = rdma_tx_cqh;
2522 #else
2523         pd->src_cq_hndl     = 0;
2524 #endif
2525     }
2526     pd->rdma_mode       = 0;
2527     pd->amo_cmd         = 0;
2528
2529     //memory registration successful
2530     if(status == GNI_RC_SUCCESS)
2531     {
2532         pd->local_mem_hndl  = msg_mem_hndl;
2533        
2534         if(pd->type == GNI_POST_RDMA_GET) 
2535         {
2536             CMI_GNI_LOCK(rdma_tx_cq_lock)
2537             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2538             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2539         }
2540         else
2541         {
2542             CMI_GNI_LOCK(default_tx_cq_lock)
2543             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2544             CMI_GNI_UNLOCK(default_tx_cq_lock)
2545         }
2546
2547     }else
2548     {
2549         SetMemHndlZero(pd->local_mem_hndl);
2550     }
2551     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2552     {
2553         MallocRdmaRequest(rdma_request_msg);
2554         rdma_request_msg->next = 0;
2555         rdma_request_msg->destNode = inst_id;
2556         rdma_request_msg->pd = pd;
2557         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2558     }else {
2559         GNI_RC_CHECK("AFter posting", status);
2560     }
2561 #endif
2562 }
2563
2564 #if CQWRITE
2565 static void PumpCqWriteTransactions()
2566 {
2567
2568     gni_cq_entry_t          ev;
2569     gni_return_t            status;
2570     void                    *msg;  
2571     int                     msg_size;
2572     while(1) {
2573         //CMI_GNI_LOCK(my_cq_lock) 
2574         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2575         //CMI_GNI_UNLOCK(my_cq_lock)
2576         if(status != GNI_RC_SUCCESS) break;
2577         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2578 #if CMK_PERSISTENT_COMM
2579 #if PRINT_SYH
2580         printf(" %d CQ write event %p\n", myrank, msg);
2581 #endif
2582         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2583 #if PRINT_SYH
2584             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2585 #endif
2586             CmiReference(msg);
2587             msg_size = CmiGetMsgSize(msg);
2588             CMI_CHECK_CHECKSUM(msg, msg_size);
2589             handleOneRecvedMsg(msg_size, msg); 
2590             continue;
2591         }
2592 #endif
2593 #if ! USE_LRTS_MEMPOOL
2594        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2595 #else
2596         DecreaseMsgInSend(msg);
2597 #endif
2598         if(NoMsgInSend(msg))
2599             buffered_send_msg -= GetMempoolsize(msg);
2600         CmiFree(msg);
2601     };
2602     if(status == GNI_RC_ERROR_RESOURCE)
2603     {
2604         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2605     }
2606 }
2607 #endif
2608
2609 #if REMOTE_EVENT
2610 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2611 {
2612     gni_cq_entry_t          ev;
2613     gni_return_t            status;
2614     void                    *msg;   
2615     int                     inst_id, index, type, size;
2616
2617 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2618     int                     pump_count = 0;
2619 #endif
2620     while(1) {
2621 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2622         if (pump_count > PumpRemoteTransactions_cap) break;
2623 #endif
2624         CMI_GNI_LOCK(global_gni_lock)
2625 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
2626         status = GNI_CqGetEvent(rx_cqh, &ev);
2627 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2628         CMI_GNI_UNLOCK(global_gni_lock)
2629
2630         if(status != GNI_RC_SUCCESS) break;
2631
2632 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2633         pump_count ++;
2634 #endif
2635
2636         inst_id = GNI_CQ_GET_INST_ID(ev);
2637         index = GET_INDEX(inst_id);
2638         type = GET_TYPE(inst_id);
2639         switch (type) {
2640         case 0:    // ACK
2641             CmiAssert(index>=0 && index<ackPool.size);
2642             CMI_GNI_LOCK(ackPool.lock);
2643             CmiAssert(GetIndexType(ackPool, index) == 1);
2644             msg = GetIndexAddress(ackPool, index);
2645             CMI_GNI_UNLOCK(ackPool.lock);
2646 #if PRINT_SYH
2647             printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2648 #endif
2649 #if ! USE_LRTS_MEMPOOL
2650            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2651 #else
2652             DecreaseMsgInSend(msg);
2653 #endif
2654             if(NoMsgInSend(msg))
2655                 buffered_send_msg -= GetMempoolsize(msg);
2656             CmiFree(msg);
2657             IndexPool_freeslot(&ackPool, index);
2658 #if CMI_EXERT_SEND_LARGE_CAP
2659             SEND_large_pending--;
2660 #endif
2661             break;
2662 #if CMK_PERSISTENT_COMM
2663         case 1:  {    // PERSISTENT
2664             CmiLock(persistPool.lock);
2665             CmiAssert(GetIndexType(persistPool, index) == 2);
2666             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2667             CmiUnlock(persistPool.lock);
2668             START_EVENT();
2669             msg = slot->destBuf[0].destAddress;
2670             size = CmiGetMsgSize(msg);
2671             CmiReference(msg);
2672             CMI_CHECK_CHECKSUM(msg, size);
2673             TRACE_COMM_CREATION(EVENT_TIME(), msg);
2674             handleOneRecvedMsg(size, msg); 
2675             break;
2676             }
2677 #endif
2678         default:
2679             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2680             CmiAbort("PumpRemoteTransactions: unknown type");
2681         }
2682     }
2683     if(status == GNI_RC_ERROR_RESOURCE)
2684     {
2685         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2686     }
2687 }
2688 #endif
2689
2690 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2691 {
2692     gni_cq_entry_t          ev;
2693     gni_return_t            status;
2694     uint64_t                type, inst_id;
2695     gni_post_descriptor_t   *tmp_pd;
2696     MSG_LIST                *ptr;
2697     CONTROL_MSG             *ack_msg_tmp;
2698     ACK_MSG                 *ack_msg;
2699     uint8_t                 msg_tag;
2700 #if CMK_DIRECT
2701     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2702 #endif
2703     SMSG_QUEUE         *queue = &smsg_queue;
2704 #if CMI_PUMPLOCALTRANSACTIONS_CAP
2705     int         pump_count = 0;
2706     while(pump_count < PumpLocalTransactions_cap) {
2707         pump_count++;
2708 #else
2709     while(1) {
2710 #endif
2711         CMI_GNI_LOCK(my_cq_lock) 
2712         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2713         CMI_GNI_UNLOCK(my_cq_lock)
2714         if(status != GNI_RC_SUCCESS) break;
2715         
2716         type = GNI_CQ_GET_TYPE(ev);
2717         if (type == GNI_CQ_EVENT_TYPE_POST)
2718         {
2719
2720 #if CMI_EXERT_RECV_RDMA_CAP
2721             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2722             RDMA_pending--;
2723 #endif
2724             inst_id     = GNI_CQ_GET_INST_ID(ev);
2725 #if PRINT_SYH
2726             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2727 #endif
2728             CMI_GNI_LOCK(my_cq_lock)
2729             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2730             CMI_GNI_UNLOCK(my_cq_lock)
2731
2732             switch (tmp_pd->type) {
2733 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2734             case GNI_POST_RDMA_PUT:
2735 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2736                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2737 #endif
2738             case GNI_POST_FMA_PUT:
2739                 if(tmp_pd->amo_cmd == 1) {
2740 #if CMK_DIRECT
2741                     //sender ACK to receiver to trigger it is done
2742                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2743                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2744                     msg_tag = DIRECT_PUT_DONE_TAG;
2745 #endif
2746                 }
2747                 else {
2748                     CmiFree((void *)tmp_pd->local_addr);
2749 #if REMOTE_EVENT
2750                     FreePostDesc(tmp_pd);
2751                     continue;
2752 #elif CQWRITE
2753                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2754                     FreePostDesc(tmp_pd);
2755                     continue;
2756 #else
2757                     MallocControlMsg(ack_msg_tmp);
2758                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2759                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2760                     ack_msg_tmp->length  = tmp_pd->length;
2761                     msg_tag = PUT_DONE_TAG;
2762 #endif
2763                 }
2764                 break;
2765 #endif
2766             case GNI_POST_RDMA_GET:
2767             case GNI_POST_FMA_GET:  {
2768 #if  ! USE_LRTS_MEMPOOL
2769                 MallocControlMsg(ack_msg_tmp);
2770                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2771                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2772                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2773                 msg_tag = ACK_TAG;  
2774 #else
2775 #if CMK_WITH_STATS
2776                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2777 #endif
2778                 int seq_id = tmp_pd->cqwrite_value;
2779                 if(seq_id > 0)      // BIG_MSG
2780                 {
2781                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2782                     MallocControlMsg(ack_msg_tmp);
2783                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2784                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2785                     ack_msg_tmp->seq_id = seq_id;
2786                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2787                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2788                     ack_msg_tmp->length = tmp_pd->length;
2789                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2790                     msg_tag = BIG_MSG_TAG; 
2791                 } 
2792                 else
2793                 {
2794                     msg_tag = ACK_TAG; 
2795 #if  !REMOTE_EVENT && !CQWRITE
2796                     MallocAckMsg(ack_msg);
2797                     ack_msg->source_addr = tmp_pd->remote_addr;
2798 #endif
2799                 }
2800 #endif
2801                 break;
2802             }
2803             case  GNI_POST_CQWRITE:
2804                    FreePostDesc(tmp_pd);
2805                    continue;
2806             default:
2807                 CmiPrintf("type=%d\n", tmp_pd->type);
2808                 CmiAbort("PumpLocalTransactions: unknown type!");
2809             }      /* end of switch */
2810
2811 #if CMK_DIRECT
2812             if (tmp_pd->amo_cmd == 1) {
2813                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2814                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2815             }
2816             else
2817 #endif
2818             if (msg_tag == ACK_TAG) {
2819 #if !REMOTE_EVENT
2820 #if   !CQWRITE
2821                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2822                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2823 #else
2824                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2825 #endif
2826 #endif
2827             }
2828             else {
2829                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2830                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2831             }
2832 #if CMK_PERSISTENT_COMM
2833             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2834 #endif
2835             {
2836                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2837 #if PRINT_SYH
2838                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2839 #endif
2840                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2841                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2842
2843                     START_EVENT();
2844                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2845                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2846 #if MACHINE_DEBUG_LOG
2847                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2848                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2849                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2850 #endif
2851                     TRACE_COMM_CREATION(EVENT_TIME(), (void*)tmp_pd->local_addr);
2852                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2853                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2854                 }else if(msg_tag == BIG_MSG_TAG){
2855                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2856                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2857                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2858                         START_EVENT();
2859 #if PRINT_SYH
2860                         printf("Pipeline msg done [%d]\n", myrank);
2861 #endif
2862 #if     CMK_SMP_TRACE_COMMTHREAD
2863                         if( tmp_pd->cqwrite_value == 1)
2864                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2865 #endif
2866                         TRACE_COMM_CREATION(EVENT_TIME(), msg);
2867                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2868                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2869                     }
2870                 }
2871             }
2872             FreePostDesc(tmp_pd);
2873         }
2874     } //end while
2875     if(status == GNI_RC_ERROR_RESOURCE)
2876     {
2877         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2878         GNI_RC_CHECK("Smsg_tx_cq full", status);
2879     }
2880 }
2881
2882 static void  SendRdmaMsg( BufferList sendqueue)
2883 {
2884     gni_return_t            status = GNI_RC_SUCCESS;
2885     gni_mem_handle_t        msg_mem_hndl;
2886     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2887     RDMA_REQUEST            *pre = 0;
2888     uint64_t                register_size = 0;
2889     void                    *msg;
2890     int                     i;
2891
2892     int len = PCQueueLength(sendqueue);
2893     for (i=0; i<len; i++)
2894     {
2895 #if CMI_EXERT_RECV_RDMA_CAP
2896         if( RDMA_pending >= RDMA_cap) break;
2897 #endif
2898         CMI_PCQUEUEPOP_LOCK( sendqueue)
2899         ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
2900         CMI_PCQUEUEPOP_UNLOCK( sendqueue)
2901         if (ptr == NULL) break;
2902         
2903         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2904         gni_post_descriptor_t *pd = ptr->pd;
2905         
2906         msg = (void*)(pd->local_addr);
2907         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2908         register_size = 0;
2909         if(pd->cqwrite_value == 0) {
2910             if(NoMsgInRecv(msg))
2911                 register_size = GetMempoolsize(msg);
2912         }
2913
2914         if(status == GNI_RC_SUCCESS)        //mem register good
2915         {
2916             int destNode = ptr->destNode;
2917             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2918             CMI_GNI_LOCK(lock);
2919 #if REMOTE_EVENT
2920             if( pd->cqwrite_value == 0) {
2921                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2922                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
2923                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2924             }
2925 #if CMK_PERSISTENT_COMM
2926             else if (pd->cqwrite_value == PERSIST_SEQ) {
2927                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2928                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
2929                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2930             }
2931 #endif
2932 #endif
2933 #if CMK_WITH_STATS
2934             RDMA_TRY_SEND(pd->type)
2935 #endif
2936 #if CMK_SMP_TRACE_COMMTHREAD
2937             if(IS_PUT(pd->type))
2938             {
2939                  START_EVENT();
2940                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
2941             }
2942 #endif
2943
2944             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2945             {
2946                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
2947             }
2948             else
2949             {
2950                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
2951             }
2952             CMI_GNI_UNLOCK(lock);
2953             
2954             if(status == GNI_RC_SUCCESS)    //post good
2955             {
2956 #if CMI_EXERT_RECV_RDMA_CAP
2957                 RDMA_pending ++;
2958 #endif
2959                 if(pd->cqwrite_value == 0)
2960                 {
2961 #if CMK_SMP_TRACE_COMMTHREAD 
2962                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2963 #endif
2964                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2965                 }
2966 #if  CMK_WITH_STATS
2967                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2968                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2969 #endif
2970 #if MACHINE_DEBUG_LOG
2971                 buffered_recv_msg += register_size;
2972                 MACHSTATE(8, "GO request from buffered\n"); 
2973 #endif
2974 #if PRINT_SYH
2975                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
2976 #endif
2977                 FreeRdmaRequest(ptr);
2978             }else           // cannot post
2979             {
2980                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2981 #if PRINT_SYH
2982                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
2983 #endif
2984                 break;
2985             }
2986         } else          //memory registration fails
2987         {
2988             PCQueuePush(sendqueue, (char*)ptr);
2989         }
2990     } //end while
2991 }
2992
2993 static 
2994 inline gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
2995 {
2996     CONTROL_MSG         *control_msg_tmp;
2997     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
2998
2999     MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3000     if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {   
3001             /* connection not exists yet */
3002 #if CMK_SMP
3003             /* non-smp case, connect is issued in send_smsg_message */
3004         if (smsg_connected_flag[ptr->destNode] == 0)
3005             connect_to(ptr->destNode); 
3006 #endif
3007     }
3008     else
3009     switch(ptr->tag)
3010     {
3011     case SMALL_DATA_TAG:
3012         status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3013         if(status == GNI_RC_SUCCESS)
3014         {
3015             CmiFree(ptr->msg);
3016         }
3017         break;
3018     case LMSG_INIT_TAG:
3019     case LMSG_OOB_INIT_TAG:
3020         control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3021         status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr, ptr->tag);
3022         break;
3023 #if !REMOTE_EVENT && !CQWRITE
3024     case ACK_TAG:
3025         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3026         if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3027         break;
3028 #endif
3029     case BIG_MSG_TAG:
3030         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3031         if(status == GNI_RC_SUCCESS)
3032         {
3033             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3034         }
3035         break;
3036 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3037     case PUT_DONE_TAG:
3038         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3039         if(status == GNI_RC_SUCCESS)
3040         {
3041             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3042         }
3043         break;
3044 #endif
3045 #if CMK_DIRECT
3046     case DIRECT_PUT_DONE_TAG:
3047         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3048         if(status == GNI_RC_SUCCESS)
3049         {
3050             free((CMK_DIRECT_HEADER*)ptr->msg);
3051         }
3052         break;
3053 #endif
3054     default:
3055         printf("Weird tag\n");
3056         CmiAbort("should not happen\n");
3057     }       // end switch
3058     return status;
3059 }
3060
3061 // return 1 if all messages are sent
3062
3063 #if ONE_SEND_QUEUE
3064
3065 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3066 {
3067     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3068     CONTROL_MSG         *control_msg_tmp;
3069     gni_return_t        status;
3070     int                 done = 1;
3071     uint64_t            register_size;
3072     void                *register_addr;
3073     int                 index_previous = -1;
3074 #if     CMI_SENDBUFFERSMSG_CAP
3075     int                 sent_length = 0;
3076 #endif
3077     int          index = 0;
3078     memset(destpe_avail, 0, mysize * sizeof(char));
3079     for (index=0; index<1; index++)
3080     {
3081         int i, len = PCQueueLength(queue->sendMsgBuf);
3082         for (i=0; i<len; i++) 
3083         {
3084             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3085             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3086             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3087             if(ptr == NULL) break;
3088             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3089                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3090                 continue;
3091             }
3092             status = _sendOneBufferedSmsg(queue, ptr);
3093 #if CMI_SENDBUFFERSMSG_CAP
3094             sent_length++;
3095 #endif
3096             if(status == GNI_RC_SUCCESS)
3097             {
3098 #if PRINT_SYH
3099                 buffered_smsg_counter--;
3100                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3101 #endif
3102                 FreeMsgList(ptr);
3103             }else {
3104                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3105                 done = 0;
3106                 if(status == GNI_RC_ERROR_RESOURCE)
3107                 {
3108                     destpe_avail[ptr->destNode] = 1;
3109                 }
3110             } 
3111         } //end while
3112     }   // end pooling for all cores
3113     return done;
3114 }
3115
3116 #else   /*  ! ONE_SEND_QUEUE  */
3117
3118 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3119 {
3120     MSG_LIST            *ptr;
3121     gni_return_t        status;
3122     int                 done = 1;
3123 #if     CMI_SENDBUFFERSMSG_CAP
3124     int                 sent_length = 0;
3125 #endif
3126     int idx;
3127 #if SMP_LOCKS
3128     int          index = -1;
3129     int nonempty = PCQueueLength(queue->nonEmptyQueues);
3130     for(idx =0; idx<nonempty; idx++) 
3131     {
3132         index++;  if (index >= nonempty) index = 0;
3133 #if CMI_SENDBUFFERSMSG_CAP
3134         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3135 #endif
3136         CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3137         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3138         CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3139         if(current_list == NULL) break; 
3140         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3141             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3142             continue;
3143         }
3144         PCQueue current_queue= current_list->sendSmsgBuf;
3145         CmiLock(current_list->lock);
3146         int i, len = PCQueueLength(current_queue);
3147         current_list->pushed = 0;
3148         CmiUnlock(current_list->lock);
3149 #else      /* ! SMP_LOCKS */
3150     static int          index = -1;
3151     for(idx =0; idx<mysize; idx++) 
3152     {
3153         index++;  if (index == mysize) index = 0;
3154 #if CMI_SENDBUFFERSMSG_CAP
3155         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3156 #endif
3157         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue;             // check urgent queue
3158         //if (index == myrank) continue;
3159         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3160         int i, len = PCQueueLength(current_queue);
3161 #endif
3162         for (i=0; i<len; i++)  {
3163             CMI_PCQUEUEPOP_LOCK(current_queue)
3164             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3165             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3166             if (ptr == 0) break;
3167
3168             status = _sendOneBufferedSmsg(queue, ptr);
3169 #if CMI_SENDBUFFERSMSG_CAP
3170             sent_length++;
3171 #endif
3172             if(status == GNI_RC_SUCCESS)
3173             {
3174 #if PRINT_SYH
3175                 buffered_smsg_counter--;
3176                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3177 #endif
3178                 FreeMsgList(ptr);
3179             }else {
3180                 PCQueuePush(current_queue, (char*)ptr);
3181                 done = 0;
3182                 if(status == GNI_RC_ERROR_RESOURCE)
3183                 {
3184                     break;
3185                 }
3186             } 
3187         } //end for i
3188 #if SMP_LOCKS
3189         CmiLock(current_list->lock);
3190         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3191         {
3192             current_list->pushed = 1;
3193             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3194         }
3195         CmiUnlock(current_list->lock); 
3196 #endif
3197     }   // end pooling for all cores
3198     return done;
3199 }
3200
3201 #endif
3202
3203 static void ProcessDeadlock();
3204 void LrtsAdvanceCommunication(int whileidle)
3205 {
3206     static int count = 0;
3207     /*  Receive Msg first */
3208 #if CMK_SMP_TRACE_COMMTHREAD
3209     double startT, endT;
3210 #endif
3211     if (useDynamicSMSG && whileidle)
3212     {
3213 #if CMK_SMP_TRACE_COMMTHREAD
3214         startT = CmiWallTimer();
3215 #endif
3216         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3217 #if CMK_SMP_TRACE_COMMTHREAD
3218         endT = CmiWallTimer();
3219         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3220 #endif
3221     }
3222
3223     SEND_OOB_SMSG(smsg_oob_queue)
3224     PUMP_REMOTE_HIGHPRIORITY
3225     PUMP_LOCAL_HIGHPRIORITY
3226     POST_HIGHPRIORITY_RDMA
3227     // Receiving small messages and persistent
3228 #if CMK_SMP_TRACE_COMMTHREAD
3229     startT = CmiWallTimer();
3230 #endif
3231     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3232 #if CMK_SMP_TRACE_COMMTHREAD
3233     endT = CmiWallTimer();
3234     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3235 #endif
3236
3237     SEND_OOB_SMSG(smsg_oob_queue)
3238     PUMP_REMOTE_HIGHPRIORITY
3239     PUMP_LOCAL_HIGHPRIORITY
3240     POST_HIGHPRIORITY_RDMA
3241     
3242     ///* Send buffered Message */
3243 #if CMK_SMP_TRACE_COMMTHREAD
3244     startT = CmiWallTimer();
3245 #endif
3246 #if CMK_USE_OOB
3247     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3248 #else
3249     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3250 #endif
3251 #if CMK_SMP_TRACE_COMMTHREAD
3252     endT = CmiWallTimer();
3253     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3254 #endif
3255
3256     SEND_OOB_SMSG(smsg_oob_queue)
3257     PUMP_REMOTE_HIGHPRIORITY
3258     PUMP_LOCAL_HIGHPRIORITY
3259     POST_HIGHPRIORITY_RDMA
3260
3261     //Pump Get messages or PUT messages
3262 #if CMK_SMP_TRACE_COMMTHREAD
3263     startT = CmiWallTimer();
3264 #endif
3265     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3266 #if MULTI_THREAD_SEND
3267     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3268 #endif
3269 #if CMK_SMP_TRACE_COMMTHREAD
3270     endT = CmiWallTimer();
3271     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3272 #endif
3273     
3274     SEND_OOB_SMSG(smsg_oob_queue)
3275     PUMP_REMOTE_HIGHPRIORITY
3276     PUMP_LOCAL_HIGHPRIORITY
3277     POST_HIGHPRIORITY_RDMA
3278     //Pump Remote event
3279 #if CMK_SMP_TRACE_COMMTHREAD
3280     startT = CmiWallTimer();
3281 #endif
3282 #if CQWRITE
3283     PumpCqWriteTransactions();
3284 #endif
3285 #if REMOTE_EVENT
3286     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3287 #endif
3288 #if CMK_SMP_TRACE_COMMTHREAD
3289     endT = CmiWallTimer();
3290     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3291 #endif
3292
3293     SEND_OOB_SMSG(smsg_oob_queue)
3294     PUMP_REMOTE_HIGHPRIORITY
3295     PUMP_LOCAL_HIGHPRIORITY
3296     POST_HIGHPRIORITY_RDMA
3297
3298 #if CMK_SMP_TRACE_COMMTHREAD
3299     startT = CmiWallTimer();
3300 #endif
3301     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3302 #if CMK_SMP_TRACE_COMMTHREAD
3303     endT = CmiWallTimer();
3304     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3305 #endif
3306
3307 #if CMK_SMP && ! LARGEPAGE
3308     if (_detected_hang)  ProcessDeadlock();
3309 #endif
3310 }
3311
3312 static void set_smsg_max()
3313 {
3314     char *env;
3315
3316     if(mysize <=512)
3317     {
3318         SMSG_MAX_MSG = 1024;
3319     }else if (mysize <= 4096)
3320     {
3321         SMSG_MAX_MSG = 1024;
3322     }else if (mysize <= 16384)
3323     {
3324         SMSG_MAX_MSG = 512;
3325     }else {
3326         SMSG_MAX_MSG = 256;
3327     }
3328
3329     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3330     if (env) SMSG_MAX_MSG = atoi(env);
3331     CmiAssert(SMSG_MAX_MSG > 0);
3332 }    
3333
3334 /* useDynamicSMSG */
3335 static void _init_dynamic_smsg()
3336 {
3337     gni_return_t status;
3338     uint32_t     vmdh_index = -1;
3339     int i;
3340
3341     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3342     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3343     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3344     for(i=0; i<mysize; i++) {
3345         smsg_connected_flag[i] = 0;
3346         smsg_attr_vector_local[i] = NULL;
3347         smsg_attr_vector_remote[i] = NULL;
3348     }
3349
3350     set_smsg_max();
3351
3352     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3353     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3354     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3355     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3356     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3357
3358     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3359     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3360     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3361     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3362     mailbox_list->offset = 0;
3363     mailbox_list->next = 0;
3364     
3365     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3366         mailbox_list->size, smsg_rx_cqh,
3367         GNI_MEM_READWRITE,   
3368         vmdh_index,
3369         &(mailbox_list->mem_hndl));
3370     GNI_RC_CHECK("MEMORY registration for smsg", status);
3371
3372     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3373     GNI_RC_CHECK("Unbound EP", status);
3374     
3375     alloc_smsg_attr(&send_smsg_attr);
3376
3377     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3378     GNI_RC_CHECK("post unbound datagram", status);
3379
3380       /* always pre-connect to proc 0 */
3381     //if (myrank != 0) connect_to(0);
3382
3383     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3384     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3385 }
3386
3387 static void _init_static_smsg()
3388 {
3389     gni_smsg_attr_t      *smsg_attr;
3390     gni_smsg_attr_t      remote_smsg_attr;
3391     gni_smsg_attr_t      *smsg_attr_vec;
3392     gni_mem_handle_t     my_smsg_mdh_mailbox;
3393     int      ret, i;
3394     gni_return_t status;
3395     uint32_t              vmdh_index = -1;
3396     mdh_addr_t            base_infor;
3397     mdh_addr_t            *base_addr_vec;
3398
3399     set_smsg_max();
3400     
3401     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3402     
3403     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3404     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3405     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3406     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3407     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3408     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3409     CmiAssert(ret == 0);
3410     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3411     
3412     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3413             smsg_memlen*(mysize), smsg_rx_cqh,
3414             GNI_MEM_READWRITE,   
3415             vmdh_index,
3416             &my_smsg_mdh_mailbox);
3417     register_memory_size += smsg_memlen*(mysize);
3418     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3419
3420     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3421     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3422
3423     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3424     base_infor.mdh =  my_smsg_mdh_mailbox;
3425     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3426
3427     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3428  
3429     for(i=0; i<mysize; i++)
3430     {
3431         if(i==myrank)
3432             continue;
3433         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3434         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3435         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3436         smsg_attr[i].mbox_offset = i*smsg_memlen;
3437         smsg_attr[i].buff_size = smsg_memlen;
3438         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3439         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3440     }
3441
3442     for(i=0; i<mysize; i++)
3443     {
3444         if (myrank == i) continue;
3445
3446         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3447         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3448         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3449         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3450         remote_smsg_attr.buff_size = smsg_memlen;
3451         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3452         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3453
3454         /* initialize the smsg channel */
3455         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3456         GNI_RC_CHECK("SMSG Init", status);
3457     } //end initialization
3458
3459     free(base_addr_vec);
3460     free(smsg_attr);
3461
3462     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);