fix bug in node level persistent
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     0
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_CAP      0
65 #define CMI_EXERT_RECV_CAP      0
66 #define CMI_EXERT_RDMA_CAP      0
67
68 #if CMI_EXERT_SEND_CAP
69 int SEND_large_cap = 100;
70 int SEND_large_pending = 0;
71 #endif
72
73 #if CMI_EXERT_RECV_CAP
74 #define RECV_CAP  4                  /* cap <= 2 sometimes hang */
75 #endif
76
77 #if CMI_EXERT_RDMA_CAP
78 int   RDMA_cap =   100;
79 int   RDMA_pending = 0;
80 #endif
81
82 #define USE_LRTS_MEMPOOL                  1
83
84 #define PRINT_SYH                         0
85
86 // Trace communication thread
87 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
88 #define TRACE_THRESHOLD     0.00005
89 #define CMI_MPI_TRACE_MOREDETAILED 0
90 #undef CMI_MPI_TRACE_USEREVENTS
91 #define CMI_MPI_TRACE_USEREVENTS 1
92 #else
93 #undef CMK_SMP_TRACE_COMMTHREAD
94 #define CMK_SMP_TRACE_COMMTHREAD 0
95 #endif
96
97 #define CMK_TRACE_COMMOVERHEAD 0
98 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
99 #undef CMI_MPI_TRACE_USEREVENTS
100 #define CMI_MPI_TRACE_USEREVENTS 1
101 #else
102 #undef CMK_TRACE_COMMOVERHEAD
103 #define CMK_TRACE_COMMOVERHEAD 0
104 #endif
105
106 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
107 CpvStaticDeclare(double, projTraceStart);
108 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
109 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
110 #else
111 #define  START_EVENT()
112 #define  END_EVENT(x)
113 #endif
114
115 #if USE_LRTS_MEMPOOL
116
117 #define oneMB (1024ll*1024)
118 #define oneGB (1024ll*1024*1024)
119
120 static CmiInt8 _mempool_size = 8*oneMB;
121 static CmiInt8 _expand_mem =  4*oneMB;
122 static CmiInt8 _mempool_size_limit = 0;
123
124 static CmiInt8 _totalmem = 0.8*oneGB;
125
126 #if LARGEPAGE
127 static CmiInt8 BIG_MSG  =  16*oneMB;
128 static CmiInt8 ONE_SEG  =  4*oneMB;
129 #else
130 static CmiInt8 BIG_MSG  =  4*oneMB;
131 static CmiInt8 ONE_SEG  =  2*oneMB;
132 #endif
133 #if MULTI_THREAD_SEND
134 static int BIG_MSG_PIPELINE = 1;
135 #else
136 static int BIG_MSG_PIPELINE = 4;
137 #endif
138
139 // dynamic flow control
140 static CmiInt8 buffered_send_msg = 0;
141 static CmiInt8 register_memory_size = 0;
142
143 #if LARGEPAGE
144 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
145 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
146 static CmiInt8  register_count = 0;
147 #else
148 #if CMK_SMP && COMM_THREAD_SEND 
149 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
150 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
151 #else
152 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
153 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
154 #endif
155
156
157 #endif
158
159 #endif     /* end USE_LRTS_MEMPOOL */
160
161 #if MULTI_THREAD_SEND 
162 #define     CMI_GNI_LOCK(x)       CmiLock(x);
163 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
164 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
165 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
166 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
167 #else
168 #define     CMI_GNI_LOCK(x)
169 #define     CMI_GNI_TRYLOCK(x)         (0)
170 #define     CMI_GNI_UNLOCK(x)
171 #define     CMI_PCQUEUEPOP_LOCK(Q)   
172 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
173 #endif
174
175 static int _tlbpagesize = 4096;
176
177 //static int _smpd_count  = 0;
178
179 static int   user_set_flag  = 0;
180
181 static int _checkProgress = 1;             /* check deadlock */
182 static int _detected_hang = 0;
183
184 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
185
186 // dynamic SMSG
187 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
188
189 static int avg_smsg_connection = 32;
190 static int                 *smsg_connected_flag= 0;
191 static gni_smsg_attr_t     **smsg_attr_vector_local;
192 static gni_smsg_attr_t     **smsg_attr_vector_remote;
193 static gni_ep_handle_t     ep_hndl_unbound;
194 static gni_smsg_attr_t     send_smsg_attr;
195 static gni_smsg_attr_t     recv_smsg_attr;
196
197 typedef struct _dynamic_smsg_mailbox{
198    void     *mailbox_base;
199    int      size;
200    int      offset;
201    gni_mem_handle_t  mem_hndl;
202    struct      _dynamic_smsg_mailbox  *next;
203 }dynamic_smsg_mailbox_t;
204
205 static dynamic_smsg_mailbox_t  *mailbox_list;
206
207 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
208 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
209
210 #if PRINT_SYH
211 int         lrts_send_msg_id = 0;
212 int         lrts_local_done_msg = 0;
213 int         lrts_send_rdma_success = 0;
214 #endif
215
216 #include "machine.h"
217
218 #include "pcqueue.h"
219
220 #include "mempool.h"
221
222 #if CMK_PERSISTENT_COMM
223 #include "machine-persistent.h"
224 #endif
225
226 //#define  USE_ONESIDED 1
227 #ifdef USE_ONESIDED
228 //onesided implementation is wrong, since no place to restore omdh
229 #include "onesided.h"
230 onesided_hnd_t   onesided_hnd;
231 onesided_md_t    omdh;
232 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
233
234 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
235
236 #else
237 uint8_t   onesided_hnd, omdh;
238
239 #if REMOTE_EVENT || CQWRITE 
240 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
241     if(register_memory_size+size>= MAX_REG_MEM) { \
242         status = GNI_RC_ERROR_NOMEM;} \
243     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
244         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
245 #else
246 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
247         if (register_memory_size + size >= MAX_REG_MEM) { \
248             status = GNI_RC_ERROR_NOMEM; \
249         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
250             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
251 #endif
252
253 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
254     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
255              register_memory_size -= size; \
256          else CmiAbort("MEM_DEregister");  \
257     } while (0)
258 #endif
259
260 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
261 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
262 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
263 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
264 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
265 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
266 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
267 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
268 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
269 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
270 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
271 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
272 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
273 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
274
275 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
276 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
277
278 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
279 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
280 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
281 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
282
283 #define ALIGNBUF                64
284
285 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
286 /* If SMSG is not used */
287
288 #define FMA_PER_CORE  1024
289 #define FMA_BUFFER_SIZE 1024
290
291 /* If SMSG is used */
292 static int  SMSG_MAX_MSG = 1024;
293 #define SMSG_MAX_CREDIT    72
294
295 #define MSGQ_MAXSIZE       2048
296
297 /* large message transfer with FMA or BTE */
298 #if ! REMOTE_EVENT
299 #define LRTS_GNI_RDMA_THRESHOLD  1024 
300 #else
301    /* remote events only work with RDMA */
302 #define LRTS_GNI_RDMA_THRESHOLD  0 
303 #endif
304
305 #if CMK_SMP
306 static int  REMOTE_QUEUE_ENTRIES=163840; 
307 static int LOCAL_QUEUE_ENTRIES=163840; 
308 #else
309 static int  REMOTE_QUEUE_ENTRIES=20480;
310 static int LOCAL_QUEUE_ENTRIES=20480; 
311 #endif
312
313 #define BIG_MSG_TAG             0x26
314 #define PUT_DONE_TAG            0x28
315 #define DIRECT_PUT_DONE_TAG     0x29
316 #define ACK_TAG                 0x30
317 /* SMSG is data message */
318 #define SMALL_DATA_TAG          0x31
319 /* SMSG is a control message to initialize a BTE */
320 #define LMSG_INIT_TAG           0x39 
321
322 #define DEBUG
323 #ifdef GNI_RC_CHECK
324 #undef GNI_RC_CHECK
325 #endif
326 #ifdef DEBUG
327 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
328 #else
329 #define GNI_RC_CHECK(msg,rc)
330 #endif
331
332 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
333 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
334 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
335
336 static int useStaticMSGQ = 0;
337 static int useStaticFMA = 0;
338 static int mysize, myrank;
339 static gni_nic_handle_t   nic_hndl;
340
341 typedef struct {
342     gni_mem_handle_t mdh;
343     uint64_t addr;
344 } mdh_addr_t ;
345 // this is related to dynamic SMSG
346
347 typedef struct mdh_addr_list{
348     gni_mem_handle_t mdh;
349    void *addr;
350     struct mdh_addr_list *next;
351 }mdh_addr_list_t;
352
353 static unsigned int         smsg_memlen;
354 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
355 mdh_addr_t          setup_mem;
356 mdh_addr_t          *smsg_connection_vec = 0;
357 gni_mem_handle_t    smsg_connection_memhndl;
358 static int          smsg_expand_slots = 10;
359 static int          smsg_available_slot = 0;
360 static void         *smsg_mailbox_mempool = 0;
361 mdh_addr_list_t     *smsg_dynamic_list = 0;
362
363 static void             *smsg_mailbox_base;
364 gni_msgq_attr_t         msgq_attrs;
365 gni_msgq_handle_t       msgq_handle;
366 gni_msgq_ep_attr_t      msgq_ep_attrs;
367 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
368
369 /* =====Beginning of Declarations of Machine Specific Variables===== */
370 static int cookie;
371 static int modes = 0;
372 static gni_cq_handle_t       smsg_rx_cqh = NULL;
373 static gni_cq_handle_t       default_tx_cqh = NULL;
374 static gni_cq_handle_t       rdma_tx_cqh = NULL;
375 static gni_cq_handle_t       rdma_rx_cqh = NULL;
376 static gni_cq_handle_t       post_tx_cqh = NULL;
377 static gni_ep_handle_t       *ep_hndl_array;
378
379 static CmiNodeLock           *ep_lock_array;
380 static CmiNodeLock           default_tx_cq_lock; 
381 static CmiNodeLock           rdma_tx_cq_lock; 
382 static CmiNodeLock           global_gni_lock; 
383 static CmiNodeLock           rx_cq_lock;
384 static CmiNodeLock           smsg_mailbox_lock;
385 static CmiNodeLock           smsg_rx_cq_lock;
386 static CmiNodeLock           *mempool_lock;
387 //#define     CMK_WITH_STATS      1
388 typedef struct msg_list
389 {
390     uint32_t destNode;
391     uint32_t size;
392     void *msg;
393     uint8_t tag;
394 #if !CMK_SMP
395     struct msg_list *next;
396 #endif
397 #if CMK_WITH_STATS
398     double  creation_time;
399 #endif
400 }MSG_LIST;
401
402
403 typedef struct control_msg
404 {
405     uint64_t            source_addr;    /* address from the start of buffer  */
406     uint64_t            dest_addr;      /* address from the start of buffer */
407     int                 total_length;   /* total length */
408     int                 length;         /* length of this packet */
409 #if REMOTE_EVENT
410     int                 ack_index;      /* index from integer to address */
411 #endif
412     uint8_t             seq_id;         //big message   0 meaning single message
413     gni_mem_handle_t    source_mem_hndl;
414     struct control_msg *next;
415 } CONTROL_MSG;
416
417 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
418
419 typedef struct ack_msg
420 {
421     uint64_t            source_addr;    /* address from the start of buffer  */
422 #if ! USE_LRTS_MEMPOOL
423     gni_mem_handle_t    source_mem_hndl;
424     int                 length;          /* total length */
425 #endif
426     struct ack_msg     *next;
427 } ACK_MSG;
428
429 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
430
431 #if CMK_DIRECT
432 typedef struct{
433     uint64_t    handler_addr;
434 }CMK_DIRECT_HEADER;
435
436 typedef struct {
437     char core[CmiMsgHeaderSizeBytes];
438     uint64_t handler;
439 }cmidirectMsg;
440
441 //SYH
442 CpvDeclare(int, CmiHandleDirectIdx);
443 void CmiHandleDirectMsg(cmidirectMsg* msg)
444 {
445
446     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
447    (*(_handle->callbackFnPtr))(_handle->callbackData);
448    CmiFree(msg);
449 }
450
451 void CmiDirectInit()
452 {
453     CpvInitialize(int,  CmiHandleDirectIdx);
454     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
455 }
456
457 #endif
458 typedef struct  rmda_msg
459 {
460     int                   destNode;
461 #if REMOTE_EVENT
462     int                   ack_index;
463 #endif
464     gni_post_descriptor_t *pd;
465 #if !CMK_SMP
466     struct  rmda_msg      *next;
467 #endif
468 }RDMA_REQUEST;
469
470
471 #if CMK_SMP
472 #define SMP_LOCKS                       0
473 #define ONE_SEND_QUEUE                  0
474 PCQueue sendRdmaBuf;
475 typedef struct  msg_list_index
476 {
477     PCQueue     sendSmsgBuf;
478     int         pushed;
479     CmiNodeLock   lock;
480 } MSG_LIST_INDEX;
481 char                *destpe_avail;
482 #if  !ONE_SEND_QUEUE && SMP_LOCKS
483     PCQueue     nonEmptyQueues;
484 #endif
485 #else         /* non-smp */
486
487 static RDMA_REQUEST        *sendRdmaBuf = 0;
488 static RDMA_REQUEST        *sendRdmaTail = 0;
489 typedef struct  msg_list_index
490 {
491     int         next;
492     MSG_LIST    *sendSmsgBuf;
493     MSG_LIST    *tail;
494 } MSG_LIST_INDEX;
495
496 #endif
497
498 // buffered send queue
499 #if ! ONE_SEND_QUEUE
500 typedef struct smsg_queue
501 {
502     MSG_LIST_INDEX   *smsg_msglist_index;
503     int               smsg_head_index;
504 } SMSG_QUEUE;
505 #else
506 typedef struct smsg_queue
507 {
508     PCQueue       sendMsgBuf;
509 }  SMSG_QUEUE;
510 #endif
511
512 SMSG_QUEUE                  smsg_queue;
513 #if CMK_USE_OOB
514 SMSG_QUEUE                  smsg_oob_queue;
515 #endif
516
517 #if CMK_SMP
518
519 #define FreeMsgList(d)   free(d);
520 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
521
522 #else
523
524 static MSG_LIST       *msglist_freelist=0;
525
526 #define FreeMsgList(d)  \
527   do { \
528   (d)->next = msglist_freelist;\
529   msglist_freelist = d; \
530   } while (0)
531
532 #define MallocMsgList(d) \
533   do {  \
534   d = msglist_freelist;\
535   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
536              _MEMCHECK(d);\
537   } else msglist_freelist = d->next; \
538   d->next =0;  \
539   } while (0)
540
541 #endif
542
543 #if CMK_SMP
544
545 #define FreeControlMsg(d)      free(d);
546 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
547
548 #else
549
550 static CONTROL_MSG    *control_freelist=0;
551
552 #define FreeControlMsg(d)       \
553   do { \
554   (d)->next = control_freelist;\
555   control_freelist = d; \
556   } while (0);
557
558 #define MallocControlMsg(d) \
559   do {  \
560   d = control_freelist;\
561   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
562              _MEMCHECK(d);\
563   } else control_freelist = d->next;  \
564   } while (0);
565
566 #endif
567
568 #if CMK_SMP
569
570 #define FreeAckMsg(d)      free(d);
571 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
572
573 #else
574
575 static ACK_MSG        *ack_freelist=0;
576
577 #define FreeAckMsg(d)       \
578   do { \
579   (d)->next = ack_freelist;\
580   ack_freelist = d; \
581   } while (0)
582
583 #define MallocAckMsg(d) \
584   do { \
585   d = ack_freelist;\
586   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
587              _MEMCHECK(d);\
588   } else ack_freelist = d->next; \
589   } while (0)
590
591 #endif
592
593
594 # if CMK_SMP
595 #define FreeRdmaRequest(d)       free(d);
596 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
597 #else
598
599 static RDMA_REQUEST         *rdma_freelist = NULL;
600
601 #define FreeRdmaRequest(d)       \
602   do {  \
603   (d)->next = rdma_freelist;\
604   rdma_freelist = d;    \
605   } while (0)
606
607 #define MallocRdmaRequest(d) \
608   do {   \
609   d = rdma_freelist;\
610   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
611              _MEMCHECK(d);\
612   } else rdma_freelist = d->next; \
613   d->next =0;   \
614   } while (0)
615 #endif
616
617 /* reuse gni_post_descriptor_t */
618 static gni_post_descriptor_t *post_freelist=0;
619
620 #if  !CMK_SMP
621 #define FreePostDesc(d)       \
622     (d)->next_descr = post_freelist;\
623     post_freelist = d;
624
625 #define MallocPostDesc(d) \
626   d = post_freelist;\
627   if (d==0) { \
628      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
629      d->next_descr = 0;\
630       _MEMCHECK(d);\
631   } else post_freelist = d->next_descr;
632 #else
633
634 #define FreePostDesc(d)     free(d);
635 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
636
637 #endif
638
639
640 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
641 static int      buffered_smsg_counter = 0;
642
643 /* SmsgSend return success but message sent is not confirmed by remote side */
644 static MSG_LIST *buffered_fma_head = 0;
645 static MSG_LIST *buffered_fma_tail = 0;
646
647 /* functions  */
648 #define IsFree(a,ind)  !( a& (1<<(ind) ))
649 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
650 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
651
652 CpvDeclare(mempool_type*, mempool);
653
654 #if REMOTE_EVENT
655 /* ack pool for remote events */
656
657 static int  SHIFT   =           18;
658 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
659 #define RANK_MASK               ((1<<SHIFT) - 1)
660 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
661
662 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
663 #define GET_RANK(evt)           ((evt) & RANK_MASK)
664 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
665
666 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
667
668 #if CMK_SMP
669 #define INIT_SIZE                4096
670 #else
671 #define INIT_SIZE                1024
672 #endif
673
674 struct IndexStruct {
675 void *addr;
676 int next;
677 int type;     // 1: ACK   2: Persistent
678 };
679
680 typedef struct IndexPool {
681     struct IndexStruct   *indexes;
682     int                   size;
683     int                   freehead;
684     CmiNodeLock           lock;
685 } IndexPool;
686
687 static IndexPool  ackPool;
688 #if CMK_PERSISTENT_COMM
689 static IndexPool  persistPool;
690 #endif
691
692 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
693 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
694
695 static void IndexPool_init(IndexPool *pool)
696 {
697     int i;
698     if ((1<<SHIFT) < mysize) 
699         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
700     pool->size = INIT_SIZE;
701     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
702     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
703     for (i=0; i<pool->size-1; i++) {
704         pool->indexes[i].next = i+1;
705         pool->indexes[i].type = 0;
706     }
707     pool->indexes[i].next = -1;
708     pool->freehead = 0;
709 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
710     pool->lock  = CmiCreateLock();
711 #else
712     pool->lock  = 0;
713 #endif
714 }
715
716 static
717 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
718 {
719     int s, i;
720 #if MULTI_THREAD_SEND  
721     CmiLock(pool->lock);
722 #endif
723     s = pool->freehead;
724     if (s == -1) {
725         int newsize = pool->size * 2;
726         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
727         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
728         struct IndexStruct *old_ackpool = pool->indexes;
729         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
730         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
731         for (i=pool->size; i<newsize-1; i++) {
732             pool->indexes[i].next = i+1;
733             pool->indexes[i].type = 0;
734         }
735         pool->indexes[i].next = -1;
736         pool->indexes[i].type = 0;
737         pool->freehead = pool->size;
738         s = pool->size;
739         pool->size = newsize;
740         free(old_ackpool);
741     }
742     pool->freehead = pool->indexes[s].next;
743     pool->indexes[s].addr = addr;
744     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
745     pool->indexes[s].type = type;
746 #if MULTI_THREAD_SEND
747     CmiUnlock(pool->lock);
748 #endif
749     return s;
750 }
751
752 static
753 inline  void IndexPool_freeslot(IndexPool *pool, int s)
754 {
755     CmiAssert(s>=0 && s<pool->size);
756 #if MULTI_THREAD_SEND
757     CmiLock(pool->lock);
758 #endif
759     pool->indexes[s].next = pool->freehead;
760     pool->indexes[s].type = 0;
761     pool->freehead = s;
762 #if MULTI_THREAD_SEND
763     CmiUnlock(pool->lock);
764 #endif
765 }
766
767
768 #endif
769
770 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
771 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
772 #define CHARM_MAGIC_NUMBER               126
773
774 #if CMK_ERROR_CHECKING
775 extern unsigned char computeCheckSum(unsigned char *data, int len);
776 static int checksum_flag = 0;
777 #define CMI_SET_CHECKSUM(msg, len)      \
778         if (checksum_flag)  {   \
779           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
780           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
781         }
782 #define CMI_CHECK_CHECKSUM(msg, len)    \
783         if (checksum_flag)      \
784           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
785             CmiAbort("Fatal error: checksum doesn't agree!\n");
786 #else
787 #define CMI_SET_CHECKSUM(msg, len)
788 #define CMI_CHECK_CHECKSUM(msg, len)
789 #endif
790 /* =====End of Definitions of Message-Corruption Related Macros=====*/
791
792 static int print_stats = 0;
793 static int stats_off = 0;
794 void CmiTurnOnStats()
795 {
796     stats_off = 0;
797     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
798 }
799
800 void CmiTurnOffStats()
801 {
802     stats_off = 1;
803 }
804
805 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
806
807 #if CMK_WITH_STATS
808 FILE *counterLog = NULL;
809 typedef struct comm_thread_stats
810 {
811     uint64_t  smsg_data_count;
812     uint64_t  lmsg_init_count;
813     uint64_t  ack_count;
814     uint64_t  big_msg_ack_count;
815     uint64_t  smsg_count;
816     uint64_t  direct_put_done_count;
817     uint64_t  put_done_count;
818     //times of calling SmsgSend
819     uint64_t  try_smsg_data_count;
820     uint64_t  try_lmsg_init_count;
821     uint64_t  try_ack_count;
822     uint64_t  try_big_msg_ack_count;
823     uint64_t  try_direct_put_done_count;
824     uint64_t  try_put_done_count;
825     uint64_t  try_smsg_count;
826     
827     double    max_time_in_send_buffered_smsg;
828     double    all_time_in_send_buffered_smsg;
829
830     uint64_t  rdma_get_count, rdma_put_count;
831     uint64_t  try_rdma_get_count, try_rdma_put_count;
832     double    max_time_from_control_to_rdma_init;
833     double    all_time_from_control_to_rdma_init;
834
835     double    max_time_from_rdma_init_to_rdma_done;
836     double    all_time_from_rdma_init_to_rdma_done;
837
838     int      count_in_PumpNetwork;
839     double   time_in_PumpNetwork;
840     double   max_time_in_PumpNetwork;
841     int      count_in_SendBufferMsg_smsg;
842     double   time_in_SendBufferMsg_smsg;
843     double   max_time_in_SendBufferMsg_smsg;
844     int      count_in_SendRdmaMsg;
845     double   time_in_SendRdmaMsg;
846     double   max_time_in_SendRdmaMsg;
847     int      count_in_PumpRemoteTransactions;
848     double   time_in_PumpRemoteTransactions;
849     double   max_time_in_PumpRemoteTransactions;
850     int      count_in_PumpLocalTransactions_rdma;
851     double   time_in_PumpLocalTransactions_rdma;
852     double   max_time_in_PumpLocalTransactions_rdma;
853     int      count_in_PumpDatagramConnection;
854     double   time_in_PumpDatagramConnection;
855     double   max_time_in_PumpDatagramConnection;
856 } Comm_Thread_Stats;
857
858 static Comm_Thread_Stats   comm_stats;
859
860 static char *counters_dirname = "counters";
861
862 static void init_comm_stats()
863 {
864   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
865   if (print_stats){
866       char ln[200];
867       int code = mkdir(counters_dirname, 00777); 
868       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
869       counterLog=fopen(ln,"w");
870       if (counterLog == NULL) CmiAbort("Counter files open failed");
871   }
872 }
873
874 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
875
876 #define SMSG_SENT_DONE(creation_time, tag)  \
877         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
878             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
879             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
880             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
881             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
882             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
883             comm_stats.smsg_count++; \
884             double inbuff_time = CmiWallTimer() - creation_time;   \
885             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
886             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
887         }
888
889 #define SMSG_TRY_SEND(tag)  \
890         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
891             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
892             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
893             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
894             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
895             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
896             comm_stats.try_smsg_count++; \
897         }
898
899 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
900
901 #define  RDMA_TRANS_DONE(x)      \
902          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
903              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
904              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
905          }
906
907 #define  RDMA_TRANS_INIT(type, x)      \
908          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
909              double rdma_trans_time = CmiWallTimer() - x ; \
910              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
911              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
912          }
913
914 #define STATS_PUMPNETWORK_TIME(x)   \
915         { double t = CmiWallTimer(); \
916           x;        \
917           t = CmiWallTimer() - t;          \
918           comm_stats.count_in_PumpNetwork++;        \
919           comm_stats.time_in_PumpNetwork += t;   \
920           if (t>comm_stats.max_time_in_PumpNetwork)      \
921               comm_stats.max_time_in_PumpNetwork = t;    \
922         }
923
924 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
925         { double t = CmiWallTimer(); \
926           x;        \
927           t = CmiWallTimer() - t;          \
928           comm_stats.count_in_PumpRemoteTransactions ++;        \
929           comm_stats.time_in_PumpRemoteTransactions += t;   \
930           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
931               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
932         }
933
934 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
935         { double t = CmiWallTimer(); \
936           x;        \
937           t = CmiWallTimer() - t;          \
938           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
939           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
940           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
941               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
942         }
943
944 #define STATS_SEND_SMSGS_TIME(x)   \
945         { double t = CmiWallTimer(); \
946           x;        \
947           t = CmiWallTimer() - t;          \
948           comm_stats.count_in_SendBufferMsg_smsg ++;        \
949           comm_stats.time_in_SendBufferMsg_smsg += t;   \
950           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
951               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
952         }
953
954 #define STATS_SENDRDMAMSG_TIME(x)   \
955         { double t = CmiWallTimer(); \
956           x;        \
957           t = CmiWallTimer() - t;          \
958           comm_stats.count_in_SendRdmaMsg ++;        \
959           comm_stats.time_in_SendRdmaMsg += t;   \
960           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
961               comm_stats.max_time_in_SendRdmaMsg = t;    \
962         }
963
964 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
965         { double t = CmiWallTimer(); \
966           x;        \
967           t = CmiWallTimer() - t;          \
968           comm_stats.count_in_PumpDatagramConnection ++;        \
969           comm_stats.time_in_PumpDatagramConnection += t;   \
970           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
971               comm_stats.max_time_in_PumpDatagramConnection = t;    \
972         }
973
974 static void print_comm_stats()
975 {
976     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
977     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
978             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
979             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
980     
981     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
982             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
983             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
984
985     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
986     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
987     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
988
989
990     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
991     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
992     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
993     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
994     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
995     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
996     if (useDynamicSMSG)
997     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
998
999     fclose(counterLog);
1000 }
1001
1002 #else
1003 #define STATS_PUMPNETWORK_TIME(x)                  x
1004 #define STATS_SEND_SMSGS_TIME(x)                   x
1005 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
1006 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1007 #define STATS_SENDRDMAMSG_TIME(x)                  x
1008 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1009 #endif
1010
1011 static void
1012 allgather(void *in,void *out, int len)
1013 {
1014     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1015     int i,rc;
1016     int my_rank;
1017     char *tmp_buf,*out_ptr;
1018
1019     if(!already_called) {
1020
1021         rc = PMI_Get_size(&job_size);
1022         CmiAssert(rc == PMI_SUCCESS);
1023         rc = PMI_Get_rank(&my_rank);
1024         CmiAssert(rc == PMI_SUCCESS);
1025
1026         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1027         CmiAssert(ivec_ptr != NULL);
1028
1029         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1030         CmiAssert(rc == PMI_SUCCESS);
1031
1032         already_called = 1;
1033
1034     }
1035
1036     tmp_buf = (char *)malloc(job_size * len);
1037     CmiAssert(tmp_buf);
1038
1039     rc = PMI_Allgather(in,tmp_buf,len);
1040     CmiAssert(rc == PMI_SUCCESS);
1041
1042     out_ptr = out;
1043
1044     for(i=0;i<job_size;i++) {
1045
1046         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1047
1048     }
1049
1050     free(tmp_buf);
1051 }
1052
1053 static void
1054 allgather_2(void *in,void *out, int len)
1055 {
1056     //PMI_Allgather is out of order
1057     int i,rc, extend_len;
1058     int  rank_index;
1059     char *out_ptr, *out_ref;
1060     char *in2;
1061
1062     extend_len = sizeof(int) + len;
1063     in2 = (char*)malloc(extend_len);
1064
1065     memcpy(in2, &myrank, sizeof(int));
1066     memcpy(in2+sizeof(int), in, len);
1067
1068     out_ptr = (char*)malloc(mysize*extend_len);
1069
1070     rc = PMI_Allgather(in2, out_ptr, extend_len);
1071     GNI_RC_CHECK("allgather", rc);
1072
1073     out_ref = out;
1074
1075     for(i=0;i<mysize;i++) {
1076         //rank index 
1077         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1078         //copy to the rank index slot
1079         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1080     }
1081
1082     free(out_ptr);
1083     free(in2);
1084
1085 }
1086
1087 static unsigned int get_gni_nic_address(int device_id)
1088 {
1089     unsigned int address, cpu_id;
1090     gni_return_t status;
1091     int i, alps_dev_id=-1,alps_address=-1;
1092     char *token, *p_ptr;
1093
1094     p_ptr = getenv("PMI_GNI_DEV_ID");
1095     if (!p_ptr) {
1096         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1097        
1098         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1099     } else {
1100         while ((token = strtok(p_ptr,":")) != NULL) {
1101             alps_dev_id = atoi(token);
1102             if (alps_dev_id == device_id) {
1103                 break;
1104             }
1105             p_ptr = NULL;
1106         }
1107         CmiAssert(alps_dev_id != -1);
1108         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1109         CmiAssert(p_ptr != NULL);
1110         i = 0;
1111         while ((token = strtok(p_ptr,":")) != NULL) {
1112             if (i == alps_dev_id) {
1113                 alps_address = atoi(token);
1114                 break;
1115             }
1116             p_ptr = NULL;
1117             ++i;
1118         }
1119         CmiAssert(alps_address != -1);
1120         address = alps_address;
1121     }
1122     return address;
1123 }
1124
1125 static uint8_t get_ptag(void)
1126 {
1127     char *p_ptr, *token;
1128     uint8_t ptag;
1129
1130     p_ptr = getenv("PMI_GNI_PTAG");
1131     CmiAssert(p_ptr != NULL);
1132     token = strtok(p_ptr, ":");
1133     ptag = (uint8_t)atoi(token);
1134     return ptag;
1135         
1136 }
1137
1138 static uint32_t get_cookie(void)
1139 {
1140     uint32_t cookie;
1141     char *p_ptr, *token;
1142
1143     p_ptr = getenv("PMI_GNI_COOKIE");
1144     CmiAssert(p_ptr != NULL);
1145     token = strtok(p_ptr, ":");
1146     cookie = (uint32_t)atoi(token);
1147
1148     return cookie;
1149 }
1150
1151 #if LARGEPAGE
1152
1153 /* directly mmap memory from hugetlbfs for large pages */
1154
1155 #include <sys/stat.h>
1156 #include <fcntl.h>
1157 #include <sys/mman.h>
1158 #include <hugetlbfs.h>
1159
1160 // size must be _tlbpagesize aligned
1161 void *my_get_huge_pages(size_t size)
1162 {
1163     char filename[512];
1164     int fd;
1165     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1166     void *ptr = NULL;
1167
1168     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1169     fd = open(filename, O_RDWR | O_CREAT, mode);
1170     if (fd == -1) {
1171         CmiAbort("my_get_huge_pages: open filed");
1172     }
1173     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1174     if (ptr == MAP_FAILED) ptr = NULL;
1175 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1176     close(fd);
1177     unlink(filename);
1178     return ptr;
1179 }
1180
1181 void my_free_huge_pages(void *ptr, int size)
1182 {
1183 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1184     int ret = munmap(ptr, size);
1185     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1186 }
1187
1188 #endif
1189
1190 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1191 /* TODO: add any that are related */
1192 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1193
1194
1195 #include "machine-lrts.h"
1196 #include "machine-common-core.c"
1197
1198 /* Network progress function is used to poll the network when for
1199    messages. This flushes receive buffers on some  implementations*/
1200 #if CMK_MACHINE_PROGRESS_DEFINED
1201 void CmiMachineProgressImpl() {
1202 }
1203 #endif
1204
1205 static int SendBufferMsg(SMSG_QUEUE *queue);
1206 static void SendRdmaMsg();
1207 static void PumpNetworkSmsg();
1208 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1209 #if CQWRITE
1210 static void PumpCqWriteTransactions();
1211 #endif
1212 #if REMOTE_EVENT
1213 static void PumpRemoteTransactions();
1214 #endif
1215
1216 #if MACHINE_DEBUG_LOG
1217 FILE *debugLog = NULL;
1218 static CmiInt8 buffered_recv_msg = 0;
1219 int         lrts_smsg_success = 0;
1220 int         lrts_received_msg = 0;
1221 #endif
1222
1223 static void sweep_mempool(mempool_type *mptr)
1224 {
1225     int n = 0;
1226     block_header *current = &(mptr->block_head);
1227
1228     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1229     while( current!= NULL) {
1230         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1231         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1232     }
1233     printf("[n %d] sweep_mempool slot END.\n", myrank);
1234 }
1235
1236 inline
1237 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1238 {
1239     block_header *current = *from;
1240
1241     //while(register_memory_size>= MAX_REG_MEM)
1242     //{
1243         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1244             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1245
1246         *from = current;
1247         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1248         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1249         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1250     //}
1251     return GNI_RC_SUCCESS;
1252 }
1253
1254 inline 
1255 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1256 {
1257     gni_return_t status = GNI_RC_SUCCESS;
1258     //int size = GetMempoolsize(msg);
1259     //void *blockaddr = GetMempoolBlockPtr(msg);
1260     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1261    
1262     block_header *current = &(mptr->block_head);
1263     while(register_memory_size>= MAX_REG_MEM)
1264     {
1265         status = deregisterMemory(mptr, &current);
1266         if (status != GNI_RC_SUCCESS) break;
1267     }
1268     if(register_memory_size>= MAX_REG_MEM) return status;
1269
1270     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1271     while(1)
1272     {
1273         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1274         if(status == GNI_RC_SUCCESS)
1275         {
1276             break;
1277         }
1278         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1279         {
1280             GNI_RC_CHECK("registerFromMempool", status);
1281         }
1282         else
1283         {
1284             status = deregisterMemory(mptr, &current);
1285             if (status != GNI_RC_SUCCESS) break;
1286         }
1287     }; 
1288     return status;
1289 }
1290
1291 inline 
1292 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1293 {
1294     static int rank = -1;
1295     int i;
1296     gni_return_t status;
1297     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1298     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1299     mempool_type *mptr;
1300
1301     status = registerFromMempool(mptr1, msg, size, t, cqh);
1302     if (status == GNI_RC_SUCCESS) return status;
1303 #if CMK_SMP 
1304     for (i=0; i<CmiMyNodeSize()+1; i++) {
1305       rank = (rank+1)%(CmiMyNodeSize()+1);
1306       mptr = CpvAccessOther(mempool, rank);
1307       if (mptr == mptr1) continue;
1308       status = registerFromMempool(mptr, msg, size, t, cqh);
1309       if (status == GNI_RC_SUCCESS) return status;
1310     }
1311 #endif
1312     return  GNI_RC_ERROR_RESOURCE;
1313 }
1314
1315 inline
1316 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1317 {
1318     MSG_LIST        *msg_tmp;
1319     MallocMsgList(msg_tmp);
1320     msg_tmp->destNode = destNode;
1321     msg_tmp->size   = size;
1322     msg_tmp->msg    = msg;
1323     msg_tmp->tag    = tag;
1324 #if CMK_WITH_STATS
1325     SMSG_CREATION(msg_tmp)
1326 #endif
1327 #if !CMK_SMP
1328     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1329         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1330         queue->smsg_head_index = destNode;
1331         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1332     }else
1333     {
1334         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1335         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1336     }
1337 #else
1338 #if ONE_SEND_QUEUE
1339     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1340 #else
1341 #if SMP_LOCKS
1342     CmiLock(queue->smsg_msglist_index[destNode].lock);
1343     if(queue->smsg_msglist_index[destNode].pushed == 0)
1344     {
1345         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1346     }
1347     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1348     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1349 #else
1350     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1351 #endif
1352 #endif
1353 #endif
1354 #if PRINT_SYH
1355     buffered_smsg_counter++;
1356 #endif
1357 }
1358
1359 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1360 {
1361     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1362 }
1363
1364 inline
1365 static void setup_smsg_connection(int destNode)
1366 {
1367     mdh_addr_list_t  *new_entry = 0;
1368     gni_post_descriptor_t *pd;
1369     gni_smsg_attr_t      *smsg_attr;
1370     gni_return_t status = GNI_RC_NOT_DONE;
1371     RDMA_REQUEST        *rdma_request_msg;
1372     
1373     if(smsg_available_slot == smsg_expand_slots)
1374     {
1375         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1376         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1377         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1378
1379         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1380             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1381             GNI_MEM_READWRITE,   
1382             -1,
1383             &(new_entry->mdh));
1384         smsg_available_slot = 0; 
1385         new_entry->next = smsg_dynamic_list;
1386         smsg_dynamic_list = new_entry;
1387     }
1388     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1389     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1390     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1391     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1392     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1393     smsg_attr->buff_size = smsg_memlen;
1394     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1395     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1396     smsg_local_attr_vec[destNode] = smsg_attr;
1397     smsg_available_slot++;
1398     MallocPostDesc(pd);
1399     pd->type            = GNI_POST_FMA_PUT;
1400     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1401     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1402     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1403     pd->length          = sizeof(gni_smsg_attr_t);
1404     pd->local_addr      = (uint64_t) smsg_attr;
1405     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1406     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1407     pd->src_cq_hndl     = rdma_tx_cqh;
1408     pd->rdma_mode       = 0;
1409     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1410     print_smsg_attr(smsg_attr);
1411     if(status == GNI_RC_ERROR_RESOURCE )
1412     {
1413         MallocRdmaRequest(rdma_request_msg);
1414         rdma_request_msg->destNode = destNode;
1415         rdma_request_msg->pd = pd;
1416         /* buffer this request */
1417     }
1418 #if PRINT_SYH
1419     if(status != GNI_RC_SUCCESS)
1420        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1421     else
1422         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1423 #endif
1424 }
1425
1426 /* useDynamicSMSG */
1427 inline 
1428 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1429 {
1430     gni_return_t status = GNI_RC_NOT_DONE;
1431
1432     if(mailbox_list->offset == mailbox_list->size)
1433     {
1434         dynamic_smsg_mailbox_t *new_mailbox_entry;
1435         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1436         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1437         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1438         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1439         new_mailbox_entry->offset = 0;
1440         
1441         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1442             new_mailbox_entry->size, smsg_rx_cqh,
1443             GNI_MEM_READWRITE,   
1444             -1,
1445             &(new_mailbox_entry->mem_hndl));
1446
1447         GNI_RC_CHECK("register", status);
1448         new_mailbox_entry->next = mailbox_list;
1449         mailbox_list = new_mailbox_entry;
1450     }
1451     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1452     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1453     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1454     local_smsg_attr->mbox_offset = mailbox_list->offset;
1455     mailbox_list->offset += smsg_memlen;
1456     local_smsg_attr->buff_size = smsg_memlen;
1457     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1458     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1459 }
1460
1461 /* useDynamicSMSG */
1462 inline 
1463 static int connect_to(int destNode)
1464 {
1465     gni_return_t status = GNI_RC_NOT_DONE;
1466     CmiAssert(smsg_connected_flag[destNode] == 0);
1467     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1468     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1469     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1470     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1471     
1472     CMI_GNI_LOCK(global_gni_lock)
1473     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1474     CMI_GNI_UNLOCK(global_gni_lock)
1475     if (status == GNI_RC_ERROR_RESOURCE) {
1476       /* possibly destNode is making connection at the same time */
1477       free(smsg_attr_vector_local[destNode]);
1478       smsg_attr_vector_local[destNode] = NULL;
1479       free(smsg_attr_vector_remote[destNode]);
1480       smsg_attr_vector_remote[destNode] = NULL;
1481       mailbox_list->offset -= smsg_memlen;
1482 #if PRINT_SYH
1483     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1484 #endif
1485       return 0;
1486     }
1487     GNI_RC_CHECK("GNI_Post", status);
1488     smsg_connected_flag[destNode] = 1;
1489 #if PRINT_SYH
1490     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1491 #endif
1492     return 1;
1493 }
1494
1495 inline 
1496 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1497 {
1498     unsigned int          remote_address;
1499     uint32_t              remote_id;
1500     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1501     gni_smsg_attr_t       *smsg_attr;
1502     gni_post_descriptor_t *pd;
1503     gni_post_state_t      post_state;
1504     char                  *real_data; 
1505
1506     if (useDynamicSMSG) {
1507         switch (smsg_connected_flag[destNode]) {
1508         case 0: 
1509             connect_to(destNode);         /* continue to case 1 */
1510         case 1:                           /* pending connection, do nothing */
1511             status = GNI_RC_NOT_DONE;
1512             if(inbuff ==0)
1513                 buffer_small_msgs(queue, msg, size, destNode, tag);
1514             return status;
1515         }
1516     }
1517 #if CMK_SMP
1518 #if ! ONE_SEND_QUEUE
1519     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1520 #endif
1521     {
1522 #else
1523     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1524     {
1525 #endif
1526         //CMI_GNI_LOCK(smsg_mailbox_lock)
1527         CMI_GNI_LOCK(default_tx_cq_lock)
1528 #if CMK_SMP_TRACE_COMMTHREAD
1529         int oldpe = -1;
1530         int oldeventid = -1;
1531         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1532         { 
1533             START_EVENT();
1534             if ( tag == SMALL_DATA_TAG)
1535                 real_data = (char*)msg; 
1536             else 
1537                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1538             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1539             TRACE_COMM_SET_COMM_MSGID(real_data);
1540         }
1541 #endif
1542 #if REMOTE_EVENT
1543         if (tag == LMSG_INIT_TAG) {
1544             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1545             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1546                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1547         }
1548         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1549 #endif
1550 #if     CMK_WITH_STATS
1551         SMSG_TRY_SEND(tag)
1552 #endif
1553 #if CMK_WITH_STATS
1554     double              creation_time;
1555     if (ptr == NULL)
1556         creation_time = CmiWallTimer();
1557     else
1558         creation_time = ptr->creation_time;
1559 #endif
1560
1561     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1562 #if CMK_SMP_TRACE_COMMTHREAD
1563         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1564 #endif
1565         CMI_GNI_UNLOCK(default_tx_cq_lock)
1566         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1567         if(status == GNI_RC_SUCCESS)
1568         {
1569 #if     CMK_WITH_STATS
1570             SMSG_SENT_DONE(creation_time,tag) 
1571 #endif
1572 #if CMK_SMP_TRACE_COMMTHREAD
1573             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1574             { 
1575                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1576             }
1577 #endif
1578         }else
1579             status = GNI_RC_ERROR_RESOURCE;
1580     }
1581     if(status != GNI_RC_SUCCESS && inbuff ==0)
1582         buffer_small_msgs(queue, msg, size, destNode, tag);
1583     return status;
1584 }
1585
1586 inline 
1587 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1588 {
1589     /* construct a control message and send */
1590     CONTROL_MSG         *control_msg_tmp;
1591     MallocControlMsg(control_msg_tmp);
1592     control_msg_tmp->source_addr = (uint64_t)msg;
1593     control_msg_tmp->seq_id    = seqno;
1594     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1595 #if REMOTE_EVENT
1596     control_msg_tmp->ack_index    =  -1;
1597 #endif
1598 #if     USE_LRTS_MEMPOOL
1599     if(size < BIG_MSG)
1600     {
1601         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1602     }
1603     else
1604     {
1605         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1606         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1607         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1608     }
1609 #else
1610     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1611 #endif
1612     return control_msg_tmp;
1613 }
1614
1615 #define BLOCKING_SEND_CONTROL    0
1616
1617 // Large message, send control to receiver, receiver register memory and do a GET, 
1618 // return 1 - send no success
1619 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1620 {
1621     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1622     uint32_t            vmdh_index  = -1;
1623     int                 size;
1624     int                 offset = 0;
1625     uint64_t            source_addr;
1626     int                 register_size; 
1627     void                *msg;
1628
1629     size    =   control_msg_tmp->total_length;
1630     source_addr = control_msg_tmp->source_addr;
1631     register_size = control_msg_tmp->length;
1632
1633 #if  USE_LRTS_MEMPOOL
1634     if( control_msg_tmp->seq_id == 0 ){
1635 #if BLOCKING_SEND_CONTROL
1636         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1637             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1638                 LrtsAdvanceCommunication(0);
1639         }
1640 #endif
1641         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1642         {
1643             msg = (void*)source_addr;
1644             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1645             {
1646                 if(!inbuff)
1647                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1648                 return GNI_RC_ERROR_NOMEM;
1649             }
1650             //register the corresponding mempool
1651             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1652             if(status == GNI_RC_SUCCESS)
1653             {
1654                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1655             }
1656         }else
1657         {
1658             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1659             status = GNI_RC_SUCCESS;
1660         }
1661         if(NoMsgInSend(source_addr))
1662             register_size = GetMempoolsize((void*)(source_addr));
1663         else
1664             register_size = 0;
1665     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1666     {
1667         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1668         source_addr += offset;
1669         size = control_msg_tmp->length;
1670 #if BLOCKING_SEND_CONTROL
1671         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1672             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1673                 LrtsAdvanceCommunication(0);
1674         }
1675 #endif
1676         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1677             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1678             {
1679                 if(!inbuff)
1680                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1681                 return GNI_RC_ERROR_NOMEM;
1682             }
1683             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1684             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1685         }
1686         else
1687         {
1688             status = GNI_RC_SUCCESS;
1689         }
1690         register_size = 0;  
1691     }
1692
1693 #if CMI_EXERT_SEND_CAP
1694     if(SEND_large_pending >= SEND_large_cap)
1695     {
1696         status = GNI_RC_ERROR_NOMEM;
1697     }
1698 #endif
1699  
1700     if(status == GNI_RC_SUCCESS)
1701     {
1702        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1703         if(status == GNI_RC_SUCCESS)
1704         {
1705 #if CMI_EXERT_SEND_CAP
1706             SEND_large_pending++;
1707 #endif
1708             buffered_send_msg += register_size;
1709             if(control_msg_tmp->seq_id == 0)
1710             {
1711                 IncreaseMsgInSend(source_addr);
1712             }
1713             FreeControlMsg(control_msg_tmp);
1714             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1715         }else
1716             status = GNI_RC_ERROR_RESOURCE;
1717
1718     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1719     {
1720         CmiAbort("Memory registor for large msg\n");
1721     }else 
1722     {
1723         status = GNI_RC_ERROR_NOMEM; 
1724         if(!inbuff)
1725             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1726     }
1727     return status;
1728 #else
1729     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1730     if(status == GNI_RC_SUCCESS)
1731     {
1732         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1733         if(status == GNI_RC_SUCCESS)
1734         {
1735             FreeControlMsg(control_msg_tmp);
1736         }
1737     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1738     {
1739         CmiAbort("Memory registor for large msg\n");
1740     }else 
1741     {
1742         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1743     }
1744     return status;
1745 #endif
1746 }
1747
1748 inline void LrtsPrepareEnvelope(char *msg, int size)
1749 {
1750     CmiSetMsgSize(msg, size);
1751     CMI_SET_CHECKSUM(msg, size);
1752 }
1753
1754 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1755 {
1756     gni_return_t        status  =   GNI_RC_SUCCESS;
1757     uint8_t tag;
1758     CONTROL_MSG         *control_msg_tmp;
1759     int                 oob = ( mode & OUT_OF_BAND);
1760     SMSG_QUEUE          *queue;
1761
1762     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1763 #if CMK_USE_OOB
1764     queue = oob? &smsg_oob_queue : &smsg_queue;
1765 #else
1766     queue = &smsg_queue;
1767 #endif
1768
1769     LrtsPrepareEnvelope(msg, size);
1770
1771 #if PRINT_SYH
1772     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1773 #endif 
1774 #if CMK_SMP 
1775     if(size <= SMSG_MAX_MSG)
1776         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1777     else if (size < BIG_MSG) {
1778         control_msg_tmp =  construct_control_msg(size, msg, 0);
1779         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1780     }
1781     else {
1782           CmiSetMsgSeq(msg, 0);
1783           control_msg_tmp =  construct_control_msg(size, msg, 1);
1784           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1785     }
1786 #else   //non-smp, smp(worker sending)
1787     if(size <= SMSG_MAX_MSG)
1788     {
1789         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1790             CmiFree(msg);
1791     }
1792     else if (size < BIG_MSG) {
1793         control_msg_tmp =  construct_control_msg(size, msg, 0);
1794         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1795     }
1796     else {
1797 #if     USE_LRTS_MEMPOOL
1798         CmiSetMsgSeq(msg, 0);
1799         control_msg_tmp =  construct_control_msg(size, msg, 1);
1800         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1801 #else
1802         control_msg_tmp =  construct_control_msg(size, msg, 0);
1803         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1804 #endif
1805     }
1806 #endif
1807     return 0;
1808 }
1809
1810 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1811 {
1812   int i;
1813 #if CMK_BROADCAST_USE_CMIREFERENCE
1814   for(i=0;i<npes;i++) {
1815     if (pes[i] == CmiMyPe())
1816       CmiSyncSend(pes[i], len, msg);
1817     else {
1818       CmiReference(msg);
1819       CmiSyncSendAndFree(pes[i], len, msg);
1820     }
1821   }
1822 #else
1823   for(i=0;i<npes;i++) {
1824     CmiSyncSend(pes[i], len, msg);
1825   }
1826 #endif
1827 }
1828
1829 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1830 {
1831   /* A better asynchronous implementation may be wanted, but at least it works */
1832   CmiSyncListSendFn(npes, pes, len, msg);
1833   return (CmiCommHandle) 0;
1834 }
1835
1836 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1837 {
1838   if (npes == 1) {
1839       CmiSyncSendAndFree(pes[0], len, msg);
1840       return;
1841   }
1842 #if CMK_PERSISTENT_COMM
1843   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE) {
1844       int i;
1845       for(i=0;i<npes;i++) {
1846         if (pes[i] == CmiMyPe())
1847           CmiSyncSend(pes[i], len, msg);
1848         else {
1849           CmiReference(msg);
1850           CmiSyncSendAndFree(pes[i], len, msg);
1851         }
1852       }
1853       CmiFree(msg);
1854       return;
1855   }
1856 #endif
1857   
1858 #if CMK_BROADCAST_USE_CMIREFERENCE
1859   CmiSyncListSendFn(npes, pes, len, msg);
1860   CmiFree(msg);
1861 #else
1862   int i;
1863   for(i=0;i<npes-1;i++) {
1864     CmiSyncSend(pes[i], len, msg);
1865   }
1866   if (npes>0)
1867     CmiSyncSendAndFree(pes[npes-1], len, msg);
1868   else 
1869     CmiFree(msg);
1870 #endif
1871 }
1872
1873 static void    PumpDatagramConnection();
1874 static      int         event_SetupConnect = 111;
1875 static      int         event_PumpSmsg = 222 ;
1876 static      int         event_PumpTransaction = 333;
1877 static      int         event_PumpRdmaTransaction = 444;
1878 static      int         event_SendBufferSmsg = 444;
1879 static      int         event_SendFmaRdmaMsg = 555;
1880 static      int         event_AdvanceCommunication = 666;
1881
1882 static void registerUserTraceEvents() {
1883 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1884     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1885     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1886     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1887     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1888     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1889     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1890     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1891 #endif
1892 }
1893
1894 static void ProcessDeadlock()
1895 {
1896     static CmiUInt8 *ptr = NULL;
1897     static CmiUInt8  last = 0, mysum, sum;
1898     static int count = 0;
1899     gni_return_t status;
1900     int i;
1901
1902 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1903 //sweep_mempool(CpvAccess(mempool));
1904     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1905     mysum = smsg_send_count + smsg_recv_count;
1906     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1907     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1908     GNI_RC_CHECK("PMI_Allgather", status);
1909     sum = 0;
1910     for (i=0; i<mysize; i++)  sum+= ptr[i];
1911     if (last == 0 || sum == last) 
1912         count++;
1913     else
1914         count = 0;
1915     last = sum;
1916     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1917     if (count == 2) { 
1918         /* detected twice, it is a real deadlock */
1919         if (myrank == 0)  {
1920             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1921             CmiAbort("Fatal> Deadlock detected.");
1922         }
1923
1924     }
1925     _detected_hang = 0;
1926 }
1927
1928 static void CheckProgress()
1929 {
1930     if (smsg_send_count == last_smsg_send_count &&
1931         smsg_recv_count == last_smsg_recv_count ) 
1932     {
1933         _detected_hang = 1;
1934 #if !CMK_SMP
1935         if (_detected_hang) ProcessDeadlock();
1936 #endif
1937
1938     }
1939     else {
1940         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1941         last_smsg_send_count = smsg_send_count;
1942         last_smsg_recv_count = smsg_recv_count;
1943         _detected_hang = 0;
1944     }
1945 }
1946
1947 static void set_limit()
1948 {
1949     //if (!user_set_flag && CmiMyRank() == 0) {
1950     if (CmiMyRank() == 0) {
1951         int mynode = CmiPhysicalNodeID(CmiMyPe());
1952         int numpes = CmiNumPesOnPhysicalNode(mynode);
1953         int numprocesses = numpes / CmiMyNodeSize();
1954         MAX_REG_MEM  = _totalmem / numprocesses;
1955         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1956         if (CmiMyPe() == 0)
1957            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1958         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1959         {
1960              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1961              CmiAbort("memory registration\n");
1962         }
1963     }
1964 }
1965
1966 void LrtsPostCommonInit(int everReturn)
1967 {
1968 #if CMK_DIRECT
1969     CmiDirectInit();
1970 #endif
1971 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1972     CpvInitialize(double, projTraceStart);
1973     /* only PE 0 needs to care about registration (to generate sts file). */
1974     //if (CmiMyPe() == 0) 
1975     {
1976         registerMachineUserEventsFunction(&registerUserTraceEvents);
1977     }
1978 #endif
1979
1980 #if CMK_SMP
1981     CmiIdleState *s=CmiNotifyGetState();
1982     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1983     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1984 #else
1985     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1986     if (useDynamicSMSG)
1987     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1988 #endif
1989
1990 #if ! LARGEPAGE
1991     if (_checkProgress)
1992 #if CMK_SMP
1993     if (CmiMyRank() == 0)
1994 #endif
1995     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1996 #endif
1997  
1998 #if !LARGEPAGE
1999     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
2000 #endif
2001 }
2002
2003 /* this is called by worker thread */
2004 void LrtsPostNonLocal(){
2005 #if CMK_SMP_TRACE_COMMTHREAD
2006     double startT, endT;
2007 #endif
2008 #if MULTI_THREAD_SEND
2009     if(mysize == 1) return;
2010 #if CMK_SMP_TRACE_COMMTHREAD
2011     traceEndIdle();
2012 #endif
2013
2014 #if CMK_SMP_TRACE_COMMTHREAD
2015     startT = CmiWallTimer();
2016 #endif
2017
2018 #if CMK_WORKER_SINGLE_TASK
2019     if (CmiMyRank() % 6 == 0)
2020 #endif
2021     PumpNetworkSmsg();
2022
2023 #if CMK_WORKER_SINGLE_TASK
2024     if (CmiMyRank() % 6 == 1)
2025 #endif
2026     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2027
2028 #if CMK_WORKER_SINGLE_TASK
2029     if (CmiMyRank() % 6 == 2)
2030 #endif
2031     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2032
2033 #if REMOTE_EVENT
2034 #if CMK_WORKER_SINGLE_TASK
2035     if (CmiMyRank() % 6 == 3)
2036 #endif
2037     PumpRemoteTransactions();
2038 #endif
2039
2040 #if CMK_WORKER_SINGLE_TASK
2041     if (CmiMyRank() % 6 == 4)
2042 #endif
2043 #if CMK_USE_OOB
2044     if (SendBufferMsg(&smsg_oob_queue) == 1)
2045 #endif
2046     {
2047         SendBufferMsg(&smsg_queue);
2048     }
2049
2050 #if CMK_WORKER_SINGLE_TASK
2051     if (CmiMyRank() % 6 == 5)
2052 #endif
2053     SendRdmaMsg();
2054
2055 #if CMK_SMP_TRACE_COMMTHREAD
2056     endT = CmiWallTimer();
2057     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2058 #endif
2059 #if CMK_SMP_TRACE_COMMTHREAD
2060     traceBeginIdle();
2061 #endif
2062 #endif
2063 }
2064
2065 /* useDynamicSMSG */
2066 static void    PumpDatagramConnection()
2067 {
2068     uint32_t          remote_address;
2069     uint32_t          remote_id;
2070     gni_return_t status;
2071     gni_post_state_t  post_state;
2072     uint64_t          datagram_id;
2073     int i;
2074
2075    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2076    {
2077        if (datagram_id >= mysize) {           /* bound endpoint */
2078            int pe = datagram_id - mysize;
2079            CMI_GNI_LOCK(global_gni_lock)
2080            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2081            CMI_GNI_UNLOCK(global_gni_lock)
2082            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2083            {
2084                CmiAssert(remote_id == pe);
2085                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2086                GNI_RC_CHECK("Dynamic SMSG Init", status);
2087 #if PRINT_SYH
2088                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2089 #endif
2090                CmiAssert(smsg_connected_flag[pe] == 1);
2091                smsg_connected_flag[pe] = 2;
2092            }
2093        }
2094        else {         /* unbound ep */
2095            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2096            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2097            {
2098                CmiAssert(remote_id<mysize);
2099                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2100                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2101                GNI_RC_CHECK("Dynamic SMSG Init", status);
2102 #if PRINT_SYH
2103                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2104 #endif
2105                smsg_connected_flag[remote_id] = 2;
2106
2107                alloc_smsg_attr(&send_smsg_attr);
2108                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2109                GNI_RC_CHECK("post unbound datagram", status);
2110            }
2111        }
2112    }
2113 }
2114
2115 /* pooling CQ to receive network message */
2116 static void PumpNetworkRdmaMsgs()
2117 {
2118     gni_cq_entry_t      event_data;
2119     gni_return_t        status;
2120
2121 }
2122
2123 inline 
2124 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
2125 {
2126     RDMA_REQUEST        *rdma_request_msg;
2127     MallocRdmaRequest(rdma_request_msg);
2128     rdma_request_msg->destNode = inst_id;
2129     rdma_request_msg->pd = pd;
2130 #if REMOTE_EVENT
2131     rdma_request_msg->ack_index = ack_index;
2132 #endif
2133 #if CMK_SMP
2134     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2135 #else
2136     if(sendRdmaBuf == 0)
2137     {
2138         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2139     }else{
2140         sendRdmaTail->next = rdma_request_msg;
2141         sendRdmaTail =  rdma_request_msg;
2142     }
2143 #endif
2144
2145 }
2146
2147 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2148
2149 static void PumpNetworkSmsg()
2150 {
2151     uint64_t            inst_id;
2152     int                 ret;
2153     gni_cq_entry_t      event_data;
2154     gni_return_t        status, status2;
2155     void                *header;
2156     uint8_t             msg_tag;
2157     int                 msg_nbytes;
2158     void                *msg_data;
2159     gni_mem_handle_t    msg_mem_hndl;
2160     gni_smsg_attr_t     *smsg_attr;
2161     gni_smsg_attr_t     *remote_smsg_attr;
2162     int                 init_flag;
2163     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2164     uint64_t            source_addr;
2165     SMSG_QUEUE         *queue = &smsg_queue;
2166 #if   CMK_DIRECT
2167     cmidirectMsg        *direct_msg;
2168 #endif
2169 #if CMI_EXERT_RECV_CAP
2170     int                  recv_cnt = 0;
2171 #endif
2172     while(1)
2173     {
2174         CMI_GNI_LOCK(smsg_rx_cq_lock)
2175         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2176         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2177         if(status != GNI_RC_SUCCESS)
2178             break;
2179         inst_id = GNI_CQ_GET_INST_ID(event_data);
2180 #if REMOTE_EVENT
2181         inst_id = GET_RANK(inst_id);      /* important */
2182 #endif
2183         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2184 #if PRINT_SYH
2185         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2186 #endif
2187         if (useDynamicSMSG) {
2188             /* subtle: smsg may come before connection is setup */
2189             while (smsg_connected_flag[inst_id] != 2) 
2190                PumpDatagramConnection();
2191         }
2192         msg_tag = GNI_SMSG_ANY_TAG;
2193         while(1) {
2194             CMI_GNI_LOCK(smsg_mailbox_lock)
2195             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2196             if (status != GNI_RC_SUCCESS)
2197             {
2198                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2199                 break;
2200             }
2201 #if PRINT_SYH
2202             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2203 #endif
2204             /* copy msg out and then put into queue (small message) */
2205             switch (msg_tag) {
2206             case SMALL_DATA_TAG:
2207             {
2208                 START_EVENT();
2209                 msg_nbytes = CmiGetMsgSize(header);
2210                 msg_data    = CmiAlloc(msg_nbytes);
2211                 memcpy(msg_data, (char*)header, msg_nbytes);
2212                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2213                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2214                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
2215                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2216                 handleOneRecvedMsg(msg_nbytes, msg_data);
2217                 break;
2218             }
2219             case LMSG_INIT_TAG:
2220             {
2221 #if MULTI_THREAD_SEND
2222                 MallocControlMsg(control_msg_tmp);
2223                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2224                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2225                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2226                 getLargeMsgRequest(control_msg_tmp, inst_id);
2227                 FreeControlMsg(control_msg_tmp);
2228 #else
2229                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2230                 getLargeMsgRequest(header, inst_id);
2231                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2232 #endif
2233                 break;
2234             }
2235 #if !REMOTE_EVENT && !CQWRITE
2236             case ACK_TAG:   //msg fit into mempool
2237             {
2238                 /* Get is done, release message . Now put is not used yet*/
2239                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2240                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2241                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2242 #if ! USE_LRTS_MEMPOOL
2243                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2244 #else
2245                 DecreaseMsgInSend(msg);
2246 #endif
2247                 if(NoMsgInSend(msg))
2248                     buffered_send_msg -= GetMempoolsize(msg);
2249                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2250                 CmiFree(msg);
2251 #if CMI_EXERT_SEND_CAP
2252                 SEND_large_pending--;
2253 #endif
2254                 break;
2255             }
2256 #endif
2257             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2258             {
2259 #if MULTI_THREAD_SEND
2260                 MallocControlMsg(header_tmp);
2261                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2262                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2263 #else
2264                 header_tmp = (CONTROL_MSG *) header;
2265 #endif
2266                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2267 #if CMI_EXERT_SEND_CAP
2268                     SEND_large_pending--;
2269 #endif
2270                 void *msg = (void*)(header_tmp->source_addr);
2271                 int cur_seq = CmiGetMsgSeq(msg);
2272                 int offset = ONE_SEG*(cur_seq+1);
2273                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2274                 buffered_send_msg -= header_tmp->length;
2275                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2276                 if (remain_size < 0) remain_size = 0;
2277                 CmiSetMsgSize(msg, remain_size);
2278                 if(remain_size <= 0) //transaction done
2279                 {
2280                     CmiFree(msg);
2281                 }else if (header_tmp->total_length > offset)
2282                 {
2283                     CmiSetMsgSeq(msg, cur_seq+1);
2284                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2285                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2286                     //send next seg
2287                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2288                          // pipelining
2289                     if (header_tmp->seq_id == 1) {
2290                       int i;
2291                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2292                         int seq = cur_seq+i+2;
2293                         CmiSetMsgSeq(msg, seq-1);
2294                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2295                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2296                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2297                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2298                       }
2299                     }
2300                 }
2301 #if MULTI_THREAD_SEND
2302                 FreeControlMsg(header_tmp);
2303 #else
2304                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2305 #endif
2306                 break;
2307             }
2308 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2309             case PUT_DONE_TAG:  {   //persistent message
2310                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2311                 int size = ((CONTROL_MSG *) header)->length;
2312                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2313                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2314                 CmiReference(msg);
2315                 CMI_CHECK_CHECKSUM(msg, size);
2316                 handleOneRecvedMsg(size, msg); 
2317 #if PRINT_SYH
2318                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2319 #endif
2320                 break;
2321             }
2322 #endif
2323 #if CMK_DIRECT
2324             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2325                 //create a trigger message
2326                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2327                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2328                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2329                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2330                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2331                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2332                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2333                 break;
2334 #endif
2335             default:
2336                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2337                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2338                 printf("weird tag problem\n");
2339                 CmiAbort("Unknown tag\n");
2340             }               // end switch
2341 #if PRINT_SYH
2342             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2343 #endif
2344             smsg_recv_count ++;
2345             msg_tag = GNI_SMSG_ANY_TAG;
2346 #if CMI_EXERT_RECV_CAP
2347             if (status == GNI_RC_SUCCESS && ++recv_cnt == RECV_CAP) return;
2348 #endif
2349         } //endwhile GNI_SmsgGetNextWTag
2350     }   //end while GetEvent
2351     if(status == GNI_RC_ERROR_RESOURCE)
2352     {
2353         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2354         GNI_RC_CHECK("Smsg_rx_cq full", status);
2355     }
2356 }
2357
2358 static void printDesc(gni_post_descriptor_t *pd)
2359 {
2360     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2361 }
2362
2363 #if CQWRITE
2364 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2365 {
2366     gni_post_descriptor_t *pd;
2367     gni_return_t        status = GNI_RC_SUCCESS;
2368     
2369     MallocPostDesc(pd);
2370     pd->type = GNI_POST_CQWRITE;
2371     pd->cq_mode = GNI_CQMODE_SILENT;
2372     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2373     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2374     pd->cqwrite_value = data;
2375     pd->remote_mem_hndl = mem_hndl;
2376     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2377     GNI_RC_CHECK("GNI_PostCqWrite", status);
2378 }
2379 #endif
2380
2381 // register memory for a message
2382 // return mem handle
2383 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2384 {
2385     gni_return_t status = GNI_RC_SUCCESS;
2386
2387     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2388
2389 #if CMK_PERSISTENT_COMM
2390       // persistent message is always registered
2391       // BIG_MSG small pieces do not have malloc chunk header
2392     if ((seqno <= 1 || seqno == PERSIST_SEQ) && !IsMemHndlZero(MEMHFIELD(msg))) {
2393         *memh = MEMHFIELD(msg);
2394         return GNI_RC_SUCCESS;
2395     }
2396 #endif
2397     if(seqno == 0 
2398 #if CMK_PERSISTENT_COMM
2399          || seqno == PERSIST_SEQ
2400 #endif
2401       )
2402     {
2403         if(IsMemHndlZero((GetMemHndl(msg))))
2404         {
2405             msg = (void*)(msg);
2406             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2407             if(status == GNI_RC_SUCCESS)
2408                 *memh = GetMemHndl(msg);
2409         }
2410         else {
2411             *memh = GetMemHndl(msg);
2412         }
2413     }
2414     else {
2415         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2416         status = registerMemory(msg, size, memh, NULL); 
2417     }
2418     return status;
2419 }
2420
2421 // for BIG_MSG called on receiver side for receiving control message
2422 // LMSG_INIT_TAG
2423 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2424 {
2425 #if     USE_LRTS_MEMPOOL
2426     CONTROL_MSG         *request_msg;
2427     gni_return_t        status = GNI_RC_SUCCESS;
2428     void                *msg_data;
2429     gni_post_descriptor_t *pd;
2430     gni_mem_handle_t    msg_mem_hndl;
2431     int                 size, transaction_size, offset = 0;
2432     size_t              register_size = 0;
2433
2434     // initial a get to transfer data from the sender side */
2435     request_msg = (CONTROL_MSG *) header;
2436     size = request_msg->total_length;
2437     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2438     MallocPostDesc(pd);
2439 #if CMK_WITH_STATS 
2440     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2441 #endif
2442     if(request_msg->seq_id < 2)   {
2443 #if CMK_SMP_TRACE_COMMTHREAD 
2444         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2445 #endif
2446         msg_data = CmiAlloc(size);
2447         CmiSetMsgSeq(msg_data, 0);
2448         _MEMCHECK(msg_data);
2449     }
2450     else {
2451         offset = ONE_SEG*(request_msg->seq_id-1);
2452         msg_data = (char*)request_msg->dest_addr + offset;
2453     }
2454    
2455     pd->cqwrite_value = request_msg->seq_id;
2456
2457     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2458     SetMemHndlZero(pd->local_mem_hndl);
2459     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2460     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2461         if(NoMsgInRecv( (void*)(msg_data)))
2462             register_size = GetMempoolsize((void*)(msg_data));
2463     }
2464
2465     pd->first_operand = ALIGN64(size);                   //  total length
2466
2467     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2468         pd->type            = GNI_POST_FMA_GET;
2469     else
2470         pd->type            = GNI_POST_RDMA_GET;
2471     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2472     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2473     pd->length          = transaction_size;
2474     pd->local_addr      = (uint64_t) msg_data;
2475     pd->remote_addr     = request_msg->source_addr + offset;
2476     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2477     pd->src_cq_hndl     = rdma_tx_cqh;
2478     pd->rdma_mode       = 0;
2479     pd->amo_cmd         = 0;
2480 #if CMI_EXERT_RDMA_CAP
2481     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2482 #endif
2483     //memory registration success
2484     if(status == GNI_RC_SUCCESS )
2485     {
2486         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2487         CMI_GNI_LOCK(lock)
2488 #if REMOTE_EVENT
2489         if( request_msg->seq_id == 0)
2490         {
2491             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2492             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2493             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2494         }
2495 #endif
2496
2497 #if CMK_WITH_STATS
2498         RDMA_TRY_SEND(pd->type)
2499 #endif
2500         if(pd->type == GNI_POST_RDMA_GET) 
2501         {
2502             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2503         }
2504         else
2505         {
2506             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2507         }
2508         CMI_GNI_UNLOCK(lock)
2509
2510         if(status == GNI_RC_SUCCESS )
2511         {
2512 #if CMI_EXERT_RDMA_CAP
2513             RDMA_pending++;
2514 #endif
2515             if(pd->cqwrite_value == 0)
2516             {
2517 #if MACHINE_DEBUG_LOG
2518                 buffered_recv_msg += register_size;
2519                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2520 #endif
2521                 IncreaseMsgInRecv(msg_data);
2522 #if CMK_SMP_TRACE_COMMTHREAD 
2523                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2524 #endif
2525             }
2526 #if  CMK_WITH_STATS
2527             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2528             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2529 #endif
2530         }
2531     }else
2532     {
2533         SetMemHndlZero((pd->local_mem_hndl));
2534     }
2535     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2536     {
2537 #if REMOTE_EVENT
2538         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2539 #else
2540         bufferRdmaMsg(inst_id, pd, -1); 
2541 #endif
2542     }else if (status != GNI_RC_SUCCESS) {
2543         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2544         GNI_RC_CHECK("GetLargeAFter posting", status);
2545     }
2546 #else
2547     CONTROL_MSG         *request_msg;
2548     gni_return_t        status;
2549     void                *msg_data;
2550     gni_post_descriptor_t *pd;
2551     RDMA_REQUEST        *rdma_request_msg;
2552     gni_mem_handle_t    msg_mem_hndl;
2553     //int source;
2554     // initial a get to transfer data from the sender side */
2555     request_msg = (CONTROL_MSG *) header;
2556     msg_data = CmiAlloc(request_msg->length);
2557     _MEMCHECK(msg_data);
2558
2559     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2560
2561     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2562     {
2563         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2564     }
2565
2566     MallocPostDesc(pd);
2567     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2568         pd->type            = GNI_POST_FMA_GET;
2569     else
2570         pd->type            = GNI_POST_RDMA_GET;
2571     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2572     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2573     pd->length          = ALIGN64(request_msg->length);
2574     pd->local_addr      = (uint64_t) msg_data;
2575     pd->remote_addr     = request_msg->source_addr;
2576     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2577     pd->src_cq_hndl     = rdma_tx_cqh;
2578     pd->rdma_mode       = 0;
2579     pd->amo_cmd         = 0;
2580
2581     //memory registration successful
2582     if(status == GNI_RC_SUCCESS)
2583     {
2584         pd->local_mem_hndl  = msg_mem_hndl;
2585        
2586         if(pd->type == GNI_POST_RDMA_GET) 
2587         {
2588             CMI_GNI_LOCK(rdma_tx_cq_lock)
2589             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2590             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2591         }
2592         else
2593         {
2594             CMI_GNI_LOCK(default_tx_cq_lock)
2595             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2596             CMI_GNI_UNLOCK(default_tx_cq_lock)
2597         }
2598
2599     }else
2600     {
2601         SetMemHndlZero(pd->local_mem_hndl);
2602     }
2603     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2604     {
2605         MallocRdmaRequest(rdma_request_msg);
2606         rdma_request_msg->next = 0;
2607         rdma_request_msg->destNode = inst_id;
2608         rdma_request_msg->pd = pd;
2609         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2610     }else {
2611         GNI_RC_CHECK("AFter posting", status);
2612     }
2613 #endif
2614 }
2615
2616 #if CQWRITE
2617 static void PumpCqWriteTransactions()
2618 {
2619
2620     gni_cq_entry_t          ev;
2621     gni_return_t            status;
2622     void                    *msg;  
2623     int                     msg_size;
2624     while(1) {
2625         //CMI_GNI_LOCK(my_cq_lock) 
2626         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2627         //CMI_GNI_UNLOCK(my_cq_lock)
2628         if(status != GNI_RC_SUCCESS) break;
2629         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2630 #if CMK_PERSISTENT_COMM
2631 #if PRINT_SYH
2632         printf(" %d CQ write event %p\n", myrank, msg);
2633 #endif
2634         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2635 #if PRINT_SYH
2636             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2637 #endif
2638             CmiReference(msg);
2639             msg_size = CmiGetMsgSize(msg);
2640             CMI_CHECK_CHECKSUM(msg, msg_size);
2641             handleOneRecvedMsg(msg_size, msg); 
2642             continue;
2643         }
2644 #endif
2645 #if ! USE_LRTS_MEMPOOL
2646        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2647 #else
2648         DecreaseMsgInSend(msg);
2649 #endif
2650         if(NoMsgInSend(msg))
2651             buffered_send_msg -= GetMempoolsize(msg);
2652         CmiFree(msg);
2653     };
2654     if(status == GNI_RC_ERROR_RESOURCE)
2655     {
2656         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2657     }
2658 }
2659 #endif
2660
2661 #if REMOTE_EVENT
2662 static void PumpRemoteTransactions()
2663 {
2664     gni_cq_entry_t          ev;
2665     gni_return_t            status;
2666     void                    *msg;   
2667     int                     inst_id, index, type, size;
2668
2669     while(1) {
2670         CMI_GNI_LOCK(rdma_tx_cq_lock)
2671 //        CMI_GNI_LOCK(global_gni_lock)
2672         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2673 //        CMI_GNI_UNLOCK(global_gni_lock)
2674         CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2675
2676         if(status != GNI_RC_SUCCESS) break;
2677
2678         inst_id = GNI_CQ_GET_INST_ID(ev);
2679         index = GET_INDEX(inst_id);
2680         type = GET_TYPE(inst_id);
2681         switch (type) {
2682         case 0:    // ACK
2683             CmiAssert(index>=0 && index<ackPool.size);
2684             CMI_GNI_LOCK(ackPool.lock);
2685             CmiAssert(GetIndexType(ackPool, index) == 1);
2686             msg = GetIndexAddress(ackPool, index);
2687             CMI_GNI_UNLOCK(ackPool.lock);
2688 #if PRINT_SYH
2689             printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2690 #endif
2691 #if ! USE_LRTS_MEMPOOL
2692            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2693 #else
2694             DecreaseMsgInSend(msg);
2695 #endif
2696             if(NoMsgInSend(msg))
2697                 buffered_send_msg -= GetMempoolsize(msg);
2698             CmiFree(msg);
2699             IndexPool_freeslot(&ackPool, index);
2700 #if CMI_EXERT_SEND_CAP
2701             SEND_large_pending--;
2702 #endif
2703             break;
2704 #if CMK_PERSISTENT_COMM
2705         case 1:  {    // PERSISTENT
2706             CmiLock(persistPool.lock);
2707             CmiAssert(GetIndexType(persistPool, index) == 2);
2708             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2709             CmiUnlock(persistPool.lock);
2710             START_EVENT();
2711             msg = slot->destBuf[0].destAddress;
2712             size = CmiGetMsgSize(msg);
2713             CmiReference(msg);
2714             CMI_CHECK_CHECKSUM(msg, size);
2715             TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2716             handleOneRecvedMsg(size, msg); 
2717             break;
2718             }
2719 #endif
2720         default:
2721             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2722             CmiAbort("PumpRemoteTransactions: unknown type");
2723         }
2724     }
2725     if(status == GNI_RC_ERROR_RESOURCE)
2726     {
2727         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2728     }
2729 }
2730 #endif
2731
2732 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2733 {
2734     gni_cq_entry_t          ev;
2735     gni_return_t            status;
2736     uint64_t                type, inst_id;
2737     gni_post_descriptor_t   *tmp_pd;
2738     MSG_LIST                *ptr;
2739     CONTROL_MSG             *ack_msg_tmp;
2740     ACK_MSG                 *ack_msg;
2741     uint8_t                 msg_tag;
2742 #if CMK_DIRECT
2743     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2744 #endif
2745     SMSG_QUEUE         *queue = &smsg_queue;
2746
2747     while(1) {
2748         CMI_GNI_LOCK(my_cq_lock) 
2749         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2750         CMI_GNI_UNLOCK(my_cq_lock)
2751         if(status != GNI_RC_SUCCESS) break;
2752         
2753         type = GNI_CQ_GET_TYPE(ev);
2754         if (type == GNI_CQ_EVENT_TYPE_POST)
2755         {
2756
2757 #if CMI_EXERT_RDMA_CAP
2758             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2759             RDMA_pending--;
2760 #endif
2761             inst_id     = GNI_CQ_GET_INST_ID(ev);
2762 #if PRINT_SYH
2763             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2764 #endif
2765             CMI_GNI_LOCK(my_cq_lock)
2766             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2767             CMI_GNI_UNLOCK(my_cq_lock)
2768
2769             switch (tmp_pd->type) {
2770 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2771             case GNI_POST_RDMA_PUT:
2772 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2773                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2774 #endif
2775             case GNI_POST_FMA_PUT:
2776                 if(tmp_pd->amo_cmd == 1) {
2777 #if CMK_DIRECT
2778                     //sender ACK to receiver to trigger it is done
2779                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2780                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2781                     msg_tag = DIRECT_PUT_DONE_TAG;
2782 #endif
2783                 }
2784                 else {
2785                     CmiFree((void *)tmp_pd->local_addr);
2786 #if REMOTE_EVENT
2787                     FreePostDesc(tmp_pd);
2788                     continue;
2789 #elif CQWRITE
2790                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2791                     FreePostDesc(tmp_pd);
2792                     continue;
2793 #else
2794                     MallocControlMsg(ack_msg_tmp);
2795                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2796                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2797                     ack_msg_tmp->length  = tmp_pd->length;
2798                     msg_tag = PUT_DONE_TAG;
2799 #endif
2800                 }
2801                 break;
2802 #endif
2803             case GNI_POST_RDMA_GET:
2804             case GNI_POST_FMA_GET:  {
2805 #if  ! USE_LRTS_MEMPOOL
2806                 MallocControlMsg(ack_msg_tmp);
2807                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2808                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2809                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2810                 msg_tag = ACK_TAG;  
2811 #else
2812 #if CMK_WITH_STATS
2813                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2814 #endif
2815                 int seq_id = tmp_pd->cqwrite_value;
2816                 if(seq_id > 0)      // BIG_MSG
2817                 {
2818                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2819                     MallocControlMsg(ack_msg_tmp);
2820                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2821                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2822                     ack_msg_tmp->seq_id = seq_id;
2823                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2824                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2825                     ack_msg_tmp->length = tmp_pd->length;
2826                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2827                     msg_tag = BIG_MSG_TAG; 
2828                 } 
2829                 else
2830                 {
2831                     msg_tag = ACK_TAG; 
2832 #if  !REMOTE_EVENT && !CQWRITE
2833                     MallocAckMsg(ack_msg);
2834                     ack_msg->source_addr = tmp_pd->remote_addr;
2835 #endif
2836                 }
2837 #endif
2838                 break;
2839             }
2840             case  GNI_POST_CQWRITE:
2841                    FreePostDesc(tmp_pd);
2842                    continue;
2843             default:
2844                 CmiPrintf("type=%d\n", tmp_pd->type);
2845                 CmiAbort("PumpLocalTransactions: unknown type!");
2846             }      /* end of switch */
2847
2848 #if CMK_DIRECT
2849             if (tmp_pd->amo_cmd == 1) {
2850                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2851                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2852             }
2853             else
2854 #endif
2855             if (msg_tag == ACK_TAG) {
2856 #if !REMOTE_EVENT
2857 #if   !CQWRITE
2858                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2859                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2860 #else
2861                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2862 #endif
2863 #endif
2864             }
2865             else {
2866                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2867                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2868             }
2869 #if CMK_PERSISTENT_COMM
2870             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2871 #endif
2872             {
2873                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2874 #if PRINT_SYH
2875                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2876 #endif
2877                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2878                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2879
2880                     START_EVENT();
2881                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2882                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2883 #if MACHINE_DEBUG_LOG
2884                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2885                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2886                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2887 #endif
2888                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2889                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2890                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2891                 }else if(msg_tag == BIG_MSG_TAG){
2892                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2893                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2894                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2895                         START_EVENT();
2896 #if PRINT_SYH
2897                         printf("Pipeline msg done [%d]\n", myrank);
2898 #endif
2899 #if                 CMK_SMP_TRACE_COMMTHREAD
2900                         if( tmp_pd->cqwrite_value == 1)
2901                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2902 #endif
2903                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2904                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2905                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2906                     }
2907                 }
2908             }
2909             FreePostDesc(tmp_pd);
2910         }
2911     } //end while
2912     if(status == GNI_RC_ERROR_RESOURCE)
2913     {
2914         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2915         GNI_RC_CHECK("Smsg_tx_cq full", status);
2916     }
2917 }
2918
2919 static void  SendRdmaMsg()
2920 {
2921     gni_return_t            status = GNI_RC_SUCCESS;
2922     gni_mem_handle_t        msg_mem_hndl;
2923     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2924     RDMA_REQUEST            *pre = 0;
2925     uint64_t                register_size = 0;
2926     void                    *msg;
2927     int                     i;
2928 #if CMK_SMP
2929     int len = PCQueueLength(sendRdmaBuf);
2930     for (i=0; i<len; i++)
2931     {
2932 #if CMI_EXERT_RDMA_CAP
2933         if( RDMA_pending >= RDMA_cap) break;
2934 #endif
2935         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2936         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2937         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2938         if (ptr == NULL) break;
2939 #else
2940     ptr = sendRdmaBuf;
2941     while (ptr!=0 )
2942     {
2943 #if CMI_EXERT_RDMA_CAP
2944          if( RDMA_pending >= RDMA_cap) break;
2945 #endif
2946 #endif 
2947         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2948         gni_post_descriptor_t *pd = ptr->pd;
2949         
2950         msg = (void*)(pd->local_addr);
2951         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2952         register_size = 0;
2953         if(pd->cqwrite_value == 0) {
2954             if(NoMsgInRecv(msg))
2955                 register_size = GetMempoolsize(msg);
2956         }
2957
2958         if(status == GNI_RC_SUCCESS)        //mem register good
2959         {
2960             int destNode = ptr->destNode;
2961             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2962             CMI_GNI_LOCK(lock);
2963 #if REMOTE_EVENT
2964             if( pd->cqwrite_value == 0) {
2965                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2966                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
2967                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2968             }
2969 #if CMK_PERSISTENT_COMM
2970             else if (pd->cqwrite_value == PERSIST_SEQ) {
2971                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2972                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
2973                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2974             }
2975 #endif
2976 #endif
2977 #if CMK_WITH_STATS
2978             RDMA_TRY_SEND(pd->type)
2979 #endif
2980 #if CMK_SMP_TRACE_COMMTHREAD
2981 //            int oldpe = -1;
2982 //            int oldeventid = -1;
2983 //            if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
2984 //            { 
2985 //                TRACE_COMM_GET_MSGID((void*)pd->local_addr, &oldpe, &oldeventid);
2986 //                TRACE_COMM_SET_COMM_MSGID((void*)pd->local_addr);
2987 //            }
2988               if(IS_PUT(pd->type) )
2989               { 
2990                   START_EVENT();
2991                   TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);
2992               }
2993 #endif
2994
2995             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2996             {
2997                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
2998             }
2999             else
3000             {
3001                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3002             }
3003             CMI_GNI_UNLOCK(lock);
3004             
3005 #if CMK_SMP_TRACE_COMMTHREAD
3006 //            if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
3007 //            { 
3008 //                if (oldpe != -1)  TRACE_COMM_SET_MSGID((void*)pd->local_addr, oldpe, oldeventid);
3009 //            }
3010 #endif
3011             if(status == GNI_RC_SUCCESS)    //post good
3012             {
3013 #if CMI_EXERT_RDMA_CAP
3014                 RDMA_pending ++;
3015 #endif
3016 #if !CMK_SMP
3017                 tmp_ptr = ptr;
3018                 if(pre != 0) {
3019                     pre->next = ptr->next;
3020                 }
3021                 else {
3022                     sendRdmaBuf = ptr->next;
3023                 }
3024                 ptr = ptr->next;
3025                 FreeRdmaRequest(tmp_ptr);
3026 #endif
3027                 if(pd->cqwrite_value == 0)
3028                 {
3029 #if CMK_SMP_TRACE_COMMTHREAD 
3030                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3031 #endif
3032                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3033                 }
3034 #if  CMK_WITH_STATS
3035                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3036                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3037 #endif
3038 #if MACHINE_DEBUG_LOG
3039                 buffered_recv_msg += register_size;
3040                 MACHSTATE(8, "GO request from buffered\n"); 
3041 #endif
3042 #if PRINT_SYH
3043                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
3044 #endif
3045             }else           // cannot post
3046             {
3047 #if CMK_SMP
3048                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3049 #else
3050                 pre = ptr;
3051                 ptr = ptr->next;
3052 #endif
3053 #if PRINT_SYH
3054                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3055 #endif
3056                 break;
3057             }
3058         } else          //memory registration fails
3059         {
3060 #if CMK_SMP
3061             PCQueuePush(sendRdmaBuf, (char*)ptr);
3062 #else
3063             pre = ptr;
3064             ptr = ptr->next;
3065 #endif
3066         }
3067     } //end while
3068 #if ! CMK_SMP
3069     if(ptr == 0)
3070         sendRdmaTail = pre;
3071 #endif
3072 }
3073
3074 // return 1 if all messages are sent
3075 static int SendBufferMsg(SMSG_QUEUE *queue)
3076 {
3077     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3078     CONTROL_MSG         *control_msg_tmp;
3079     gni_return_t        status;
3080     int                 done = 1;
3081     uint64_t            register_size;
3082     void                *register_addr;
3083     int                 index_previous = -1;
3084
3085 #if CMK_SMP
3086     int          index = 0;
3087 #if ONE_SEND_QUEUE
3088     memset(destpe_avail, 0, mysize * sizeof(char));
3089     for (index=0; index<1; index++)
3090     {
3091         int i, len = PCQueueLength(queue->sendMsgBuf);
3092         for (i=0; i<len; i++) 
3093         {
3094             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3095             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3096             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3097             if(ptr == NULL) break;
3098             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3099                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3100                 continue;
3101             }
3102 #else
3103 #if SMP_LOCKS
3104     int nonempty = PCQueueLength(nonEmptyQueues);
3105     for(index =0; index<nonempty; index++)
3106     {
3107         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
3108         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
3109         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
3110         if(current_list == NULL) break; 
3111         PCQueue current_queue= current_list-> sendSmsgBuf;
3112         CmiLock(current_list->lock);
3113         int i, len = PCQueueLength(current_queue);
3114         current_list->pushed = 0;
3115         CmiUnlock(current_list->lock);
3116 #else
3117     for(index =0; index<mysize; index++)
3118     {
3119         //if (index == myrank) continue;
3120         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3121         int i, len = PCQueueLength(current_queue);
3122 #endif
3123         for (i=0; i<len; i++) 
3124         {
3125             CMI_PCQUEUEPOP_LOCK(current_queue)
3126             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3127             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3128             if (ptr == 0) break;
3129 #endif
3130 #else
3131     int index = queue->smsg_head_index;
3132     while(index != -1)
3133     {
3134         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3135         pre = 0;
3136         while(ptr != 0)
3137         {
3138 #endif
3139             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3140             status = GNI_RC_ERROR_RESOURCE;
3141             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
3142                 /* connection not exists yet */
3143 #if CMK_SMP
3144                   /* non-smp case, connect is issued in send_smsg_message */
3145                 if (smsg_connected_flag[index] == 0)
3146                     connect_to(ptr->destNode); 
3147 #endif
3148             }
3149             else
3150             switch(ptr->tag)
3151             {
3152             case SMALL_DATA_TAG:
3153                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3154                 if(status == GNI_RC_SUCCESS)
3155                 {
3156                     CmiFree(ptr->msg);
3157                 }
3158                 break;
3159             case LMSG_INIT_TAG:
3160                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3161                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
3162                 break;
3163 #if !REMOTE_EVENT && !CQWRITE
3164             case ACK_TAG:
3165                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3166                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3167                 break;
3168 #endif
3169             case BIG_MSG_TAG:
3170                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3171                 if(status == GNI_RC_SUCCESS)
3172                 {
3173                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3174                 }
3175                 break;
3176 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3177             case PUT_DONE_TAG:
3178                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3179                 if(status == GNI_RC_SUCCESS)
3180                 {
3181                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3182                 }
3183                 break;
3184 #endif
3185 #if CMK_DIRECT
3186             case DIRECT_PUT_DONE_TAG:
3187                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3188                 if(status == GNI_RC_SUCCESS)
3189                 {
3190                     free((CMK_DIRECT_HEADER*)ptr->msg);
3191                 }
3192                 break;
3193
3194 #endif
3195             default:
3196                 printf("Weird tag\n");
3197                 CmiAbort("should not happen\n");
3198             }       // end switch
3199             if(status == GNI_RC_SUCCESS)
3200             {
3201 #if PRINT_SYH
3202                 buffered_smsg_counter--;
3203                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3204 #endif
3205 #if !CMK_SMP
3206                 tmp_ptr = ptr;
3207                 if(pre)
3208                 {
3209                     ptr = pre ->next = ptr->next;
3210                 }else
3211                 {
3212                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3213                 }
3214                 FreeMsgList(tmp_ptr);
3215 #else
3216                 FreeMsgList(ptr);
3217 #endif
3218             }else {
3219 #if CMK_SMP
3220 #if ONE_SEND_QUEUE
3221                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3222 #else
3223                 PCQueuePush(current_queue, (char*)ptr);
3224 #endif
3225 #else
3226                 pre = ptr;
3227                 ptr=ptr->next;
3228 #endif
3229                 done = 0;
3230                 if(status == GNI_RC_ERROR_RESOURCE)
3231                 {
3232 #if CMK_SMP && ONE_SEND_QUEUE 
3233                     destpe_avail[ptr->destNode] = 1;
3234 #else
3235                     break;
3236 #endif
3237                 }
3238             } 
3239         } //end while
3240 #if !CMK_SMP
3241         if(ptr == 0)
3242             queue->smsg_msglist_index[index].tail = pre;
3243         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3244         {
3245             if(index_previous != -1)
3246                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3247             else
3248                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3249         }
3250         else {
3251             index_previous = index;
3252         }
3253         index = queue->smsg_msglist_index[index].next;
3254 #else
3255 #if !ONE_SEND_QUEUE && SMP_LOCKS
3256         CmiLock(current_list->lock);
3257         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3258         {
3259             current_list->pushed = 1;
3260             PCQueuePush(nonEmptyQueues, (char*)current_list);
3261         }
3262         CmiUnlock(current_list->lock); 
3263 #endif
3264 #endif
3265
3266     }   // end pooling for all cores
3267     return done;
3268 }
3269
3270 static void ProcessDeadlock();
3271 void LrtsAdvanceCommunication(int whileidle)
3272 {
3273     static int count = 0;
3274     /*  Receive Msg first */
3275 #if CMK_SMP_TRACE_COMMTHREAD
3276     double startT, endT;
3277 #endif
3278     if (useDynamicSMSG && whileidle)
3279     {
3280 #if CMK_SMP_TRACE_COMMTHREAD
3281         startT = CmiWallTimer();
3282 #endif
3283         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3284 #if CMK_SMP_TRACE_COMMTHREAD
3285         endT = CmiWallTimer();
3286         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3287 #endif
3288     }
3289
3290 #if CMK_SMP_TRACE_COMMTHREAD
3291     startT = CmiWallTimer();
3292 #endif
3293     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3294     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3295 #if CMK_SMP_TRACE_COMMTHREAD
3296     endT = CmiWallTimer();
3297     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3298 #endif
3299
3300 #if CMK_SMP_TRACE_COMMTHREAD
3301     startT = CmiWallTimer();
3302 #endif
3303     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3304     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3305 #if CMK_SMP_TRACE_COMMTHREAD
3306     endT = CmiWallTimer();
3307     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3308 #endif
3309
3310 #if CMK_SMP_TRACE_COMMTHREAD
3311     startT = CmiWallTimer();
3312 #endif
3313     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3314
3315 #if CQWRITE
3316     PumpCqWriteTransactions();
3317 #endif
3318
3319 #if REMOTE_EVENT
3320     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions());
3321 #endif
3322
3323     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3324 #if CMK_SMP_TRACE_COMMTHREAD
3325     endT = CmiWallTimer();
3326     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3327 #endif
3328  
3329 #if CMK_SMP_TRACE_COMMTHREAD
3330     startT = CmiWallTimer();
3331 #endif
3332     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3333     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3334 #if CMK_SMP_TRACE_COMMTHREAD
3335     endT = CmiWallTimer();
3336     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3337 #endif
3338
3339     /* Send buffered Message */
3340 #if CMK_SMP_TRACE_COMMTHREAD
3341     startT = CmiWallTimer();
3342 #endif
3343 #if CMK_USE_OOB
3344     if (SendBufferMsg(&smsg_oob_queue) == 1)
3345 #endif
3346     {
3347         STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
3348     }
3349     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3350 #if CMK_SMP_TRACE_COMMTHREAD
3351     endT = CmiWallTimer();
3352     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3353 #endif
3354
3355 #if CMK_SMP && ! LARGEPAGE
3356     if (_detected_hang)  ProcessDeadlock();
3357 #endif
3358 }
3359
3360 static void set_smsg_max()
3361 {
3362     char *env;
3363
3364     if(mysize <=512)
3365     {
3366         SMSG_MAX_MSG = 1024;
3367     }else if (mysize <= 4096)
3368     {
3369         SMSG_MAX_MSG = 1024;
3370     }else if (mysize <= 16384)
3371     {
3372         SMSG_MAX_MSG = 512;
3373     }else {
3374         SMSG_MAX_MSG = 256;
3375     }
3376
3377     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3378     if (env) SMSG_MAX_MSG = atoi(env);
3379     CmiAssert(SMSG_MAX_MSG > 0);
3380 }    
3381
3382 /* useDynamicSMSG */
3383 static void _init_dynamic_smsg()
3384 {
3385     gni_return_t status;
3386     uint32_t     vmdh_index = -1;
3387     int i;
3388
3389     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3390     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3391     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3392     for(i=0; i<mysize; i++) {
3393         smsg_connected_flag[i] = 0;
3394         smsg_attr_vector_local[i] = NULL;
3395         smsg_attr_vector_remote[i] = NULL;
3396     }
3397
3398     set_smsg_max();
3399
3400     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3401     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3402     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3403     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3404     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3405
3406     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3407     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3408     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3409     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3410     mailbox_list->offset = 0;
3411     mailbox_list->next = 0;
3412     
3413     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3414         mailbox_list->size, smsg_rx_cqh,
3415         GNI_MEM_READWRITE,   
3416         vmdh_index,
3417         &(mailbox_list->mem_hndl));
3418     GNI_RC_CHECK("MEMORY registration for smsg", status);
3419
3420     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3421     GNI_RC_CHECK("Unbound EP", status);
3422     
3423     alloc_smsg_attr(&send_smsg_attr);
3424
3425     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3426     GNI_RC_CHECK("post unbound datagram", status);
3427
3428       /* always pre-connect to proc 0 */
3429     //if (myrank != 0) connect_to(0);
3430
3431     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3432     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3433 }
3434
3435 static void _init_static_smsg()
3436 {
3437     gni_smsg_attr_t      *smsg_attr;
3438     gni_smsg_attr_t      remote_smsg_attr;
3439     gni_smsg_attr_t      *smsg_attr_vec;
3440     gni_mem_handle_t     my_smsg_mdh_mailbox;
3441     int      ret, i;
3442     gni_return_t status;
3443     uint32_t              vmdh_index = -1;
3444     mdh_addr_t            base_infor;
3445     mdh_addr_t            *base_addr_vec;
3446
3447     set_smsg_max();
3448     
3449     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3450     
3451     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3452     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3453     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3454     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3455     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3456     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3457     CmiAssert(ret == 0);
3458     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3459     
3460     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3461             smsg_memlen*(mysize), smsg_rx_cqh,
3462             GNI_MEM_READWRITE,   
3463             vmdh_index,
3464