9e00a539fe3aa323ad1a2d9e146030bc17d6dad4
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #if !defined(LARGEPAGE)
51 #define     LARGEPAGE              0
52 #endif
53
54 #if CMK_SMP
55 #define MULTI_THREAD_SEND          0
56 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
57 #endif
58
59 #if MULTI_THREAD_SEND
60 #define CMK_WORKER_SINGLE_TASK     1
61 #endif
62
63 #define REMOTE_EVENT               1
64 #define CQWRITE                    0
65
66 #define CMI_EXERT_SEND_LARGE_CAP   0
67 #define CMI_EXERT_RECV_RDMA_CAP    0
68
69
70 #define CMI_SENDBUFFERSMSG_CAP            0
71 #define CMI_PUMPNETWORKSMSG_CAP           0
72 #define CMI_PUMPREMOTETRANSACTIONS_CAP    0
73 #define CMI_PUMPLOCALTRANSACTIONS_CAP     0
74
75 #if CMI_SENDBUFFERSMSG_CAP
76 int     SendBufferMsg_cap  = 20;
77 #endif
78
79 #if CMI_PUMPNETWORKSMSG_CAP
80 int     PumpNetworkSmsg_cap = 20;
81 #endif
82
83 #if CMI_PUMPREMOTETRANSACTIONS_CAP
84 int     PumpRemoteTransactions_cap = 20;
85 #endif
86
87 #if CMI_PUMPREMOTETRANSACTIONS_CAP
88 int     PumpLocalTransactions_cap = 15;
89 #endif
90
91 #if CMI_EXERT_SEND_LARGE_CAP
92 static int SEND_large_cap = 20;
93 static int SEND_large_pending = 0;
94 #endif
95
96 #if CMI_EXERT_RECV_RDMA_CAP
97 static int   RDMA_cap =   10;
98 static int   RDMA_pending = 0;
99 #endif
100
101 #define USE_LRTS_MEMPOOL                  1
102
103 #define PRINT_SYH                         0
104
105 // Trace communication thread
106 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
107 #define TRACE_THRESHOLD     0.00001
108 #define CMI_MPI_TRACE_MOREDETAILED 0
109 #undef CMI_MPI_TRACE_USEREVENTS
110 #define CMI_MPI_TRACE_USEREVENTS 1
111 #else
112 #undef CMK_SMP_TRACE_COMMTHREAD
113 #define CMK_SMP_TRACE_COMMTHREAD 0
114 #endif
115
116 #define CMK_TRACE_COMMOVERHEAD 0
117 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
118 #undef CMI_MPI_TRACE_USEREVENTS
119 #define CMI_MPI_TRACE_USEREVENTS 1
120 #else
121 #undef CMK_TRACE_COMMOVERHEAD
122 #define CMK_TRACE_COMMOVERHEAD 0
123 #endif
124
125 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
126 CpvStaticDeclare(double, projTraceStart);
127 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
128 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
129 #define  EVENT_TIME()   CpvAccess(projTraceStart)
130 #else
131 #define  START_EVENT()
132 #define  END_EVENT(x)
133 #define  EVENT_TIME()   (0.0)
134 #endif
135
136 #if USE_LRTS_MEMPOOL
137
138 #define oneMB (1024ll*1024)
139 #define oneGB (1024ll*1024*1024)
140
141 static CmiInt8 _mempool_size = 8*oneMB;
142 static CmiInt8 _expand_mem =  4*oneMB;
143 static CmiInt8 _mempool_size_limit = 0;
144
145 static CmiInt8 _totalmem = 0.8*oneGB;
146
147 #if LARGEPAGE
148 static CmiInt8 BIG_MSG  =  16*oneMB;
149 static CmiInt8 ONE_SEG  =  4*oneMB;
150 #else
151 static CmiInt8 BIG_MSG  =  4*oneMB;
152 static CmiInt8 ONE_SEG  =  2*oneMB;
153 #endif
154 #if MULTI_THREAD_SEND
155 static int BIG_MSG_PIPELINE = 1;
156 #else
157 static int BIG_MSG_PIPELINE = 4;
158 #endif
159
160 // dynamic flow control
161 static CmiInt8 buffered_send_msg = 0;
162 static CmiInt8 register_memory_size = 0;
163
164 #if LARGEPAGE
165 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
166 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
167 static CmiInt8  register_count = 0;
168 #else
169 #if CMK_SMP && COMM_THREAD_SEND 
170 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
171 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
172 #else
173 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
174 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
175 #endif
176
177
178 #endif
179
180 #endif     /* end USE_LRTS_MEMPOOL */
181
182 #if MULTI_THREAD_SEND 
183 #define     CMI_GNI_LOCK(x)       CmiLock(x);
184 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
185 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
186 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
187 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
188 #else
189 #define     CMI_GNI_LOCK(x)
190 #define     CMI_GNI_TRYLOCK(x)         (0)
191 #define     CMI_GNI_UNLOCK(x)
192 #define     CMI_PCQUEUEPOP_LOCK(Q)   
193 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
194 #endif
195
196 static int _tlbpagesize = 4096;
197
198 //static int _smpd_count  = 0;
199
200 static int   user_set_flag  = 0;
201
202 static int _checkProgress = 1;             /* check deadlock */
203 static int _detected_hang = 0;
204
205 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
206
207 // dynamic SMSG
208 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
209
210 static int avg_smsg_connection = 32;
211 static int                 *smsg_connected_flag= 0;
212 static gni_smsg_attr_t     **smsg_attr_vector_local;
213 static gni_smsg_attr_t     **smsg_attr_vector_remote;
214 static gni_ep_handle_t     ep_hndl_unbound;
215 static gni_smsg_attr_t     send_smsg_attr;
216 static gni_smsg_attr_t     recv_smsg_attr;
217
218 typedef struct _dynamic_smsg_mailbox{
219    void     *mailbox_base;
220    int      size;
221    int      offset;
222    gni_mem_handle_t  mem_hndl;
223    struct      _dynamic_smsg_mailbox  *next;
224 }dynamic_smsg_mailbox_t;
225
226 static dynamic_smsg_mailbox_t  *mailbox_list;
227
228 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
229 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
230
231 #if PRINT_SYH
232 int         lrts_send_msg_id = 0;
233 int         lrts_local_done_msg = 0;
234 int         lrts_send_rdma_success = 0;
235 #endif
236
237 #include "machine.h"
238
239 #include "pcqueue.h"
240
241 #include "mempool.h"
242
243 #if CMK_PERSISTENT_COMM
244 #include "machine-persistent.h"
245 #define  POST_HIGHPRIORITY_RDMA    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
246 #else  
247 #define  POST_HIGHPRIORITY_RDMA   
248 #endif
249
250 #if REMOTE_EVENT && (CMK_USE_OOB || CMK_PERSISTENT_COMM) 
251 #define  PUMP_REMOTE_HIGHPRIORITY    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
252 #else
253 #define  PUMP_REMOTE_HIGHPRIORITY
254 #endif
255
256 //#define  USE_ONESIDED 1
257 #ifdef USE_ONESIDED
258 //onesided implementation is wrong, since no place to restore omdh
259 #include "onesided.h"
260 onesided_hnd_t   onesided_hnd;
261 onesided_md_t    omdh;
262 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
263
264 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
265
266 #else
267 uint8_t   onesided_hnd, omdh;
268
269 #if REMOTE_EVENT || CQWRITE 
270 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
271     if(register_memory_size+size>= MAX_REG_MEM) { \
272         status = GNI_RC_ERROR_NOMEM;} \
273     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
274         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
275 #else
276 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
277         if (register_memory_size + size >= MAX_REG_MEM) { \
278             status = GNI_RC_ERROR_NOMEM; \
279         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
280             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
281 #endif
282
283 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
284     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
285              register_memory_size -= size; \
286          else CmiAbort("MEM_DEregister");  \
287     } while (0)
288 #endif
289
290 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
291 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
292 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
293 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
294 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
295 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
296 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
297 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
298 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
299 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
300 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
301 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
302 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
303 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
304
305 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
306 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
307
308 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
309 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
310 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
311 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
312
313 #define ALIGNBUF                64
314
315 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
316 /* If SMSG is not used */
317
318 #define FMA_PER_CORE  1024
319 #define FMA_BUFFER_SIZE 1024
320
321 /* If SMSG is used */
322 static int  SMSG_MAX_MSG = 1024;
323 #define SMSG_MAX_CREDIT    72
324
325 #define MSGQ_MAXSIZE       2048
326
327 /* large message transfer with FMA or BTE */
328 #if ! REMOTE_EVENT
329 #define LRTS_GNI_RDMA_THRESHOLD  1024 
330 #else
331    /* remote events only work with RDMA */
332 #define LRTS_GNI_RDMA_THRESHOLD  0 
333 #endif
334
335 #if CMK_SMP
336 static int  REMOTE_QUEUE_ENTRIES=163840; 
337 static int LOCAL_QUEUE_ENTRIES=163840; 
338 #else
339 static int  REMOTE_QUEUE_ENTRIES=20480;
340 static int LOCAL_QUEUE_ENTRIES=20480; 
341 #endif
342
343 #define BIG_MSG_TAG             0x26
344 #define PUT_DONE_TAG            0x28
345 #define DIRECT_PUT_DONE_TAG     0x29
346 #define ACK_TAG                 0x30
347 /* SMSG is data message */
348 #define SMALL_DATA_TAG          0x31
349 /* SMSG is a control message to initialize a BTE */
350 #define LMSG_INIT_TAG           0x33 
351 #define LMSG_OOB_INIT_TAG       0x35
352
353 #define DEBUG
354 #ifdef GNI_RC_CHECK
355 #undef GNI_RC_CHECK
356 #endif
357 #ifdef DEBUG
358 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
359 #else
360 #define GNI_RC_CHECK(msg,rc)
361 #endif
362
363 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
364 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
365 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
366
367 static int useStaticMSGQ = 0;
368 static int useStaticFMA = 0;
369 static int mysize, myrank;
370 static gni_nic_handle_t   nic_hndl;
371
372 typedef struct {
373     gni_mem_handle_t mdh;
374     uint64_t addr;
375 } mdh_addr_t ;
376 // this is related to dynamic SMSG
377
378 typedef struct mdh_addr_list{
379     gni_mem_handle_t mdh;
380    void *addr;
381     struct mdh_addr_list *next;
382 }mdh_addr_list_t;
383
384 static unsigned int         smsg_memlen;
385 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
386 mdh_addr_t          setup_mem;
387 mdh_addr_t          *smsg_connection_vec = 0;
388 gni_mem_handle_t    smsg_connection_memhndl;
389 static int          smsg_expand_slots = 10;
390 static int          smsg_available_slot = 0;
391 static void         *smsg_mailbox_mempool = 0;
392 mdh_addr_list_t     *smsg_dynamic_list = 0;
393
394 static void             *smsg_mailbox_base;
395 gni_msgq_attr_t         msgq_attrs;
396 gni_msgq_handle_t       msgq_handle;
397 gni_msgq_ep_attr_t      msgq_ep_attrs;
398 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
399
400 /* =====Beginning of Declarations of Machine Specific Variables===== */
401 static int cookie;
402 static int modes = 0;
403 static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
404 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
405 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
406 static gni_cq_handle_t       highprior_rdma_tx_cqh = NULL;      // rdma - local event
407 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
408 static gni_cq_handle_t       highpriority_rx_cqh = NULL;      // mempool - remote event
409 static gni_ep_handle_t       *ep_hndl_array;
410
411 static CmiNodeLock           *ep_lock_array;
412 static CmiNodeLock           default_tx_cq_lock; 
413 static CmiNodeLock           rdma_tx_cq_lock; 
414 static CmiNodeLock           global_gni_lock; 
415 static CmiNodeLock           rx_cq_lock;
416 static CmiNodeLock           smsg_mailbox_lock;
417 static CmiNodeLock           smsg_rx_cq_lock;
418 static CmiNodeLock           *mempool_lock;
419 //#define     CMK_WITH_STATS      1
420 typedef struct msg_list
421 {
422     uint32_t destNode;
423     uint32_t size;
424     void *msg;
425     uint8_t tag;
426 #if CMK_WITH_STATS
427     double  creation_time;
428 #endif
429 }MSG_LIST;
430
431
432 typedef struct control_msg
433 {
434     uint64_t            source_addr;    /* address from the start of buffer  */
435     uint64_t            dest_addr;      /* address from the start of buffer */
436     int                 total_length;   /* total length */
437     int                 length;         /* length of this packet */
438 #if REMOTE_EVENT
439     int                 ack_index;      /* index from integer to address */
440 #endif
441     uint8_t             seq_id;         //big message   0 meaning single message
442     gni_mem_handle_t    source_mem_hndl;
443     struct control_msg *next;
444 } CONTROL_MSG;
445
446 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
447
448 typedef struct ack_msg
449 {
450     uint64_t            source_addr;    /* address from the start of buffer  */
451 #if ! USE_LRTS_MEMPOOL
452     gni_mem_handle_t    source_mem_hndl;
453     int                 length;          /* total length */
454 #endif
455     struct ack_msg     *next;
456 } ACK_MSG;
457
458 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
459
460 #if CMK_DIRECT
461 typedef struct{
462     uint64_t    handler_addr;
463 }CMK_DIRECT_HEADER;
464
465 typedef struct {
466     char core[CmiMsgHeaderSizeBytes];
467     uint64_t handler;
468 }cmidirectMsg;
469
470 //SYH
471 CpvDeclare(int, CmiHandleDirectIdx);
472 void CmiHandleDirectMsg(cmidirectMsg* msg)
473 {
474
475     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
476    (*(_handle->callbackFnPtr))(_handle->callbackData);
477    CmiFree(msg);
478 }
479
480 void CmiDirectInit()
481 {
482     CpvInitialize(int,  CmiHandleDirectIdx);
483     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
484 }
485
486 #endif
487 typedef struct  rmda_msg
488 {
489     int                   destNode;
490 #if REMOTE_EVENT
491     int                   ack_index;
492 #endif
493     gni_post_descriptor_t *pd;
494 }RDMA_REQUEST;
495
496
497 #define SMP_LOCKS                       1
498 #define ONE_SEND_QUEUE                  0
499 typedef PCQueue BufferList;
500 typedef struct  msg_list_index
501 {
502     PCQueue       sendSmsgBuf;
503 #if  SMP_LOCKS
504     CmiNodeLock   lock;
505     int           pushed;
506     int           destpe;
507 #endif
508 } MSG_LIST_INDEX;
509 char                *destpe_avail;
510 PCQueue sendRdmaBuf;
511 PCQueue sendHighPriorBuf;
512 // buffered send queue
513 #if ! ONE_SEND_QUEUE
514 typedef struct smsg_queue
515 {
516     MSG_LIST_INDEX   *smsg_msglist_index;
517     int               smsg_head_index;
518 #if  SMP_LOCKS
519     PCQueue     nonEmptyQueues;
520 #endif
521 } SMSG_QUEUE;
522 #else
523 typedef struct smsg_queue
524 {
525     PCQueue       sendMsgBuf;
526 }  SMSG_QUEUE;
527 #endif
528
529 SMSG_QUEUE                  smsg_queue;
530 #if CMK_USE_OOB
531 SMSG_QUEUE                  smsg_oob_queue;
532 #define SEND_OOB_SMSG(x)            SendBufferMsg(&x, NULL);
533 #define PUMP_LOCAL_HIGHPRIORITY    STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh,  rdma_tx_cq_lock)); 
534 #else
535 #define SEND_OOB_SMSG(x)            
536 #define PUMP_LOCAL_HIGHPRIORITY     
537 #endif
538
539 #define FreeMsgList(d)   free(d);
540 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
541
542 #define FreeControlMsg(d)      free(d);
543 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
544
545 #define FreeAckMsg(d)      free(d);
546 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
547
548 #define FreeRdmaRequest(d)       free(d);
549 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
550 /* reuse gni_post_descriptor_t */
551 static gni_post_descriptor_t *post_freelist=0;
552
553 #define FreePostDesc(d)     free(d);
554 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
555
556
557 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
558 static int      buffered_smsg_counter = 0;
559
560 /* SmsgSend return success but message sent is not confirmed by remote side */
561 static MSG_LIST *buffered_fma_head = 0;
562 static MSG_LIST *buffered_fma_tail = 0;
563
564 /* functions  */
565 #define IsFree(a,ind)  !( a& (1<<(ind) ))
566 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
567 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
568
569 CpvDeclare(mempool_type*, mempool);
570
571 #if CMK_PERSISTENT_COMM
572 CpvDeclare(mempool_type*, persistent_mempool);
573 #endif
574
575 #if REMOTE_EVENT
576 /* ack pool for remote events */
577
578 static int  SHIFT   =           18;
579 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
580 #define RANK_MASK               ((1<<SHIFT) - 1)
581 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
582
583 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
584 #define GET_RANK(evt)           ((evt) & RANK_MASK)
585 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
586
587 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
588
589 #if CMK_SMP
590 #define INIT_SIZE                4096
591 #else
592 #define INIT_SIZE                1024
593 #endif
594
595 struct IndexStruct {
596 void *addr;
597 int next;
598 int type;     // 1: ACK   2: Persistent
599 };
600
601 typedef struct IndexPool {
602     struct IndexStruct   *indexes;
603     int                   size;
604     int                   freehead;
605     CmiNodeLock           lock;
606 } IndexPool;
607
608 static IndexPool  ackPool;
609 #if CMK_PERSISTENT_COMM
610 static IndexPool  persistPool;
611 #endif
612
613 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
614 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
615
616 static void IndexPool_init(IndexPool *pool)
617 {
618     int i;
619     if ((1<<SHIFT) < mysize) 
620         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
621     pool->size = INIT_SIZE;
622     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
623     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
624     for (i=0; i<pool->size-1; i++) {
625         pool->indexes[i].next = i+1;
626         pool->indexes[i].type = 0;
627     }
628     pool->indexes[i].next = -1;
629     pool->indexes[i].type = 0;
630     pool->freehead = 0;
631 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
632     pool->lock  = CmiCreateLock();
633 #else
634     pool->lock  = 0;
635 #endif
636 }
637
638 static
639 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
640 {
641     int s, i;
642 #if MULTI_THREAD_SEND  
643     CmiLock(pool->lock);
644 #endif
645     CmiAssert(type == 1 || type == 2);
646     s = pool->freehead;
647     if (s == -1) {
648         int newsize = pool->size * 2;
649         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
650         if (newsize > (1<<(32-SHIFT-1))) {
651             printf("[%d] Warning: IndexPool_getslot %p overflow when expanding to: %d\n", myrank, pool, newsize);
652             return -1;
653             CmiAbort("IndexPool for remote events overflows, try compile Charm++ with remote event disabled.");
654         }
655         struct IndexStruct *old_ackpool = pool->indexes;
656         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
657         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
658         for (i=pool->size; i<newsize-1; i++) {
659             pool->indexes[i].next = i+1;
660             pool->indexes[i].type = 0;
661         }
662         pool->indexes[i].next = -1;
663         pool->indexes[i].type = 0;
664         pool->freehead = pool->size;
665         s = pool->size;
666         pool->size = newsize;
667         free(old_ackpool);
668     }
669     pool->freehead = pool->indexes[s].next;
670     pool->indexes[s].addr = addr;
671     CmiAssert(pool->indexes[s].type == 0);
672     pool->indexes[s].type = type;
673 #if MULTI_THREAD_SEND
674     CmiUnlock(pool->lock);
675 #endif
676     return s;
677 }
678
679 static
680 inline  void IndexPool_freeslot(IndexPool *pool, int s)
681 {
682     CmiAssert(s>=0 && s<pool->size);
683 #if MULTI_THREAD_SEND
684     CmiLock(pool->lock);
685 #endif
686     pool->indexes[s].next = pool->freehead;
687     pool->indexes[s].type = 0;
688     pool->freehead = s;
689 #if MULTI_THREAD_SEND
690     CmiUnlock(pool->lock);
691 #endif
692 }
693
694
695 #endif
696
697 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
698 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
699 #define CHARM_MAGIC_NUMBER               126
700
701 #if CMK_ERROR_CHECKING
702 extern unsigned char computeCheckSum(unsigned char *data, int len);
703 static int checksum_flag = 0;
704 #define CMI_SET_CHECKSUM(msg, len)      \
705         if (checksum_flag)  {   \
706           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
707           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
708         }
709 #define CMI_CHECK_CHECKSUM(msg, len)    \
710         if (checksum_flag)      \
711           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
712             CmiAbort("Fatal error: checksum doesn't agree!\n");
713 #else
714 #define CMI_SET_CHECKSUM(msg, len)
715 #define CMI_CHECK_CHECKSUM(msg, len)
716 #endif
717 /* =====End of Definitions of Message-Corruption Related Macros=====*/
718
719 static int print_stats = 0;
720 static int stats_off = 0;
721 void CmiTurnOnStats()
722 {
723     stats_off = 0;
724     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
725 }
726
727 void CmiTurnOffStats()
728 {
729     stats_off = 1;
730 }
731
732 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
733
734 #if CMK_WITH_STATS
735 FILE *counterLog = NULL;
736 typedef struct comm_thread_stats
737 {
738     uint64_t  smsg_data_count;
739     uint64_t  lmsg_init_count;
740     uint64_t  ack_count;
741     uint64_t  big_msg_ack_count;
742     uint64_t  smsg_count;
743     uint64_t  direct_put_done_count;
744     uint64_t  put_done_count;
745     //times of calling SmsgSend
746     uint64_t  try_smsg_data_count;
747     uint64_t  try_lmsg_init_count;
748     uint64_t  try_ack_count;
749     uint64_t  try_big_msg_ack_count;
750     uint64_t  try_direct_put_done_count;
751     uint64_t  try_put_done_count;
752     uint64_t  try_smsg_count;
753     
754     double    max_time_in_send_buffered_smsg;
755     double    all_time_in_send_buffered_smsg;
756
757     uint64_t  rdma_get_count, rdma_put_count;
758     uint64_t  try_rdma_get_count, try_rdma_put_count;
759     double    max_time_from_control_to_rdma_init;
760     double    all_time_from_control_to_rdma_init;
761
762     double    max_time_from_rdma_init_to_rdma_done;
763     double    all_time_from_rdma_init_to_rdma_done;
764
765     int      count_in_PumpNetwork;
766     double   time_in_PumpNetwork;
767     double   max_time_in_PumpNetwork;
768     int      count_in_SendBufferMsg_smsg;
769     double   time_in_SendBufferMsg_smsg;
770     double   max_time_in_SendBufferMsg_smsg;
771     int      count_in_SendRdmaMsg;
772     double   time_in_SendRdmaMsg;
773     double   max_time_in_SendRdmaMsg;
774     int      count_in_PumpRemoteTransactions;
775     double   time_in_PumpRemoteTransactions;
776     double   max_time_in_PumpRemoteTransactions;
777     int      count_in_PumpLocalTransactions_rdma;
778     double   time_in_PumpLocalTransactions_rdma;
779     double   max_time_in_PumpLocalTransactions_rdma;
780     int      count_in_PumpDatagramConnection;
781     double   time_in_PumpDatagramConnection;
782     double   max_time_in_PumpDatagramConnection;
783 } Comm_Thread_Stats;
784
785 static Comm_Thread_Stats   comm_stats;
786
787 static char *counters_dirname = "counters";
788
789 static void init_comm_stats()
790 {
791   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
792   if (print_stats){
793       char ln[200];
794       int code = mkdir(counters_dirname, 00777); 
795       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
796       counterLog=fopen(ln,"w");
797       if (counterLog == NULL) CmiAbort("Counter files open failed");
798   }
799 }
800
801 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
802
803 #define SMSG_SENT_DONE(creation_time, tag)  \
804         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
805             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++;  \
806             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
807             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
808             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
809             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
810             comm_stats.smsg_count++; \
811             double inbuff_time = CmiWallTimer() - creation_time;   \
812             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
813             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
814         }
815
816 #define SMSG_TRY_SEND(tag)  \
817         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
818             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
819             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
820             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
821             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
822             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
823             comm_stats.try_smsg_count++; \
824         }
825
826 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
827
828 #define  RDMA_TRANS_DONE(x)      \
829          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
830              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
831              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
832          }
833
834 #define  RDMA_TRANS_INIT(type, x)      \
835          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
836              double rdma_trans_time = CmiWallTimer() - x ; \
837              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
838              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
839          }
840
841 #define STATS_PUMPNETWORK_TIME(x)   \
842         { double t = CmiWallTimer(); \
843           x;        \
844           t = CmiWallTimer() - t;          \
845           comm_stats.count_in_PumpNetwork++;        \
846           comm_stats.time_in_PumpNetwork += t;   \
847           if (t>comm_stats.max_time_in_PumpNetwork)      \
848               comm_stats.max_time_in_PumpNetwork = t;    \
849         }
850
851 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
852         { double t = CmiWallTimer(); \
853           x;        \
854           t = CmiWallTimer() - t;          \
855           comm_stats.count_in_PumpRemoteTransactions ++;        \
856           comm_stats.time_in_PumpRemoteTransactions += t;   \
857           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
858               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
859         }
860
861 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
862         { double t = CmiWallTimer(); \
863           x;        \
864           t = CmiWallTimer() - t;          \
865           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
866           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
867           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
868               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
869         }
870
871 #define STATS_SEND_SMSGS_TIME(x)   \
872         { double t = CmiWallTimer(); \
873           x;        \
874           t = CmiWallTimer() - t;          \
875           comm_stats.count_in_SendBufferMsg_smsg ++;        \
876           comm_stats.time_in_SendBufferMsg_smsg += t;   \
877           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
878               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
879         }
880
881 #define STATS_SENDRDMAMSG_TIME(x)   \
882         { double t = CmiWallTimer(); \
883           x;        \
884           t = CmiWallTimer() - t;          \
885           comm_stats.count_in_SendRdmaMsg ++;        \
886           comm_stats.time_in_SendRdmaMsg += t;   \
887           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
888               comm_stats.max_time_in_SendRdmaMsg = t;    \
889         }
890
891 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
892         { double t = CmiWallTimer(); \
893           x;        \
894           t = CmiWallTimer() - t;          \
895           comm_stats.count_in_PumpDatagramConnection ++;        \
896           comm_stats.time_in_PumpDatagramConnection += t;   \
897           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
898               comm_stats.max_time_in_PumpDatagramConnection = t;    \
899         }
900
901 static void print_comm_stats()
902 {
903     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
904     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
905             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
906             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
907     
908     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
909             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
910             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
911
912     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
913     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
914     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
915
916
917     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
918     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
919     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
920     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
921     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
922     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
923     if (useDynamicSMSG)
924     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
925
926     fclose(counterLog);
927 }
928
929 #else
930 #define STATS_PUMPNETWORK_TIME(x)                  x
931 #define STATS_SEND_SMSGS_TIME(x)                   x
932 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
933 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
934 #define STATS_SENDRDMAMSG_TIME(x)                  x
935 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
936 #endif
937
938 static void
939 allgather(void *in,void *out, int len)
940 {
941     static int *ivec_ptr=NULL,already_called=0,job_size=0;
942     int i,rc;
943     int my_rank;
944     char *tmp_buf,*out_ptr;
945
946     if(!already_called) {
947
948         rc = PMI_Get_size(&job_size);
949         CmiAssert(rc == PMI_SUCCESS);
950         rc = PMI_Get_rank(&my_rank);
951         CmiAssert(rc == PMI_SUCCESS);
952
953         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
954         CmiAssert(ivec_ptr != NULL);
955
956         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
957         CmiAssert(rc == PMI_SUCCESS);
958
959         already_called = 1;
960
961     }
962
963     tmp_buf = (char *)malloc(job_size * len);
964     CmiAssert(tmp_buf);
965
966     rc = PMI_Allgather(in,tmp_buf,len);
967     CmiAssert(rc == PMI_SUCCESS);
968
969     out_ptr = out;
970
971     for(i=0;i<job_size;i++) {
972
973         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
974
975     }
976
977     free(tmp_buf);
978 }
979
980 static void
981 allgather_2(void *in,void *out, int len)
982 {
983     //PMI_Allgather is out of order
984     int i,rc, extend_len;
985     int  rank_index;
986     char *out_ptr, *out_ref;
987     char *in2;
988
989     extend_len = sizeof(int) + len;
990     in2 = (char*)malloc(extend_len);
991
992     memcpy(in2, &myrank, sizeof(int));
993     memcpy(in2+sizeof(int), in, len);
994
995     out_ptr = (char*)malloc(mysize*extend_len);
996
997     rc = PMI_Allgather(in2, out_ptr, extend_len);
998     GNI_RC_CHECK("allgather", rc);
999
1000     out_ref = out;
1001
1002     for(i=0;i<mysize;i++) {
1003         //rank index 
1004         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1005         //copy to the rank index slot
1006         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1007     }
1008
1009     free(out_ptr);
1010     free(in2);
1011
1012 }
1013
1014 static unsigned int get_gni_nic_address(int device_id)
1015 {
1016     unsigned int address, cpu_id;
1017     gni_return_t status;
1018     int i, alps_dev_id=-1,alps_address=-1;
1019     char *token, *p_ptr;
1020
1021     p_ptr = getenv("PMI_GNI_DEV_ID");
1022     if (!p_ptr) {
1023         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1024        
1025         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1026     } else {
1027         while ((token = strtok(p_ptr,":")) != NULL) {
1028             alps_dev_id = atoi(token);
1029             if (alps_dev_id == device_id) {
1030                 break;
1031             }
1032             p_ptr = NULL;
1033         }
1034         CmiAssert(alps_dev_id != -1);
1035         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1036         CmiAssert(p_ptr != NULL);
1037         i = 0;
1038         while ((token = strtok(p_ptr,":")) != NULL) {
1039             if (i == alps_dev_id) {
1040                 alps_address = atoi(token);
1041                 break;
1042             }
1043             p_ptr = NULL;
1044             ++i;
1045         }
1046         CmiAssert(alps_address != -1);
1047         address = alps_address;
1048     }
1049     return address;
1050 }
1051
1052 static uint8_t get_ptag(void)
1053 {
1054     char *p_ptr, *token;
1055     uint8_t ptag;
1056
1057     p_ptr = getenv("PMI_GNI_PTAG");
1058     CmiAssert(p_ptr != NULL);
1059     token = strtok(p_ptr, ":");
1060     ptag = (uint8_t)atoi(token);
1061     return ptag;
1062         
1063 }
1064
1065 static uint32_t get_cookie(void)
1066 {
1067     uint32_t cookie;
1068     char *p_ptr, *token;
1069
1070     p_ptr = getenv("PMI_GNI_COOKIE");
1071     CmiAssert(p_ptr != NULL);
1072     token = strtok(p_ptr, ":");
1073     cookie = (uint32_t)atoi(token);
1074
1075     return cookie;
1076 }
1077
1078 #if LARGEPAGE
1079
1080 /* directly mmap memory from hugetlbfs for large pages */
1081
1082 #include <sys/stat.h>
1083 #include <fcntl.h>
1084 #include <sys/mman.h>
1085 #include <hugetlbfs.h>
1086
1087 // size must be _tlbpagesize aligned
1088 void *my_get_huge_pages(size_t size)
1089 {
1090     char filename[512];
1091     int fd;
1092     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1093     void *ptr = NULL;
1094
1095     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1096     fd = open(filename, O_RDWR | O_CREAT, mode);
1097     if (fd == -1) {
1098         CmiAbort("my_get_huge_pages: open filed");
1099     }
1100     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1101     if (ptr == MAP_FAILED) ptr = NULL;
1102 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1103     close(fd);
1104     unlink(filename);
1105     return ptr;
1106 }
1107
1108 void my_free_huge_pages(void *ptr, int size)
1109 {
1110 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1111     int ret = munmap(ptr, size);
1112     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1113 }
1114
1115 #endif
1116
1117 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1118 /* TODO: add any that are related */
1119 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1120
1121
1122 #include "machine-lrts.h"
1123 #include "machine-common-core.c"
1124
1125
1126 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1127 static void SendRdmaMsg(PCQueue );
1128 static void PumpNetworkSmsg();
1129 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1130 #if CQWRITE
1131 static void PumpCqWriteTransactions();
1132 #endif
1133 #if REMOTE_EVENT
1134 static void PumpRemoteTransactions(gni_cq_handle_t);
1135 #endif
1136
1137 #if MACHINE_DEBUG_LOG
1138 FILE *debugLog = NULL;
1139 static CmiInt8 buffered_recv_msg = 0;
1140 int         lrts_smsg_success = 0;
1141 int         lrts_received_msg = 0;
1142 #endif
1143
1144 static void sweep_mempool(mempool_type *mptr)
1145 {
1146     int n = 0;
1147     block_header *current = &(mptr->block_head);
1148
1149     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1150     while( current!= NULL) {
1151         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1152         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1153     }
1154     printf("[n %d] sweep_mempool slot END.\n", myrank);
1155 }
1156
1157 inline
1158 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1159 {
1160     block_header *current = *from;
1161
1162     //while(register_memory_size>= MAX_REG_MEM)
1163     //{
1164         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1165             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1166
1167         *from = current;
1168         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1169         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1170         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1171     //}
1172     return GNI_RC_SUCCESS;
1173 }
1174
1175 inline 
1176 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1177 {
1178     gni_return_t status = GNI_RC_SUCCESS;
1179     //int size = GetMempoolsize(msg);
1180     //void *blockaddr = GetMempoolBlockPtr(msg);
1181     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1182    
1183     block_header *current = &(mptr->block_head);
1184     while(register_memory_size>= MAX_REG_MEM)
1185     {
1186         status = deregisterMemory(mptr, &current);
1187         if (status != GNI_RC_SUCCESS) break;
1188     }
1189     if(register_memory_size>= MAX_REG_MEM) return status;
1190
1191     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1192     while(1)
1193     {
1194         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1195         if(status == GNI_RC_SUCCESS)
1196         {
1197             break;
1198         }
1199         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1200         {
1201             GNI_RC_CHECK("registerFromMempool", status);
1202         }
1203         else
1204         {
1205             status = deregisterMemory(mptr, &current);
1206             if (status != GNI_RC_SUCCESS) break;
1207         }
1208     }; 
1209     return status;
1210 }
1211
1212 inline 
1213 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1214 {
1215     static int rank = -1;
1216     int i;
1217     gni_return_t status;
1218     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1219     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1220     mempool_type *mptr;
1221
1222     status = registerFromMempool(mptr1, msg, size, t, cqh);
1223     if (status == GNI_RC_SUCCESS) return status;
1224 #if CMK_SMP 
1225     for (i=0; i<CmiMyNodeSize()+1; i++) {
1226       rank = (rank+1)%(CmiMyNodeSize()+1);
1227       mptr = CpvAccessOther(mempool, rank);
1228       if (mptr == mptr1) continue;
1229       status = registerFromMempool(mptr, msg, size, t, cqh);
1230       if (status == GNI_RC_SUCCESS) return status;
1231     }
1232 #endif
1233     return  GNI_RC_ERROR_RESOURCE;
1234 }
1235
1236 inline
1237 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1238 {
1239     MSG_LIST        *msg_tmp;
1240     MallocMsgList(msg_tmp);
1241     msg_tmp->destNode = destNode;
1242     msg_tmp->size   = size;
1243     msg_tmp->msg    = msg;
1244     msg_tmp->tag    = tag;
1245 #if CMK_WITH_STATS
1246     SMSG_CREATION(msg_tmp)
1247 #endif
1248
1249 #if ONE_SEND_QUEUE
1250     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1251 #else
1252 #if SMP_LOCKS
1253     CmiLock(queue->smsg_msglist_index[destNode].lock);
1254     if(queue->smsg_msglist_index[destNode].pushed == 0)
1255     {
1256         PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1257     }
1258     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1259     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1260 #else
1261     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1262 #endif
1263 #endif
1264
1265 #if PRINT_SYH
1266     buffered_smsg_counter++;
1267 #endif
1268 }
1269
1270 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1271 {
1272     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1273 }
1274
1275 inline
1276 static void setup_smsg_connection(int destNode)
1277 {
1278     mdh_addr_list_t  *new_entry = 0;
1279     gni_post_descriptor_t *pd;
1280     gni_smsg_attr_t      *smsg_attr;
1281     gni_return_t status = GNI_RC_NOT_DONE;
1282     RDMA_REQUEST        *rdma_request_msg;
1283     
1284     if(smsg_available_slot == smsg_expand_slots)
1285     {
1286         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1287         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1288         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1289
1290         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1291             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1292             GNI_MEM_READWRITE,   
1293             -1,
1294             &(new_entry->mdh));
1295         smsg_available_slot = 0; 
1296         new_entry->next = smsg_dynamic_list;
1297         smsg_dynamic_list = new_entry;
1298     }
1299     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1300     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1301     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1302     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1303     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1304     smsg_attr->buff_size = smsg_memlen;
1305     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1306     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1307     smsg_local_attr_vec[destNode] = smsg_attr;
1308     smsg_available_slot++;
1309     MallocPostDesc(pd);
1310     pd->type            = GNI_POST_FMA_PUT;
1311     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1312     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1313     pd->length          = sizeof(gni_smsg_attr_t);
1314     pd->local_addr      = (uint64_t) smsg_attr;
1315     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1316     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1317     pd->src_cq_hndl     = 0;
1318
1319     pd->rdma_mode       = 0;
1320     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1321     print_smsg_attr(smsg_attr);
1322     if(status == GNI_RC_ERROR_RESOURCE )
1323     {
1324         MallocRdmaRequest(rdma_request_msg);
1325         rdma_request_msg->destNode = destNode;
1326         rdma_request_msg->pd = pd;
1327         /* buffer this request */
1328     }
1329 #if PRINT_SYH
1330     if(status != GNI_RC_SUCCESS)
1331        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1332     else
1333         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1334 #endif
1335 }
1336
1337 /* useDynamicSMSG */
1338 inline 
1339 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1340 {
1341     gni_return_t status = GNI_RC_NOT_DONE;
1342
1343     if(mailbox_list->offset == mailbox_list->size)
1344     {
1345         dynamic_smsg_mailbox_t *new_mailbox_entry;
1346         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1347         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1348         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1349         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1350         new_mailbox_entry->offset = 0;
1351         
1352         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1353             new_mailbox_entry->size, smsg_rx_cqh,
1354             GNI_MEM_READWRITE,   
1355             -1,
1356             &(new_mailbox_entry->mem_hndl));
1357
1358         GNI_RC_CHECK("register", status);
1359         new_mailbox_entry->next = mailbox_list;
1360         mailbox_list = new_mailbox_entry;
1361     }
1362     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1363     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1364     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1365     local_smsg_attr->mbox_offset = mailbox_list->offset;
1366     mailbox_list->offset += smsg_memlen;
1367     local_smsg_attr->buff_size = smsg_memlen;
1368     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1369     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1370 }
1371
1372 /* useDynamicSMSG */
1373 inline 
1374 static int connect_to(int destNode)
1375 {
1376     gni_return_t status = GNI_RC_NOT_DONE;
1377     CmiAssert(smsg_connected_flag[destNode] == 0);
1378     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1379     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1380     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1381     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1382     
1383     CMI_GNI_LOCK(global_gni_lock)
1384     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1385     CMI_GNI_UNLOCK(global_gni_lock)
1386     if (status == GNI_RC_ERROR_RESOURCE) {
1387       /* possibly destNode is making connection at the same time */
1388       free(smsg_attr_vector_local[destNode]);
1389       smsg_attr_vector_local[destNode] = NULL;
1390       free(smsg_attr_vector_remote[destNode]);
1391       smsg_attr_vector_remote[destNode] = NULL;
1392       mailbox_list->offset -= smsg_memlen;
1393 #if PRINT_SYH
1394     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1395 #endif
1396       return 0;
1397     }
1398     GNI_RC_CHECK("GNI_Post", status);
1399     smsg_connected_flag[destNode] = 1;
1400 #if PRINT_SYH
1401     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1402 #endif
1403     return 1;
1404 }
1405
1406 inline 
1407 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1408 {
1409     unsigned int          remote_address;
1410     uint32_t              remote_id;
1411     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1412     gni_smsg_attr_t       *smsg_attr;
1413     gni_post_descriptor_t *pd;
1414     gni_post_state_t      post_state;
1415     char                  *real_data; 
1416
1417     if (useDynamicSMSG) {
1418         switch (smsg_connected_flag[destNode]) {
1419         case 0: 
1420             connect_to(destNode);         /* continue to case 1 */
1421         case 1:                           /* pending connection, do nothing */
1422             status = GNI_RC_NOT_DONE;
1423             if(inbuff ==0)
1424                 buffer_small_msgs(queue, msg, size, destNode, tag);
1425             return status;
1426         }
1427     }
1428 #if ! ONE_SEND_QUEUE
1429     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1430 #endif
1431     {
1432         //CMI_GNI_LOCK(smsg_mailbox_lock)
1433         CMI_GNI_LOCK(default_tx_cq_lock)
1434 #if CMK_SMP_TRACE_COMMTHREAD
1435         int oldpe = -1;
1436         int oldeventid = -1;
1437         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
1438         { 
1439             START_EVENT();
1440             if ( tag == SMALL_DATA_TAG)
1441                 real_data = (char*)msg; 
1442             else 
1443                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1444             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1445             TRACE_COMM_SET_COMM_MSGID(real_data);
1446         }
1447 #endif
1448 #if REMOTE_EVENT
1449         if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) {
1450             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1451             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1452             {
1453                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1454                 if (control_msg_tmp->ack_index == -1) {    /* table overflow */
1455                     status = GNI_RC_NOT_DONE;
1456                     if (inbuff ==0)
1457                         buffer_small_msgs(queue, msg, size, destNode, tag);
1458                     return status;
1459                 }
1460             }
1461         }
1462         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1463 #endif
1464 #if     CMK_WITH_STATS
1465         SMSG_TRY_SEND(tag)
1466 #endif
1467 #if CMK_WITH_STATS
1468     double              creation_time;
1469     if (ptr == NULL)
1470         creation_time = CmiWallTimer();
1471     else
1472         creation_time = ptr->creation_time;
1473 #endif
1474
1475     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1476 #if CMK_SMP_TRACE_COMMTHREAD
1477         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1478 #endif
1479         CMI_GNI_UNLOCK(default_tx_cq_lock)
1480         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1481         if(status == GNI_RC_SUCCESS)
1482         {
1483 #if     CMK_WITH_STATS
1484             SMSG_SENT_DONE(creation_time,tag) 
1485 #endif
1486 #if CMK_SMP_TRACE_COMMTHREAD
1487             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
1488             { 
1489                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1490             }
1491 #endif
1492         }else
1493             status = GNI_RC_ERROR_RESOURCE;
1494     }
1495     if(status != GNI_RC_SUCCESS && inbuff ==0)
1496         buffer_small_msgs(queue, msg, size, destNode, tag);
1497     return status;
1498 }
1499
1500 inline 
1501 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1502 {
1503     /* construct a control message and send */
1504     CONTROL_MSG         *control_msg_tmp;
1505     MallocControlMsg(control_msg_tmp);
1506     control_msg_tmp->source_addr = (uint64_t)msg;
1507     control_msg_tmp->seq_id    = seqno;
1508     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1509 #if REMOTE_EVENT
1510     control_msg_tmp->ack_index    =  -1;
1511 #endif
1512 #if     USE_LRTS_MEMPOOL
1513     if(size < BIG_MSG)
1514     {
1515         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1516     }
1517     else
1518     {
1519         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1520         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1521         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1522     }
1523 #else
1524     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1525 #endif
1526     return control_msg_tmp;
1527 }
1528
1529 #define BLOCKING_SEND_CONTROL    0
1530
1531 // Large message, send control to receiver, receiver register memory and do a GET, 
1532 // return 1 - send no success
1533 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr, uint8_t lmsg_tag)
1534 {
1535     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1536     uint32_t            vmdh_index  = -1;
1537     int                 size;
1538     int                 offset = 0;
1539     uint64_t            source_addr;
1540     int                 register_size; 
1541     void                *msg;
1542
1543     size    =   control_msg_tmp->total_length;
1544     source_addr = control_msg_tmp->source_addr;
1545     register_size = control_msg_tmp->length;
1546
1547 #if  USE_LRTS_MEMPOOL
1548     if( control_msg_tmp->seq_id == 0 ){
1549 #if BLOCKING_SEND_CONTROL
1550         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1551             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1552                 LrtsAdvanceCommunication(0);
1553         }
1554 #endif
1555         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1556         {
1557             msg = (void*)source_addr;
1558             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1559             {
1560                 if(!inbuff)
1561                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1562                 return GNI_RC_ERROR_NOMEM;
1563             }
1564             //register the corresponding mempool
1565             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1566             if(status == GNI_RC_SUCCESS)
1567             {
1568                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1569             }
1570         }else
1571         {
1572             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1573             status = GNI_RC_SUCCESS;
1574         }
1575         if(NoMsgInSend(source_addr))
1576             register_size = GetMempoolsize((void*)(source_addr));
1577         else
1578             register_size = 0;
1579     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1580     {
1581         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1582         source_addr += offset;
1583         size = control_msg_tmp->length;
1584 #if BLOCKING_SEND_CONTROL
1585         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1586             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1587                 LrtsAdvanceCommunication(0);
1588         }
1589 #endif
1590         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1591             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1592             {
1593                 if(!inbuff)
1594                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1595                 return GNI_RC_ERROR_NOMEM;
1596             }
1597             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1598             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1599         }
1600         else
1601         {
1602             status = GNI_RC_SUCCESS;
1603         }
1604         register_size = 0;  
1605     }
1606
1607 #if CMI_EXERT_SEND_LARGE_CAP
1608     if(SEND_large_pending >= SEND_large_cap)
1609     {
1610         status = GNI_RC_ERROR_NOMEM;
1611     }
1612 #endif
1613  
1614     if(status == GNI_RC_SUCCESS)
1615     {
1616        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, inbuff, smsg_ptr); 
1617         if(status == GNI_RC_SUCCESS)
1618         {
1619 #if CMI_EXERT_SEND_LARGE_CAP
1620             SEND_large_pending++;
1621 #endif
1622             buffered_send_msg += register_size;
1623             if(control_msg_tmp->seq_id == 0)
1624             {
1625                 IncreaseMsgInSend(source_addr);
1626             }
1627             FreeControlMsg(control_msg_tmp);
1628             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, lmsg_tag); 
1629         }else
1630             status = GNI_RC_ERROR_RESOURCE;
1631
1632     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1633     {
1634         CmiAbort("Memory registor for large msg\n");
1635     }else 
1636     {
1637         status = GNI_RC_ERROR_NOMEM; 
1638         if(!inbuff)
1639             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1640     }
1641     return status;
1642 #else
1643     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1644     if(status == GNI_RC_SUCCESS)
1645     {
1646         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, 0, NULL);  
1647         if(status == GNI_RC_SUCCESS)
1648         {
1649             FreeControlMsg(control_msg_tmp);
1650         }
1651     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1652     {
1653         CmiAbort("Memory registor for large msg\n");
1654     }else 
1655     {
1656         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1657     }
1658     return status;
1659 #endif
1660 }
1661
1662 inline void LrtsPrepareEnvelope(char *msg, int size)
1663 {
1664     CmiSetMsgSize(msg, size);
1665     CMI_SET_CHECKSUM(msg, size);
1666 }
1667
1668 CmiCommHandle LrtsSendFunc(int destPE, int size, char *msg, int mode)
1669 {
1670     int destNode = CmiNodeOf(destPE);
1671     gni_return_t        status  =   GNI_RC_SUCCESS;
1672     uint8_t tag;
1673     CONTROL_MSG         *control_msg_tmp;
1674     int                 oob = ( mode & OUT_OF_BAND);
1675     SMSG_QUEUE          *queue;
1676
1677     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1678 #if CMK_USE_OOB
1679     queue = oob? &smsg_oob_queue : &smsg_queue;
1680     tag = oob? LMSG_OOB_INIT_TAG: LMSG_INIT_TAG;
1681 #else
1682     queue = &smsg_queue;
1683     tag = LMSG_INIT_TAG;
1684 #endif
1685
1686     LrtsPrepareEnvelope(msg, size);
1687
1688 #if PRINT_SYH
1689     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1690 #endif 
1691
1692 #if CMK_SMP 
1693     if(size <= SMSG_MAX_MSG)
1694         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1695     else if (size < BIG_MSG) {
1696         control_msg_tmp =  construct_control_msg(size, msg, 0);
1697         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1698     }
1699     else {
1700         CmiSetMsgSeq(msg, 0);
1701         control_msg_tmp =  construct_control_msg(size, msg, 1);
1702         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1703     }
1704 #else   //non-smp, smp(worker sending)
1705     if(size <= SMSG_MAX_MSG)
1706     {
1707         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1708             CmiFree(msg);
1709     }
1710     else if (size < BIG_MSG) {
1711         control_msg_tmp =  construct_control_msg(size, msg, 0);
1712         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1713     }
1714     else {
1715 #if     USE_LRTS_MEMPOOL
1716         CmiSetMsgSeq(msg, 0);
1717         control_msg_tmp =  construct_control_msg(size, msg, 1);
1718         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1719 #else
1720         control_msg_tmp =  construct_control_msg(size, msg, 0);
1721         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1722 #endif
1723     }
1724 #endif
1725     return 0;
1726 }
1727
1728 #if 0
1729 // this is no different from the common code
1730 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1731 {
1732   int i;
1733 #if CMK_BROADCAST_USE_CMIREFERENCE
1734   for(i=0;i<npes;i++) {
1735     if (pes[i] == CmiMyPe())
1736       CmiSyncSend(pes[i], len, msg);
1737     else {
1738       CmiReference(msg);
1739       CmiSyncSendAndFree(pes[i], len, msg);
1740     }
1741   }
1742 #else
1743   for(i=0;i<npes;i++) {
1744     CmiSyncSend(pes[i], len, msg);
1745   }
1746 #endif
1747 }
1748
1749 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1750 {
1751   /* A better asynchronous implementation may be wanted, but at least it works */
1752   CmiSyncListSendFn(npes, pes, len, msg);
1753   return (CmiCommHandle) 0;
1754 }
1755
1756 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1757 {
1758   if (npes == 1) {
1759       CmiSyncSendAndFree(pes[0], len, msg);
1760       return;
1761   }
1762 #if CMK_PERSISTENT_COMM
1763   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE 
1764 #if CMK_SMP
1765             && IS_PERSISTENT_MEMORY(msg)
1766 #endif
1767      ){
1768       int i;
1769       for(i=0;i<npes;i++) {
1770         if (pes[i] == CmiMyPe())
1771           CmiSyncSend(pes[i], len, msg);
1772         else {
1773           CmiReference(msg);
1774           CmiSyncSendAndFree(pes[i], len, msg);
1775         }
1776       }
1777       CmiFree(msg);
1778       return;
1779   }
1780 #endif
1781   
1782 #if CMK_BROADCAST_USE_CMIREFERENCE
1783   CmiSyncListSendFn(npes, pes, len, msg);
1784   CmiFree(msg);
1785 #else
1786   int i;
1787   for(i=0;i<npes-1;i++) {
1788     CmiSyncSend(pes[i], len, msg);
1789   }
1790   if (npes>0)
1791     CmiSyncSendAndFree(pes[npes-1], len, msg);
1792   else 
1793     CmiFree(msg);
1794 #endif
1795 }
1796 #endif
1797
1798 static void    PumpDatagramConnection();
1799 static      int         event_SetupConnect = 111;
1800 static      int         event_PumpSmsg = 222 ;
1801 static      int         event_PumpTransaction = 333;
1802 static      int         event_PumpRdmaTransaction = 444;
1803 static      int         event_SendBufferSmsg = 484;
1804 static      int         event_SendFmaRdmaMsg = 555;
1805 static      int         event_AdvanceCommunication = 666;
1806
1807 static void registerUserTraceEvents() {
1808 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1809     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1810     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1811     event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1812     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1813     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1814     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1815     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1816 #endif
1817 }
1818
1819 static void ProcessDeadlock()
1820 {
1821     static CmiUInt8 *ptr = NULL;
1822     static CmiUInt8  last = 0, mysum, sum;
1823     static int count = 0;
1824     gni_return_t status;
1825     int i;
1826
1827 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1828 //sweep_mempool(CpvAccess(mempool));
1829     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1830     mysum = smsg_send_count + smsg_recv_count;
1831     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1832     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1833     GNI_RC_CHECK("PMI_Allgather", status);
1834     sum = 0;
1835     for (i=0; i<mysize; i++)  sum+= ptr[i];
1836     if (last == 0 || sum == last) 
1837         count++;
1838     else
1839         count = 0;
1840     last = sum;
1841     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1842     if (count == 2) { 
1843         /* detected twice, it is a real deadlock */
1844         if (myrank == 0)  {
1845             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1846             CmiAbort("Fatal> Deadlock detected.");
1847         }
1848
1849     }
1850     _detected_hang = 0;
1851 }
1852
1853 static void CheckProgress()
1854 {
1855     if (smsg_send_count == last_smsg_send_count &&
1856         smsg_recv_count == last_smsg_recv_count ) 
1857     {
1858         _detected_hang = 1;
1859 #if !CMK_SMP
1860         if (_detected_hang) ProcessDeadlock();
1861 #endif
1862
1863     }
1864     else {
1865         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1866         last_smsg_send_count = smsg_send_count;
1867         last_smsg_recv_count = smsg_recv_count;
1868         _detected_hang = 0;
1869     }
1870 }
1871
1872 static void set_limit()
1873 {
1874     //if (!user_set_flag && CmiMyRank() == 0) {
1875     if (CmiMyRank() == 0) {
1876         int mynode = CmiPhysicalNodeID(CmiMyPe());
1877         int numpes = CmiNumPesOnPhysicalNode(mynode);
1878         int numprocesses = numpes / CmiMyNodeSize();
1879         MAX_REG_MEM  = _totalmem / numprocesses;
1880         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1881         if (CmiMyPe() == 0)
1882            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1883         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1884         {
1885              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1886              CmiAbort("memory registration\n");
1887         }
1888     }
1889 }
1890
1891 void LrtsPostCommonInit(int everReturn)
1892 {
1893 #if CMK_DIRECT
1894     CmiDirectInit();
1895 #endif
1896 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1897     CpvInitialize(double, projTraceStart);
1898     /* only PE 0 needs to care about registration (to generate sts file). */
1899     //if (CmiMyPe() == 0) 
1900     {
1901         registerMachineUserEventsFunction(&registerUserTraceEvents);
1902     }
1903 #endif
1904
1905 #if CMK_SMP
1906     CmiIdleState *s=CmiNotifyGetState();
1907     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1908     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1909 #else
1910     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1911     if (useDynamicSMSG)
1912     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1913 #endif
1914
1915 #if ! LARGEPAGE
1916     if (_checkProgress)
1917 #if CMK_SMP
1918     if (CmiMyRank() == 0)
1919 #endif
1920     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1921 #endif
1922  
1923 #if !LARGEPAGE
1924     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1925 #endif
1926 }
1927
1928 /* this is called by worker thread */
1929 void LrtsPostNonLocal()
1930 {
1931 #if 1
1932
1933 #if CMK_SMP_TRACE_COMMTHREAD
1934     double startT, endT;
1935 #endif
1936
1937 #if MULTI_THREAD_SEND
1938     if(mysize == 1) return;
1939
1940     if (CmiMyRank() % 6 != 3) return;
1941
1942 #if CMK_SMP_TRACE_COMMTHREAD
1943     traceEndIdle();
1944     startT = CmiWallTimer();
1945 #endif
1946
1947     CmiMachineProgressImpl();
1948
1949 #if CMK_SMP_TRACE_COMMTHREAD
1950     endT = CmiWallTimer();
1951     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1952     traceBeginIdle();
1953 #endif
1954
1955 #endif
1956 #endif
1957 }
1958
1959 /* Network progress function is used to poll the network when for
1960    messages. This flushes receive buffers on some  implementations*/
1961 #if CMK_MACHINE_PROGRESS_DEFINED
1962 void CmiMachineProgressImpl() {
1963 #if ! CMK_SMP || MULTI_THREAD_SEND
1964
1965     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
1966     SEND_OOB_SMSG(smsg_oob_queue)
1967     PUMP_REMOTE_HIGHPRIORITY
1968     PUMP_LOCAL_HIGHPRIORITY
1969     POST_HIGHPRIORITY_RDMA
1970
1971 #if 0
1972 #if CMK_WORKER_SINGLE_TASK
1973     if (CmiMyRank() % 6 == 0)
1974 #endif
1975     PumpNetworkSmsg();
1976
1977 #if CMK_WORKER_SINGLE_TASK
1978     if (CmiMyRank() % 6 == 1)
1979 #endif
1980     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1981
1982 #if CMK_WORKER_SINGLE_TASK
1983     if (CmiMyRank() % 6 == 2)
1984 #endif
1985     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1986
1987 #if REMOTE_EVENT
1988 #if CMK_WORKER_SINGLE_TASK
1989     if (CmiMyRank() % 6 == 3)
1990 #endif
1991     PumpRemoteTransactions(rdma_rx_cqh);         // rdma_rx_cqh
1992 #endif
1993
1994 #if CMK_WORKER_SINGLE_TASK
1995     if (CmiMyRank() % 6 == 4)
1996 #endif
1997     {
1998 #if CMK_USE_OOB
1999     SendBufferMsg(&smsg_oob_queue, NULL);
2000     SendBufferMsg(&smsg_queue, &smsg_oob_queue);
2001 #else
2002     SendBufferMsg(&smsg_queue, NULL);
2003 #endif
2004     }
2005
2006 #if CMK_WORKER_SINGLE_TASK
2007     if (CmiMyRank() % 6 == 5)
2008 #endif
2009 #if CMK_SMP
2010     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
2011 #else
2012     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
2013 #endif
2014
2015 #endif
2016 #endif
2017 }
2018 #endif
2019
2020
2021 /* useDynamicSMSG */
2022 static void    PumpDatagramConnection()
2023 {
2024     uint32_t          remote_address;
2025     uint32_t          remote_id;
2026     gni_return_t status;
2027     gni_post_state_t  post_state;
2028     uint64_t          datagram_id;
2029     int i;
2030
2031    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2032    {
2033        if (datagram_id >= mysize) {           /* bound endpoint */
2034            int pe = datagram_id - mysize;
2035            CMI_GNI_LOCK(global_gni_lock)
2036            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2037            CMI_GNI_UNLOCK(global_gni_lock)
2038            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2039            {
2040                CmiAssert(remote_id == pe);
2041                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2042                GNI_RC_CHECK("Dynamic SMSG Init", status);
2043 #if PRINT_SYH
2044                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2045 #endif
2046                CmiAssert(smsg_connected_flag[pe] == 1);
2047                smsg_connected_flag[pe] = 2;
2048            }
2049        }
2050        else {         /* unbound ep */
2051            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2052            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2053            {
2054                CmiAssert(remote_id<mysize);
2055                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2056                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2057                GNI_RC_CHECK("Dynamic SMSG Init", status);
2058 #if PRINT_SYH
2059                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2060 #endif
2061                smsg_connected_flag[remote_id] = 2;
2062
2063                alloc_smsg_attr(&send_smsg_attr);
2064                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2065                GNI_RC_CHECK("post unbound datagram", status);
2066            }
2067        }
2068    }
2069 }
2070
2071 /* pooling CQ to receive network message */
2072 static void PumpNetworkRdmaMsgs()
2073 {
2074     gni_cq_entry_t      event_data;
2075     gni_return_t        status;
2076
2077 }
2078
2079 inline 
2080 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2081 {
2082     RDMA_REQUEST        *rdma_request_msg;
2083     MallocRdmaRequest(rdma_request_msg);
2084     rdma_request_msg->destNode = inst_id;
2085     rdma_request_msg->pd = pd;
2086 #if REMOTE_EVENT
2087     rdma_request_msg->ack_index = ack_index;
2088 #endif
2089     PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2090 }
2091
2092 static void getLargeMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2093
2094 static void PumpNetworkSmsg()
2095 {
2096     uint64_t            inst_id;
2097     gni_cq_entry_t      event_data;
2098     gni_return_t        status;
2099     void                *header;
2100     uint8_t             msg_tag;
2101     int                 msg_nbytes;
2102     void                *msg_data;
2103     gni_mem_handle_t    msg_mem_hndl;
2104     gni_smsg_attr_t     *smsg_attr;
2105     gni_smsg_attr_t     *remote_smsg_attr;
2106     int                 init_flag;
2107     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2108     uint64_t            source_addr;
2109     SMSG_QUEUE         *queue = &smsg_queue;
2110     PCQueue             tmp_queue;
2111 #if  CMK_DIRECT
2112     cmidirectMsg        *direct_msg;
2113 #endif
2114 #if CMI_PUMPNETWORKSMSG_CAP 
2115     int                  recv_cnt = 0;
2116     while(recv_cnt< PumpNetworkSmsg_cap) {
2117 #else
2118     while(1) {
2119 #endif
2120         CMI_GNI_LOCK(smsg_rx_cq_lock)
2121         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2122         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2123         if(status != GNI_RC_SUCCESS) break;
2124
2125         inst_id = GNI_CQ_GET_INST_ID(event_data);
2126 #if REMOTE_EVENT
2127         inst_id = GET_RANK(inst_id);      /* important */
2128 #endif
2129         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2130 #if PRINT_SYH
2131         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2132 #endif
2133         if (useDynamicSMSG) {
2134             /* subtle: smsg may come before connection is setup */
2135             while (smsg_connected_flag[inst_id] != 2) 
2136                PumpDatagramConnection();
2137         }
2138         msg_tag = GNI_SMSG_ANY_TAG;
2139         while(1) {
2140             CMI_GNI_LOCK(smsg_mailbox_lock)
2141             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2142             if (status != GNI_RC_SUCCESS)
2143             {
2144                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2145                 break;
2146             }
2147 #if         CMI_PUMPNETWORKSMSG_CAP
2148             recv_cnt++; 
2149 #endif
2150 #if PRINT_SYH
2151             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2152 #endif
2153             /* copy msg out and then put into queue (small message) */
2154             switch (msg_tag) {
2155             case SMALL_DATA_TAG:
2156             {
2157                 START_EVENT();
2158                 msg_nbytes = CmiGetMsgSize(header);
2159                 msg_data    = CmiAlloc(msg_nbytes);
2160                 memcpy(msg_data, (char*)header, msg_nbytes);
2161                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2162                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2163                 TRACE_COMM_CREATION(EVENT_TIME(), msg_data);
2164                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2165                 handleOneRecvedMsg(msg_nbytes, msg_data);
2166                 break;
2167             }
2168             case LMSG_INIT_TAG:
2169             case LMSG_OOB_INIT_TAG:
2170             {
2171                 tmp_queue = (msg_tag == LMSG_INIT_TAG)? sendRdmaBuf : sendHighPriorBuf; 
2172 #if MULTI_THREAD_SEND
2173                 MallocControlMsg(control_msg_tmp);
2174                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2175                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2176                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2177                 getLargeMsgRequest(control_msg_tmp, inst_id, msg_tag, tmp_queue);
2178                 FreeControlMsg(control_msg_tmp);
2179 #else
2180                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2181                 getLargeMsgRequest(header, inst_id, msg_tag, tmp_queue);
2182                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2183 #endif
2184                 break;
2185             }
2186 #if !REMOTE_EVENT && !CQWRITE
2187             case ACK_TAG:   //msg fit into mempool
2188             {
2189                 /* Get is done, release message . Now put is not used yet*/
2190                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2191                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2192                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2193 #if ! USE_LRTS_MEMPOOL
2194                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2195 #else
2196                 DecreaseMsgInSend(msg);
2197 #endif
2198                 if(NoMsgInSend(msg))
2199                     buffered_send_msg -= GetMempoolsize(msg);
2200                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2201                 CmiFree(msg);
2202 #if CMI_EXERT_SEND_LARGE_CAP
2203                 SEND_large_pending--;
2204 #endif
2205                 break;
2206             }
2207 #endif
2208             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2209             {
2210 #if MULTI_THREAD_SEND
2211                 MallocControlMsg(header_tmp);
2212                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2213                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2214 #else
2215                 header_tmp = (CONTROL_MSG *) header;
2216 #endif
2217                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2218 #if CMI_EXERT_SEND_LARGE_CAP
2219                 SEND_large_pending--;
2220 #endif
2221                 void *msg = (void*)(header_tmp->source_addr);
2222                 int cur_seq = CmiGetMsgSeq(msg);
2223                 int offset = ONE_SEG*(cur_seq+1);
2224                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2225                 buffered_send_msg -= header_tmp->length;
2226                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2227                 if (remain_size < 0) remain_size = 0;
2228                 CmiSetMsgSize(msg, remain_size);
2229                 if(remain_size <= 0) //transaction done
2230                 {
2231                     CmiFree(msg);
2232                 }else if (header_tmp->total_length > offset)
2233                 {
2234                     CmiSetMsgSeq(msg, cur_seq+1);
2235                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2236                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2237                     //send next seg
2238                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2239                          // pipelining
2240                     if (header_tmp->seq_id == 1) {
2241                       int i;
2242                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2243                         int seq = cur_seq+i+2;
2244                         CmiSetMsgSeq(msg, seq-1);
2245                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2246                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2247                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2248                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2249                       }
2250                     }
2251                 }
2252 #if MULTI_THREAD_SEND
2253                 FreeControlMsg(header_tmp);
2254 #else
2255                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2256 #endif
2257                 break;
2258             }
2259 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2260             case PUT_DONE_TAG:  {   //persistent message
2261                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2262                 int size = ((CONTROL_MSG *) header)->length;
2263                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2264                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2265                 CmiReference(msg);
2266                 CMI_CHECK_CHECKSUM(msg, size);
2267                 handleOneRecvedMsg(size, msg); 
2268 #if PRINT_SYH
2269                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2270 #endif
2271                 break;
2272             }
2273 #endif
2274 #if CMK_DIRECT
2275             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2276                 //create a trigger message
2277                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2278                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2279                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2280                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2281                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2282                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2283                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2284                 break;
2285 #endif
2286             default:
2287                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2288                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2289                 printf("weird tag problem\n");
2290                 CmiAbort("Unknown tag\n");
2291             }               // end switch
2292 #if PRINT_SYH
2293             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2294 #endif
2295             smsg_recv_count ++;
2296             msg_tag = GNI_SMSG_ANY_TAG;
2297         } //endwhile GNI_SmsgGetNextWTag
2298     }   //end while GetEvent
2299     if(status == GNI_RC_ERROR_RESOURCE)
2300     {
2301         printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
2302         GNI_RC_CHECK("Smsg_rx_cq full", status);
2303     }
2304 }
2305
2306 static void printDesc(gni_post_descriptor_t *pd)
2307 {
2308     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2309 }
2310
2311 #if CQWRITE
2312 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2313 {
2314     gni_post_descriptor_t *pd;
2315     gni_return_t        status = GNI_RC_SUCCESS;
2316     
2317     MallocPostDesc(pd);
2318     pd->type = GNI_POST_CQWRITE;
2319     pd->cq_mode = GNI_CQMODE_SILENT;
2320     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2321     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2322     pd->cqwrite_value = data;
2323     pd->remote_mem_hndl = mem_hndl;
2324     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2325     GNI_RC_CHECK("GNI_PostCqWrite", status);
2326 }
2327 #endif
2328
2329 // register memory for a message
2330 // return mem handle
2331 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2332 {
2333     gni_return_t status = GNI_RC_SUCCESS;
2334
2335     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2336
2337 #if CMK_PERSISTENT_COMM
2338       // persistent message is always registered
2339       // BIG_MSG small pieces do not have malloc chunk header
2340     if (IS_PERSISTENT_MEMORY(msg)) {
2341         *memh = GetMemHndl(msg);
2342         return GNI_RC_SUCCESS;
2343     }
2344 #endif
2345     if(seqno == 0 
2346 #if CMK_PERSISTENT_COMM
2347          || seqno == PERSIST_SEQ
2348 #endif
2349       )
2350     {
2351         if(IsMemHndlZero((GetMemHndl(msg))))
2352         {
2353             msg = (void*)(msg);
2354             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2355             if(status == GNI_RC_SUCCESS)
2356                 *memh = GetMemHndl(msg);
2357         }
2358         else {
2359             *memh = GetMemHndl(msg);
2360         }
2361     }
2362     else {
2363         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2364         status = registerMemory(msg, size, memh, NULL); 
2365     }
2366     return status;
2367 }
2368
2369 // for BIG_MSG called on receiver side for receiving control message
2370 // LMSG_INIT_TAG
2371 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2372 {
2373 #if     USE_LRTS_MEMPOOL
2374     CONTROL_MSG         *request_msg;
2375     gni_return_t        status = GNI_RC_SUCCESS;
2376     void                *msg_data;
2377     gni_post_descriptor_t *pd;
2378     gni_mem_handle_t    msg_mem_hndl;
2379     int                 size, transaction_size, offset = 0;
2380     size_t              register_size = 0;
2381
2382     // initial a get to transfer data from the sender side */
2383     request_msg = (CONTROL_MSG *) header;
2384     size = request_msg->total_length;
2385     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2386     MallocPostDesc(pd);
2387 #if CMK_WITH_STATS 
2388     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2389 #endif
2390     if(request_msg->seq_id < 2)   {
2391 #if CMK_SMP_TRACE_COMMTHREAD 
2392         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2393 #endif
2394         msg_data = CmiAlloc(size);
2395         CmiSetMsgSeq(msg_data, 0);
2396         _MEMCHECK(msg_data);
2397     }
2398     else {
2399         offset = ONE_SEG*(request_msg->seq_id-1);
2400         msg_data = (char*)request_msg->dest_addr + offset;
2401     }
2402    
2403     pd->cqwrite_value = request_msg->seq_id;
2404
2405     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2406     SetMemHndlZero(pd->local_mem_hndl);
2407     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2408     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2409         if(NoMsgInRecv( (void*)(msg_data)))
2410             register_size = GetMempoolsize((void*)(msg_data));
2411     }
2412
2413     pd->first_operand = ALIGN64(size);                   //  total length
2414
2415     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2416         pd->type            = GNI_POST_FMA_GET;
2417     else
2418         pd->type            = GNI_POST_RDMA_GET;
2419     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2420     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2421     pd->length          = transaction_size;
2422     pd->local_addr      = (uint64_t) msg_data;
2423     pd->remote_addr     = request_msg->source_addr + offset;
2424     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2425
2426     if (tag == LMSG_OOB_INIT_TAG) 
2427         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2428     else
2429     {
2430 #if MULTI_THREAD_SEND
2431         pd->src_cq_hndl     = rdma_tx_cqh;
2432 #else
2433         pd->src_cq_hndl     = 0;
2434 #endif
2435     }
2436
2437     pd->rdma_mode       = 0;
2438     pd->amo_cmd         = 0;
2439 #if CMI_EXERT_RECV_RDMA_CAP
2440     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2441 #endif
2442     //memory registration success
2443     if(status == GNI_RC_SUCCESS && tag == LMSG_OOB_INIT_TAG )
2444     {
2445         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2446         CMI_GNI_LOCK(lock)
2447 #if REMOTE_EVENT
2448         if( request_msg->seq_id == 0)
2449         {
2450             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2451             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2452             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2453         }
2454 #endif
2455
2456 #if CMK_WITH_STATS
2457         RDMA_TRY_SEND(pd->type)
2458 #endif
2459         if(pd->type == GNI_POST_RDMA_GET) 
2460         {
2461             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2462         }
2463         else
2464         {
2465             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2466         }
2467         CMI_GNI_UNLOCK(lock)
2468
2469         if(status == GNI_RC_SUCCESS )
2470         {
2471 #if CMI_EXERT_RECV_RDMA_CAP
2472             RDMA_pending++;
2473 #endif
2474             if(pd->cqwrite_value == 0)
2475             {
2476 #if MACHINE_DEBUG_LOG
2477                 buffered_recv_msg += register_size;
2478                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2479 #endif
2480                 IncreaseMsgInRecv(msg_data);
2481 #if CMK_SMP_TRACE_COMMTHREAD 
2482                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2483 #endif
2484             }
2485 #if  CMK_WITH_STATS
2486             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2487             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2488 #endif
2489         }
2490     }else if (status != GNI_RC_SUCCESS)
2491     {
2492         SetMemHndlZero((pd->local_mem_hndl));
2493     }
2494         if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM || tag != LMSG_OOB_INIT_TAG)
2495     {
2496 #if REMOTE_EVENT
2497         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2498 #else
2499         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2500 #endif
2501     }else if (status != GNI_RC_SUCCESS) {
2502         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2503         GNI_RC_CHECK("GetLargeAFter posting", status);
2504     }
2505 #else
2506     CONTROL_MSG         *request_msg;
2507     gni_return_t        status;
2508     void                *msg_data;
2509     gni_post_descriptor_t *pd;
2510     RDMA_REQUEST        *rdma_request_msg;
2511     gni_mem_handle_t    msg_mem_hndl;
2512     //int source;
2513     // initial a get to transfer data from the sender side */
2514     request_msg = (CONTROL_MSG *) header;
2515     msg_data = CmiAlloc(request_msg->length);
2516     _MEMCHECK(msg_data);
2517
2518     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2519
2520     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2521     {
2522         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2523     }
2524
2525     MallocPostDesc(pd);
2526     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2527         pd->type            = GNI_POST_FMA_GET;
2528     else
2529         pd->type            = GNI_POST_RDMA_GET;
2530     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2531     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2532     pd->length          = ALIGN64(request_msg->length);
2533     pd->local_addr      = (uint64_t) msg_data;
2534     pd->remote_addr     = request_msg->source_addr;
2535     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2536     if (tag == LMSG_OOB_INIT_TAG) 
2537         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2538     else
2539     {
2540 #if MULTI_THREAD_SEND
2541         pd->src_cq_hndl     = rdma_tx_cqh;
2542 #else
2543         pd->src_cq_hndl     = 0;
2544 #endif
2545     }
2546     pd->rdma_mode       = 0;
2547     pd->amo_cmd         = 0;
2548
2549     //memory registration successful
2550     if(status == GNI_RC_SUCCESS)
2551     {
2552         pd->local_mem_hndl  = msg_mem_hndl;
2553        
2554         if(pd->type == GNI_POST_RDMA_GET) 
2555         {
2556             CMI_GNI_LOCK(rdma_tx_cq_lock)
2557             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2558             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2559         }
2560         else
2561         {
2562             CMI_GNI_LOCK(default_tx_cq_lock)
2563             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2564             CMI_GNI_UNLOCK(default_tx_cq_lock)
2565         }
2566
2567     }else
2568     {
2569         SetMemHndlZero(pd->local_mem_hndl);
2570     }
2571     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2572     {
2573         MallocRdmaRequest(rdma_request_msg);
2574         rdma_request_msg->next = 0;
2575         rdma_request_msg->destNode = inst_id;
2576         rdma_request_msg->pd = pd;
2577         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2578     }else {
2579         GNI_RC_CHECK("AFter posting", status);
2580     }
2581 #endif
2582 }
2583
2584 #if CQWRITE
2585 static void PumpCqWriteTransactions()
2586 {
2587
2588     gni_cq_entry_t          ev;
2589     gni_return_t            status;
2590     void                    *msg;  
2591     int                     msg_size;
2592     while(1) {
2593         //CMI_GNI_LOCK(my_cq_lock) 
2594         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2595         //CMI_GNI_UNLOCK(my_cq_lock)
2596         if(status != GNI_RC_SUCCESS) break;
2597         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2598 #if CMK_PERSISTENT_COMM
2599 #if PRINT_SYH
2600         printf(" %d CQ write event %p\n", myrank, msg);
2601 #endif
2602         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2603 #if PRINT_SYH
2604             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2605 #endif
2606             CmiReference(msg);
2607             msg_size = CmiGetMsgSize(msg);
2608             CMI_CHECK_CHECKSUM(msg, msg_size);
2609             handleOneRecvedMsg(msg_size, msg); 
2610             continue;
2611         }
2612 #endif
2613 #if ! USE_LRTS_MEMPOOL
2614        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2615 #else
2616         DecreaseMsgInSend(msg);
2617 #endif
2618         if(NoMsgInSend(msg))
2619             buffered_send_msg -= GetMempoolsize(msg);
2620         CmiFree(msg);
2621     };
2622     if(status == GNI_RC_ERROR_RESOURCE)
2623     {
2624         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2625     }
2626 }
2627 #endif
2628
2629 #if REMOTE_EVENT
2630 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2631 {
2632     gni_cq_entry_t          ev;
2633     gni_return_t            status;
2634     void                    *msg;   
2635     int                     inst_id, index, type, size;
2636
2637 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2638     int                     pump_count = 0;
2639 #endif
2640     while(1) {
2641 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2642         if (pump_count > PumpRemoteTransactions_cap) break;
2643 #endif
2644         CMI_GNI_LOCK(global_gni_lock)
2645 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
2646         status = GNI_CqGetEvent(rx_cqh, &ev);
2647 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2648         CMI_GNI_UNLOCK(global_gni_lock)
2649
2650         if(status != GNI_RC_SUCCESS) break;
2651
2652 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2653         pump_count ++;
2654 #endif
2655
2656         inst_id = GNI_CQ_GET_INST_ID(ev);
2657         index = GET_INDEX(inst_id);
2658         type = GET_TYPE(inst_id);
2659         switch (type) {
2660         case 0:    // ACK
2661             CmiAssert(index>=0 && index<ackPool.size);
2662             CMI_GNI_LOCK(ackPool.lock);
2663             CmiAssert(GetIndexType(ackPool, index) == 1);
2664             msg = GetIndexAddress(ackPool, index);
2665             CMI_GNI_UNLOCK(ackPool.lock);
2666 #if PRINT_SYH
2667             printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2668 #endif
2669 #if ! USE_LRTS_MEMPOOL
2670            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2671 #else
2672             DecreaseMsgInSend(msg);
2673 #endif
2674             if(NoMsgInSend(msg))
2675                 buffered_send_msg -= GetMempoolsize(msg);
2676             CmiFree(msg);
2677             IndexPool_freeslot(&ackPool, index);
2678 #if CMI_EXERT_SEND_LARGE_CAP
2679             SEND_large_pending--;
2680 #endif
2681             break;
2682 #if CMK_PERSISTENT_COMM
2683         case 1:  {    // PERSISTENT
2684             CmiLock(persistPool.lock);
2685             CmiAssert(GetIndexType(persistPool, index) == 2);
2686             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2687             CmiUnlock(persistPool.lock);
2688             START_EVENT();
2689             msg = slot->destBuf[slot->addrIndex].destAddress;
2690             size = CmiGetMsgSize(msg);
2691             CmiReference(msg);
2692             CMI_CHECK_CHECKSUM(msg, size);
2693             TRACE_COMM_CREATION(EVENT_TIME(), msg);
2694             handleOneRecvedMsg(size, msg); 
2695             break;
2696             }
2697 #endif
2698         default:
2699             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2700             CmiAbort("PumpRemoteTransactions: unknown type");
2701         }
2702     }
2703     if(status == GNI_RC_ERROR_RESOURCE)
2704     {
2705         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2706     }
2707 }
2708 #endif
2709
2710 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2711 {
2712     gni_cq_entry_t          ev;
2713     gni_return_t            status;
2714     uint64_t                type, inst_id;
2715     gni_post_descriptor_t   *tmp_pd;
2716     MSG_LIST                *ptr;
2717     CONTROL_MSG             *ack_msg_tmp;
2718     ACK_MSG                 *ack_msg;
2719     uint8_t                 msg_tag;
2720 #if CMK_DIRECT
2721     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2722 #endif
2723     SMSG_QUEUE         *queue = &smsg_queue;
2724 #if CMI_PUMPLOCALTRANSACTIONS_CAP
2725     int         pump_count = 0;
2726     while(pump_count < PumpLocalTransactions_cap) {
2727         pump_count++;
2728 #else
2729     while(1) {
2730 #endif
2731         CMI_GNI_LOCK(my_cq_lock) 
2732         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2733         CMI_GNI_UNLOCK(my_cq_lock)
2734         if(status != GNI_RC_SUCCESS) break;
2735         
2736         type = GNI_CQ_GET_TYPE(ev);
2737         if (type == GNI_CQ_EVENT_TYPE_POST)
2738         {
2739
2740 #if CMI_EXERT_RECV_RDMA_CAP
2741             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2742             RDMA_pending--;
2743 #endif
2744             inst_id     = GNI_CQ_GET_INST_ID(ev);
2745 #if PRINT_SYH
2746             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2747 #endif
2748             CMI_GNI_LOCK(my_cq_lock)
2749             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2750             CMI_GNI_UNLOCK(my_cq_lock)
2751
2752             switch (tmp_pd->type) {
2753 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2754             case GNI_POST_RDMA_PUT:
2755 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2756                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2757 #endif
2758             case GNI_POST_FMA_PUT:
2759                 if(tmp_pd->amo_cmd == 1) {
2760 #if CMK_DIRECT
2761                     //sender ACK to receiver to trigger it is done
2762                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2763                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2764                     msg_tag = DIRECT_PUT_DONE_TAG;
2765 #endif
2766                 }
2767                 else {
2768                     CmiFree((void *)tmp_pd->local_addr);
2769 #if REMOTE_EVENT
2770                     FreePostDesc(tmp_pd);
2771                     continue;
2772 #elif CQWRITE
2773                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2774                     FreePostDesc(tmp_pd);
2775                     continue;
2776 #else
2777                     MallocControlMsg(ack_msg_tmp);
2778                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2779                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2780                     ack_msg_tmp->length  = tmp_pd->length;
2781                     msg_tag = PUT_DONE_TAG;
2782 #endif
2783                 }
2784                 break;
2785 #endif
2786             case GNI_POST_RDMA_GET:
2787             case GNI_POST_FMA_GET:  {
2788 #if  ! USE_LRTS_MEMPOOL
2789                 MallocControlMsg(ack_msg_tmp);
2790                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2791                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2792                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2793                 msg_tag = ACK_TAG;  
2794 #else
2795 #if CMK_WITH_STATS
2796                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2797 #endif
2798                 int seq_id = tmp_pd->cqwrite_value;
2799                 if(seq_id > 0)      // BIG_MSG
2800                 {
2801                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2802                     MallocControlMsg(ack_msg_tmp);
2803                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2804                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2805                     ack_msg_tmp->seq_id = seq_id;
2806                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2807                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2808                     ack_msg_tmp->length = tmp_pd->length;
2809                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2810                     msg_tag = BIG_MSG_TAG; 
2811                 } 
2812                 else
2813                 {
2814                     msg_tag = ACK_TAG; 
2815 #if  !REMOTE_EVENT && !CQWRITE
2816                     MallocAckMsg(ack_msg);
2817                     ack_msg->source_addr = tmp_pd->remote_addr;
2818 #endif
2819                 }
2820 #endif
2821                 break;
2822             }
2823             case  GNI_POST_CQWRITE:
2824                    FreePostDesc(tmp_pd);
2825                    continue;
2826             default:
2827                 CmiPrintf("type=%d\n", tmp_pd->type);
2828                 CmiAbort("PumpLocalTransactions: unknown type!");
2829             }      /* end of switch */
2830
2831 #if CMK_DIRECT
2832             if (tmp_pd->amo_cmd == 1) {
2833                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2834                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2835             }
2836             else
2837 #endif
2838             if (msg_tag == ACK_TAG) {
2839 #if !REMOTE_EVENT
2840 #if   !CQWRITE
2841                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2842                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2843 #else
2844                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2845 #endif
2846 #endif
2847             }
2848             else {
2849                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2850                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2851             }
2852 #if CMK_PERSISTENT_COMM
2853             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2854 #endif
2855             {
2856                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2857 #if PRINT_SYH
2858                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2859 #endif
2860                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2861                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2862
2863                     START_EVENT();
2864                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2865                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2866 #if MACHINE_DEBUG_LOG
2867                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2868                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2869                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2870 #endif
2871                     TRACE_COMM_CREATION(EVENT_TIME(), (void*)tmp_pd->local_addr);
2872                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2873                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2874                 }else if(msg_tag == BIG_MSG_TAG){
2875                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2876                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2877                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2878                         START_EVENT();
2879 #if PRINT_SYH
2880                         printf("Pipeline msg done [%d]\n", myrank);
2881 #endif
2882 #if     CMK_SMP_TRACE_COMMTHREAD
2883                         if( tmp_pd->cqwrite_value == 1)
2884                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2885 #endif
2886                         TRACE_COMM_CREATION(EVENT_TIME(), msg);
2887                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2888                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2889                     }
2890                 }
2891             }
2892             FreePostDesc(tmp_pd);
2893         }
2894     } //end while
2895     if(status == GNI_RC_ERROR_RESOURCE)
2896     {
2897         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2898         GNI_RC_CHECK("Smsg_tx_cq full", status);
2899     }
2900 }
2901
2902 static void  SendRdmaMsg( BufferList sendqueue)
2903 {
2904     gni_return_t            status = GNI_RC_SUCCESS;
2905     gni_mem_handle_t        msg_mem_hndl;
2906     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2907     RDMA_REQUEST            *pre = 0;
2908     uint64_t                register_size = 0;
2909     void                    *msg;
2910     int                     i;
2911
2912     int len = PCQueueLength(sendqueue);
2913     for (i=0; i<len; i++)
2914     {
2915 #if CMI_EXERT_RECV_RDMA_CAP
2916         if( RDMA_pending >= RDMA_cap) break;
2917 #endif
2918         CMI_PCQUEUEPOP_LOCK( sendqueue)
2919         ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
2920         CMI_PCQUEUEPOP_UNLOCK( sendqueue)
2921         if (ptr == NULL) break;
2922         
2923         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2924         gni_post_descriptor_t *pd = ptr->pd;
2925         
2926         msg = (void*)(pd->local_addr);
2927         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2928         register_size = 0;
2929         if(pd->cqwrite_value == 0) {
2930             if(NoMsgInRecv(msg))
2931                 register_size = GetMempoolsize(msg);
2932         }
2933
2934         if(status == GNI_RC_SUCCESS)        //mem register good
2935         {
2936             int destNode = ptr->destNode;
2937             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2938             CMI_GNI_LOCK(lock);
2939 #if REMOTE_EVENT
2940             if( pd->cqwrite_value == 0) {
2941                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2942                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
2943                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2944             }
2945 #if CMK_PERSISTENT_COMM
2946             else if (pd->cqwrite_value == PERSIST_SEQ) {
2947                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2948                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
2949                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2950             }
2951 #endif
2952 #endif
2953 #if CMK_WITH_STATS
2954             RDMA_TRY_SEND(pd->type)
2955 #endif
2956 #if CMK_SMP_TRACE_COMMTHREAD
2957             if(IS_PUT(pd->type))
2958             {
2959                  START_EVENT();
2960                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
2961             }
2962 #endif
2963
2964             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2965             {
2966                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
2967             }
2968             else
2969             {
2970                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
2971             }
2972             CMI_GNI_UNLOCK(lock);
2973             
2974             if(status == GNI_RC_SUCCESS)    //post good
2975             {
2976 #if CMI_EXERT_RECV_RDMA_CAP
2977                 RDMA_pending ++;
2978 #endif
2979                 if(pd->cqwrite_value == 0)
2980                 {
2981 #if CMK_SMP_TRACE_COMMTHREAD 
2982                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2983 #endif
2984                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2985                 }
2986 #if  CMK_WITH_STATS
2987                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2988                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2989 #endif
2990 #if MACHINE_DEBUG_LOG
2991                 buffered_recv_msg += register_size;
2992                 MACHSTATE(8, "GO request from buffered\n"); 
2993 #endif
2994 #if PRINT_SYH
2995                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
2996 #endif
2997                 FreeRdmaRequest(ptr);
2998             }else           // cannot post
2999             {
3000                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3001 #if PRINT_SYH
3002                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3003 #endif
3004                 break;
3005             }
3006         } else          //memory registration fails
3007         {
3008             PCQueuePush(sendqueue, (char*)ptr);
3009         }
3010     } //end while
3011 }
3012
3013 static 
3014 inline gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
3015 {
3016     CONTROL_MSG         *control_msg_tmp;
3017     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
3018
3019     MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3020     if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {   
3021             /* connection not exists yet */
3022 #if CMK_SMP
3023             /* non-smp case, connect is issued in send_smsg_message */
3024         if (smsg_connected_flag[ptr->destNode] == 0)
3025             connect_to(ptr->destNode); 
3026 #endif
3027     }
3028     else
3029     switch(ptr->tag)
3030     {
3031     case SMALL_DATA_TAG:
3032         status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3033         if(status == GNI_RC_SUCCESS)
3034         {
3035             CmiFree(ptr->msg);
3036         }
3037         break;
3038     case LMSG_INIT_TAG:
3039     case LMSG_OOB_INIT_TAG:
3040         control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3041         status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr, ptr->tag);
3042         break;
3043 #if !REMOTE_EVENT && !CQWRITE
3044     case ACK_TAG:
3045         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3046         if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3047         break;
3048 #endif
3049     case BIG_MSG_TAG:
3050         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3051         if(status == GNI_RC_SUCCESS)
3052         {
3053             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3054         }
3055         break;
3056 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3057     case PUT_DONE_TAG:
3058         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3059         if(status == GNI_RC_SUCCESS)
3060         {
3061             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3062         }
3063         break;
3064 #endif
3065 #if CMK_DIRECT
3066     case DIRECT_PUT_DONE_TAG:
3067         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3068         if(status == GNI_RC_SUCCESS)
3069         {
3070             free((CMK_DIRECT_HEADER*)ptr->msg);
3071         }
3072         break;
3073 #endif
3074     default:
3075         printf("Weird tag\n");
3076         CmiAbort("should not happen\n");
3077     }       // end switch
3078     return status;
3079 }
3080
3081 // return 1 if all messages are sent
3082
3083 #if ONE_SEND_QUEUE
3084
3085 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3086 {
3087     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3088     CONTROL_MSG         *control_msg_tmp;
3089     gni_return_t        status;
3090     int                 done = 1;
3091     uint64_t            register_size;
3092     void                *register_addr;
3093     int                 index_previous = -1;
3094 #if     CMI_SENDBUFFERSMSG_CAP
3095     int                 sent_length = 0;
3096 #endif
3097     int          index = 0;
3098     memset(destpe_avail, 0, mysize * sizeof(char));
3099     for (index=0; index<1; index++)
3100     {
3101         int i, len = PCQueueLength(queue->sendMsgBuf);
3102         for (i=0; i<len; i++) 
3103         {
3104             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3105             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3106             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3107             if(ptr == NULL) break;
3108             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3109                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3110                 continue;
3111             }
3112             status = _sendOneBufferedSmsg(queue, ptr);
3113 #if CMI_SENDBUFFERSMSG_CAP
3114             sent_length++;
3115 #endif
3116             if(status == GNI_RC_SUCCESS)
3117             {
3118 #if PRINT_SYH
3119                 buffered_smsg_counter--;
3120                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3121 #endif
3122                 FreeMsgList(ptr);
3123             }else {
3124                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3125                 done = 0;
3126                 if(status == GNI_RC_ERROR_RESOURCE)
3127                 {
3128                     destpe_avail[ptr->destNode] = 1;
3129                 }
3130             } 
3131         } //end while
3132     }   // end pooling for all cores
3133     return done;
3134 }
3135
3136 #else   /*  ! ONE_SEND_QUEUE  */
3137
3138 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3139 {
3140     MSG_LIST            *ptr;
3141     gni_return_t        status;
3142     int                 done = 1;
3143 #if     CMI_SENDBUFFERSMSG_CAP
3144     int                 sent_length = 0;
3145 #endif
3146     int idx;
3147 #if SMP_LOCKS
3148     int          index = -1;
3149     int nonempty = PCQueueLength(queue->nonEmptyQueues);
3150     for(idx =0; idx<nonempty; idx++) 
3151     {
3152         index++;  if (index >= nonempty) index = 0;
3153 #if CMI_SENDBUFFERSMSG_CAP
3154         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3155 #endif
3156         CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3157         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3158         CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3159         if(current_list == NULL) break; 
3160         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3161             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3162             continue;
3163         }
3164         PCQueue current_queue= current_list->sendSmsgBuf;
3165         CmiLock(current_list->lock);
3166         int i, len = PCQueueLength(current_queue);
3167         current_list->pushed = 0;
3168         CmiUnlock(current_list->lock);
3169 #else      /* ! SMP_LOCKS */
3170     static int          index = -1;
3171     for(idx =0; idx<mysize; idx++) 
3172     {
3173         index++;  if (index == mysize) index = 0;
3174 #if CMI_SENDBUFFERSMSG_CAP
3175         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3176 #endif
3177         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue;             // check urgent queue
3178         //if (index == myrank) continue;
3179         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3180         int i, len = PCQueueLength(current_queue);
3181 #endif
3182         for (i=0; i<len; i++)  {
3183             CMI_PCQUEUEPOP_LOCK(current_queue)
3184             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3185             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3186             if (ptr == 0) break;
3187
3188             status = _sendOneBufferedSmsg(queue, ptr);
3189 #if CMI_SENDBUFFERSMSG_CAP
3190             sent_length++;
3191 #endif
3192             if(status == GNI_RC_SUCCESS)
3193             {
3194 #if PRINT_SYH
3195                 buffered_smsg_counter--;
3196                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3197 #endif
3198                 FreeMsgList(ptr);
3199             }else {
3200                 PCQueuePush(current_queue, (char*)ptr);
3201                 done = 0;
3202                 if(status == GNI_RC_ERROR_RESOURCE)
3203                 {
3204                     break;
3205                 }
3206             } 
3207         } //end for i
3208 #if SMP_LOCKS
3209         CmiLock(current_list->lock);
3210         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3211         {
3212             current_list->pushed = 1;
3213             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3214         }
3215         CmiUnlock(current_list->lock); 
3216 #endif
3217     }   // end pooling for all cores
3218     return done;
3219 }
3220
3221 #endif
3222
3223 static void ProcessDeadlock();
3224 void LrtsAdvanceCommunication(int whileidle)
3225 {
3226     static int count = 0;
3227     /*  Receive Msg first */
3228 #if CMK_SMP_TRACE_COMMTHREAD
3229     double startT, endT;
3230 #endif
3231     if (useDynamicSMSG && whileidle)
3232     {
3233 #if CMK_SMP_TRACE_COMMTHREAD
3234         startT = CmiWallTimer();
3235 #endif
3236         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3237 #if CMK_SMP_TRACE_COMMTHREAD
3238         endT = CmiWallTimer();
3239         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3240 #endif
3241     }
3242
3243     SEND_OOB_SMSG(smsg_oob_queue)
3244     PUMP_REMOTE_HIGHPRIORITY
3245     PUMP_LOCAL_HIGHPRIORITY
3246     POST_HIGHPRIORITY_RDMA
3247     // Receiving small messages and persistent
3248 #if CMK_SMP_TRACE_COMMTHREAD
3249     startT = CmiWallTimer();
3250 #endif
3251     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3252 #if CMK_SMP_TRACE_COMMTHREAD
3253     endT = CmiWallTimer();
3254     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3255 #endif
3256
3257     SEND_OOB_SMSG(smsg_oob_queue)
3258     PUMP_REMOTE_HIGHPRIORITY
3259     PUMP_LOCAL_HIGHPRIORITY
3260     POST_HIGHPRIORITY_RDMA
3261     
3262     ///* Send buffered Message */
3263 #if CMK_SMP_TRACE_COMMTHREAD
3264     startT = CmiWallTimer();
3265 #endif
3266 #if CMK_USE_OOB
3267     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3268 #else
3269     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3270 #endif
3271 #if CMK_SMP_TRACE_COMMTHREAD
3272     endT = CmiWallTimer();
3273     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3274 #endif
3275
3276     SEND_OOB_SMSG(smsg_oob_queue)
3277     PUMP_REMOTE_HIGHPRIORITY
3278     PUMP_LOCAL_HIGHPRIORITY
3279     POST_HIGHPRIORITY_RDMA
3280
3281     //Pump Get messages or PUT messages
3282 #if CMK_SMP_TRACE_COMMTHREAD
3283     startT = CmiWallTimer();
3284 #endif
3285     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3286 #if MULTI_THREAD_SEND
3287     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3288 #endif
3289 #if CMK_SMP_TRACE_COMMTHREAD
3290     endT = CmiWallTimer();
3291     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3292 #endif
3293     
3294     SEND_OOB_SMSG(smsg_oob_queue)
3295     PUMP_REMOTE_HIGHPRIORITY
3296     PUMP_LOCAL_HIGHPRIORITY
3297     POST_HIGHPRIORITY_RDMA
3298     //Pump Remote event
3299 #if CMK_SMP_TRACE_COMMTHREAD
3300     startT = CmiWallTimer();
3301 #endif
3302 #if CQWRITE
3303     PumpCqWriteTransactions();
3304 #endif
3305 #if REMOTE_EVENT
3306     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3307 #endif
3308 #if CMK_SMP_TRACE_COMMTHREAD
3309     endT = CmiWallTimer();
3310     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3311 #endif
3312
3313     SEND_OOB_SMSG(smsg_oob_queue)
3314     PUMP_REMOTE_HIGHPRIORITY
3315     PUMP_LOCAL_HIGHPRIORITY
3316     POST_HIGHPRIORITY_RDMA
3317
3318 #if CMK_SMP_TRACE_COMMTHREAD
3319     startT = CmiWallTimer();
3320 #endif
3321     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3322 #if CMK_SMP_TRACE_COMMTHREAD
3323     endT = CmiWallTimer();
3324     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3325 #endif
3326
3327 #if CMK_SMP && ! LARGEPAGE
3328     if (_detected_hang)  ProcessDeadlock();
3329 #endif
3330 }
3331
3332 static void set_smsg_max()
3333 {
3334     char *env;
3335
3336     if(mysize <=512)
3337     {
3338         SMSG_MAX_MSG = 1024;
3339     }else if (mysize <= 4096)
3340     {
3341         SMSG_MAX_MSG = 1024;
3342     }else if (mysize <= 16384)
3343     {
3344         SMSG_MAX_MSG = 512;
3345     }else {
3346         SMSG_MAX_MSG = 256;
3347     }
3348
3349     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3350     if (env) SMSG_MAX_MSG = atoi(env);
3351     CmiAssert(SMSG_MAX_MSG > 0);
3352 }    
3353
3354 /* useDynamicSMSG */
3355 static void _init_dynamic_smsg()
3356 {
3357     gni_return_t status;
3358     uint32_t     vmdh_index = -1;
3359     int i;
3360
3361     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3362     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3363     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3364     for(i=0; i<mysize; i++) {
3365         smsg_connected_flag[i] = 0;
3366         smsg_attr_vector_local[i] = NULL;
3367         smsg_attr_vector_remote[i] = NULL;
3368     }
3369
3370     set_smsg_max();
3371
3372     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3373     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3374     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3375     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3376     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3377
3378     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3379     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3380     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3381     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3382     mailbox_list->offset = 0;
3383     mailbox_list->next = 0;
3384     
3385     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3386         mailbox_list->size, smsg_rx_cqh,
3387         GNI_MEM_READWRITE,   
3388         vmdh_index,
3389         &(mailbox_list->mem_hndl));
3390     GNI_RC_CHECK("MEMORY registration for smsg", status);
3391
3392     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3393     GNI_RC_CHECK("Unbound EP", status);
3394     
3395     alloc_smsg_attr(&send_smsg_attr);
3396
3397     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3398     GNI_RC_CHECK("post unbound datagram", status);
3399
3400       /* always pre-connect to proc 0 */
3401     //if (myrank != 0) connect_to(0);
3402
3403     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3404     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3405 }
3406
3407 static void _init_static_smsg()
3408 {
3409     gni_smsg_attr_t      *smsg_attr;
3410     gni_smsg_attr_t      remote_smsg_attr;
3411     gni_smsg_attr_t      *smsg_attr_vec;
3412     gni_mem_handle_t     my_smsg_mdh_mailbox;
3413     int      ret, i;
3414     gni_return_t status;
3415     uint32_t              vmdh_index = -1;
3416     mdh_addr_t            base_infor;
3417     mdh_addr_t            *base_addr_vec;
3418
3419     set_smsg_max();
3420     
3421     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3422     
3423     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3424     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3425     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3426     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3427     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3428     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3429     CmiAssert(ret == 0);
3430     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3431     
3432     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3433             smsg_memlen*(mysize), smsg_rx_cqh,
3434             GNI_MEM_READWRITE,   
3435             vmdh_index,
3436             &my_smsg_mdh_mailbox);
3437     register_memory_size += smsg_memlen*(mysize);
3438     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3439
3440     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3441     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3442
3443     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3444     base_infor.mdh =  my_smsg_mdh_mailbox;
3445     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3446
3447     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3448  
3449     for(i=0; i<mysize; i++)
3450     {
3451         if(i==myrank)
3452             continue;
3453         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3454         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3455         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3456         smsg_attr[i].mbox_offset = i*smsg_memlen;
3457         smsg_attr[i].buff_size = smsg_memlen;
3458         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3459         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3460     }
3461
3462     for(i=0; i<mysize; i++)
3463     {
3464         if (myrank == i) continue;
3465
<