Abstracting out mempool related access to mempool.h as macros
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           1
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     0
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_CAP      0
65 #define CMI_EXERT_RECV_CAP      0
66 #define CMI_EXERT_RDMA_CAP      0
67
68 #if CMI_EXERT_SEND_CAP
69 #define SEND_CAP  32
70 #endif
71
72 #if CMI_EXERT_RECV_CAP
73 #define RECV_CAP  4                  /* cap <= 2 sometimes hang */
74 #endif
75
76 #if CMI_EXERT_RDMA_CAP
77 int   RDMA_cap =   100;
78 int   RDMA_pending = 0;
79 #endif
80
81 #define USE_LRTS_MEMPOOL                  1
82
83 #define PRINT_SYH                         0
84
85 // Trace communication thread
86 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
87 #define TRACE_THRESHOLD     0.00005
88 #define CMI_MPI_TRACE_MOREDETAILED 0
89 #undef CMI_MPI_TRACE_USEREVENTS
90 #define CMI_MPI_TRACE_USEREVENTS 1
91 #else
92 #undef CMK_SMP_TRACE_COMMTHREAD
93 #define CMK_SMP_TRACE_COMMTHREAD 0
94 #endif
95
96 #define CMK_TRACE_COMMOVERHEAD 0
97 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
98 #undef CMI_MPI_TRACE_USEREVENTS
99 #define CMI_MPI_TRACE_USEREVENTS 1
100 #else
101 #undef CMK_TRACE_COMMOVERHEAD
102 #define CMK_TRACE_COMMOVERHEAD 0
103 #endif
104
105 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
106 CpvStaticDeclare(double, projTraceStart);
107 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
108 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
109 #else
110 #define  START_EVENT()
111 #define  END_EVENT(x)
112 #endif
113
114 #if USE_LRTS_MEMPOOL
115
116 #define oneMB (1024ll*1024)
117 #define oneGB (1024ll*1024*1024)
118
119 static CmiInt8 _mempool_size = 8*oneMB;
120 static CmiInt8 _expand_mem =  4*oneMB;
121 static CmiInt8 _mempool_size_limit = 0;
122
123 static CmiInt8 _totalmem = 0.8*oneGB;
124
125 #if LARGEPAGE
126 static CmiInt8 BIG_MSG  =  16*oneMB;
127 static CmiInt8 ONE_SEG  =  4*oneMB;
128 #else
129 static CmiInt8 BIG_MSG  =  4*oneMB;
130 static CmiInt8 ONE_SEG  =  2*oneMB;
131 #endif
132 #if MULTI_THREAD_SEND
133 static int BIG_MSG_PIPELINE = 1;
134 #else
135 static int BIG_MSG_PIPELINE = 4;
136 #endif
137
138 // dynamic flow control
139 static CmiInt8 buffered_send_msg = 0;
140 static CmiInt8 register_memory_size = 0;
141
142 #if LARGEPAGE
143 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
144 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
145 static CmiInt8  register_count = 0;
146 #else
147 #if CMK_SMP && COMM_THREAD_SEND 
148 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
149 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
150 #else
151 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
152 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
153 #endif
154
155
156 #endif
157
158 #endif     /* end USE_LRTS_MEMPOOL */
159
160 #if MULTI_THREAD_SEND 
161 #define     CMI_GNI_LOCK(x)       CmiLock(x);
162 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
163 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
164 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
165 #else
166 #define     CMI_GNI_LOCK(x)
167 #define     CMI_GNI_UNLOCK(x)
168 #define     CMI_PCQUEUEPOP_LOCK(Q)   
169 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
170 #endif
171
172 static int _tlbpagesize = 4096;
173
174 //static int _smpd_count  = 0;
175
176 static int   user_set_flag  = 0;
177
178 static int _checkProgress = 1;             /* check deadlock */
179 static int _detected_hang = 0;
180
181 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
182
183 // dynamic SMSG
184 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
185
186 static int avg_smsg_connection = 32;
187 static int                 *smsg_connected_flag= 0;
188 static gni_smsg_attr_t     **smsg_attr_vector_local;
189 static gni_smsg_attr_t     **smsg_attr_vector_remote;
190 static gni_ep_handle_t     ep_hndl_unbound;
191 static gni_smsg_attr_t     send_smsg_attr;
192 static gni_smsg_attr_t     recv_smsg_attr;
193
194 typedef struct _dynamic_smsg_mailbox{
195    void     *mailbox_base;
196    int      size;
197    int      offset;
198    gni_mem_handle_t  mem_hndl;
199    struct      _dynamic_smsg_mailbox  *next;
200 }dynamic_smsg_mailbox_t;
201
202 static dynamic_smsg_mailbox_t  *mailbox_list;
203
204 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
205 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
206
207 #if PRINT_SYH
208 int         lrts_send_msg_id = 0;
209 int         lrts_local_done_msg = 0;
210 int         lrts_send_rdma_success = 0;
211 #endif
212
213 #include "machine.h"
214
215 #include "pcqueue.h"
216
217 #include "mempool.h"
218
219 #if CMK_PERSISTENT_COMM
220 #include "machine-persistent.h"
221 #endif
222
223 //#define  USE_ONESIDED 1
224 #ifdef USE_ONESIDED
225 //onesided implementation is wrong, since no place to restore omdh
226 #include "onesided.h"
227 onesided_hnd_t   onesided_hnd;
228 onesided_md_t    omdh;
229 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
230
231 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
232
233 #else
234 uint8_t   onesided_hnd, omdh;
235
236 #if REMOTE_EVENT || CQWRITE 
237 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
238     if(register_memory_size+size>= MAX_REG_MEM) { \
239         status = GNI_RC_ERROR_NOMEM;} \
240     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
241         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
242 #else
243 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
244         if (register_memory_size + size >= MAX_REG_MEM) { \
245             status = GNI_RC_ERROR_NOMEM; \
246         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
247             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
248 #endif
249
250 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
251     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
252              register_memory_size -= size; \
253          else CmiAbort("MEM_DEregister");  \
254     } while (0)
255 #endif
256
257 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
258 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
259 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
260 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
261 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
262 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
263 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
264 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
265 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
266 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
267 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
268 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
269 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
270 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
271
272 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
273 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
274
275 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
276 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
277 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
278 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
279
280 #define ALIGNBUF                64
281
282 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
283 /* If SMSG is not used */
284
285 #define FMA_PER_CORE  1024
286 #define FMA_BUFFER_SIZE 1024
287
288 /* If SMSG is used */
289 static int  SMSG_MAX_MSG = 1024;
290 #define SMSG_MAX_CREDIT    72
291
292 #define MSGQ_MAXSIZE       2048
293
294 /* large message transfer with FMA or BTE */
295 #if ! REMOTE_EVENT
296 #define LRTS_GNI_RDMA_THRESHOLD  1024 
297 #else
298    /* remote events only work with RDMA */
299 #define LRTS_GNI_RDMA_THRESHOLD  0 
300 #endif
301
302 #if CMK_SMP
303 static int  REMOTE_QUEUE_ENTRIES=163840; 
304 static int LOCAL_QUEUE_ENTRIES=163840; 
305 #else
306 static int  REMOTE_QUEUE_ENTRIES=20480;
307 static int LOCAL_QUEUE_ENTRIES=20480; 
308 #endif
309
310 #define BIG_MSG_TAG             0x26
311 #define PUT_DONE_TAG            0x28
312 #define DIRECT_PUT_DONE_TAG     0x29
313 #define ACK_TAG                 0x30
314 /* SMSG is data message */
315 #define SMALL_DATA_TAG          0x31
316 /* SMSG is a control message to initialize a BTE */
317 #define LMSG_INIT_TAG           0x39 
318
319 #define DEBUG
320 #ifdef GNI_RC_CHECK
321 #undef GNI_RC_CHECK
322 #endif
323 #ifdef DEBUG
324 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
325 #else
326 #define GNI_RC_CHECK(msg,rc)
327 #endif
328
329 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
330 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
331 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
332
333 static int useStaticMSGQ = 0;
334 static int useStaticFMA = 0;
335 static int mysize, myrank;
336 static gni_nic_handle_t   nic_hndl;
337
338 typedef struct {
339     gni_mem_handle_t mdh;
340     uint64_t addr;
341 } mdh_addr_t ;
342 // this is related to dynamic SMSG
343
344 typedef struct mdh_addr_list{
345     gni_mem_handle_t mdh;
346    void *addr;
347     struct mdh_addr_list *next;
348 }mdh_addr_list_t;
349
350 static unsigned int         smsg_memlen;
351 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
352 mdh_addr_t          setup_mem;
353 mdh_addr_t          *smsg_connection_vec = 0;
354 gni_mem_handle_t    smsg_connection_memhndl;
355 static int          smsg_expand_slots = 10;
356 static int          smsg_available_slot = 0;
357 static void         *smsg_mailbox_mempool = 0;
358 mdh_addr_list_t     *smsg_dynamic_list = 0;
359
360 static void             *smsg_mailbox_base;
361 gni_msgq_attr_t         msgq_attrs;
362 gni_msgq_handle_t       msgq_handle;
363 gni_msgq_ep_attr_t      msgq_ep_attrs;
364 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
365
366 /* =====Beginning of Declarations of Machine Specific Variables===== */
367 static int cookie;
368 static int modes = 0;
369 static gni_cq_handle_t       smsg_rx_cqh = NULL;
370 static gni_cq_handle_t       default_tx_cqh = NULL;
371 static gni_cq_handle_t       rdma_tx_cqh = NULL;
372 static gni_cq_handle_t       rdma_rx_cqh = NULL;
373 static gni_cq_handle_t       post_tx_cqh = NULL;
374 static gni_ep_handle_t       *ep_hndl_array;
375
376 static CmiNodeLock           *ep_lock_array;
377 static CmiNodeLock           default_tx_cq_lock; 
378 static CmiNodeLock           rdma_tx_cq_lock; 
379 static CmiNodeLock           global_gni_lock; 
380 static CmiNodeLock           rx_cq_lock;
381 static CmiNodeLock           smsg_mailbox_lock;
382 static CmiNodeLock           smsg_rx_cq_lock;
383 static CmiNodeLock           *mempool_lock;
384 //#define     CMK_WITH_STATS      1
385 typedef struct msg_list
386 {
387     uint32_t destNode;
388     uint32_t size;
389     void *msg;
390     uint8_t tag;
391 #if !CMK_SMP
392     struct msg_list *next;
393 #endif
394 #if CMK_WITH_STATS
395     double  creation_time;
396 #endif
397 }MSG_LIST;
398
399
400 typedef struct control_msg
401 {
402     uint64_t            source_addr;    /* address from the start of buffer  */
403     uint64_t            dest_addr;      /* address from the start of buffer */
404     int                 total_length;   /* total length */
405     int                 length;         /* length of this packet */
406 #if REMOTE_EVENT
407     int                 ack_index;      /* index from integer to address */
408 #endif
409     uint8_t             seq_id;         //big message   0 meaning single message
410     gni_mem_handle_t    source_mem_hndl;
411     struct control_msg *next;
412 } CONTROL_MSG;
413
414 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
415
416 typedef struct ack_msg
417 {
418     uint64_t            source_addr;    /* address from the start of buffer  */
419 #if ! USE_LRTS_MEMPOOL
420     gni_mem_handle_t    source_mem_hndl;
421     int                 length;          /* total length */
422 #endif
423     struct ack_msg     *next;
424 } ACK_MSG;
425
426 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
427
428 #if CMK_DIRECT
429 typedef struct{
430     uint64_t    handler_addr;
431 }CMK_DIRECT_HEADER;
432
433 typedef struct {
434     char core[CmiMsgHeaderSizeBytes];
435     uint64_t handler;
436 }cmidirectMsg;
437
438 //SYH
439 CpvDeclare(int, CmiHandleDirectIdx);
440 void CmiHandleDirectMsg(cmidirectMsg* msg)
441 {
442
443     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
444    (*(_handle->callbackFnPtr))(_handle->callbackData);
445    CmiFree(msg);
446 }
447
448 void CmiDirectInit()
449 {
450     CpvInitialize(int,  CmiHandleDirectIdx);
451     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
452 }
453
454 #endif
455 typedef struct  rmda_msg
456 {
457     int                   destNode;
458 #if REMOTE_EVENT
459     int                   ack_index;
460 #endif
461     gni_post_descriptor_t *pd;
462 #if !CMK_SMP
463     struct  rmda_msg      *next;
464 #endif
465 }RDMA_REQUEST;
466
467
468 #if CMK_SMP
469 #define SMP_LOCKS               0
470 #define ONE_SEND_QUEUE                  0
471 PCQueue sendRdmaBuf;
472 typedef struct  msg_list_index
473 {
474     PCQueue     sendSmsgBuf;
475     int         pushed;
476     CmiNodeLock   lock;
477 } MSG_LIST_INDEX;
478 char                *destpe_avail;
479 #if  !ONE_SEND_QUEUE && SMP_LOCKS
480     PCQueue     nonEmptyQueues;
481 #endif
482 #else         /* non-smp */
483
484 static RDMA_REQUEST        *sendRdmaBuf = 0;
485 static RDMA_REQUEST        *sendRdmaTail = 0;
486 typedef struct  msg_list_index
487 {
488     int         next;
489     MSG_LIST    *sendSmsgBuf;
490     MSG_LIST    *tail;
491 } MSG_LIST_INDEX;
492
493 #endif
494
495 // buffered send queue
496 #if ! ONE_SEND_QUEUE
497 typedef struct smsg_queue
498 {
499     MSG_LIST_INDEX   *smsg_msglist_index;
500     int               smsg_head_index;
501 } SMSG_QUEUE;
502 #else
503 typedef struct smsg_queue
504 {
505     PCQueue       sendMsgBuf;
506 }  SMSG_QUEUE;
507 #endif
508
509 SMSG_QUEUE                  smsg_queue;
510 #if CMK_USE_OOB
511 SMSG_QUEUE                  smsg_oob_queue;
512 #endif
513
514 #if CMK_SMP
515
516 #define FreeMsgList(d)   free(d);
517 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
518
519 #else
520
521 static MSG_LIST       *msglist_freelist=0;
522
523 #define FreeMsgList(d)  \
524   do { \
525   (d)->next = msglist_freelist;\
526   msglist_freelist = d; \
527   } while (0)
528
529 #define MallocMsgList(d) \
530   do {  \
531   d = msglist_freelist;\
532   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
533              _MEMCHECK(d);\
534   } else msglist_freelist = d->next; \
535   d->next =0;  \
536   } while (0)
537
538 #endif
539
540 #if CMK_SMP
541
542 #define FreeControlMsg(d)      free(d);
543 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
544
545 #else
546
547 static CONTROL_MSG    *control_freelist=0;
548
549 #define FreeControlMsg(d)       \
550   do { \
551   (d)->next = control_freelist;\
552   control_freelist = d; \
553   } while (0);
554
555 #define MallocControlMsg(d) \
556   do {  \
557   d = control_freelist;\
558   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
559              _MEMCHECK(d);\
560   } else control_freelist = d->next;  \
561   } while (0);
562
563 #endif
564
565 #if CMK_SMP
566
567 #define FreeAckMsg(d)      free(d);
568 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
569
570 #else
571
572 static ACK_MSG        *ack_freelist=0;
573
574 #define FreeAckMsg(d)       \
575   do { \
576   (d)->next = ack_freelist;\
577   ack_freelist = d; \
578   } while (0)
579
580 #define MallocAckMsg(d) \
581   do { \
582   d = ack_freelist;\
583   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
584              _MEMCHECK(d);\
585   } else ack_freelist = d->next; \
586   } while (0)
587
588 #endif
589
590
591 # if CMK_SMP
592 #define FreeRdmaRequest(d)       free(d);
593 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
594 #else
595
596 static RDMA_REQUEST         *rdma_freelist = NULL;
597
598 #define FreeRdmaRequest(d)       \
599   do {  \
600   (d)->next = rdma_freelist;\
601   rdma_freelist = d;    \
602   } while (0)
603
604 #define MallocRdmaRequest(d) \
605   do {   \
606   d = rdma_freelist;\
607   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
608              _MEMCHECK(d);\
609   } else rdma_freelist = d->next; \
610   d->next =0;   \
611   } while (0)
612 #endif
613
614 /* reuse gni_post_descriptor_t */
615 static gni_post_descriptor_t *post_freelist=0;
616
617 #if  !CMK_SMP
618 #define FreePostDesc(d)       \
619     (d)->next_descr = post_freelist;\
620     post_freelist = d;
621
622 #define MallocPostDesc(d) \
623   d = post_freelist;\
624   if (d==0) { \
625      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
626      d->next_descr = 0;\
627       _MEMCHECK(d);\
628   } else post_freelist = d->next_descr;
629 #else
630
631 #define FreePostDesc(d)     free(d);
632 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
633
634 #endif
635
636
637 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
638 static int      buffered_smsg_counter = 0;
639
640 /* SmsgSend return success but message sent is not confirmed by remote side */
641 static MSG_LIST *buffered_fma_head = 0;
642 static MSG_LIST *buffered_fma_tail = 0;
643
644 /* functions  */
645 #define IsFree(a,ind)  !( a& (1<<(ind) ))
646 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
647 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
648
649 CpvDeclare(mempool_type*, mempool);
650
651 #if REMOTE_EVENT
652 /* ack pool for remote events */
653
654 static int  SHIFT   =           18;
655 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
656 #define RANK_MASK               ((1<<SHIFT) - 1)
657 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
658
659 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
660 #define GET_RANK(evt)           ((evt) & RANK_MASK)
661 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
662
663 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
664
665 #if CMK_SMP
666 #define INIT_SIZE                4096
667 #else
668 #define INIT_SIZE                1024
669 #endif
670
671 struct IndexStruct {
672 void *addr;
673 int next;
674 int type;     // 1: ACK   2: Persistent
675 };
676
677 typedef struct IndexPool {
678     struct IndexStruct   *indexes;
679     int                   size;
680     int                   freehead;
681     CmiNodeLock           lock;
682 } IndexPool;
683
684 static IndexPool  ackPool;
685 #if CMK_PERSISTENT_COMM
686 static IndexPool  persistPool;
687 #endif
688
689 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
690 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
691
692 static void IndexPool_init(IndexPool *pool)
693 {
694     int i;
695     if ((1<<SHIFT) < mysize) 
696         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
697     pool->size = INIT_SIZE;
698     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
699     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
700     for (i=0; i<pool->size-1; i++) {
701         pool->indexes[i].next = i+1;
702         pool->indexes[i].type = 0;
703     }
704     pool->indexes[i].next = -1;
705     pool->freehead = 0;
706 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
707     pool->lock  = CmiCreateLock();
708 #else
709     pool->lock  = 0;
710 #endif
711 }
712
713 static
714 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
715 {
716     int i;
717     int s;
718 #if MULTI_THREAD_SEND  
719     CmiLock(pool->lock);
720 #endif
721     s = pool->freehead;
722     if (s == -1) {
723         int newsize = pool->size * 2;
724         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
725         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
726         struct IndexStruct *old_ackpool = pool->indexes;
727         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
728         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
729         for (i=pool->size; i<newsize-1; i++) {
730             pool->indexes[i].next = i+1;
731             pool->indexes[i].type = 0;
732         }
733         pool->indexes[i].next = -1;
734         pool->indexes[i].type = 0;
735         pool->freehead = pool->size;
736         s = pool->size;
737         pool->size = newsize;
738         free(old_ackpool);
739     }
740     pool->freehead = pool->indexes[s].next;
741     pool->indexes[s].addr = addr;
742     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
743     pool->indexes[s].type = type;
744 #if MULTI_THREAD_SEND
745     CmiUnlock(pool->lock);
746 #endif
747     return s;
748 }
749
750 static
751 inline  void IndexPool_freeslot(IndexPool *pool, int s)
752 {
753     CmiAssert(s>=0 && s<pool->size);
754 #if MULTI_THREAD_SEND
755     CmiLock(pool->lock);
756 #endif
757     pool->indexes[s].next = pool->freehead;
758     pool->indexes[s].type = 0;
759     pool->freehead = s;
760 #if MULTI_THREAD_SEND
761     CmiUnlock(pool->lock);
762 #endif
763 }
764
765
766 #endif
767
768 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
769 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
770 #define CHARM_MAGIC_NUMBER               126
771
772 #if CMK_ERROR_CHECKING
773 extern unsigned char computeCheckSum(unsigned char *data, int len);
774 static int checksum_flag = 0;
775 #define CMI_SET_CHECKSUM(msg, len)      \
776         if (checksum_flag)  {   \
777           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
778           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
779         }
780 #define CMI_CHECK_CHECKSUM(msg, len)    \
781         if (checksum_flag)      \
782           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
783             CmiAbort("Fatal error: checksum doesn't agree!\n");
784 #else
785 #define CMI_SET_CHECKSUM(msg, len)
786 #define CMI_CHECK_CHECKSUM(msg, len)
787 #endif
788 /* =====End of Definitions of Message-Corruption Related Macros=====*/
789
790 static int print_stats = 0;
791 static int stats_off = 0;
792 void CmiTurnOnStats()
793 {
794     stats_off = 0;
795 }
796
797 void CmiTurnOffStats()
798 {
799     stats_off = 1;
800 }
801
802 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
803
804 #if CMK_WITH_STATS
805 FILE *counterLog = NULL;
806 typedef struct comm_thread_stats
807 {
808     uint64_t  smsg_data_count;
809     uint64_t  lmsg_init_count;
810     uint64_t  ack_count;
811     uint64_t  big_msg_ack_count;
812     uint64_t  smsg_count;
813     uint64_t  direct_put_done_count;
814     uint64_t  put_done_count;
815     //times of calling SmsgSend
816     uint64_t  try_smsg_data_count;
817     uint64_t  try_lmsg_init_count;
818     uint64_t  try_ack_count;
819     uint64_t  try_big_msg_ack_count;
820     uint64_t  try_direct_put_done_count;
821     uint64_t  try_put_done_count;
822     uint64_t  try_smsg_count;
823     
824     double    max_time_in_send_buffered_smsg;
825     double    all_time_in_send_buffered_smsg;
826
827     uint64_t  rdma_get_count, rdma_put_count;
828     uint64_t  try_rdma_get_count, try_rdma_put_count;
829     double    max_time_from_control_to_rdma_init;
830     double    all_time_from_control_to_rdma_init;
831
832     double    max_time_from_rdma_init_to_rdma_done;
833     double    all_time_from_rdma_init_to_rdma_done;
834
835     int      count_in_PumpNetwork;
836     double   time_in_PumpNetwork;
837     double   max_time_in_PumpNetwork;
838     int      count_in_SendBufferMsg_smsg;
839     double   time_in_SendBufferMsg_smsg;
840     double   max_time_in_SendBufferMsg_smsg;
841     int      count_in_SendRdmaMsg;
842     double   time_in_SendRdmaMsg;
843     double   max_time_in_SendRdmaMsg;
844     int      count_in_PumpRemoteTransactions;
845     double   time_in_PumpRemoteTransactions;
846     double   max_time_in_PumpRemoteTransactions;
847     int      count_in_PumpLocalTransactions_rdma;
848     double   time_in_PumpLocalTransactions_rdma;
849     double   max_time_in_PumpLocalTransactions_rdma;
850 } Comm_Thread_Stats;
851
852 static Comm_Thread_Stats   comm_stats;
853
854 static void init_comm_stats()
855 {
856   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
857   if (print_stats){
858       char ln[200];
859       int code = mkdir("counters", 00777); 
860       sprintf(ln,"counters/statistics.%d.%d", mysize, myrank);
861       counterLog=fopen(ln,"w");
862   }
863 }
864
865 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
866
867 #define SMSG_SENT_DONE(creation_time, tag)  \
868         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
869             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
870             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
871             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
872             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
873             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
874             comm_stats.smsg_count++; \
875             double inbuff_time = CmiWallTimer() - creation_time;   \
876             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
877             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
878         }
879
880 #define SMSG_TRY_SEND(tag)  \
881         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
882             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
883             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
884             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
885             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
886             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
887             comm_stats.try_smsg_count++; \
888         }
889
890 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
891
892 #define  RDMA_TRANS_DONE(x)      \
893          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
894              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
895              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
896          }
897
898 #define  RDMA_TRANS_INIT(type, x)      \
899          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
900              double rdma_trans_time = CmiWallTimer() - x ; \
901              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
902              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
903          }
904
905 #define STATS_PUMPNETWORK_TIME(x)   \
906         { double t = CmiWallTimer(); \
907           x;        \
908           t = CmiWallTimer() - t;          \
909           comm_stats.count_in_PumpNetwork++;        \
910           comm_stats.time_in_PumpNetwork += t;   \
911           if (t>comm_stats.max_time_in_PumpNetwork)      \
912               comm_stats.max_time_in_PumpNetwork = t;    \
913         }
914
915 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
916         { double t = CmiWallTimer(); \
917           x;        \
918           t = CmiWallTimer() - t;          \
919           comm_stats.count_in_PumpRemoteTransactions ++;        \
920           comm_stats.time_in_PumpRemoteTransactions += t;   \
921           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
922               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
923         }
924
925 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
926         { double t = CmiWallTimer(); \
927           x;        \
928           t = CmiWallTimer() - t;          \
929           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
930           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
931           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
932               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
933         }
934
935 #define STATS_SEND_SMSGS_TIME(x)   \
936         { double t = CmiWallTimer(); \
937           x;        \
938           t = CmiWallTimer() - t;          \
939           comm_stats.count_in_SendBufferMsg_smsg ++;        \
940           comm_stats.time_in_SendBufferMsg_smsg += t;   \
941           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
942               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
943         }
944
945 #define STATS_SENDRDMAMSG_TIME(x)   \
946         { double t = CmiWallTimer(); \
947           x;        \
948           t = CmiWallTimer() - t;          \
949           comm_stats.count_in_SendRdmaMsg ++;        \
950           comm_stats.time_in_SendRdmaMsg += t;   \
951           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
952               comm_stats.max_time_in_SendRdmaMsg = t;    \
953         }
954
955 static void print_comm_stats()
956 {
957     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
958     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
959             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
960             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
961     
962     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
963             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
964             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
965
966     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
967     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
968     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
969
970
971     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
972     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
973     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
974     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
975     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
976     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
977 }
978
979 #else
980 #define STATS_PUMPNETWORK_TIME(x)                  x
981 #define STATS_SEND_SMSGS_TIME(x)                   x
982 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
983 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
984 #define STATS_SENDRDMAMSG_TIME(x)                  x
985 #endif
986
987 static void
988 allgather(void *in,void *out, int len)
989 {
990     static int *ivec_ptr=NULL,already_called=0,job_size=0;
991     int i,rc;
992     int my_rank;
993     char *tmp_buf,*out_ptr;
994
995     if(!already_called) {
996
997         rc = PMI_Get_size(&job_size);
998         CmiAssert(rc == PMI_SUCCESS);
999         rc = PMI_Get_rank(&my_rank);
1000         CmiAssert(rc == PMI_SUCCESS);
1001
1002         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1003         CmiAssert(ivec_ptr != NULL);
1004
1005         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1006         CmiAssert(rc == PMI_SUCCESS);
1007
1008         already_called = 1;
1009
1010     }
1011
1012     tmp_buf = (char *)malloc(job_size * len);
1013     CmiAssert(tmp_buf);
1014
1015     rc = PMI_Allgather(in,tmp_buf,len);
1016     CmiAssert(rc == PMI_SUCCESS);
1017
1018     out_ptr = out;
1019
1020     for(i=0;i<job_size;i++) {
1021
1022         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1023
1024     }
1025
1026     free(tmp_buf);
1027 }
1028
1029 static void
1030 allgather_2(void *in,void *out, int len)
1031 {
1032     //PMI_Allgather is out of order
1033     int i,rc, extend_len;
1034     int  rank_index;
1035     char *out_ptr, *out_ref;
1036     char *in2;
1037
1038     extend_len = sizeof(int) + len;
1039     in2 = (char*)malloc(extend_len);
1040
1041     memcpy(in2, &myrank, sizeof(int));
1042     memcpy(in2+sizeof(int), in, len);
1043
1044     out_ptr = (char*)malloc(mysize*extend_len);
1045
1046     rc = PMI_Allgather(in2, out_ptr, extend_len);
1047     GNI_RC_CHECK("allgather", rc);
1048
1049     out_ref = out;
1050
1051     for(i=0;i<mysize;i++) {
1052         //rank index 
1053         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1054         //copy to the rank index slot
1055         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1056     }
1057
1058     free(out_ptr);
1059     free(in2);
1060
1061 }
1062
1063 static unsigned int get_gni_nic_address(int device_id)
1064 {
1065     unsigned int address, cpu_id;
1066     gni_return_t status;
1067     int i, alps_dev_id=-1,alps_address=-1;
1068     char *token, *p_ptr;
1069
1070     p_ptr = getenv("PMI_GNI_DEV_ID");
1071     if (!p_ptr) {
1072         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1073        
1074         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1075     } else {
1076         while ((token = strtok(p_ptr,":")) != NULL) {
1077             alps_dev_id = atoi(token);
1078             if (alps_dev_id == device_id) {
1079                 break;
1080             }
1081             p_ptr = NULL;
1082         }
1083         CmiAssert(alps_dev_id != -1);
1084         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1085         CmiAssert(p_ptr != NULL);
1086         i = 0;
1087         while ((token = strtok(p_ptr,":")) != NULL) {
1088             if (i == alps_dev_id) {
1089                 alps_address = atoi(token);
1090                 break;
1091             }
1092             p_ptr = NULL;
1093             ++i;
1094         }
1095         CmiAssert(alps_address != -1);
1096         address = alps_address;
1097     }
1098     return address;
1099 }
1100
1101 static uint8_t get_ptag(void)
1102 {
1103     char *p_ptr, *token;
1104     uint8_t ptag;
1105
1106     p_ptr = getenv("PMI_GNI_PTAG");
1107     CmiAssert(p_ptr != NULL);
1108     token = strtok(p_ptr, ":");
1109     ptag = (uint8_t)atoi(token);
1110     return ptag;
1111         
1112 }
1113
1114 static uint32_t get_cookie(void)
1115 {
1116     uint32_t cookie;
1117     char *p_ptr, *token;
1118
1119     p_ptr = getenv("PMI_GNI_COOKIE");
1120     CmiAssert(p_ptr != NULL);
1121     token = strtok(p_ptr, ":");
1122     cookie = (uint32_t)atoi(token);
1123
1124     return cookie;
1125 }
1126
1127 #if LARGEPAGE
1128
1129 /* directly mmap memory from hugetlbfs for large pages */
1130
1131 #include <sys/stat.h>
1132 #include <fcntl.h>
1133 #include <sys/mman.h>
1134 #include <hugetlbfs.h>
1135
1136 // size must be _tlbpagesize aligned
1137 void *my_get_huge_pages(size_t size)
1138 {
1139     char filename[512];
1140     int fd;
1141     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1142     void *ptr = NULL;
1143
1144     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1145     fd = open(filename, O_RDWR | O_CREAT, mode);
1146     if (fd == -1) {
1147         CmiAbort("my_get_huge_pages: open filed");
1148     }
1149     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1150     if (ptr == MAP_FAILED) ptr = NULL;
1151 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1152     close(fd);
1153     unlink(filename);
1154     return ptr;
1155 }
1156
1157 void my_free_huge_pages(void *ptr, int size)
1158 {
1159 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1160     int ret = munmap(ptr, size);
1161     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1162 }
1163
1164 #endif
1165
1166 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1167 /* TODO: add any that are related */
1168 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1169
1170
1171 #include "machine-lrts.h"
1172 #include "machine-common-core.c"
1173
1174 /* Network progress function is used to poll the network when for
1175    messages. This flushes receive buffers on some  implementations*/
1176 #if CMK_MACHINE_PROGRESS_DEFINED
1177 void CmiMachineProgressImpl() {
1178 }
1179 #endif
1180
1181 static int SendBufferMsg(SMSG_QUEUE *queue);
1182 static void SendRdmaMsg();
1183 static void PumpNetworkSmsg();
1184 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1185 #if CQWRITE
1186 static void PumpCqWriteTransactions();
1187 #endif
1188 #if REMOTE_EVENT
1189 static void PumpRemoteTransactions();
1190 #endif
1191
1192 #if MACHINE_DEBUG_LOG
1193 FILE *debugLog = NULL;
1194 static CmiInt8 buffered_recv_msg = 0;
1195 int         lrts_smsg_success = 0;
1196 int         lrts_received_msg = 0;
1197 #endif
1198
1199 static void sweep_mempool(mempool_type *mptr)
1200 {
1201     int n = 0;
1202     block_header *current = &(mptr->block_head);
1203
1204     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1205     while( current!= NULL) {
1206         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1207         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1208     }
1209     printf("[n %d] sweep_mempool slot END.\n", myrank);
1210 }
1211
1212 inline
1213 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1214 {
1215     block_header *current = *from;
1216
1217     //while(register_memory_size>= MAX_REG_MEM)
1218     //{
1219         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1220             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1221
1222         *from = current;
1223         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1224         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1225         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1226     //}
1227     return GNI_RC_SUCCESS;
1228 }
1229
1230 inline 
1231 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1232 {
1233     gni_return_t status = GNI_RC_SUCCESS;
1234     //int size = GetMempoolsize(msg);
1235     //void *blockaddr = GetMempoolBlockPtr(msg);
1236     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1237    
1238     block_header *current = &(mptr->block_head);
1239     while(register_memory_size>= MAX_REG_MEM)
1240     {
1241         status = deregisterMemory(mptr, &current);
1242         if (status != GNI_RC_SUCCESS) break;
1243     }
1244     if(register_memory_size>= MAX_REG_MEM) return status;
1245
1246     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1247     while(1)
1248     {
1249         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1250         if(status == GNI_RC_SUCCESS)
1251         {
1252             break;
1253         }
1254         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1255         {
1256             CmiAbort("Memory registor for mempool fails\n");
1257         }
1258         else
1259         {
1260             status = deregisterMemory(mptr, &current);
1261             if (status != GNI_RC_SUCCESS) break;
1262         }
1263     }; 
1264     return status;
1265 }
1266
1267 inline 
1268 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1269 {
1270     static int rank = -1;
1271     int i;
1272     gni_return_t status;
1273     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1274     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1275     mempool_type *mptr;
1276
1277     status = registerFromMempool(mptr1, msg, size, t, cqh);
1278     if (status == GNI_RC_SUCCESS) return status;
1279 #if CMK_SMP 
1280     for (i=0; i<CmiMyNodeSize()+1; i++) {
1281       rank = (rank+1)%(CmiMyNodeSize()+1);
1282       mptr = CpvAccessOther(mempool, rank);
1283       if (mptr == mptr1) continue;
1284       status = registerFromMempool(mptr, msg, size, t, cqh);
1285       if (status == GNI_RC_SUCCESS) return status;
1286     }
1287 #endif
1288     return  GNI_RC_ERROR_RESOURCE;
1289 }
1290
1291 inline
1292 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1293 {
1294     MSG_LIST        *msg_tmp;
1295     MallocMsgList(msg_tmp);
1296     msg_tmp->destNode = destNode;
1297     msg_tmp->size   = size;
1298     msg_tmp->msg    = msg;
1299     msg_tmp->tag    = tag;
1300 #if CMK_WITH_STATS
1301     SMSG_CREATION(msg_tmp)
1302 #endif
1303 #if !CMK_SMP
1304     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1305         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1306         queue->smsg_head_index = destNode;
1307         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1308     }else
1309     {
1310         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1311         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1312     }
1313 #else
1314 #if ONE_SEND_QUEUE
1315     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1316 #else
1317 #if SMP_LOCKS
1318     CmiLock(queue->smsg_msglist_index[destNode].lock);
1319     if(queue->smsg_msglist_index[destNode].pushed == 0)
1320     {
1321         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1322     }
1323     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1324     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1325 #else
1326     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1327 #endif
1328 #endif
1329 #endif
1330 #if PRINT_SYH
1331     buffered_smsg_counter++;
1332 #endif
1333 }
1334
1335 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1336 {
1337     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1338 }
1339
1340 inline
1341 static void setup_smsg_connection(int destNode)
1342 {
1343     mdh_addr_list_t  *new_entry = 0;
1344     gni_post_descriptor_t *pd;
1345     gni_smsg_attr_t      *smsg_attr;
1346     gni_return_t status = GNI_RC_NOT_DONE;
1347     RDMA_REQUEST        *rdma_request_msg;
1348     
1349     if(smsg_available_slot == smsg_expand_slots)
1350     {
1351         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1352         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1353         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1354
1355         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1356             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1357             GNI_MEM_READWRITE,   
1358             -1,
1359             &(new_entry->mdh));
1360         smsg_available_slot = 0; 
1361         new_entry->next = smsg_dynamic_list;
1362         smsg_dynamic_list = new_entry;
1363     }
1364     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1365     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1366     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1367     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1368     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1369     smsg_attr->buff_size = smsg_memlen;
1370     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1371     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1372     smsg_local_attr_vec[destNode] = smsg_attr;
1373     smsg_available_slot++;
1374     MallocPostDesc(pd);
1375     pd->type            = GNI_POST_FMA_PUT;
1376     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1377     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1378     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1379     pd->length          = sizeof(gni_smsg_attr_t);
1380     pd->local_addr      = (uint64_t) smsg_attr;
1381     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1382     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1383     pd->src_cq_hndl     = rdma_tx_cqh;
1384     pd->rdma_mode       = 0;
1385     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1386     print_smsg_attr(smsg_attr);
1387     if(status == GNI_RC_ERROR_RESOURCE )
1388     {
1389         MallocRdmaRequest(rdma_request_msg);
1390         rdma_request_msg->destNode = destNode;
1391         rdma_request_msg->pd = pd;
1392         /* buffer this request */
1393     }
1394 #if PRINT_SYH
1395     if(status != GNI_RC_SUCCESS)
1396        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1397     else
1398         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1399 #endif
1400 }
1401
1402 /* useDynamicSMSG */
1403 inline 
1404 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1405 {
1406     gni_return_t status = GNI_RC_NOT_DONE;
1407
1408     if(mailbox_list->offset == mailbox_list->size)
1409     {
1410         dynamic_smsg_mailbox_t *new_mailbox_entry;
1411         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1412         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1413         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1414         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1415         new_mailbox_entry->offset = 0;
1416         
1417         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1418             new_mailbox_entry->size, smsg_rx_cqh,
1419             GNI_MEM_READWRITE,   
1420             -1,
1421             &(new_mailbox_entry->mem_hndl));
1422
1423         GNI_RC_CHECK("register", status);
1424         new_mailbox_entry->next = mailbox_list;
1425         mailbox_list = new_mailbox_entry;
1426     }
1427     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1428     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1429     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1430     local_smsg_attr->mbox_offset = mailbox_list->offset;
1431     mailbox_list->offset += smsg_memlen;
1432     local_smsg_attr->buff_size = smsg_memlen;
1433     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1434     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1435 }
1436
1437 /* useDynamicSMSG */
1438 inline 
1439 static int connect_to(int destNode)
1440 {
1441     gni_return_t status = GNI_RC_NOT_DONE;
1442     CmiAssert(smsg_connected_flag[destNode] == 0);
1443     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1444     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1445     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1446     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1447     
1448     CMI_GNI_LOCK(global_gni_lock)
1449     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1450     CMI_GNI_UNLOCK(global_gni_lock)
1451     if (status == GNI_RC_ERROR_RESOURCE) {
1452       /* possibly destNode is making connection at the same time */
1453       free(smsg_attr_vector_local[destNode]);
1454       smsg_attr_vector_local[destNode] = NULL;
1455       free(smsg_attr_vector_remote[destNode]);
1456       smsg_attr_vector_remote[destNode] = NULL;
1457       mailbox_list->offset -= smsg_memlen;
1458       return 0;
1459     }
1460     GNI_RC_CHECK("GNI_Post", status);
1461     smsg_connected_flag[destNode] = 1;
1462     return 1;
1463 }
1464
1465 inline 
1466 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1467 {
1468     unsigned int          remote_address;
1469     uint32_t              remote_id;
1470     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1471     gni_smsg_attr_t       *smsg_attr;
1472     gni_post_descriptor_t *pd;
1473     gni_post_state_t      post_state;
1474     char                  *real_data; 
1475
1476     if (useDynamicSMSG) {
1477         switch (smsg_connected_flag[destNode]) {
1478         case 0: 
1479             connect_to(destNode);         /* continue to case 1 */
1480         case 1:                           /* pending connection, do nothing */
1481             status = GNI_RC_NOT_DONE;
1482             if(inbuff ==0)
1483                 buffer_small_msgs(queue, msg, size, destNode, tag);
1484             return status;
1485         }
1486     }
1487 #if CMK_SMP
1488 #if ! ONE_SEND_QUEUE
1489     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1490 #endif
1491     {
1492 #else
1493     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1494     {
1495 #endif
1496         //CMI_GNI_LOCK(smsg_mailbox_lock)
1497         CMI_GNI_LOCK(default_tx_cq_lock)
1498 #if CMK_SMP_TRACE_COMMTHREAD
1499         int oldpe = -1;
1500         int oldeventid = -1;
1501         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1502         { 
1503             START_EVENT();
1504             if ( tag == SMALL_DATA_TAG)
1505                 real_data = (char*)msg; 
1506             else 
1507                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1508             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1509             TRACE_COMM_SET_COMM_MSGID(real_data);
1510         }
1511 #endif
1512 #if REMOTE_EVENT
1513         if (tag == LMSG_INIT_TAG) {
1514             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1515             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1516                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1517         }
1518         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1519 #endif
1520 #if     CMK_WITH_STATS
1521         SMSG_TRY_SEND(tag)
1522 #endif
1523 #if CMK_WITH_STATS
1524     double              creation_time;
1525     if (ptr == NULL)
1526         creation_time = CmiWallTimer();
1527     else
1528         creation_time = ptr->creation_time;
1529 #endif
1530
1531     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1532 #if CMK_SMP_TRACE_COMMTHREAD
1533         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1534 #endif
1535         CMI_GNI_UNLOCK(default_tx_cq_lock)
1536         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1537         if(status == GNI_RC_SUCCESS)
1538         {
1539 #if     CMK_WITH_STATS
1540             SMSG_SENT_DONE(creation_time,tag) 
1541 #endif
1542 #if CMK_SMP_TRACE_COMMTHREAD
1543             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1544             { 
1545                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1546             }
1547 #endif
1548         }else
1549             status = GNI_RC_ERROR_RESOURCE;
1550     }
1551     if(status != GNI_RC_SUCCESS && inbuff ==0)
1552         buffer_small_msgs(queue, msg, size, destNode, tag);
1553     return status;
1554 }
1555
1556 inline 
1557 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1558 {
1559     /* construct a control message and send */
1560     CONTROL_MSG         *control_msg_tmp;
1561     MallocControlMsg(control_msg_tmp);
1562     control_msg_tmp->source_addr = (uint64_t)msg;
1563     control_msg_tmp->seq_id    = seqno;
1564     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1565 #if REMOTE_EVENT
1566     control_msg_tmp->ack_index    =  -1;
1567 #endif
1568 #if     USE_LRTS_MEMPOOL
1569     if(size < BIG_MSG)
1570     {
1571         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1572     }
1573     else
1574     {
1575         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1576         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1577         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1578     }
1579 #else
1580     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1581 #endif
1582     return control_msg_tmp;
1583 }
1584
1585 #define BLOCKING_SEND_CONTROL    0
1586
1587 // Large message, send control to receiver, receiver register memory and do a GET, 
1588 // return 1 - send no success
1589 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1590 {
1591     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1592     uint32_t            vmdh_index  = -1;
1593     int                 size;
1594     int                 offset = 0;
1595     uint64_t            source_addr;
1596     int                 register_size; 
1597     void                *msg;
1598
1599     size    =   control_msg_tmp->total_length;
1600     source_addr = control_msg_tmp->source_addr;
1601     register_size = control_msg_tmp->length;
1602
1603 #if  USE_LRTS_MEMPOOL
1604     if( control_msg_tmp->seq_id == 0 ){
1605 #if BLOCKING_SEND_CONTROL
1606         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1607             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1608                 LrtsAdvanceCommunication(0);
1609         }
1610 #endif
1611         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1612         {
1613             msg = (void*)source_addr;
1614             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1615             {
1616                 if(!inbuff)
1617                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1618                 return GNI_RC_ERROR_NOMEM;
1619             }
1620             //register the corresponding mempool
1621             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1622             if(status == GNI_RC_SUCCESS)
1623             {
1624                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1625             }
1626         }else
1627         {
1628             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1629             status = GNI_RC_SUCCESS;
1630         }
1631         if(NoMsgInSend(source_addr))
1632             register_size = GetMempoolsize((void*)(source_addr));
1633         else
1634             register_size = 0;
1635     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1636     {
1637         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1638         source_addr += offset;
1639         size = control_msg_tmp->length;
1640 #if BLOCKING_SEND_CONTROL
1641         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1642             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1643                 LrtsAdvanceCommunication(0);
1644         }
1645 #endif
1646         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1647             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1648             {
1649                 if(!inbuff)
1650                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1651                 return GNI_RC_ERROR_NOMEM;
1652             }
1653             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1654             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1655         }
1656         else
1657         {
1658             status = GNI_RC_SUCCESS;
1659         }
1660         register_size = 0;  
1661     }
1662
1663     if(status == GNI_RC_SUCCESS)
1664     {
1665         status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1666         if(status == GNI_RC_SUCCESS)
1667         {
1668             buffered_send_msg += register_size;
1669             if(control_msg_tmp->seq_id == 0)
1670             {
1671                 IncreaseMsgInSend(source_addr);
1672             }
1673             FreeControlMsg(control_msg_tmp);
1674             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1675         }else
1676             status = GNI_RC_ERROR_RESOURCE;
1677
1678     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1679     {
1680         CmiAbort("Memory registor for large msg\n");
1681     }else 
1682     {
1683         status = GNI_RC_ERROR_NOMEM; 
1684         if(!inbuff)
1685             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1686     }
1687     return status;
1688 #else
1689     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1690     if(status == GNI_RC_SUCCESS)
1691     {
1692         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1693         if(status == GNI_RC_SUCCESS)
1694         {
1695             FreeControlMsg(control_msg_tmp);
1696         }
1697     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1698     {
1699         CmiAbort("Memory registor for large msg\n");
1700     }else 
1701     {
1702         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1703     }
1704     return status;
1705 #endif
1706 }
1707
1708 inline void LrtsPrepareEnvelope(char *msg, int size)
1709 {
1710     CmiSetMsgSize(msg, size);
1711     CMI_SET_CHECKSUM(msg, size);
1712 }
1713
1714 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1715 {
1716     gni_return_t        status  =   GNI_RC_SUCCESS;
1717     uint8_t tag;
1718     CONTROL_MSG         *control_msg_tmp;
1719     int                 oob = ( mode & OUT_OF_BAND);
1720     SMSG_QUEUE          *queue;
1721
1722     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1723 #if CMK_USE_OOB
1724     queue = oob? &smsg_oob_queue : &smsg_queue;
1725 #else
1726     queue = &smsg_queue;
1727 #endif
1728
1729     LrtsPrepareEnvelope(msg, size);
1730
1731 #if PRINT_SYH
1732     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1733 #endif 
1734 #if CMK_SMP 
1735     if(size <= SMSG_MAX_MSG)
1736         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1737     else if (size < BIG_MSG) {
1738         control_msg_tmp =  construct_control_msg(size, msg, 0);
1739         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1740     }
1741     else {
1742           CmiSetMsgSeq(msg, 0);
1743           control_msg_tmp =  construct_control_msg(size, msg, 1);
1744           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1745     }
1746 #else   //non-smp, smp(worker sending)
1747     if(size <= SMSG_MAX_MSG)
1748     {
1749         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1750             CmiFree(msg);
1751     }
1752     else if (size < BIG_MSG) {
1753         control_msg_tmp =  construct_control_msg(size, msg, 0);
1754         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1755     }
1756     else {
1757 #if     USE_LRTS_MEMPOOL
1758         CmiSetMsgSeq(msg, 0);
1759         control_msg_tmp =  construct_control_msg(size, msg, 1);
1760         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1761 #else
1762         control_msg_tmp =  construct_control_msg(size, msg, 0);
1763         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1764 #endif
1765     }
1766 #endif
1767     return 0;
1768 }
1769
1770 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1771 {
1772   int i;
1773 #if CMK_BROADCAST_USE_CMIREFERENCE
1774   for(i=0;i<npes;i++) {
1775     if (pes[i] == CmiMyPe())
1776       CmiSyncSend(pes[i], len, msg);
1777     else {
1778       CmiReference(msg);
1779       CmiSyncSendAndFree(pes[i], len, msg);
1780     }
1781   }
1782 #else
1783   for(i=0;i<npes;i++) {
1784     CmiSyncSend(pes[i], len, msg);
1785   }
1786 #endif
1787 }
1788
1789 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1790 {
1791   /* A better asynchronous implementation may be wanted, but at least it works */
1792   CmiSyncListSendFn(npes, pes, len, msg);
1793   return (CmiCommHandle) 0;
1794 }
1795
1796 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1797 {
1798   if (npes == 1) {
1799       CmiSyncSendAndFree(pes[0], len, msg);
1800       return;
1801   }
1802 #if CMK_PERSISTENT_COMM
1803   if (CpvAccess(phs) && len > 1024) {
1804       int i;
1805       for(i=0;i<npes;i++) {
1806         if (pes[i] == CmiMyPe())
1807           CmiSyncSend(pes[i], len, msg);
1808         else {
1809           CmiReference(msg);
1810           CmiSyncSendAndFree(pes[i], len, msg);
1811         }
1812       }
1813       CmiFree(msg);
1814       return;
1815   }
1816 #endif
1817   
1818 #if CMK_BROADCAST_USE_CMIREFERENCE
1819   if (npes == 1) {
1820     CmiSyncSendAndFree(pes[0], len, msg);
1821     return;
1822   }
1823   CmiSyncListSendFn(npes, pes, len, msg);
1824   CmiFree(msg);
1825 #else
1826   int i;
1827   for(i=0;i<npes-1;i++) {
1828     CmiSyncSend(pes[i], len, msg);
1829   }
1830   if (npes>0)
1831     CmiSyncSendAndFree(pes[npes-1], len, msg);
1832   else 
1833     CmiFree(msg);
1834 #endif
1835 }
1836
1837 static void    PumpDatagramConnection();
1838 static      int         event_SetupConnect = 111;
1839 static      int         event_PumpSmsg = 222 ;
1840 static      int         event_PumpTransaction = 333;
1841 static      int         event_PumpRdmaTransaction = 444;
1842 static      int         event_SendBufferSmsg = 444;
1843 static      int         event_SendFmaRdmaMsg = 555;
1844 static      int         event_AdvanceCommunication = 666;
1845
1846 static void registerUserTraceEvents() {
1847 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1848     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1849     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1850     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1851     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1852     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1853     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1854     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1855 #endif
1856 }
1857
1858 static void ProcessDeadlock()
1859 {
1860     static CmiUInt8 *ptr = NULL;
1861     static CmiUInt8  last = 0, mysum, sum;
1862     static int count = 0;
1863     gni_return_t status;
1864     int i;
1865
1866 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1867 //sweep_mempool(CpvAccess(mempool));
1868     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1869     mysum = smsg_send_count + smsg_recv_count;
1870     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1871     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1872     GNI_RC_CHECK("PMI_Allgather", status);
1873     sum = 0;
1874     for (i=0; i<mysize; i++)  sum+= ptr[i];
1875     if (last == 0 || sum == last) 
1876         count++;
1877     else
1878         count = 0;
1879     last = sum;
1880     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1881     if (count == 2) { 
1882         /* detected twice, it is a real deadlock */
1883         if (myrank == 0)  {
1884             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1885             CmiAbort("Fatal> Deadlock detected.");
1886         }
1887
1888     }
1889     _detected_hang = 0;
1890 }
1891
1892 static void CheckProgress()
1893 {
1894     if (smsg_send_count == last_smsg_send_count &&
1895         smsg_recv_count == last_smsg_recv_count ) 
1896     {
1897         _detected_hang = 1;
1898 #if !CMK_SMP
1899         if (_detected_hang) ProcessDeadlock();
1900 #endif
1901
1902     }
1903     else {
1904         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1905         last_smsg_send_count = smsg_send_count;
1906         last_smsg_recv_count = smsg_recv_count;
1907         _detected_hang = 0;
1908     }
1909 }
1910
1911 static void set_limit()
1912 {
1913     //if (!user_set_flag && CmiMyRank() == 0) {
1914     if (CmiMyRank() == 0) {
1915         int mynode = CmiPhysicalNodeID(CmiMyPe());
1916         int numpes = CmiNumPesOnPhysicalNode(mynode);
1917         int numprocesses = numpes / CmiMyNodeSize();
1918         MAX_REG_MEM  = _totalmem / numprocesses;
1919         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1920         if (CmiMyPe() == 0)
1921            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1922         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1923         {
1924              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1925              CmiAbort("memory registration\n");
1926         }
1927     }
1928 }
1929
1930 void LrtsPostCommonInit(int everReturn)
1931 {
1932 #if CMK_DIRECT
1933     CmiDirectInit();
1934 #endif
1935 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1936     CpvInitialize(double, projTraceStart);
1937     /* only PE 0 needs to care about registration (to generate sts file). */
1938     //if (CmiMyPe() == 0) 
1939     {
1940         registerMachineUserEventsFunction(&registerUserTraceEvents);
1941     }
1942 #endif
1943
1944 #if CMK_SMP
1945     CmiIdleState *s=CmiNotifyGetState();
1946     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1947     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1948 #else
1949     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1950     if (useDynamicSMSG)
1951     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1952 #endif
1953
1954 #if ! LARGEPAGE
1955     if (_checkProgress)
1956 #if CMK_SMP
1957     if (CmiMyRank() == 0)
1958 #endif
1959     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1960 #endif
1961  
1962 #if !LARGEPAGE
1963     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1964 #endif
1965 }
1966
1967 /* this is called by worker thread */
1968 void LrtsPostNonLocal(){
1969 #if CMK_SMP_TRACE_COMMTHREAD
1970     double startT, endT;
1971 #endif
1972 #if MULTI_THREAD_SEND
1973     if(mysize == 1) return;
1974 #if CMK_SMP_TRACE_COMMTHREAD
1975     traceEndIdle();
1976 #endif
1977
1978 #if CMK_SMP_TRACE_COMMTHREAD
1979     startT = CmiWallTimer();
1980 #endif
1981
1982 #if CMK_WORKER_SINGLE_TASK
1983     if (CmiMyRank() % 6 == 0)
1984 #endif
1985     PumpNetworkSmsg();
1986
1987 #if CMK_WORKER_SINGLE_TASK
1988     if (CmiMyRank() % 6 == 1)
1989 #endif
1990     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1991
1992 #if CMK_WORKER_SINGLE_TASK
1993     if (CmiMyRank() % 6 == 2)
1994 #endif
1995     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1996
1997 #if REMOTE_EVENT
1998 #if CMK_WORKER_SINGLE_TASK
1999     if (CmiMyRank() % 6 == 3)
2000 #endif
2001     PumpRemoteTransactions();
2002 #endif
2003
2004 #if CMK_USE_OOB
2005     if (SendBufferMsg(&smsg_oob_queue) == 1)
2006 #endif
2007     {
2008 #if CMK_WORKER_SINGLE_TASK
2009     if (CmiMyRank() % 6 == 4)
2010 #endif
2011         SendBufferMsg(&smsg_queue);
2012     }
2013
2014 #if CMK_WORKER_SINGLE_TASK
2015     if (CmiMyRank() % 6 == 5)
2016 #endif
2017     SendRdmaMsg();
2018
2019 #if CMK_SMP_TRACE_COMMTHREAD
2020     endT = CmiWallTimer();
2021     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2022 #endif
2023 #if CMK_SMP_TRACE_COMMTHREAD
2024     traceBeginIdle();
2025 #endif
2026 #endif
2027 }
2028
2029 /* useDynamicSMSG */
2030 static void    PumpDatagramConnection()
2031 {
2032     uint32_t          remote_address;
2033     uint32_t          remote_id;
2034     gni_return_t status;
2035     gni_post_state_t  post_state;
2036     uint64_t          datagram_id;
2037     int i;
2038
2039    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2040    {
2041        if (datagram_id >= mysize) {           /* bound endpoint */
2042            int pe = datagram_id - mysize;
2043            CMI_GNI_LOCK(global_gni_lock)
2044            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2045            CMI_GNI_UNLOCK(global_gni_lock)
2046            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2047            {
2048                CmiAssert(remote_id == pe);
2049                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2050                GNI_RC_CHECK("Dynamic SMSG Init", status);
2051 #if PRINT_SYH
2052                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
2053 #endif
2054                CmiAssert(smsg_connected_flag[pe] == 1);
2055                smsg_connected_flag[pe] = 2;
2056            }
2057        }
2058        else {         /* unbound ep */
2059            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2060            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2061            {
2062                CmiAssert(remote_id<mysize);
2063                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2064                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2065                GNI_RC_CHECK("Dynamic SMSG Init", status);
2066 #if PRINT_SYH
2067                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
2068 #endif
2069                smsg_connected_flag[remote_id] = 2;
2070
2071                alloc_smsg_attr(&send_smsg_attr);
2072                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2073                GNI_RC_CHECK("post unbound datagram", status);
2074            }
2075        }
2076    }
2077 }
2078
2079 /* pooling CQ to receive network message */
2080 static void PumpNetworkRdmaMsgs()
2081 {
2082     gni_cq_entry_t      event_data;
2083     gni_return_t        status;
2084
2085 }
2086
2087 inline 
2088 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
2089 {
2090     RDMA_REQUEST        *rdma_request_msg;
2091     MallocRdmaRequest(rdma_request_msg);
2092     rdma_request_msg->destNode = inst_id;
2093     rdma_request_msg->pd = pd;
2094 #if REMOTE_EVENT
2095     rdma_request_msg->ack_index = ack_index;
2096 #endif
2097 #if CMK_SMP
2098     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2099 #else
2100     if(sendRdmaBuf == 0)
2101     {
2102         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2103     }else{
2104         sendRdmaTail->next = rdma_request_msg;
2105         sendRdmaTail =  rdma_request_msg;
2106     }
2107 #endif
2108
2109 }
2110
2111 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2112
2113 static void PumpNetworkSmsg()
2114 {
2115     uint64_t            inst_id;
2116     int                 ret;
2117     gni_cq_entry_t      event_data;
2118     gni_return_t        status, status2;
2119     void                *header;
2120     uint8_t             msg_tag;
2121     int                 msg_nbytes;
2122     void                *msg_data;
2123     gni_mem_handle_t    msg_mem_hndl;
2124     gni_smsg_attr_t     *smsg_attr;
2125     gni_smsg_attr_t     *remote_smsg_attr;
2126     int                 init_flag;
2127     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2128     uint64_t            source_addr;
2129     SMSG_QUEUE         *queue = &smsg_queue;
2130 #if   CMK_DIRECT
2131     cmidirectMsg        *direct_msg;
2132 #endif
2133 #if CMI_EXERT_RECV_CAP
2134     int                  recv_cnt = 0;
2135 #endif
2136     while(1)
2137     {
2138         CMI_GNI_LOCK(smsg_rx_cq_lock)
2139         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2140         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2141         if(status != GNI_RC_SUCCESS)
2142             break;
2143         inst_id = GNI_CQ_GET_INST_ID(event_data);
2144 #if REMOTE_EVENT
2145         inst_id = GET_RANK(inst_id);      /* important */
2146 #endif
2147         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2148 #if PRINT_SYH
2149         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2150 #endif
2151         if (useDynamicSMSG) {
2152             /* subtle: smsg may come before connection is setup */
2153             while (smsg_connected_flag[inst_id] != 2) 
2154                PumpDatagramConnection();
2155         }
2156         msg_tag = GNI_SMSG_ANY_TAG;
2157         while(1) {
2158             CMI_GNI_LOCK(smsg_mailbox_lock)
2159             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2160             if (status != GNI_RC_SUCCESS)
2161             {
2162                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2163                 break;
2164             }
2165 #if PRINT_SYH
2166             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2167 #endif
2168             /* copy msg out and then put into queue (small message) */
2169             switch (msg_tag) {
2170             case SMALL_DATA_TAG:
2171             {
2172                 START_EVENT();
2173                 msg_nbytes = CmiGetMsgSize(header);
2174                 msg_data    = CmiAlloc(msg_nbytes);
2175                 memcpy(msg_data, (char*)header, msg_nbytes);
2176                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2177                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2178                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
2179                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2180                 handleOneRecvedMsg(msg_nbytes, msg_data);
2181                 break;
2182             }
2183             case LMSG_INIT_TAG:
2184             {
2185 #if MULTI_THREAD_SEND
2186                 MallocControlMsg(control_msg_tmp);
2187                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2188                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2189                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2190                 getLargeMsgRequest(control_msg_tmp, inst_id);
2191                 FreeControlMsg(control_msg_tmp);
2192 #else
2193                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2194                 getLargeMsgRequest(header, inst_id);
2195                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2196 #endif
2197                 break;
2198             }
2199 #if !REMOTE_EVENT && !CQWRITE
2200             case ACK_TAG:   //msg fit into mempool
2201             {
2202                 /* Get is done, release message . Now put is not used yet*/
2203                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2204                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2205                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2206 #if ! USE_LRTS_MEMPOOL
2207                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2208 #else
2209                 DecreaseMsgInSend(msg);
2210 #endif
2211                 if(NoMsgInSend(msg))
2212                     buffered_send_msg -= GetMempoolsize(msg);
2213                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2214                 CmiFree(msg);
2215                 break;
2216             }
2217 #endif
2218             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2219             {
2220 #if MULTI_THREAD_SEND
2221                 MallocControlMsg(header_tmp);
2222                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2223                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2224 #else
2225                 header_tmp = (CONTROL_MSG *) header;
2226 #endif
2227                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2228                 void *msg = (void*)(header_tmp->source_addr);
2229                 int cur_seq = CmiGetMsgSeq(msg);
2230                 int offset = ONE_SEG*(cur_seq+1);
2231                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2232                 buffered_send_msg -= header_tmp->length;
2233                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2234                 if (remain_size < 0) remain_size = 0;
2235                 CmiSetMsgSize(msg, remain_size);
2236                 if(remain_size <= 0) //transaction done
2237                 {
2238                     CmiFree(msg);
2239                 }else if (header_tmp->total_length > offset)
2240                 {
2241                     CmiSetMsgSeq(msg, cur_seq+1);
2242                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2243                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2244                     //send next seg
2245                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2246                          // pipelining
2247                     if (header_tmp->seq_id == 1) {
2248                       int i;
2249                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2250                         int seq = cur_seq+i+2;
2251                         CmiSetMsgSeq(msg, seq-1);
2252                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2253                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2254                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2255                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2256                       }
2257                     }
2258                 }
2259 #if MULTI_THREAD_SEND
2260                 FreeControlMsg(header_tmp);
2261 #else
2262                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2263 #endif
2264                 break;
2265             }
2266 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2267             case PUT_DONE_TAG:  {   //persistent message
2268                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2269                 int size = ((CONTROL_MSG *) header)->length;
2270                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2271                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2272                 CmiReference(msg);
2273                 CMI_CHECK_CHECKSUM(msg, size);
2274                 handleOneRecvedMsg(size, msg); 
2275 #if PRINT_SYH
2276                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2277 #endif
2278                 break;
2279             }
2280 #endif
2281 #if CMK_DIRECT
2282             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2283                 //create a trigger message
2284                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2285                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2286                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2287                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2288                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2289                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2290                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2291                 break;
2292 #endif
2293             default:
2294                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2295                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2296                 printf("weird tag problem\n");
2297                 CmiAbort("Unknown tag\n");
2298             }               // end switch
2299 #if PRINT_SYH
2300             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2301 #endif
2302             smsg_recv_count ++;
2303             msg_tag = GNI_SMSG_ANY_TAG;
2304 #if CMI_EXERT_RECV_CAP
2305             if (status == GNI_RC_SUCCESS && ++recv_cnt == RECV_CAP) return;
2306 #endif
2307         } //endwhile GNI_SmsgGetNextWTag
2308     }   //end while GetEvent
2309     if(status == GNI_RC_ERROR_RESOURCE)
2310     {
2311         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2312         GNI_RC_CHECK("Smsg_rx_cq full", status);
2313     }
2314 }
2315
2316 static void printDesc(gni_post_descriptor_t *pd)
2317 {
2318     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2319 }
2320
2321 #if CQWRITE
2322 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2323 {
2324     gni_post_descriptor_t *pd;
2325     gni_return_t        status = GNI_RC_SUCCESS;
2326     
2327     MallocPostDesc(pd);
2328     pd->type = GNI_POST_CQWRITE;
2329     pd->cq_mode = GNI_CQMODE_SILENT;
2330     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2331     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2332     pd->cqwrite_value = data;
2333     pd->remote_mem_hndl = mem_hndl;
2334     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2335     GNI_RC_CHECK("GNI_PostCqWrite", status);
2336 }
2337 #endif
2338
2339 // register memory for a message
2340 // return mem handle
2341 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2342 {
2343     gni_return_t status = GNI_RC_SUCCESS;
2344
2345     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2346
2347 #if CMK_PERSISTENT_COMM
2348       // persistent message is always registered
2349       // BIG_MSG small pieces do not have malloc chunk header
2350     if ((seqno <= 1 || seqno == PERSIST_SEQ) && !IsMemHndlZero(MEMHFIELD(msg))) {
2351         *memh = MEMHFIELD(msg);
2352         return GNI_RC_SUCCESS;
2353     }
2354 #endif
2355     if(seqno == 0 
2356 #if CMK_PERSISTENT_COMM
2357          || seqno == PERSIST_SEQ
2358 #endif
2359       )
2360     {
2361         if(IsMemHndlZero((GetMemHndl(msg))))
2362         {
2363             msg = (void*)(msg);
2364             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2365             if(status == GNI_RC_SUCCESS)
2366                 *memh = GetMemHndl(msg);
2367         }
2368         else {
2369             *memh = GetMemHndl(msg);
2370         }
2371     }
2372     else {
2373         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2374         status = registerMemory(msg, size, memh, NULL); 
2375     }
2376     return status;
2377 }
2378
2379 // for BIG_MSG called on receiver side for receiving control message
2380 // LMSG_INIT_TAG
2381 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2382 {
2383 #if     USE_LRTS_MEMPOOL
2384     CONTROL_MSG         *request_msg;
2385     gni_return_t        status = GNI_RC_SUCCESS;
2386     void                *msg_data;
2387     gni_post_descriptor_t *pd;
2388     gni_mem_handle_t    msg_mem_hndl;
2389     int                 size, transaction_size, offset = 0;
2390     size_t              register_size = 0;
2391
2392     // initial a get to transfer data from the sender side */
2393     request_msg = (CONTROL_MSG *) header;
2394     size = request_msg->total_length;
2395     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2396     MallocPostDesc(pd);
2397 #if CMK_WITH_STATS 
2398     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2399 #endif
2400     if(request_msg->seq_id < 2)   {
2401 #if CMK_SMP_TRACE_COMMTHREAD 
2402         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2403 #endif
2404         msg_data = CmiAlloc(size);
2405         CmiSetMsgSeq(msg_data, 0);
2406         _MEMCHECK(msg_data);
2407     }
2408     else {
2409         offset = ONE_SEG*(request_msg->seq_id-1);
2410         msg_data = (char*)request_msg->dest_addr + offset;
2411     }
2412    
2413     pd->cqwrite_value = request_msg->seq_id;
2414
2415     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2416     SetMemHndlZero(pd->local_mem_hndl);
2417     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2418     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2419         if(NoMsgInRecv( (void*)(msg_data)))
2420             register_size = GetMempoolsize((void*)(msg_data));
2421     }
2422
2423     pd->first_operand = ALIGN64(size);                   //  total length
2424
2425     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2426         pd->type            = GNI_POST_FMA_GET;
2427     else
2428         pd->type            = GNI_POST_RDMA_GET;
2429     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2430     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2431     pd->length          = transaction_size;
2432     pd->local_addr      = (uint64_t) msg_data;
2433     pd->remote_addr     = request_msg->source_addr + offset;
2434     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2435     pd->src_cq_hndl     = rdma_tx_cqh;
2436     pd->rdma_mode       = 0;
2437     pd->amo_cmd         = 0;
2438 #if CMI_EXERT_RDMA_CAP
2439     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2440 #endif
2441     //memory registration success
2442     if(status == GNI_RC_SUCCESS )
2443     {
2444         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2445         CMI_GNI_LOCK(lock)
2446 #if REMOTE_EVENT
2447         if( request_msg->seq_id == 0)
2448         {
2449             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2450             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2451             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2452         }
2453 #endif
2454
2455 #if CMK_WITH_STATS
2456         RDMA_TRY_SEND(pd->type)
2457 #endif
2458         if(pd->type == GNI_POST_RDMA_GET) 
2459         {
2460             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2461         }
2462         else
2463         {
2464             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2465         }
2466         CMI_GNI_UNLOCK(lock)
2467
2468         if(status == GNI_RC_SUCCESS )
2469         {
2470 #if CMI_EXERT_RDMA_CAP
2471             RDMA_pending++;
2472 #endif
2473             if(pd->cqwrite_value == 0)
2474             {
2475 #if MACHINE_DEBUG_LOG
2476                 buffered_recv_msg += register_size;
2477                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2478 #endif
2479                 IncreaseMsgInRecv(msg_data);
2480 #if CMK_SMP_TRACE_COMMTHREAD 
2481                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2482 #endif
2483             }
2484 #if  CMK_WITH_STATS
2485             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2486             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2487 #endif
2488         }
2489     }else
2490     {
2491         SetMemHndlZero((pd->local_mem_hndl));
2492     }
2493     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2494     {
2495 #if REMOTE_EVENT
2496         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2497 #else
2498         bufferRdmaMsg(inst_id, pd, -1); 
2499 #endif
2500     }else if (status != GNI_RC_SUCCESS) {
2501         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2502         GNI_RC_CHECK("GetLargeAFter posting", status);
2503     }
2504 #else
2505     CONTROL_MSG         *request_msg;
2506     gni_return_t        status;
2507     void                *msg_data;
2508     gni_post_descriptor_t *pd;
2509     RDMA_REQUEST        *rdma_request_msg;
2510     gni_mem_handle_t    msg_mem_hndl;
2511     //int source;
2512     // initial a get to transfer data from the sender side */
2513     request_msg = (CONTROL_MSG *) header;
2514     msg_data = CmiAlloc(request_msg->length);
2515     _MEMCHECK(msg_data);
2516
2517     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2518
2519     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2520     {
2521         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2522     }
2523
2524     MallocPostDesc(pd);
2525     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2526         pd->type            = GNI_POST_FMA_GET;
2527     else
2528         pd->type            = GNI_POST_RDMA_GET;
2529     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2530     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2531     pd->length          = ALIGN64(request_msg->length);
2532     pd->local_addr      = (uint64_t) msg_data;
2533     pd->remote_addr     = request_msg->source_addr;
2534     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2535     pd->src_cq_hndl     = rdma_tx_cqh;
2536     pd->rdma_mode       = 0;
2537     pd->amo_cmd         = 0;
2538
2539     //memory registration successful
2540     if(status == GNI_RC_SUCCESS)
2541     {
2542         pd->local_mem_hndl  = msg_mem_hndl;
2543        
2544         if(pd->type == GNI_POST_RDMA_GET) 
2545         {
2546             CMI_GNI_LOCK(rdma_tx_cq_lock)
2547             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2548             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2549         }
2550         else
2551         {
2552             CMI_GNI_LOCK(default_tx_cq_lock)
2553             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2554             CMI_GNI_UNLOCK(default_tx_cq_lock)
2555         }
2556
2557     }else
2558     {
2559         SetMemHndlZero(pd->local_mem_hndl);
2560     }
2561     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2562     {
2563         MallocRdmaRequest(rdma_request_msg);
2564         rdma_request_msg->next = 0;
2565         rdma_request_msg->destNode = inst_id;
2566         rdma_request_msg->pd = pd;
2567         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2568     }else {
2569         GNI_RC_CHECK("AFter posting", status);
2570     }
2571 #endif
2572 }
2573
2574 #if CQWRITE
2575 static void PumpCqWriteTransactions()
2576 {
2577
2578     gni_cq_entry_t          ev;
2579     gni_return_t            status;
2580     void                    *msg;  
2581     int                     msg_size;
2582     while(1) {
2583         //CMI_GNI_LOCK(my_cq_lock) 
2584         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2585         //CMI_GNI_UNLOCK(my_cq_lock)
2586         if(status != GNI_RC_SUCCESS) break;
2587         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2588 #if CMK_PERSISTENT_COMM
2589 #if PRINT_SYH
2590         printf(" %d CQ write event %p\n", myrank, msg);
2591 #endif
2592         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2593 #if PRINT_SYH
2594             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2595 #endif
2596             CmiReference(msg);
2597             msg_size = CmiGetMsgSize(msg);
2598             CMI_CHECK_CHECKSUM(msg, msg_size);
2599             handleOneRecvedMsg(msg_size, msg); 
2600             continue;
2601         }
2602 #endif
2603 #if ! USE_LRTS_MEMPOOL
2604        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2605 #else
2606         DecreaseMsgInSend(msg);
2607 #endif
2608         if(NoMsgInSend(msg))
2609             buffered_send_msg -= GetMempoolsize(msg);
2610         CmiFree(msg);
2611     };
2612     if(status == GNI_RC_ERROR_RESOURCE)
2613     {
2614         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2615     }
2616 }
2617 #endif
2618
2619 #if REMOTE_EVENT
2620 static void PumpRemoteTransactions()
2621 {
2622     gni_cq_entry_t          ev;
2623     gni_return_t            status;
2624     void                    *msg;   
2625     int                     inst_id, index, type, size;
2626
2627     while(1) {
2628         CMI_GNI_LOCK(global_gni_lock)
2629         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2630         CMI_GNI_UNLOCK(global_gni_lock)
2631
2632         if(status != GNI_RC_SUCCESS) break;
2633
2634         inst_id = GNI_CQ_GET_INST_ID(ev);
2635         index = GET_INDEX(inst_id);
2636         type = GET_TYPE(inst_id);
2637         switch (type) {
2638         case 0:    // ACK
2639             CmiAssert(index>=0 && index<ackPool.size);
2640             CmiAssert(GetIndexType(ackPool, index) == 1);
2641             msg = GetIndexAddress(ackPool, index);
2642 #if PRINT_SYH
2643         printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2644 #endif
2645 #if ! USE_LRTS_MEMPOOL
2646            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2647 #else
2648             DecreaseMsgInSend(msg);
2649 #endif
2650             if(NoMsgInSend(msg))
2651                 buffered_send_msg -= GetMempoolsize(msg);
2652             CmiFree(msg);
2653             IndexPool_freeslot(&ackPool, index);
2654             break;
2655 #if CMK_PERSISTENT_COMM
2656         case 1:  {    // PERSISTENT
2657             CmiLock(persistPool.lock);
2658             CmiAssert(GetIndexType(persistPool, index) == 2);
2659             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2660             CmiUnlock(persistPool.lock);
2661             START_EVENT();
2662             msg = slot->destBuf[0].destAddress;
2663             size = CmiGetMsgSize(msg);
2664             CmiReference(msg);
2665             CMI_CHECK_CHECKSUM(msg, size);
2666             TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2667             handleOneRecvedMsg(size, msg); 
2668             break;
2669             }
2670 #endif
2671         default:
2672             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2673             CmiAbort("PumpRemoteTransactions: unknown type");
2674         }
2675     }
2676     if(status == GNI_RC_ERROR_RESOURCE)
2677     {
2678         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2679     }
2680 }
2681 #endif
2682
2683 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2684 {
2685     gni_cq_entry_t          ev;
2686     gni_return_t            status;
2687     uint64_t                type, inst_id;
2688     gni_post_descriptor_t   *tmp_pd;
2689     MSG_LIST                *ptr;
2690     CONTROL_MSG             *ack_msg_tmp;
2691     ACK_MSG                 *ack_msg;
2692     uint8_t                 msg_tag;
2693 #if CMK_DIRECT
2694     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2695 #endif
2696     SMSG_QUEUE         *queue = &smsg_queue;
2697
2698     while(1) {
2699         CMI_GNI_LOCK(my_cq_lock) 
2700         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2701         CMI_GNI_UNLOCK(my_cq_lock)
2702         if(status != GNI_RC_SUCCESS) break;
2703         
2704         type = GNI_CQ_GET_TYPE(ev);
2705         if (type == GNI_CQ_EVENT_TYPE_POST)
2706         {
2707
2708 #if CMI_EXERT_RDMA_CAP
2709             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2710             RDMA_pending--;
2711 #endif
2712             inst_id     = GNI_CQ_GET_INST_ID(ev);
2713 #if PRINT_SYH
2714             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2715 #endif
2716             CMI_GNI_LOCK(my_cq_lock)
2717             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2718             CMI_GNI_UNLOCK(my_cq_lock)
2719
2720             switch (tmp_pd->type) {
2721 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2722             case GNI_POST_RDMA_PUT:
2723 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2724                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2725 #endif
2726             case GNI_POST_FMA_PUT:
2727                 if(tmp_pd->amo_cmd == 1) {
2728 #if CMK_DIRECT
2729                     //sender ACK to receiver to trigger it is done
2730                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2731                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2732                     msg_tag = DIRECT_PUT_DONE_TAG;
2733 #endif
2734                 }
2735                 else {
2736                     CmiFree((void *)tmp_pd->local_addr);
2737 #if REMOTE_EVENT
2738                     FreePostDesc(tmp_pd);
2739                     continue;
2740 #elif CQWRITE
2741                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2742                     FreePostDesc(tmp_pd);
2743                     continue;
2744 #else
2745                     MallocControlMsg(ack_msg_tmp);
2746                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2747                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2748                     ack_msg_tmp->length  = tmp_pd->length;
2749                     msg_tag = PUT_DONE_TAG;
2750 #endif
2751                 }
2752                 break;
2753 #endif
2754             case GNI_POST_RDMA_GET:
2755             case GNI_POST_FMA_GET:  {
2756 #if  ! USE_LRTS_MEMPOOL
2757                 MallocControlMsg(ack_msg_tmp);
2758                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2759                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2760                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2761                 msg_tag = ACK_TAG;  
2762 #else
2763 #if CMK_WITH_STATS
2764                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2765 #endif
2766                 int seq_id = tmp_pd->cqwrite_value;
2767                 if(seq_id > 0)      // BIG_MSG
2768                 {
2769                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2770                     MallocControlMsg(ack_msg_tmp);
2771                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2772                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2773                     ack_msg_tmp->seq_id = seq_id;
2774                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2775                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2776                     ack_msg_tmp->length = tmp_pd->length;
2777                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2778                     msg_tag = BIG_MSG_TAG; 
2779                 } 
2780                 else
2781                 {
2782                     msg_tag = ACK_TAG; 
2783 #if  !REMOTE_EVENT && !CQWRITE
2784                     MallocAckMsg(ack_msg);
2785                     ack_msg->source_addr = tmp_pd->remote_addr;
2786 #endif
2787                 }
2788 #endif
2789                 break;
2790             }
2791             case  GNI_POST_CQWRITE:
2792                    FreePostDesc(tmp_pd);
2793                    continue;
2794             default:
2795                 CmiPrintf("type=%d\n", tmp_pd->type);
2796                 CmiAbort("PumpLocalTransactions: unknown type!");
2797             }      /* end of switch */
2798
2799 #if CMK_DIRECT
2800             if (tmp_pd->amo_cmd == 1) {
2801                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2802                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2803             }
2804             else
2805 #endif
2806             if (msg_tag == ACK_TAG) {
2807 #if !REMOTE_EVENT
2808 #if   !CQWRITE
2809                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2810                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2811 #else
2812                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2813 #endif
2814 #endif
2815             }
2816             else {
2817                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2818                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2819             }
2820 #if CMK_PERSISTENT_COMM
2821             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2822 #endif
2823             {
2824                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2825 #if PRINT_SYH
2826                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2827 #endif
2828                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2829                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2830
2831                     START_EVENT();
2832                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2833                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2834 #if MACHINE_DEBUG_LOG
2835                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2836                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2837                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2838 #endif
2839                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2840                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2841                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2842                 }else if(msg_tag == BIG_MSG_TAG){
2843                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2844                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2845                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2846                         START_EVENT();
2847 #if PRINT_SYH
2848                         printf("Pipeline msg done [%d]\n", myrank);
2849 #endif
2850 #if                 CMK_SMP_TRACE_COMMTHREAD
2851                         if( tmp_pd->cqwrite_value == 1)
2852                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2853 #endif
2854                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2855                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2856                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2857                     }
2858                 }
2859             }
2860             FreePostDesc(tmp_pd);
2861         }
2862     } //end while
2863     if(status == GNI_RC_ERROR_RESOURCE)
2864     {
2865         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2866         GNI_RC_CHECK("Smsg_tx_cq full", status);
2867     }
2868 }
2869
2870 static void  SendRdmaMsg()
2871 {
2872     gni_return_t            status = GNI_RC_SUCCESS;
2873     gni_mem_handle_t        msg_mem_hndl;
2874     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2875     RDMA_REQUEST            *pre = 0;
2876     uint64_t                register_size = 0;
2877     void                    *msg;
2878     int                     i;
2879 #if CMK_SMP
2880     int len = PCQueueLength(sendRdmaBuf);
2881     for (i=0; i<len; i++)
2882     {
2883 #if CMI_EXERT_RDMA_CAP
2884          if( RDMA_pending >= RDMA_cap) break;
2885 #endif
2886         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2887         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2888         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2889         if (ptr == NULL) break;
2890 #else
2891     ptr = sendRdmaBuf;
2892     while (ptr!=0 )
2893     {
2894 #if CMI_EXERT_RDMA_CAP
2895          if( RDMA_pending >= RDMA_cap) break;
2896 #endif
2897 #endif 
2898         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2899         gni_post_descriptor_t *pd = ptr->pd;
2900         
2901         msg = (void*)(pd->local_addr);
2902         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2903         register_size = 0;
2904         if(pd->cqwrite_value == 0) {
2905             if(NoMsgInRecv(msg))
2906                 register_size = GetMempoolsize(msg);
2907         }
2908
2909         if(status == GNI_RC_SUCCESS)        //mem register good
2910         {
2911             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2912             CMI_GNI_LOCK(lock);
2913 #if REMOTE_EVENT
2914             if( pd->cqwrite_value == 0) {
2915                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2916                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2917                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2918             }
2919 #if CMK_PERSISTENT_COMM
2920             else if (pd->cqwrite_value == PERSIST_SEQ) {
2921                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2922                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, PERSIST_EVENT(ptr->ack_index));
2923                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2924             }
2925 #endif
2926 #endif
2927 #if CMK_WITH_STATS
2928             RDMA_TRY_SEND(pd->type)
2929 #endif
2930             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2931             {
2932                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2933             }
2934             else
2935             {
2936                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2937             }
2938             CMI_GNI_UNLOCK(lock);
2939             
2940             if(status == GNI_RC_SUCCESS)    //post good
2941             {
2942 #if CMI_EXERT_RDMA_CAP
2943                 RDMA_pending ++;
2944 #endif
2945 #if !CMK_SMP
2946                 tmp_ptr = ptr;
2947                 if(pre != 0) {
2948                     pre->next = ptr->next;
2949                 }
2950                 else {
2951                     sendRdmaBuf = ptr->next;
2952                 }
2953                 ptr = ptr->next;
2954                 FreeRdmaRequest(tmp_ptr);
2955 #endif
2956                 if(pd->cqwrite_value == 0)
2957                 {
2958 #if CMK_SMP_TRACE_COMMTHREAD 
2959                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2960 #endif
2961                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2962                 }
2963 #if  CMK_WITH_STATS
2964                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2965                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2966 #endif
2967 #if MACHINE_DEBUG_LOG
2968                 buffered_recv_msg += register_size;
2969                 MACHSTATE(8, "GO request from buffered\n"); 
2970 #endif
2971 #if PRINT_SYH
2972                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
2973 #endif
2974             }else           // cannot post
2975             {
2976 #if CMK_SMP
2977                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2978 #else
2979                 pre = ptr;
2980                 ptr = ptr->next;
2981 #endif
2982 #if PRINT_SYH
2983                 printf("[%d] SendRdmaMsg: post failed. seqno: %d\n", myrank, pd->cqwrite_value);
2984 #endif
2985                 break;
2986             }
2987         } else          //memory registration fails
2988         {
2989 #if CMK_SMP
2990             PCQueuePush(sendRdmaBuf, (char*)ptr);
2991 #else
2992             pre = ptr;
2993             ptr = ptr->next;
2994 #endif
2995         }
2996     } //end while
2997 #if ! CMK_SMP
2998     if(ptr == 0)
2999         sendRdmaTail = pre;
3000 #endif
3001 }
3002
3003 // return 1 if all messages are sent
3004 static int SendBufferMsg(SMSG_QUEUE *queue)
3005 {
3006     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3007     CONTROL_MSG         *control_msg_tmp;
3008     gni_return_t        status;
3009     int                 done = 1;
3010     uint64_t            register_size;
3011     void                *register_addr;
3012     int                 index_previous = -1;
3013 #if CMI_EXERT_SEND_CAP
3014     int                 sent_cnt = 0;
3015 #endif
3016
3017 #if CMK_SMP
3018     int          index = 0;
3019 #if ONE_SEND_QUEUE
3020     memset(destpe_avail, 0, mysize * sizeof(char));
3021     for (index=0; index<1; index++)
3022     {
3023         int i, len = PCQueueLength(queue->sendMsgBuf);
3024         for (i=0; i<len; i++) 
3025         {
3026             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3027             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3028             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3029             if(ptr == NULL) break;
3030             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3031                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3032                 continue;
3033             }
3034 #else
3035 #if SMP_LOCKS
3036     int nonempty = PCQueueLength(nonEmptyQueues);
3037     for(index =0; index<nonempty; index++)
3038     {
3039         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
3040         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
3041         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
3042         if(current_list == NULL) break; 
3043         PCQueue current_queue= current_list-> sendSmsgBuf;
3044         CmiLock(current_list->lock);
3045         int i, len = PCQueueLength(current_queue);
3046         current_list->pushed = 0;
3047         CmiUnlock(current_list->lock);
3048 #else
3049     for(index =0; index<mysize; index++)
3050     {
3051         //if (index == myrank) continue;
3052         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3053         int i, len = PCQueueLength(current_queue);
3054 #endif
3055         for (i=0; i<len; i++) 
3056         {
3057             CMI_PCQUEUEPOP_LOCK(current_queue)
3058             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3059             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3060             if (ptr == 0) break;
3061 #endif
3062 #else
3063     int index = queue->smsg_head_index;
3064     while(index != -1)
3065     {
3066         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3067         pre = 0;
3068         while(ptr != 0)
3069         {
3070 #endif
3071             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3072             status = GNI_RC_ERROR_RESOURCE;
3073             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
3074                 /* connection not exists yet */
3075             }
3076             else
3077             switch(ptr->tag)
3078             {
3079             case SMALL_DATA_TAG:
3080                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3081                 if(status == GNI_RC_SUCCESS)
3082                 {
3083                     CmiFree(ptr->msg);
3084                 }
3085                 break;
3086             case LMSG_INIT_TAG:
3087                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3088                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
3089                 break;
3090 #if !REMOTE_EVENT && !CQWRITE
3091             case ACK_TAG:
3092                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3093                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3094                 break;
3095 #endif
3096             case BIG_MSG_TAG:
3097                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3098                 if(status == GNI_RC_SUCCESS)
3099                 {
3100                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3101                 }
3102                 break;
3103 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3104             case PUT_DONE_TAG:
3105                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3106                 if(status == GNI_RC_SUCCESS)
3107                 {
3108                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3109                 }
3110                 break;
3111 #endif
3112 #if CMK_DIRECT
3113             case DIRECT_PUT_DONE_TAG:
3114                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3115                 if(status == GNI_RC_SUCCESS)
3116                 {
3117                     free((CMK_DIRECT_HEADER*)ptr->msg);
3118                 }
3119                 break;
3120
3121 #endif
3122             default:
3123                 printf("Weird tag\n");
3124                 CmiAbort("should not happen\n");
3125             }       // end switch
3126             if(status == GNI_RC_SUCCESS)
3127             {
3128 #if PRINT_SYH
3129                 buffered_smsg_counter--;
3130                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3131 #endif
3132 #if !CMK_SMP
3133                 tmp_ptr = ptr;
3134                 if(pre)
3135                 {
3136                     ptr = pre ->next = ptr->next;
3137                 }else
3138                 {
3139                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3140                 }
3141                 FreeMsgList(tmp_ptr);
3142 #else
3143                 FreeMsgList(ptr);
3144 #endif
3145 #if CMI_EXERT_SEND_CAP
3146                 if(++sent_cnt == SEND_CAP) break;
3147 #endif
3148             }else {
3149 #if CMK_SMP
3150 #if ONE_SEND_QUEUE
3151                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3152 #else
3153                 PCQueuePush(current_queue, (char*)ptr);
3154 #endif
3155 #else
3156                 pre = ptr;
3157                 ptr=ptr->next;
3158 #endif
3159                 done = 0;
3160                 if(status == GNI_RC_ERROR_RESOURCE)
3161                 {
3162 #if CMK_SMP && ONE_SEND_QUEUE 
3163                     destpe_avail[ptr->destNode] = 1;
3164 #else
3165                     break;
3166 #endif
3167                 }
3168             } 
3169         } //end while
3170 #if !CMK_SMP
3171         if(ptr == 0)
3172             queue->smsg_msglist_index[index].tail = pre;
3173         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3174         {
3175             if(index_previous != -1)
3176                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3177             else
3178                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3179         }
3180         else {
3181             index_previous = index;
3182         }
3183         index = queue->smsg_msglist_index[index].next;
3184 #else
3185 #if !ONE_SEND_QUEUE && SMP_LOCKS
3186         CmiLock(current_list->lock);
3187         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3188         {
3189             current_list->pushed = 1;
3190             PCQueuePush(nonEmptyQueues, current_list);
3191         }
3192         CmiUnlock(current_list->lock); 
3193 #endif
3194 #endif
3195
3196 #if CMI_EXERT_SEND_CAP
3197         if(sent_cnt == SEND_CAP) {
3198             done = 0;
3199             break;
3200         }
3201 #endif
3202     }   // end pooling for all cores
3203     return done;
3204 }
3205
3206 static void ProcessDeadlock();
3207 void LrtsAdvanceCommunication(int whileidle)
3208 {
3209     static int count = 0;
3210     /*  Receive Msg first */
3211 #if CMK_SMP_TRACE_COMMTHREAD
3212     double startT, endT;
3213 #endif
3214     if (useDynamicSMSG && whileidle)
3215     {
3216 #if CMK_SMP_TRACE_COMMTHREAD
3217         startT = CmiWallTimer();
3218 #endif
3219         PumpDatagramConnection();
3220 #if CMK_SMP_TRACE_COMMTHREAD
3221         endT = CmiWallTimer();
3222         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3223 #endif
3224     }
3225
3226 #if CMK_SMP_TRACE_COMMTHREAD
3227     startT = CmiWallTimer();
3228 #endif
3229     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3230     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3231 #if CMK_SMP_TRACE_COMMTHREAD
3232     endT = CmiWallTimer();
3233     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3234 #endif
3235
3236 #if CMK_SMP_TRACE_COMMTHREAD
3237     startT = CmiWallTimer();
3238 #endif
3239     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3240     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3241 #if CMK_SMP_TRACE_COMMTHREAD
3242     endT = CmiWallTimer();
3243     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3244 #endif
3245
3246 #if CMK_SMP_TRACE_COMMTHREAD
3247     startT = CmiWallTimer();
3248 #endif
3249     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3250
3251 #if CQWRITE
3252     PumpCqWriteTransactions();
3253 #endif
3254
3255 #if REMOTE_EVENT
3256     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions());
3257 #endif
3258
3259     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3260 #if CMK_SMP_TRACE_COMMTHREAD
3261     endT = CmiWallTimer();
3262     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3263 #endif
3264  
3265 #if CMK_SMP_TRACE_COMMTHREAD
3266     startT = CmiWallTimer();
3267 #endif
3268     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3269     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3270 #if CMK_SMP_TRACE_COMMTHREAD
3271     endT = CmiWallTimer();
3272     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3273 #endif
3274
3275     /* Send buffered Message */
3276 #if CMK_SMP_TRACE_COMMTHREAD
3277     startT = CmiWallTimer();
3278 #endif
3279 #if CMK_USE_OOB
3280     if (SendBufferMsg(&smsg_oob_queue) == 1)
3281 #endif
3282     {
3283         STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
3284     }
3285     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3286 #if CMK_SMP_TRACE_COMMTHREAD
3287     endT = CmiWallTimer();
3288     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3289 #endif
3290
3291 #if CMK_SMP && ! LARGEPAGE
3292     if (_detected_hang)  ProcessDeadlock();
3293 #endif
3294 }
3295
3296 /* useDynamicSMSG */
3297 static void _init_dynamic_smsg()
3298 {
3299     gni_return_t status;
3300     uint32_t     vmdh_index = -1;
3301     int i;
3302
3303     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3304     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3305     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3306     for(i=0; i<mysize; i++) {
3307         smsg_connected_flag[i] = 0;
3308         smsg_attr_vector_local[i] = NULL;
3309         smsg_attr_vector_remote[i] = NULL;
3310     }
3311     if(mysize <=512)
3312     {
3313         SMSG_MAX_MSG = 4096;
3314     }else if (mysize <= 4096)
3315     {
3316         SMSG_MAX_MSG = 4096/mysize * 1024;
3317     }else if (mysize <= 16384)
3318     {
3319         SMSG_MAX_MSG = 512;
3320     }else {
3321         SMSG_MAX_MSG = 256;
3322     }
3323
3324     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3325     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3326     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3327     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3328     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3329
3330     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3331     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3332     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3333     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3334     mailbox_list->offset = 0;
3335     mailbox_list->next = 0;
3336     
3337     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3338         mailbox_list->size, smsg_rx_cqh,
3339         GNI_MEM_READWRITE,   
3340         vmdh_index,
3341         &(mailbox_list->mem_hndl));
3342     GNI_RC_CHECK("MEMORY registration for smsg", status);
3343
3344     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3345     GNI_RC_CHECK("Unbound EP", status);
3346     
3347     alloc_smsg_attr(&send_smsg_attr);
3348
3349     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3350     GNI_RC_CHECK("post unbound datagram", status);
3351
3352       /* always pre-connect to proc 0 */
3353     //if (myrank != 0) connect_to(0);
3354 }
3355
3356 static void _init_static_smsg()
3357 {
3358     gni_smsg_attr_t      *smsg_attr;
3359     gni_smsg_attr_t      remote_smsg_attr;
3360     gni_smsg_attr_t      *smsg_attr_vec;
3361     gni_mem_handle_t     my_smsg_mdh_mailbox;
3362     int      ret, i;
3363     gni_return_t status;
3364     uint32_t              vmdh_index = -1;
3365     mdh_addr_t            base_infor;
3366     mdh_addr_t            *base_addr_vec;
3367     char *env;
3368
3369     if(mysize <=512)
3370     {
3371         SMSG_MAX_MSG = 1024;
3372     }else if (mysize <= 4096)
3373     {
3374         SMSG_MAX_MSG = 1024;
3375     }else if (mysize <= 16384)
3376     {
3377         SMSG_MAX_MSG = 512;
3378     }else {
3379         SMSG_MAX_MSG = 256;
3380     }
3381     
3382     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3383     if (env) SMSG_MAX_MSG = atoi(env);
3384     CmiAssert(SMSG_MAX_MSG > 0);
3385
3386     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3387     
3388     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3389     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3390     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3391     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3392     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3393     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3394     CmiAssert(ret == 0);
3395     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3396     
3397     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3398             smsg_memlen*(mysize), smsg_rx_cqh,
3399             GNI_MEM_READWRITE,   
3400             vmdh_index,
3401             &my_smsg_mdh_mailbox);
3402     register_memory_size += smsg_memlen*(mysize);
3403     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3404
3405     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3406     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3407
3408     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3409     base_infor.mdh =  my_smsg_mdh_mailbox;
3410     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3411
3412     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3413  
3414     for(i=0; i<mysize; i++)
3415     {
3416         if(i==myrank)
3417             continue;
3418         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3419         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3420         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3421         smsg_attr[i].mbox_offset = i*smsg_memlen;
3422         smsg_attr[i].buff_size = smsg_memlen;
3423         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3424         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3425     }
3426
3427     for(i=0; i<mysize; i++)
3428     {
3429         if (myrank == i) continue;
3430
3431         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3432         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3433         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3434         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3435         remote_smsg_attr.buff_size = smsg_memlen;
3436         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3437         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3438
3439         /* initialize the smsg channel */
3440         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3441         GNI_RC_CHECK("SMSG Init", status);
3442     } //end initialization
3443
3444     free(base_addr_vec);
3445     free(smsg_attr);
3446
3447     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3448     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3449
3450
3451 inline
3452 static void _init_send_queue(SMSG_QUEUE *queue)
3453 {
3454      int i;
3455 #if ONE_SEND_QUEUE
3456      queue->sendMsgBuf = PCQueueCreate();
3457      destpe_avail = (char*)malloc(mysize * sizeof(char));
3458 #else
3459      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3460 #if CMK_SMP && SMP_LOCKS
3461      nonEmptyQueues = PCQueueCreate();
3462 #endif
3463      for(i =0; i<mysize; i++)
3464      {
3465 #if CMK_SMP
3466          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3467 #if SMP_LOCKS
3468          queue->smsg_msglist_index[i].pushed = 0;
3469          queue->smsg_msglist_index[i].lock = CmiCreateLock();