persistent threshold use smsg_max, dynamic_smsgs works now with persistent
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           1
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     0
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_CAP      0
65 #define CMI_EXERT_RECV_CAP      0
66 #define CMI_EXERT_RDMA_CAP      0
67 #if CMI_EXERT_SEND_CAP
68 int SEND_large_cap = 100;
69 int SEND_large_pending = 0;
70 #endif
71
72 #if CMI_EXERT_RECV_CAP
73 #define RECV_CAP  4                  /* cap <= 2 sometimes hang */
74 #endif
75
76 #if CMI_EXERT_RDMA_CAP
77 int   RDMA_cap =   100;
78 int   RDMA_pending = 0;
79 #endif
80
81 #define USE_LRTS_MEMPOOL                  1
82
83 #define PRINT_SYH                         0
84
85 // Trace communication thread
86 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
87 #define TRACE_THRESHOLD     0.00005
88 #define CMI_MPI_TRACE_MOREDETAILED 0
89 #undef CMI_MPI_TRACE_USEREVENTS
90 #define CMI_MPI_TRACE_USEREVENTS 1
91 #else
92 #undef CMK_SMP_TRACE_COMMTHREAD
93 #define CMK_SMP_TRACE_COMMTHREAD 0
94 #endif
95
96 #define CMK_TRACE_COMMOVERHEAD 0
97 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
98 #undef CMI_MPI_TRACE_USEREVENTS
99 #define CMI_MPI_TRACE_USEREVENTS 1
100 #else
101 #undef CMK_TRACE_COMMOVERHEAD
102 #define CMK_TRACE_COMMOVERHEAD 0
103 #endif
104
105 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
106 CpvStaticDeclare(double, projTraceStart);
107 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
108 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
109 #else
110 #define  START_EVENT()
111 #define  END_EVENT(x)
112 #endif
113
114 #if USE_LRTS_MEMPOOL
115
116 #define oneMB (1024ll*1024)
117 #define oneGB (1024ll*1024*1024)
118
119 static CmiInt8 _mempool_size = 8*oneMB;
120 static CmiInt8 _expand_mem =  4*oneMB;
121 static CmiInt8 _mempool_size_limit = 0;
122
123 static CmiInt8 _totalmem = 0.8*oneGB;
124
125 #if LARGEPAGE
126 static CmiInt8 BIG_MSG  =  16*oneMB;
127 static CmiInt8 ONE_SEG  =  4*oneMB;
128 #else
129 static CmiInt8 BIG_MSG  =  4*oneMB;
130 static CmiInt8 ONE_SEG  =  2*oneMB;
131 #endif
132 #if MULTI_THREAD_SEND
133 static int BIG_MSG_PIPELINE = 1;
134 #else
135 static int BIG_MSG_PIPELINE = 4;
136 #endif
137
138 // dynamic flow control
139 static CmiInt8 buffered_send_msg = 0;
140 static CmiInt8 register_memory_size = 0;
141
142 #if LARGEPAGE
143 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
144 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
145 static CmiInt8  register_count = 0;
146 #else
147 #if CMK_SMP && COMM_THREAD_SEND 
148 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
149 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
150 #else
151 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
152 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
153 #endif
154
155
156 #endif
157
158 #endif     /* end USE_LRTS_MEMPOOL */
159
160 #if MULTI_THREAD_SEND 
161 #define     CMI_GNI_LOCK(x)       CmiLock(x);
162 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
163 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
164 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
165 #else
166 #define     CMI_GNI_LOCK(x)
167 #define     CMI_GNI_UNLOCK(x)
168 #define     CMI_PCQUEUEPOP_LOCK(Q)   
169 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
170 #endif
171
172 static int _tlbpagesize = 4096;
173
174 //static int _smpd_count  = 0;
175
176 static int   user_set_flag  = 0;
177
178 static int _checkProgress = 1;             /* check deadlock */
179 static int _detected_hang = 0;
180
181 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
182
183 // dynamic SMSG
184 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
185
186 static int avg_smsg_connection = 32;
187 static int                 *smsg_connected_flag= 0;
188 static gni_smsg_attr_t     **smsg_attr_vector_local;
189 static gni_smsg_attr_t     **smsg_attr_vector_remote;
190 static gni_ep_handle_t     ep_hndl_unbound;
191 static gni_smsg_attr_t     send_smsg_attr;
192 static gni_smsg_attr_t     recv_smsg_attr;
193
194 typedef struct _dynamic_smsg_mailbox{
195    void     *mailbox_base;
196    int      size;
197    int      offset;
198    gni_mem_handle_t  mem_hndl;
199    struct      _dynamic_smsg_mailbox  *next;
200 }dynamic_smsg_mailbox_t;
201
202 static dynamic_smsg_mailbox_t  *mailbox_list;
203
204 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
205 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
206
207 #if PRINT_SYH
208 int         lrts_send_msg_id = 0;
209 int         lrts_local_done_msg = 0;
210 int         lrts_send_rdma_success = 0;
211 #endif
212
213 #include "machine.h"
214
215 #include "pcqueue.h"
216
217 #include "mempool.h"
218
219 #if CMK_PERSISTENT_COMM
220 #include "machine-persistent.h"
221 #endif
222
223 //#define  USE_ONESIDED 1
224 #ifdef USE_ONESIDED
225 //onesided implementation is wrong, since no place to restore omdh
226 #include "onesided.h"
227 onesided_hnd_t   onesided_hnd;
228 onesided_md_t    omdh;
229 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
230
231 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
232
233 #else
234 uint8_t   onesided_hnd, omdh;
235
236 #if REMOTE_EVENT || CQWRITE 
237 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
238     if(register_memory_size+size>= MAX_REG_MEM) { \
239         status = GNI_RC_ERROR_NOMEM;} \
240     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
241         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
242 #else
243 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
244         if (register_memory_size + size >= MAX_REG_MEM) { \
245             status = GNI_RC_ERROR_NOMEM; \
246         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
247             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
248 #endif
249
250 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
251     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
252              register_memory_size -= size; \
253          else CmiAbort("MEM_DEregister");  \
254     } while (0)
255 #endif
256
257 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
258 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
259 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
260 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
261 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
262 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
263 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
264 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
265 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
266 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
267 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
268 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
269 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
270 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
271
272 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
273 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
274
275 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
276 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
277 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
278 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
279
280 #define ALIGNBUF                64
281
282 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
283 /* If SMSG is not used */
284
285 #define FMA_PER_CORE  1024
286 #define FMA_BUFFER_SIZE 1024
287
288 /* If SMSG is used */
289 static int  SMSG_MAX_MSG = 1024;
290 #define SMSG_MAX_CREDIT    72
291
292 #define MSGQ_MAXSIZE       2048
293
294 /* large message transfer with FMA or BTE */
295 #if ! REMOTE_EVENT
296 #define LRTS_GNI_RDMA_THRESHOLD  1024 
297 #else
298    /* remote events only work with RDMA */
299 #define LRTS_GNI_RDMA_THRESHOLD  0 
300 #endif
301
302 #if CMK_SMP
303 static int  REMOTE_QUEUE_ENTRIES=163840; 
304 static int LOCAL_QUEUE_ENTRIES=163840; 
305 #else
306 static int  REMOTE_QUEUE_ENTRIES=20480;
307 static int LOCAL_QUEUE_ENTRIES=20480; 
308 #endif
309
310 #define BIG_MSG_TAG             0x26
311 #define PUT_DONE_TAG            0x28
312 #define DIRECT_PUT_DONE_TAG     0x29
313 #define ACK_TAG                 0x30
314 /* SMSG is data message */
315 #define SMALL_DATA_TAG          0x31
316 /* SMSG is a control message to initialize a BTE */
317 #define LMSG_INIT_TAG           0x39 
318
319 #define DEBUG
320 #ifdef GNI_RC_CHECK
321 #undef GNI_RC_CHECK
322 #endif
323 #ifdef DEBUG
324 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
325 #else
326 #define GNI_RC_CHECK(msg,rc)
327 #endif
328
329 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
330 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
331 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
332
333 static int useStaticMSGQ = 0;
334 static int useStaticFMA = 0;
335 static int mysize, myrank;
336 static gni_nic_handle_t   nic_hndl;
337
338 typedef struct {
339     gni_mem_handle_t mdh;
340     uint64_t addr;
341 } mdh_addr_t ;
342 // this is related to dynamic SMSG
343
344 typedef struct mdh_addr_list{
345     gni_mem_handle_t mdh;
346    void *addr;
347     struct mdh_addr_list *next;
348 }mdh_addr_list_t;
349
350 static unsigned int         smsg_memlen;
351 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
352 mdh_addr_t          setup_mem;
353 mdh_addr_t          *smsg_connection_vec = 0;
354 gni_mem_handle_t    smsg_connection_memhndl;
355 static int          smsg_expand_slots = 10;
356 static int          smsg_available_slot = 0;
357 static void         *smsg_mailbox_mempool = 0;
358 mdh_addr_list_t     *smsg_dynamic_list = 0;
359
360 static void             *smsg_mailbox_base;
361 gni_msgq_attr_t         msgq_attrs;
362 gni_msgq_handle_t       msgq_handle;
363 gni_msgq_ep_attr_t      msgq_ep_attrs;
364 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
365
366 /* =====Beginning of Declarations of Machine Specific Variables===== */
367 static int cookie;
368 static int modes = 0;
369 static gni_cq_handle_t       smsg_rx_cqh = NULL;
370 static gni_cq_handle_t       default_tx_cqh = NULL;
371 static gni_cq_handle_t       rdma_tx_cqh = NULL;
372 static gni_cq_handle_t       rdma_rx_cqh = NULL;
373 static gni_cq_handle_t       post_tx_cqh = NULL;
374 static gni_ep_handle_t       *ep_hndl_array;
375
376 static CmiNodeLock           *ep_lock_array;
377 static CmiNodeLock           default_tx_cq_lock; 
378 static CmiNodeLock           rdma_tx_cq_lock; 
379 static CmiNodeLock           global_gni_lock; 
380 static CmiNodeLock           rx_cq_lock;
381 static CmiNodeLock           smsg_mailbox_lock;
382 static CmiNodeLock           smsg_rx_cq_lock;
383 static CmiNodeLock           *mempool_lock;
384 //#define     CMK_WITH_STATS      1
385 typedef struct msg_list
386 {
387     uint32_t destNode;
388     uint32_t size;
389     void *msg;
390     uint8_t tag;
391 #if !CMK_SMP
392     struct msg_list *next;
393 #endif
394 #if CMK_WITH_STATS
395     double  creation_time;
396 #endif
397 }MSG_LIST;
398
399
400 typedef struct control_msg
401 {
402     uint64_t            source_addr;    /* address from the start of buffer  */
403     uint64_t            dest_addr;      /* address from the start of buffer */
404     int                 total_length;   /* total length */
405     int                 length;         /* length of this packet */
406 #if REMOTE_EVENT
407     int                 ack_index;      /* index from integer to address */
408 #endif
409     uint8_t             seq_id;         //big message   0 meaning single message
410     gni_mem_handle_t    source_mem_hndl;
411     struct control_msg *next;
412 } CONTROL_MSG;
413
414 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
415
416 typedef struct ack_msg
417 {
418     uint64_t            source_addr;    /* address from the start of buffer  */
419 #if ! USE_LRTS_MEMPOOL
420     gni_mem_handle_t    source_mem_hndl;
421     int                 length;          /* total length */
422 #endif
423     struct ack_msg     *next;
424 } ACK_MSG;
425
426 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
427
428 #if CMK_DIRECT
429 typedef struct{
430     uint64_t    handler_addr;
431 }CMK_DIRECT_HEADER;
432
433 typedef struct {
434     char core[CmiMsgHeaderSizeBytes];
435     uint64_t handler;
436 }cmidirectMsg;
437
438 //SYH
439 CpvDeclare(int, CmiHandleDirectIdx);
440 void CmiHandleDirectMsg(cmidirectMsg* msg)
441 {
442
443     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
444    (*(_handle->callbackFnPtr))(_handle->callbackData);
445    CmiFree(msg);
446 }
447
448 void CmiDirectInit()
449 {
450     CpvInitialize(int,  CmiHandleDirectIdx);
451     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
452 }
453
454 #endif
455 typedef struct  rmda_msg
456 {
457     int                   destNode;
458 #if REMOTE_EVENT
459     int                   ack_index;
460 #endif
461     gni_post_descriptor_t *pd;
462 #if !CMK_SMP
463     struct  rmda_msg      *next;
464 #endif
465 }RDMA_REQUEST;
466
467
468 #if CMK_SMP
469 #define SMP_LOCKS               0
470 #define ONE_SEND_QUEUE                  0
471 PCQueue sendRdmaBuf;
472 typedef struct  msg_list_index
473 {
474     PCQueue     sendSmsgBuf;
475     int         pushed;
476     CmiNodeLock   lock;
477 } MSG_LIST_INDEX;
478 char                *destpe_avail;
479 #if  !ONE_SEND_QUEUE && SMP_LOCKS
480     PCQueue     nonEmptyQueues;
481 #endif
482 #else         /* non-smp */
483
484 static RDMA_REQUEST        *sendRdmaBuf = 0;
485 static RDMA_REQUEST        *sendRdmaTail = 0;
486 typedef struct  msg_list_index
487 {
488     int         next;
489     MSG_LIST    *sendSmsgBuf;
490     MSG_LIST    *tail;
491 } MSG_LIST_INDEX;
492
493 #endif
494
495 // buffered send queue
496 #if ! ONE_SEND_QUEUE
497 typedef struct smsg_queue
498 {
499     MSG_LIST_INDEX   *smsg_msglist_index;
500     int               smsg_head_index;
501 } SMSG_QUEUE;
502 #else
503 typedef struct smsg_queue
504 {
505     PCQueue       sendMsgBuf;
506 }  SMSG_QUEUE;
507 #endif
508
509 SMSG_QUEUE                  smsg_queue;
510 #if CMK_USE_OOB
511 SMSG_QUEUE                  smsg_oob_queue;
512 #endif
513
514 #if CMK_SMP
515
516 #define FreeMsgList(d)   free(d);
517 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
518
519 #else
520
521 static MSG_LIST       *msglist_freelist=0;
522
523 #define FreeMsgList(d)  \
524   do { \
525   (d)->next = msglist_freelist;\
526   msglist_freelist = d; \
527   } while (0)
528
529 #define MallocMsgList(d) \
530   do {  \
531   d = msglist_freelist;\
532   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
533              _MEMCHECK(d);\
534   } else msglist_freelist = d->next; \
535   d->next =0;  \
536   } while (0)
537
538 #endif
539
540 #if CMK_SMP
541
542 #define FreeControlMsg(d)      free(d);
543 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
544
545 #else
546
547 static CONTROL_MSG    *control_freelist=0;
548
549 #define FreeControlMsg(d)       \
550   do { \
551   (d)->next = control_freelist;\
552   control_freelist = d; \
553   } while (0);
554
555 #define MallocControlMsg(d) \
556   do {  \
557   d = control_freelist;\
558   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
559              _MEMCHECK(d);\
560   } else control_freelist = d->next;  \
561   } while (0);
562
563 #endif
564
565 #if CMK_SMP
566
567 #define FreeAckMsg(d)      free(d);
568 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
569
570 #else
571
572 static ACK_MSG        *ack_freelist=0;
573
574 #define FreeAckMsg(d)       \
575   do { \
576   (d)->next = ack_freelist;\
577   ack_freelist = d; \
578   } while (0)
579
580 #define MallocAckMsg(d) \
581   do { \
582   d = ack_freelist;\
583   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
584              _MEMCHECK(d);\
585   } else ack_freelist = d->next; \
586   } while (0)
587
588 #endif
589
590
591 # if CMK_SMP
592 #define FreeRdmaRequest(d)       free(d);
593 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
594 #else
595
596 static RDMA_REQUEST         *rdma_freelist = NULL;
597
598 #define FreeRdmaRequest(d)       \
599   do {  \
600   (d)->next = rdma_freelist;\
601   rdma_freelist = d;    \
602   } while (0)
603
604 #define MallocRdmaRequest(d) \
605   do {   \
606   d = rdma_freelist;\
607   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
608              _MEMCHECK(d);\
609   } else rdma_freelist = d->next; \
610   d->next =0;   \
611   } while (0)
612 #endif
613
614 /* reuse gni_post_descriptor_t */
615 static gni_post_descriptor_t *post_freelist=0;
616
617 #if  !CMK_SMP
618 #define FreePostDesc(d)       \
619     (d)->next_descr = post_freelist;\
620     post_freelist = d;
621
622 #define MallocPostDesc(d) \
623   d = post_freelist;\
624   if (d==0) { \
625      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
626      d->next_descr = 0;\
627       _MEMCHECK(d);\
628   } else post_freelist = d->next_descr;
629 #else
630
631 #define FreePostDesc(d)     free(d);
632 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
633
634 #endif
635
636
637 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
638 static int      buffered_smsg_counter = 0;
639
640 /* SmsgSend return success but message sent is not confirmed by remote side */
641 static MSG_LIST *buffered_fma_head = 0;
642 static MSG_LIST *buffered_fma_tail = 0;
643
644 /* functions  */
645 #define IsFree(a,ind)  !( a& (1<<(ind) ))
646 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
647 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
648
649 CpvDeclare(mempool_type*, mempool);
650
651 #if REMOTE_EVENT
652 /* ack pool for remote events */
653
654 static int  SHIFT   =           18;
655 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
656 #define RANK_MASK               ((1<<SHIFT) - 1)
657 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
658
659 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
660 #define GET_RANK(evt)           ((evt) & RANK_MASK)
661 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
662
663 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
664
665 #if CMK_SMP
666 #define INIT_SIZE                4096
667 #else
668 #define INIT_SIZE                1024
669 #endif
670
671 struct IndexStruct {
672 void *addr;
673 int next;
674 int type;     // 1: ACK   2: Persistent
675 };
676
677 typedef struct IndexPool {
678     struct IndexStruct   *indexes;
679     int                   size;
680     int                   freehead;
681     CmiNodeLock           lock;
682 } IndexPool;
683
684 static IndexPool  ackPool;
685 #if CMK_PERSISTENT_COMM
686 static IndexPool  persistPool;
687 #endif
688
689 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
690 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
691
692 static void IndexPool_init(IndexPool *pool)
693 {
694     int i;
695     if ((1<<SHIFT) < mysize) 
696         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
697     pool->size = INIT_SIZE;
698     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
699     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
700     for (i=0; i<pool->size-1; i++) {
701         pool->indexes[i].next = i+1;
702         pool->indexes[i].type = 0;
703     }
704     pool->indexes[i].next = -1;
705     pool->freehead = 0;
706 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
707     pool->lock  = CmiCreateLock();
708 #else
709     pool->lock  = 0;
710 #endif
711 }
712
713 static
714 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
715 {
716     int i;
717     int s;
718 #if MULTI_THREAD_SEND  
719     CmiLock(pool->lock);
720 #endif
721     s = pool->freehead;
722     if (s == -1) {
723         int newsize = pool->size * 2;
724         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
725         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
726         struct IndexStruct *old_ackpool = pool->indexes;
727         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
728         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
729         for (i=pool->size; i<newsize-1; i++) {
730             pool->indexes[i].next = i+1;
731             pool->indexes[i].type = 0;
732         }
733         pool->indexes[i].next = -1;
734         pool->indexes[i].type = 0;
735         pool->freehead = pool->size;
736         s = pool->size;
737         pool->size = newsize;
738         free(old_ackpool);
739     }
740     pool->freehead = pool->indexes[s].next;
741     pool->indexes[s].addr = addr;
742     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
743     pool->indexes[s].type = type;
744 #if MULTI_THREAD_SEND
745     CmiUnlock(pool->lock);
746 #endif
747     return s;
748 }
749
750 static
751 inline  void IndexPool_freeslot(IndexPool *pool, int s)
752 {
753     CmiAssert(s>=0 && s<pool->size);
754 #if MULTI_THREAD_SEND
755     CmiLock(pool->lock);
756 #endif
757     pool->indexes[s].next = pool->freehead;
758     pool->indexes[s].type = 0;
759     pool->freehead = s;
760 #if MULTI_THREAD_SEND
761     CmiUnlock(pool->lock);
762 #endif
763 }
764
765
766 #endif
767
768 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
769 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
770 #define CHARM_MAGIC_NUMBER               126
771
772 #if CMK_ERROR_CHECKING
773 extern unsigned char computeCheckSum(unsigned char *data, int len);
774 static int checksum_flag = 0;
775 #define CMI_SET_CHECKSUM(msg, len)      \
776         if (checksum_flag)  {   \
777           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
778           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
779         }
780 #define CMI_CHECK_CHECKSUM(msg, len)    \
781         if (checksum_flag)      \
782           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
783             CmiAbort("Fatal error: checksum doesn't agree!\n");
784 #else
785 #define CMI_SET_CHECKSUM(msg, len)
786 #define CMI_CHECK_CHECKSUM(msg, len)
787 #endif
788 /* =====End of Definitions of Message-Corruption Related Macros=====*/
789
790 static int print_stats = 0;
791 static int stats_off = 0;
792 void CmiTurnOnStats()
793 {
794     stats_off = 0;
795     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
796 }
797
798 void CmiTurnOffStats()
799 {
800     stats_off = 1;
801 }
802
803 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
804
805 #if CMK_WITH_STATS
806 FILE *counterLog = NULL;
807 typedef struct comm_thread_stats
808 {
809     uint64_t  smsg_data_count;
810     uint64_t  lmsg_init_count;
811     uint64_t  ack_count;
812     uint64_t  big_msg_ack_count;
813     uint64_t  smsg_count;
814     uint64_t  direct_put_done_count;
815     uint64_t  put_done_count;
816     //times of calling SmsgSend
817     uint64_t  try_smsg_data_count;
818     uint64_t  try_lmsg_init_count;
819     uint64_t  try_ack_count;
820     uint64_t  try_big_msg_ack_count;
821     uint64_t  try_direct_put_done_count;
822     uint64_t  try_put_done_count;
823     uint64_t  try_smsg_count;
824     
825     double    max_time_in_send_buffered_smsg;
826     double    all_time_in_send_buffered_smsg;
827
828     uint64_t  rdma_get_count, rdma_put_count;
829     uint64_t  try_rdma_get_count, try_rdma_put_count;
830     double    max_time_from_control_to_rdma_init;
831     double    all_time_from_control_to_rdma_init;
832
833     double    max_time_from_rdma_init_to_rdma_done;
834     double    all_time_from_rdma_init_to_rdma_done;
835
836     int      count_in_PumpNetwork;
837     double   time_in_PumpNetwork;
838     double   max_time_in_PumpNetwork;
839     int      count_in_SendBufferMsg_smsg;
840     double   time_in_SendBufferMsg_smsg;
841     double   max_time_in_SendBufferMsg_smsg;
842     int      count_in_SendRdmaMsg;
843     double   time_in_SendRdmaMsg;
844     double   max_time_in_SendRdmaMsg;
845     int      count_in_PumpRemoteTransactions;
846     double   time_in_PumpRemoteTransactions;
847     double   max_time_in_PumpRemoteTransactions;
848     int      count_in_PumpLocalTransactions_rdma;
849     double   time_in_PumpLocalTransactions_rdma;
850     double   max_time_in_PumpLocalTransactions_rdma;
851     int      count_in_PumpDatagramConnection;
852     double   time_in_PumpDatagramConnection;
853     double   max_time_in_PumpDatagramConnection;
854 } Comm_Thread_Stats;
855
856 static Comm_Thread_Stats   comm_stats;
857
858 static char *counters_dirname = "counters";
859
860 static void init_comm_stats()
861 {
862   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
863   if (print_stats){
864       char ln[200];
865       int code = mkdir(counters_dirname, 00777); 
866       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
867       counterLog=fopen(ln,"w");
868       if (counterLog == NULL) CmiAbort("Counter files open failed");
869   }
870 }
871
872 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
873
874 #define SMSG_SENT_DONE(creation_time, tag)  \
875         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
876             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
877             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
878             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
879             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
880             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
881             comm_stats.smsg_count++; \
882             double inbuff_time = CmiWallTimer() - creation_time;   \
883             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
884             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
885         }
886
887 #define SMSG_TRY_SEND(tag)  \
888         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
889             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
890             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
891             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
892             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
893             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
894             comm_stats.try_smsg_count++; \
895         }
896
897 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
898
899 #define  RDMA_TRANS_DONE(x)      \
900          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
901              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
902              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
903          }
904
905 #define  RDMA_TRANS_INIT(type, x)      \
906          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
907              double rdma_trans_time = CmiWallTimer() - x ; \
908              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
909              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
910          }
911
912 #define STATS_PUMPNETWORK_TIME(x)   \
913         { double t = CmiWallTimer(); \
914           x;        \
915           t = CmiWallTimer() - t;          \
916           comm_stats.count_in_PumpNetwork++;        \
917           comm_stats.time_in_PumpNetwork += t;   \
918           if (t>comm_stats.max_time_in_PumpNetwork)      \
919               comm_stats.max_time_in_PumpNetwork = t;    \
920         }
921
922 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
923         { double t = CmiWallTimer(); \
924           x;        \
925           t = CmiWallTimer() - t;          \
926           comm_stats.count_in_PumpRemoteTransactions ++;        \
927           comm_stats.time_in_PumpRemoteTransactions += t;   \
928           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
929               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
930         }
931
932 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
933         { double t = CmiWallTimer(); \
934           x;        \
935           t = CmiWallTimer() - t;          \
936           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
937           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
938           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
939               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
940         }
941
942 #define STATS_SEND_SMSGS_TIME(x)   \
943         { double t = CmiWallTimer(); \
944           x;        \
945           t = CmiWallTimer() - t;          \
946           comm_stats.count_in_SendBufferMsg_smsg ++;        \
947           comm_stats.time_in_SendBufferMsg_smsg += t;   \
948           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
949               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
950         }
951
952 #define STATS_SENDRDMAMSG_TIME(x)   \
953         { double t = CmiWallTimer(); \
954           x;        \
955           t = CmiWallTimer() - t;          \
956           comm_stats.count_in_SendRdmaMsg ++;        \
957           comm_stats.time_in_SendRdmaMsg += t;   \
958           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
959               comm_stats.max_time_in_SendRdmaMsg = t;    \
960         }
961
962 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
963         { double t = CmiWallTimer(); \
964           x;        \
965           t = CmiWallTimer() - t;          \
966           comm_stats.count_in_PumpDatagramConnection ++;        \
967           comm_stats.time_in_PumpDatagramConnection += t;   \
968           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
969               comm_stats.max_time_in_PumpDatagramConnection = t;    \
970         }
971
972 static void print_comm_stats()
973 {
974     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
975     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
976             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
977             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
978     
979     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
980             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
981             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
982
983     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
984     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
985     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
986
987
988     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
989     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
990     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
991     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
992     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
993     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
994     if (useDynamicSMSG)
995     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
996
997     fclose(counterLog);
998 }
999
1000 #else
1001 #define STATS_PUMPNETWORK_TIME(x)                  x
1002 #define STATS_SEND_SMSGS_TIME(x)                   x
1003 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
1004 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1005 #define STATS_SENDRDMAMSG_TIME(x)                  x
1006 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1007 #endif
1008
1009 static void
1010 allgather(void *in,void *out, int len)
1011 {
1012     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1013     int i,rc;
1014     int my_rank;
1015     char *tmp_buf,*out_ptr;
1016
1017     if(!already_called) {
1018
1019         rc = PMI_Get_size(&job_size);
1020         CmiAssert(rc == PMI_SUCCESS);
1021         rc = PMI_Get_rank(&my_rank);
1022         CmiAssert(rc == PMI_SUCCESS);
1023
1024         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1025         CmiAssert(ivec_ptr != NULL);
1026
1027         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1028         CmiAssert(rc == PMI_SUCCESS);
1029
1030         already_called = 1;
1031
1032     }
1033
1034     tmp_buf = (char *)malloc(job_size * len);
1035     CmiAssert(tmp_buf);
1036
1037     rc = PMI_Allgather(in,tmp_buf,len);
1038     CmiAssert(rc == PMI_SUCCESS);
1039
1040     out_ptr = out;
1041
1042     for(i=0;i<job_size;i++) {
1043
1044         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1045
1046     }
1047
1048     free(tmp_buf);
1049 }
1050
1051 static void
1052 allgather_2(void *in,void *out, int len)
1053 {
1054     //PMI_Allgather is out of order
1055     int i,rc, extend_len;
1056     int  rank_index;
1057     char *out_ptr, *out_ref;
1058     char *in2;
1059
1060     extend_len = sizeof(int) + len;
1061     in2 = (char*)malloc(extend_len);
1062
1063     memcpy(in2, &myrank, sizeof(int));
1064     memcpy(in2+sizeof(int), in, len);
1065
1066     out_ptr = (char*)malloc(mysize*extend_len);
1067
1068     rc = PMI_Allgather(in2, out_ptr, extend_len);
1069     GNI_RC_CHECK("allgather", rc);
1070
1071     out_ref = out;
1072
1073     for(i=0;i<mysize;i++) {
1074         //rank index 
1075         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1076         //copy to the rank index slot
1077         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1078     }
1079
1080     free(out_ptr);
1081     free(in2);
1082
1083 }
1084
1085 static unsigned int get_gni_nic_address(int device_id)
1086 {
1087     unsigned int address, cpu_id;
1088     gni_return_t status;
1089     int i, alps_dev_id=-1,alps_address=-1;
1090     char *token, *p_ptr;
1091
1092     p_ptr = getenv("PMI_GNI_DEV_ID");
1093     if (!p_ptr) {
1094         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1095        
1096         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1097     } else {
1098         while ((token = strtok(p_ptr,":")) != NULL) {
1099             alps_dev_id = atoi(token);
1100             if (alps_dev_id == device_id) {
1101                 break;
1102             }
1103             p_ptr = NULL;
1104         }
1105         CmiAssert(alps_dev_id != -1);
1106         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1107         CmiAssert(p_ptr != NULL);
1108         i = 0;
1109         while ((token = strtok(p_ptr,":")) != NULL) {
1110             if (i == alps_dev_id) {
1111                 alps_address = atoi(token);
1112                 break;
1113             }
1114             p_ptr = NULL;
1115             ++i;
1116         }
1117         CmiAssert(alps_address != -1);
1118         address = alps_address;
1119     }
1120     return address;
1121 }
1122
1123 static uint8_t get_ptag(void)
1124 {
1125     char *p_ptr, *token;
1126     uint8_t ptag;
1127
1128     p_ptr = getenv("PMI_GNI_PTAG");
1129     CmiAssert(p_ptr != NULL);
1130     token = strtok(p_ptr, ":");
1131     ptag = (uint8_t)atoi(token);
1132     return ptag;
1133         
1134 }
1135
1136 static uint32_t get_cookie(void)
1137 {
1138     uint32_t cookie;
1139     char *p_ptr, *token;
1140
1141     p_ptr = getenv("PMI_GNI_COOKIE");
1142     CmiAssert(p_ptr != NULL);
1143     token = strtok(p_ptr, ":");
1144     cookie = (uint32_t)atoi(token);
1145
1146     return cookie;
1147 }
1148
1149 #if LARGEPAGE
1150
1151 /* directly mmap memory from hugetlbfs for large pages */
1152
1153 #include <sys/stat.h>
1154 #include <fcntl.h>
1155 #include <sys/mman.h>
1156 #include <hugetlbfs.h>
1157
1158 // size must be _tlbpagesize aligned
1159 void *my_get_huge_pages(size_t size)
1160 {
1161     char filename[512];
1162     int fd;
1163     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1164     void *ptr = NULL;
1165
1166     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1167     fd = open(filename, O_RDWR | O_CREAT, mode);
1168     if (fd == -1) {
1169         CmiAbort("my_get_huge_pages: open filed");
1170     }
1171     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1172     if (ptr == MAP_FAILED) ptr = NULL;
1173 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1174     close(fd);
1175     unlink(filename);
1176     return ptr;
1177 }
1178
1179 void my_free_huge_pages(void *ptr, int size)
1180 {
1181 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1182     int ret = munmap(ptr, size);
1183     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1184 }
1185
1186 #endif
1187
1188 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1189 /* TODO: add any that are related */
1190 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1191
1192
1193 #include "machine-lrts.h"
1194 #include "machine-common-core.c"
1195
1196 /* Network progress function is used to poll the network when for
1197    messages. This flushes receive buffers on some  implementations*/
1198 #if CMK_MACHINE_PROGRESS_DEFINED
1199 void CmiMachineProgressImpl() {
1200 }
1201 #endif
1202
1203 static int SendBufferMsg(SMSG_QUEUE *queue);
1204 static void SendRdmaMsg();
1205 static void PumpNetworkSmsg();
1206 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1207 #if CQWRITE
1208 static void PumpCqWriteTransactions();
1209 #endif
1210 #if REMOTE_EVENT
1211 static void PumpRemoteTransactions();
1212 #endif
1213
1214 #if MACHINE_DEBUG_LOG
1215 FILE *debugLog = NULL;
1216 static CmiInt8 buffered_recv_msg = 0;
1217 int         lrts_smsg_success = 0;
1218 int         lrts_received_msg = 0;
1219 #endif
1220
1221 static void sweep_mempool(mempool_type *mptr)
1222 {
1223     int n = 0;
1224     block_header *current = &(mptr->block_head);
1225
1226     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1227     while( current!= NULL) {
1228         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1229         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1230     }
1231     printf("[n %d] sweep_mempool slot END.\n", myrank);
1232 }
1233
1234 inline
1235 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1236 {
1237     block_header *current = *from;
1238
1239     //while(register_memory_size>= MAX_REG_MEM)
1240     //{
1241         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1242             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1243
1244         *from = current;
1245         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1246         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1247         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1248     //}
1249     return GNI_RC_SUCCESS;
1250 }
1251
1252 inline 
1253 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1254 {
1255     gni_return_t status = GNI_RC_SUCCESS;
1256     //int size = GetMempoolsize(msg);
1257     //void *blockaddr = GetMempoolBlockPtr(msg);
1258     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1259    
1260     block_header *current = &(mptr->block_head);
1261     while(register_memory_size>= MAX_REG_MEM)
1262     {
1263         status = deregisterMemory(mptr, &current);
1264         if (status != GNI_RC_SUCCESS) break;
1265     }
1266     if(register_memory_size>= MAX_REG_MEM) return status;
1267
1268     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1269     while(1)
1270     {
1271         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1272         if(status == GNI_RC_SUCCESS)
1273         {
1274             break;
1275         }
1276         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1277         {
1278             GNI_RC_CHECK("registerFromMempool", status);
1279         }
1280         else
1281         {
1282             status = deregisterMemory(mptr, &current);
1283             if (status != GNI_RC_SUCCESS) break;
1284         }
1285     }; 
1286     return status;
1287 }
1288
1289 inline 
1290 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1291 {
1292     static int rank = -1;
1293     int i;
1294     gni_return_t status;
1295     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1296     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1297     mempool_type *mptr;
1298
1299     status = registerFromMempool(mptr1, msg, size, t, cqh);
1300     if (status == GNI_RC_SUCCESS) return status;
1301 #if CMK_SMP 
1302     for (i=0; i<CmiMyNodeSize()+1; i++) {
1303       rank = (rank+1)%(CmiMyNodeSize()+1);
1304       mptr = CpvAccessOther(mempool, rank);
1305       if (mptr == mptr1) continue;
1306       status = registerFromMempool(mptr, msg, size, t, cqh);
1307       if (status == GNI_RC_SUCCESS) return status;
1308     }
1309 #endif
1310     return  GNI_RC_ERROR_RESOURCE;
1311 }
1312
1313 inline
1314 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1315 {
1316     MSG_LIST        *msg_tmp;
1317     MallocMsgList(msg_tmp);
1318     msg_tmp->destNode = destNode;
1319     msg_tmp->size   = size;
1320     msg_tmp->msg    = msg;
1321     msg_tmp->tag    = tag;
1322 #if CMK_WITH_STATS
1323     SMSG_CREATION(msg_tmp)
1324 #endif
1325 #if !CMK_SMP
1326     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1327         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1328         queue->smsg_head_index = destNode;
1329         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1330     }else
1331     {
1332         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1333         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1334     }
1335 #else
1336 #if ONE_SEND_QUEUE
1337     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1338 #else
1339 #if SMP_LOCKS
1340     CmiLock(queue->smsg_msglist_index[destNode].lock);
1341     if(queue->smsg_msglist_index[destNode].pushed == 0)
1342     {
1343         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1344     }
1345     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1346     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1347 #else
1348     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1349 #endif
1350 #endif
1351 #endif
1352 #if PRINT_SYH
1353     buffered_smsg_counter++;
1354 #endif
1355 }
1356
1357 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1358 {
1359     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1360 }
1361
1362 inline
1363 static void setup_smsg_connection(int destNode)
1364 {
1365     mdh_addr_list_t  *new_entry = 0;
1366     gni_post_descriptor_t *pd;
1367     gni_smsg_attr_t      *smsg_attr;
1368     gni_return_t status = GNI_RC_NOT_DONE;
1369     RDMA_REQUEST        *rdma_request_msg;
1370     
1371     if(smsg_available_slot == smsg_expand_slots)
1372     {
1373         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1374         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1375         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1376
1377         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1378             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1379             GNI_MEM_READWRITE,   
1380             -1,
1381             &(new_entry->mdh));
1382         smsg_available_slot = 0; 
1383         new_entry->next = smsg_dynamic_list;
1384         smsg_dynamic_list = new_entry;
1385     }
1386     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1387     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1388     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1389     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1390     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1391     smsg_attr->buff_size = smsg_memlen;
1392     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1393     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1394     smsg_local_attr_vec[destNode] = smsg_attr;
1395     smsg_available_slot++;
1396     MallocPostDesc(pd);
1397     pd->type            = GNI_POST_FMA_PUT;
1398     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1399     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1400     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1401     pd->length          = sizeof(gni_smsg_attr_t);
1402     pd->local_addr      = (uint64_t) smsg_attr;
1403     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1404     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1405     pd->src_cq_hndl     = rdma_tx_cqh;
1406     pd->rdma_mode       = 0;
1407     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1408     print_smsg_attr(smsg_attr);
1409     if(status == GNI_RC_ERROR_RESOURCE )
1410     {
1411         MallocRdmaRequest(rdma_request_msg);
1412         rdma_request_msg->destNode = destNode;
1413         rdma_request_msg->pd = pd;
1414         /* buffer this request */
1415     }
1416 #if PRINT_SYH
1417     if(status != GNI_RC_SUCCESS)
1418        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1419     else
1420         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1421 #endif
1422 }
1423
1424 /* useDynamicSMSG */
1425 inline 
1426 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1427 {
1428     gni_return_t status = GNI_RC_NOT_DONE;
1429
1430     if(mailbox_list->offset == mailbox_list->size)
1431     {
1432         dynamic_smsg_mailbox_t *new_mailbox_entry;
1433         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1434         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1435         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1436         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1437         new_mailbox_entry->offset = 0;
1438         
1439         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1440             new_mailbox_entry->size, smsg_rx_cqh,
1441             GNI_MEM_READWRITE,   
1442             -1,
1443             &(new_mailbox_entry->mem_hndl));
1444
1445         GNI_RC_CHECK("register", status);
1446         new_mailbox_entry->next = mailbox_list;
1447         mailbox_list = new_mailbox_entry;
1448     }
1449     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1450     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1451     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1452     local_smsg_attr->mbox_offset = mailbox_list->offset;
1453     mailbox_list->offset += smsg_memlen;
1454     local_smsg_attr->buff_size = smsg_memlen;
1455     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1456     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1457 }
1458
1459 /* useDynamicSMSG */
1460 inline 
1461 static int connect_to(int destNode)
1462 {
1463     gni_return_t status = GNI_RC_NOT_DONE;
1464     CmiAssert(smsg_connected_flag[destNode] == 0);
1465     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1466     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1467     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1468     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1469     
1470     CMI_GNI_LOCK(global_gni_lock)
1471     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1472     CMI_GNI_UNLOCK(global_gni_lock)
1473     if (status == GNI_RC_ERROR_RESOURCE) {
1474       /* possibly destNode is making connection at the same time */
1475       free(smsg_attr_vector_local[destNode]);
1476       smsg_attr_vector_local[destNode] = NULL;
1477       free(smsg_attr_vector_remote[destNode]);
1478       smsg_attr_vector_remote[destNode] = NULL;
1479       mailbox_list->offset -= smsg_memlen;
1480 #if PRINT_SYH
1481     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1482 #endif
1483       return 0;
1484     }
1485     GNI_RC_CHECK("GNI_Post", status);
1486     smsg_connected_flag[destNode] = 1;
1487 #if PRINT_SYH
1488     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1489 #endif
1490     return 1;
1491 }
1492
1493 inline 
1494 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1495 {
1496     unsigned int          remote_address;
1497     uint32_t              remote_id;
1498     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1499     gni_smsg_attr_t       *smsg_attr;
1500     gni_post_descriptor_t *pd;
1501     gni_post_state_t      post_state;
1502     char                  *real_data; 
1503
1504     if (useDynamicSMSG) {
1505         switch (smsg_connected_flag[destNode]) {
1506         case 0: 
1507             connect_to(destNode);         /* continue to case 1 */
1508         case 1:                           /* pending connection, do nothing */
1509             status = GNI_RC_NOT_DONE;
1510             if(inbuff ==0)
1511                 buffer_small_msgs(queue, msg, size, destNode, tag);
1512             return status;
1513         }
1514     }
1515 #if CMK_SMP
1516 #if ! ONE_SEND_QUEUE
1517     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1518 #endif
1519     {
1520 #else
1521     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1522     {
1523 #endif
1524         //CMI_GNI_LOCK(smsg_mailbox_lock)
1525         CMI_GNI_LOCK(default_tx_cq_lock)
1526 #if CMK_SMP_TRACE_COMMTHREAD
1527         int oldpe = -1;
1528         int oldeventid = -1;
1529         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1530         { 
1531             START_EVENT();
1532             if ( tag == SMALL_DATA_TAG)
1533                 real_data = (char*)msg; 
1534             else 
1535                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1536             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1537             TRACE_COMM_SET_COMM_MSGID(real_data);
1538         }
1539 #endif
1540 #if REMOTE_EVENT
1541         if (tag == LMSG_INIT_TAG) {
1542             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1543             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1544                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1545         }
1546         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1547 #endif
1548 #if     CMK_WITH_STATS
1549         SMSG_TRY_SEND(tag)
1550 #endif
1551 #if CMK_WITH_STATS
1552     double              creation_time;
1553     if (ptr == NULL)
1554         creation_time = CmiWallTimer();
1555     else
1556         creation_time = ptr->creation_time;
1557 #endif
1558
1559     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1560 #if CMK_SMP_TRACE_COMMTHREAD
1561         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1562 #endif
1563         CMI_GNI_UNLOCK(default_tx_cq_lock)
1564         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1565         if(status == GNI_RC_SUCCESS)
1566         {
1567 #if     CMK_WITH_STATS
1568             SMSG_SENT_DONE(creation_time,tag) 
1569 #endif
1570 #if CMK_SMP_TRACE_COMMTHREAD
1571             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1572             { 
1573                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1574             }
1575 #endif
1576         }else
1577             status = GNI_RC_ERROR_RESOURCE;
1578     }
1579     if(status != GNI_RC_SUCCESS && inbuff ==0)
1580         buffer_small_msgs(queue, msg, size, destNode, tag);
1581     return status;
1582 }
1583
1584 inline 
1585 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1586 {
1587     /* construct a control message and send */
1588     CONTROL_MSG         *control_msg_tmp;
1589     MallocControlMsg(control_msg_tmp);
1590     control_msg_tmp->source_addr = (uint64_t)msg;
1591     control_msg_tmp->seq_id    = seqno;
1592     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1593 #if REMOTE_EVENT
1594     control_msg_tmp->ack_index    =  -1;
1595 #endif
1596 #if     USE_LRTS_MEMPOOL
1597     if(size < BIG_MSG)
1598     {
1599         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1600     }
1601     else
1602     {
1603         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1604         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1605         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1606     }
1607 #else
1608     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1609 #endif
1610     return control_msg_tmp;
1611 }
1612
1613 #define BLOCKING_SEND_CONTROL    0
1614
1615 // Large message, send control to receiver, receiver register memory and do a GET, 
1616 // return 1 - send no success
1617 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1618 {
1619     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1620     uint32_t            vmdh_index  = -1;
1621     int                 size;
1622     int                 offset = 0;
1623     uint64_t            source_addr;
1624     int                 register_size; 
1625     void                *msg;
1626
1627     size    =   control_msg_tmp->total_length;
1628     source_addr = control_msg_tmp->source_addr;
1629     register_size = control_msg_tmp->length;
1630
1631 #if  USE_LRTS_MEMPOOL
1632     if( control_msg_tmp->seq_id == 0 ){
1633 #if BLOCKING_SEND_CONTROL
1634         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1635             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1636                 LrtsAdvanceCommunication(0);
1637         }
1638 #endif
1639         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1640         {
1641             msg = (void*)source_addr;
1642             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1643             {
1644                 if(!inbuff)
1645                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1646                 return GNI_RC_ERROR_NOMEM;
1647             }
1648             //register the corresponding mempool
1649             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1650             if(status == GNI_RC_SUCCESS)
1651             {
1652                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1653             }
1654         }else
1655         {
1656             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1657             status = GNI_RC_SUCCESS;
1658         }
1659         if(NoMsgInSend(source_addr))
1660             register_size = GetMempoolsize((void*)(source_addr));
1661         else
1662             register_size = 0;
1663     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1664     {
1665         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1666         source_addr += offset;
1667         size = control_msg_tmp->length;
1668 #if BLOCKING_SEND_CONTROL
1669         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1670             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1671                 LrtsAdvanceCommunication(0);
1672         }
1673 #endif
1674         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1675             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1676             {
1677                 if(!inbuff)
1678                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1679                 return GNI_RC_ERROR_NOMEM;
1680             }
1681             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1682             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1683         }
1684         else
1685         {
1686             status = GNI_RC_SUCCESS;
1687         }
1688         register_size = 0;  
1689     }
1690
1691 #if CMI_EXERT_SEND_CAP
1692     if(SEND_large_pending >= SEND_large_cap)
1693     {
1694         status = GNI_RC_ERROR_NOMEM;
1695     }
1696 #endif
1697  
1698     if(status == GNI_RC_SUCCESS)
1699     {
1700        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1701         if(status == GNI_RC_SUCCESS)
1702         {
1703 #if CMI_EXERT_SEND_CAP
1704             SEND_large_pending++;
1705 #endif
1706             buffered_send_msg += register_size;
1707             if(control_msg_tmp->seq_id == 0)
1708             {
1709                 IncreaseMsgInSend(source_addr);
1710             }
1711             FreeControlMsg(control_msg_tmp);
1712             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1713         }else
1714             status = GNI_RC_ERROR_RESOURCE;
1715
1716     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1717     {
1718         CmiAbort("Memory registor for large msg\n");
1719     }else 
1720     {
1721         status = GNI_RC_ERROR_NOMEM; 
1722         if(!inbuff)
1723             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1724     }
1725     return status;
1726 #else
1727     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1728     if(status == GNI_RC_SUCCESS)
1729     {
1730         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1731         if(status == GNI_RC_SUCCESS)
1732         {
1733             FreeControlMsg(control_msg_tmp);
1734         }
1735     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1736     {
1737         CmiAbort("Memory registor for large msg\n");
1738     }else 
1739     {
1740         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1741     }
1742     return status;
1743 #endif
1744 }
1745
1746 inline void LrtsPrepareEnvelope(char *msg, int size)
1747 {
1748     CmiSetMsgSize(msg, size);
1749     CMI_SET_CHECKSUM(msg, size);
1750 }
1751
1752 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1753 {
1754     gni_return_t        status  =   GNI_RC_SUCCESS;
1755     uint8_t tag;
1756     CONTROL_MSG         *control_msg_tmp;
1757     int                 oob = ( mode & OUT_OF_BAND);
1758     SMSG_QUEUE          *queue;
1759
1760     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1761 #if CMK_USE_OOB
1762     queue = oob? &smsg_oob_queue : &smsg_queue;
1763 #else
1764     queue = &smsg_queue;
1765 #endif
1766
1767     LrtsPrepareEnvelope(msg, size);
1768
1769 #if PRINT_SYH
1770     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1771 #endif 
1772 #if CMK_SMP 
1773     if(size <= SMSG_MAX_MSG)
1774         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1775     else if (size < BIG_MSG) {
1776         control_msg_tmp =  construct_control_msg(size, msg, 0);
1777         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1778     }
1779     else {
1780           CmiSetMsgSeq(msg, 0);
1781           control_msg_tmp =  construct_control_msg(size, msg, 1);
1782           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1783     }
1784 #else   //non-smp, smp(worker sending)
1785     if(size <= SMSG_MAX_MSG)
1786     {
1787         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1788             CmiFree(msg);
1789     }
1790     else if (size < BIG_MSG) {
1791         control_msg_tmp =  construct_control_msg(size, msg, 0);
1792         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1793     }
1794     else {
1795 #if     USE_LRTS_MEMPOOL
1796         CmiSetMsgSeq(msg, 0);
1797         control_msg_tmp =  construct_control_msg(size, msg, 1);
1798         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1799 #else
1800         control_msg_tmp =  construct_control_msg(size, msg, 0);
1801         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1802 #endif
1803     }
1804 #endif
1805     return 0;
1806 }
1807
1808 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1809 {
1810   int i;
1811 #if CMK_BROADCAST_USE_CMIREFERENCE
1812   for(i=0;i<npes;i++) {
1813     if (pes[i] == CmiMyPe())
1814       CmiSyncSend(pes[i], len, msg);
1815     else {
1816       CmiReference(msg);
1817       CmiSyncSendAndFree(pes[i], len, msg);
1818     }
1819   }
1820 #else
1821   for(i=0;i<npes;i++) {
1822     CmiSyncSend(pes[i], len, msg);
1823   }
1824 #endif
1825 }
1826
1827 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1828 {
1829   /* A better asynchronous implementation may be wanted, but at least it works */
1830   CmiSyncListSendFn(npes, pes, len, msg);
1831   return (CmiCommHandle) 0;
1832 }
1833
1834 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1835 {
1836   if (npes == 1) {
1837       CmiSyncSendAndFree(pes[0], len, msg);
1838       return;
1839   }
1840 #if CMK_PERSISTENT_COMM
1841   if (CpvAccess(phs) && len > 1024) {
1842       int i;
1843       for(i=0;i<npes;i++) {
1844         if (pes[i] == CmiMyPe())
1845           CmiSyncSend(pes[i], len, msg);
1846         else {
1847           CmiReference(msg);
1848           CmiSyncSendAndFree(pes[i], len, msg);
1849         }
1850       }
1851       CmiFree(msg);
1852       return;
1853   }
1854 #endif
1855   
1856 #if CMK_BROADCAST_USE_CMIREFERENCE
1857   CmiSyncListSendFn(npes, pes, len, msg);
1858   CmiFree(msg);
1859 #else
1860   int i;
1861   for(i=0;i<npes-1;i++) {
1862     CmiSyncSend(pes[i], len, msg);
1863   }
1864   if (npes>0)
1865     CmiSyncSendAndFree(pes[npes-1], len, msg);
1866   else 
1867     CmiFree(msg);
1868 #endif
1869 }
1870
1871 static void    PumpDatagramConnection();
1872 static      int         event_SetupConnect = 111;
1873 static      int         event_PumpSmsg = 222 ;
1874 static      int         event_PumpTransaction = 333;
1875 static      int         event_PumpRdmaTransaction = 444;
1876 static      int         event_SendBufferSmsg = 444;
1877 static      int         event_SendFmaRdmaMsg = 555;
1878 static      int         event_AdvanceCommunication = 666;
1879
1880 static void registerUserTraceEvents() {
1881 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1882     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1883     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1884     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1885     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1886     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1887     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1888     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1889 #endif
1890 }
1891
1892 static void ProcessDeadlock()
1893 {
1894     static CmiUInt8 *ptr = NULL;
1895     static CmiUInt8  last = 0, mysum, sum;
1896     static int count = 0;
1897     gni_return_t status;
1898     int i;
1899
1900 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1901 //sweep_mempool(CpvAccess(mempool));
1902     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1903     mysum = smsg_send_count + smsg_recv_count;
1904     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1905     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1906     GNI_RC_CHECK("PMI_Allgather", status);
1907     sum = 0;
1908     for (i=0; i<mysize; i++)  sum+= ptr[i];
1909     if (last == 0 || sum == last) 
1910         count++;
1911     else
1912         count = 0;
1913     last = sum;
1914     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1915     if (count == 2) { 
1916         /* detected twice, it is a real deadlock */
1917         if (myrank == 0)  {
1918             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1919             CmiAbort("Fatal> Deadlock detected.");
1920         }
1921
1922     }
1923     _detected_hang = 0;
1924 }
1925
1926 static void CheckProgress()
1927 {
1928     if (smsg_send_count == last_smsg_send_count &&
1929         smsg_recv_count == last_smsg_recv_count ) 
1930     {
1931         _detected_hang = 1;
1932 #if !CMK_SMP
1933         if (_detected_hang) ProcessDeadlock();
1934 #endif
1935
1936     }
1937     else {
1938         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1939         last_smsg_send_count = smsg_send_count;
1940         last_smsg_recv_count = smsg_recv_count;
1941         _detected_hang = 0;
1942     }
1943 }
1944
1945 static void set_limit()
1946 {
1947     //if (!user_set_flag && CmiMyRank() == 0) {
1948     if (CmiMyRank() == 0) {
1949         int mynode = CmiPhysicalNodeID(CmiMyPe());
1950         int numpes = CmiNumPesOnPhysicalNode(mynode);
1951         int numprocesses = numpes / CmiMyNodeSize();
1952         MAX_REG_MEM  = _totalmem / numprocesses;
1953         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1954         if (CmiMyPe() == 0)
1955            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1956         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1957         {
1958              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1959              CmiAbort("memory registration\n");
1960         }
1961     }
1962 }
1963
1964 void LrtsPostCommonInit(int everReturn)
1965 {
1966 #if CMK_DIRECT
1967     CmiDirectInit();
1968 #endif
1969 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1970     CpvInitialize(double, projTraceStart);
1971     /* only PE 0 needs to care about registration (to generate sts file). */
1972     //if (CmiMyPe() == 0) 
1973     {
1974         registerMachineUserEventsFunction(&registerUserTraceEvents);
1975     }
1976 #endif
1977
1978 #if CMK_SMP
1979     CmiIdleState *s=CmiNotifyGetState();
1980     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1981     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1982 #else
1983     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1984     if (useDynamicSMSG)
1985     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1986 #endif
1987
1988 #if ! LARGEPAGE
1989     if (_checkProgress)
1990 #if CMK_SMP
1991     if (CmiMyRank() == 0)
1992 #endif
1993     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1994 #endif
1995  
1996 #if !LARGEPAGE
1997     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1998 #endif
1999 }
2000
2001 /* this is called by worker thread */
2002 void LrtsPostNonLocal(){
2003 #if CMK_SMP_TRACE_COMMTHREAD
2004     double startT, endT;
2005 #endif
2006 #if MULTI_THREAD_SEND
2007     if(mysize == 1) return;
2008 #if CMK_SMP_TRACE_COMMTHREAD
2009     traceEndIdle();
2010 #endif
2011
2012 #if CMK_SMP_TRACE_COMMTHREAD
2013     startT = CmiWallTimer();
2014 #endif
2015
2016 #if CMK_WORKER_SINGLE_TASK
2017     if (CmiMyRank() % 6 == 0)
2018 #endif
2019     PumpNetworkSmsg();
2020
2021 #if CMK_WORKER_SINGLE_TASK
2022     if (CmiMyRank() % 6 == 1)
2023 #endif
2024     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2025
2026 #if CMK_WORKER_SINGLE_TASK
2027     if (CmiMyRank() % 6 == 2)
2028 #endif
2029     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2030
2031 #if REMOTE_EVENT
2032 #if CMK_WORKER_SINGLE_TASK
2033     if (CmiMyRank() % 6 == 3)
2034 #endif
2035     PumpRemoteTransactions();
2036 #endif
2037
2038 #if CMK_USE_OOB
2039     if (SendBufferMsg(&smsg_oob_queue) == 1)
2040 #endif
2041     {
2042 #if CMK_WORKER_SINGLE_TASK
2043     if (CmiMyRank() % 6 == 4)
2044 #endif
2045         SendBufferMsg(&smsg_queue);
2046     }
2047
2048 #if CMK_WORKER_SINGLE_TASK
2049     if (CmiMyRank() % 6 == 5)
2050 #endif
2051     SendRdmaMsg();
2052
2053 #if CMK_SMP_TRACE_COMMTHREAD
2054     endT = CmiWallTimer();
2055     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2056 #endif
2057 #if CMK_SMP_TRACE_COMMTHREAD
2058     traceBeginIdle();
2059 #endif
2060 #endif
2061 }
2062
2063 /* useDynamicSMSG */
2064 static void    PumpDatagramConnection()
2065 {
2066     uint32_t          remote_address;
2067     uint32_t          remote_id;
2068     gni_return_t status;
2069     gni_post_state_t  post_state;
2070     uint64_t          datagram_id;
2071     int i;
2072
2073    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2074    {
2075        if (datagram_id >= mysize) {           /* bound endpoint */
2076            int pe = datagram_id - mysize;
2077            CMI_GNI_LOCK(global_gni_lock)
2078            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2079            CMI_GNI_UNLOCK(global_gni_lock)
2080            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2081            {
2082                CmiAssert(remote_id == pe);
2083                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2084                GNI_RC_CHECK("Dynamic SMSG Init", status);
2085 #if PRINT_SYH
2086                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2087 #endif
2088                CmiAssert(smsg_connected_flag[pe] == 1);
2089                smsg_connected_flag[pe] = 2;
2090            }
2091        }
2092        else {         /* unbound ep */
2093            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2094            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2095            {
2096                CmiAssert(remote_id<mysize);
2097                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2098                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2099                GNI_RC_CHECK("Dynamic SMSG Init", status);
2100 #if PRINT_SYH
2101                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2102 #endif
2103                smsg_connected_flag[remote_id] = 2;
2104
2105                alloc_smsg_attr(&send_smsg_attr);
2106                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2107                GNI_RC_CHECK("post unbound datagram", status);
2108            }
2109        }
2110    }
2111 }
2112
2113 /* pooling CQ to receive network message */
2114 static void PumpNetworkRdmaMsgs()
2115 {
2116     gni_cq_entry_t      event_data;
2117     gni_return_t        status;
2118
2119 }
2120
2121 inline 
2122 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
2123 {
2124     RDMA_REQUEST        *rdma_request_msg;
2125     MallocRdmaRequest(rdma_request_msg);
2126     rdma_request_msg->destNode = inst_id;
2127     rdma_request_msg->pd = pd;
2128 #if REMOTE_EVENT
2129     rdma_request_msg->ack_index = ack_index;
2130 #endif
2131 #if CMK_SMP
2132     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2133 #else
2134     if(sendRdmaBuf == 0)
2135     {
2136         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2137     }else{
2138         sendRdmaTail->next = rdma_request_msg;
2139         sendRdmaTail =  rdma_request_msg;
2140     }
2141 #endif
2142
2143 }
2144
2145 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2146
2147 static void PumpNetworkSmsg()
2148 {
2149     uint64_t            inst_id;
2150     int                 ret;
2151     gni_cq_entry_t      event_data;
2152     gni_return_t        status, status2;
2153     void                *header;
2154     uint8_t             msg_tag;
2155     int                 msg_nbytes;
2156     void                *msg_data;
2157     gni_mem_handle_t    msg_mem_hndl;
2158     gni_smsg_attr_t     *smsg_attr;
2159     gni_smsg_attr_t     *remote_smsg_attr;
2160     int                 init_flag;
2161     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2162     uint64_t            source_addr;
2163     SMSG_QUEUE         *queue = &smsg_queue;
2164 #if   CMK_DIRECT
2165     cmidirectMsg        *direct_msg;
2166 #endif
2167 #if CMI_EXERT_RECV_CAP
2168     int                  recv_cnt = 0;
2169 #endif
2170     while(1)
2171     {
2172         CMI_GNI_LOCK(smsg_rx_cq_lock)
2173         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2174         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2175         if(status != GNI_RC_SUCCESS)
2176             break;
2177         inst_id = GNI_CQ_GET_INST_ID(event_data);
2178 #if REMOTE_EVENT
2179         inst_id = GET_RANK(inst_id);      /* important */
2180 #endif
2181         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2182 #if PRINT_SYH
2183         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2184 #endif
2185         if (useDynamicSMSG) {
2186             /* subtle: smsg may come before connection is setup */
2187             while (smsg_connected_flag[inst_id] != 2) 
2188                PumpDatagramConnection();
2189         }
2190         msg_tag = GNI_SMSG_ANY_TAG;
2191         while(1) {
2192             CMI_GNI_LOCK(smsg_mailbox_lock)
2193             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2194             if (status != GNI_RC_SUCCESS)
2195             {
2196                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2197                 break;
2198             }
2199 #if PRINT_SYH
2200             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2201 #endif
2202             /* copy msg out and then put into queue (small message) */
2203             switch (msg_tag) {
2204             case SMALL_DATA_TAG:
2205             {
2206                 START_EVENT();
2207                 msg_nbytes = CmiGetMsgSize(header);
2208                 msg_data    = CmiAlloc(msg_nbytes);
2209                 memcpy(msg_data, (char*)header, msg_nbytes);
2210                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2211                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2212                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
2213                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2214                 handleOneRecvedMsg(msg_nbytes, msg_data);
2215                 break;
2216             }
2217             case LMSG_INIT_TAG:
2218             {
2219 #if MULTI_THREAD_SEND
2220                 MallocControlMsg(control_msg_tmp);
2221                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2222                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2223                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2224                 getLargeMsgRequest(control_msg_tmp, inst_id);
2225                 FreeControlMsg(control_msg_tmp);
2226 #else
2227                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2228                 getLargeMsgRequest(header, inst_id);
2229                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2230 #endif
2231                 break;
2232             }
2233 #if !REMOTE_EVENT && !CQWRITE
2234             case ACK_TAG:   //msg fit into mempool
2235             {
2236                 /* Get is done, release message . Now put is not used yet*/
2237                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2238                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2239                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2240 #if ! USE_LRTS_MEMPOOL
2241                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2242 #else
2243                 DecreaseMsgInSend(msg);
2244 #endif
2245                 if(NoMsgInSend(msg))
2246                     buffered_send_msg -= GetMempoolsize(msg);
2247                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2248                 CmiFree(msg);
2249 #if CMI_EXERT_SEND_CAP
2250                 SEND_large_pending--;
2251 #endif
2252                 break;
2253             }
2254 #endif
2255             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2256             {
2257 #if MULTI_THREAD_SEND
2258                 MallocControlMsg(header_tmp);
2259                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2260                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2261 #else
2262                 header_tmp = (CONTROL_MSG *) header;
2263 #endif
2264                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2265 #if CMI_EXERT_SEND_CAP
2266                     SEND_large_pending--;
2267 #endif
2268                 void *msg = (void*)(header_tmp->source_addr);
2269                 int cur_seq = CmiGetMsgSeq(msg);
2270                 int offset = ONE_SEG*(cur_seq+1);
2271                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2272                 buffered_send_msg -= header_tmp->length;
2273                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2274                 if (remain_size < 0) remain_size = 0;
2275                 CmiSetMsgSize(msg, remain_size);
2276                 if(remain_size <= 0) //transaction done
2277                 {
2278                     CmiFree(msg);
2279                 }else if (header_tmp->total_length > offset)
2280                 {
2281                     CmiSetMsgSeq(msg, cur_seq+1);
2282                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2283                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2284                     //send next seg
2285                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2286                          // pipelining
2287                     if (header_tmp->seq_id == 1) {
2288                       int i;
2289                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2290                         int seq = cur_seq+i+2;
2291                         CmiSetMsgSeq(msg, seq-1);
2292                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2293                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2294                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2295                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2296                       }
2297                     }
2298                 }
2299 #if MULTI_THREAD_SEND
2300                 FreeControlMsg(header_tmp);
2301 #else
2302                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2303 #endif
2304                 break;
2305             }
2306 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2307             case PUT_DONE_TAG:  {   //persistent message
2308                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2309                 int size = ((CONTROL_MSG *) header)->length;
2310                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2311                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2312                 CmiReference(msg);
2313                 CMI_CHECK_CHECKSUM(msg, size);
2314                 handleOneRecvedMsg(size, msg); 
2315 #if PRINT_SYH
2316                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2317 #endif
2318                 break;
2319             }
2320 #endif
2321 #if CMK_DIRECT
2322             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2323                 //create a trigger message
2324                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2325                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2326                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2327                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2328                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2329                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2330                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2331                 break;
2332 #endif
2333             default:
2334                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2335                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2336                 printf("weird tag problem\n");
2337                 CmiAbort("Unknown tag\n");
2338             }               // end switch
2339 #if PRINT_SYH
2340             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2341 #endif
2342             smsg_recv_count ++;
2343             msg_tag = GNI_SMSG_ANY_TAG;
2344 #if CMI_EXERT_RECV_CAP
2345             if (status == GNI_RC_SUCCESS && ++recv_cnt == RECV_CAP) return;
2346 #endif
2347         } //endwhile GNI_SmsgGetNextWTag
2348     }   //end while GetEvent
2349     if(status == GNI_RC_ERROR_RESOURCE)
2350     {
2351         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2352         GNI_RC_CHECK("Smsg_rx_cq full", status);
2353     }
2354 }
2355
2356 static void printDesc(gni_post_descriptor_t *pd)
2357 {
2358     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2359 }
2360
2361 #if CQWRITE
2362 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2363 {
2364     gni_post_descriptor_t *pd;
2365     gni_return_t        status = GNI_RC_SUCCESS;
2366     
2367     MallocPostDesc(pd);
2368     pd->type = GNI_POST_CQWRITE;
2369     pd->cq_mode = GNI_CQMODE_SILENT;
2370     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2371     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2372     pd->cqwrite_value = data;
2373     pd->remote_mem_hndl = mem_hndl;
2374     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2375     GNI_RC_CHECK("GNI_PostCqWrite", status);
2376 }
2377 #endif
2378
2379 // register memory for a message
2380 // return mem handle
2381 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2382 {
2383     gni_return_t status = GNI_RC_SUCCESS;
2384
2385     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2386
2387 #if CMK_PERSISTENT_COMM
2388       // persistent message is always registered
2389       // BIG_MSG small pieces do not have malloc chunk header
2390     if ((seqno <= 1 || seqno == PERSIST_SEQ) && !IsMemHndlZero(MEMHFIELD(msg))) {
2391         *memh = MEMHFIELD(msg);
2392         return GNI_RC_SUCCESS;
2393     }
2394 #endif
2395     if(seqno == 0 
2396 #if CMK_PERSISTENT_COMM
2397          || seqno == PERSIST_SEQ
2398 #endif
2399       )
2400     {
2401         if(IsMemHndlZero((GetMemHndl(msg))))
2402         {
2403             msg = (void*)(msg);
2404             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2405             if(status == GNI_RC_SUCCESS)
2406                 *memh = GetMemHndl(msg);
2407         }
2408         else {
2409             *memh = GetMemHndl(msg);
2410         }
2411     }
2412     else {
2413         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2414         status = registerMemory(msg, size, memh, NULL); 
2415     }
2416     return status;
2417 }
2418
2419 // for BIG_MSG called on receiver side for receiving control message
2420 // LMSG_INIT_TAG
2421 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2422 {
2423 #if     USE_LRTS_MEMPOOL
2424     CONTROL_MSG         *request_msg;
2425     gni_return_t        status = GNI_RC_SUCCESS;
2426     void                *msg_data;
2427     gni_post_descriptor_t *pd;
2428     gni_mem_handle_t    msg_mem_hndl;
2429     int                 size, transaction_size, offset = 0;
2430     size_t              register_size = 0;
2431
2432     // initial a get to transfer data from the sender side */
2433     request_msg = (CONTROL_MSG *) header;
2434     size = request_msg->total_length;
2435     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2436     MallocPostDesc(pd);
2437 #if CMK_WITH_STATS 
2438     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2439 #endif
2440     if(request_msg->seq_id < 2)   {
2441 #if CMK_SMP_TRACE_COMMTHREAD 
2442         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2443 #endif
2444         msg_data = CmiAlloc(size);
2445         CmiSetMsgSeq(msg_data, 0);
2446         _MEMCHECK(msg_data);
2447     }
2448     else {
2449         offset = ONE_SEG*(request_msg->seq_id-1);
2450         msg_data = (char*)request_msg->dest_addr + offset;
2451     }
2452    
2453     pd->cqwrite_value = request_msg->seq_id;
2454
2455     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2456     SetMemHndlZero(pd->local_mem_hndl);
2457     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2458     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2459         if(NoMsgInRecv( (void*)(msg_data)))
2460             register_size = GetMempoolsize((void*)(msg_data));
2461     }
2462
2463     pd->first_operand = ALIGN64(size);                   //  total length
2464
2465     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2466         pd->type            = GNI_POST_FMA_GET;
2467     else
2468         pd->type            = GNI_POST_RDMA_GET;
2469     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2470     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2471     pd->length          = transaction_size;
2472     pd->local_addr      = (uint64_t) msg_data;
2473     pd->remote_addr     = request_msg->source_addr + offset;
2474     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2475     pd->src_cq_hndl     = rdma_tx_cqh;
2476     pd->rdma_mode       = 0;
2477     pd->amo_cmd         = 0;
2478 #if CMI_EXERT_RDMA_CAP
2479     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2480 #endif
2481     //memory registration success
2482     if(status == GNI_RC_SUCCESS )
2483     {
2484         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2485         CMI_GNI_LOCK(lock)
2486 #if REMOTE_EVENT
2487         if( request_msg->seq_id == 0)
2488         {
2489             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2490             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2491             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2492         }
2493 #endif
2494
2495 #if CMK_WITH_STATS
2496         RDMA_TRY_SEND(pd->type)
2497 #endif
2498         if(pd->type == GNI_POST_RDMA_GET) 
2499         {
2500             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2501         }
2502         else
2503         {
2504             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2505         }
2506         CMI_GNI_UNLOCK(lock)
2507
2508         if(status == GNI_RC_SUCCESS )
2509         {
2510 #if CMI_EXERT_RDMA_CAP
2511             RDMA_pending++;
2512 #endif
2513             if(pd->cqwrite_value == 0)
2514             {
2515 #if MACHINE_DEBUG_LOG
2516                 buffered_recv_msg += register_size;
2517                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2518 #endif
2519                 IncreaseMsgInRecv(msg_data);
2520 #if CMK_SMP_TRACE_COMMTHREAD 
2521                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2522 #endif
2523             }
2524 #if  CMK_WITH_STATS
2525             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2526             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2527 #endif
2528         }
2529     }else
2530     {
2531         SetMemHndlZero((pd->local_mem_hndl));
2532     }
2533     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2534     {
2535 #if REMOTE_EVENT
2536         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2537 #else
2538         bufferRdmaMsg(inst_id, pd, -1); 
2539 #endif
2540     }else if (status != GNI_RC_SUCCESS) {
2541         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2542         GNI_RC_CHECK("GetLargeAFter posting", status);
2543     }
2544 #else
2545     CONTROL_MSG         *request_msg;
2546     gni_return_t        status;
2547     void                *msg_data;
2548     gni_post_descriptor_t *pd;
2549     RDMA_REQUEST        *rdma_request_msg;
2550     gni_mem_handle_t    msg_mem_hndl;
2551     //int source;
2552     // initial a get to transfer data from the sender side */
2553     request_msg = (CONTROL_MSG *) header;
2554     msg_data = CmiAlloc(request_msg->length);
2555     _MEMCHECK(msg_data);
2556
2557     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2558
2559     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2560     {
2561         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2562     }
2563
2564     MallocPostDesc(pd);
2565     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2566         pd->type            = GNI_POST_FMA_GET;
2567     else
2568         pd->type            = GNI_POST_RDMA_GET;
2569     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2570     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2571     pd->length          = ALIGN64(request_msg->length);
2572     pd->local_addr      = (uint64_t) msg_data;
2573     pd->remote_addr     = request_msg->source_addr;
2574     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2575     pd->src_cq_hndl     = rdma_tx_cqh;
2576     pd->rdma_mode       = 0;
2577     pd->amo_cmd         = 0;
2578
2579     //memory registration successful
2580     if(status == GNI_RC_SUCCESS)
2581     {
2582         pd->local_mem_hndl  = msg_mem_hndl;
2583        
2584         if(pd->type == GNI_POST_RDMA_GET) 
2585         {
2586             CMI_GNI_LOCK(rdma_tx_cq_lock)
2587             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2588             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2589         }
2590         else
2591         {
2592             CMI_GNI_LOCK(default_tx_cq_lock)
2593             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2594             CMI_GNI_UNLOCK(default_tx_cq_lock)
2595         }
2596
2597     }else
2598     {
2599         SetMemHndlZero(pd->local_mem_hndl);
2600     }
2601     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2602     {
2603         MallocRdmaRequest(rdma_request_msg);
2604         rdma_request_msg->next = 0;
2605         rdma_request_msg->destNode = inst_id;
2606         rdma_request_msg->pd = pd;
2607         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2608     }else {
2609         GNI_RC_CHECK("AFter posting", status);
2610     }
2611 #endif
2612 }
2613
2614 #if CQWRITE
2615 static void PumpCqWriteTransactions()
2616 {
2617
2618     gni_cq_entry_t          ev;
2619     gni_return_t            status;
2620     void                    *msg;  
2621     int                     msg_size;
2622     while(1) {
2623         //CMI_GNI_LOCK(my_cq_lock) 
2624         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2625         //CMI_GNI_UNLOCK(my_cq_lock)
2626         if(status != GNI_RC_SUCCESS) break;
2627         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2628 #if CMK_PERSISTENT_COMM
2629 #if PRINT_SYH
2630         printf(" %d CQ write event %p\n", myrank, msg);
2631 #endif
2632         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2633 #if PRINT_SYH
2634             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2635 #endif
2636             CmiReference(msg);
2637             msg_size = CmiGetMsgSize(msg);
2638             CMI_CHECK_CHECKSUM(msg, msg_size);
2639             handleOneRecvedMsg(msg_size, msg); 
2640             continue;
2641         }
2642 #endif
2643 #if ! USE_LRTS_MEMPOOL
2644        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2645 #else
2646         DecreaseMsgInSend(msg);
2647 #endif
2648         if(NoMsgInSend(msg))
2649             buffered_send_msg -= GetMempoolsize(msg);
2650         CmiFree(msg);
2651     };
2652     if(status == GNI_RC_ERROR_RESOURCE)
2653     {
2654         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2655     }
2656 }
2657 #endif
2658
2659 #if REMOTE_EVENT
2660 static void PumpRemoteTransactions()
2661 {
2662     gni_cq_entry_t          ev;
2663     gni_return_t            status;
2664     void                    *msg;   
2665     int                     inst_id, index, type, size;
2666
2667     while(1) {
2668         CMI_GNI_LOCK(global_gni_lock)
2669         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2670         CMI_GNI_UNLOCK(global_gni_lock)
2671
2672         if(status != GNI_RC_SUCCESS) break;
2673
2674         inst_id = GNI_CQ_GET_INST_ID(ev);
2675         index = GET_INDEX(inst_id);
2676         type = GET_TYPE(inst_id);
2677         switch (type) {
2678         case 0:    // ACK
2679             CmiAssert(index>=0 && index<ackPool.size);
2680             CmiAssert(GetIndexType(ackPool, index) == 1);
2681             msg = GetIndexAddress(ackPool, index);
2682 #if PRINT_SYH
2683         printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2684 #endif
2685 #if ! USE_LRTS_MEMPOOL
2686            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2687 #else
2688             DecreaseMsgInSend(msg);
2689 #endif
2690             if(NoMsgInSend(msg))
2691                 buffered_send_msg -= GetMempoolsize(msg);
2692             CmiFree(msg);
2693             IndexPool_freeslot(&ackPool, index);
2694 #if CMI_EXERT_SEND_CAP
2695             SEND_large_pending--;
2696 #endif
2697             break;
2698 #if CMK_PERSISTENT_COMM
2699         case 1:  {    // PERSISTENT
2700             CmiLock(persistPool.lock);
2701             CmiAssert(GetIndexType(persistPool, index) == 2);
2702             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2703             CmiUnlock(persistPool.lock);
2704             START_EVENT();
2705             msg = slot->destBuf[0].destAddress;
2706             size = CmiGetMsgSize(msg);
2707             CmiReference(msg);
2708             CMI_CHECK_CHECKSUM(msg, size);
2709             TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2710             handleOneRecvedMsg(size, msg); 
2711             break;
2712             }
2713 #endif
2714         default:
2715             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2716             CmiAbort("PumpRemoteTransactions: unknown type");
2717         }
2718     }
2719     if(status == GNI_RC_ERROR_RESOURCE)
2720     {
2721         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2722     }
2723 }
2724 #endif
2725
2726 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2727 {
2728     gni_cq_entry_t          ev;
2729     gni_return_t            status;
2730     uint64_t                type, inst_id;
2731     gni_post_descriptor_t   *tmp_pd;
2732     MSG_LIST                *ptr;
2733     CONTROL_MSG             *ack_msg_tmp;
2734     ACK_MSG                 *ack_msg;
2735     uint8_t                 msg_tag;
2736 #if CMK_DIRECT
2737     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2738 #endif
2739     SMSG_QUEUE         *queue = &smsg_queue;
2740
2741     while(1) {
2742         CMI_GNI_LOCK(my_cq_lock) 
2743         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2744         CMI_GNI_UNLOCK(my_cq_lock)
2745         if(status != GNI_RC_SUCCESS) break;
2746         
2747         type = GNI_CQ_GET_TYPE(ev);
2748         if (type == GNI_CQ_EVENT_TYPE_POST)
2749         {
2750
2751 #if CMI_EXERT_RDMA_CAP
2752             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2753             RDMA_pending--;
2754 #endif
2755             inst_id     = GNI_CQ_GET_INST_ID(ev);
2756 #if PRINT_SYH
2757             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2758 #endif
2759             CMI_GNI_LOCK(my_cq_lock)
2760             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2761             CMI_GNI_UNLOCK(my_cq_lock)
2762
2763             switch (tmp_pd->type) {
2764 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2765             case GNI_POST_RDMA_PUT:
2766 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2767                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2768 #endif
2769             case GNI_POST_FMA_PUT:
2770                 if(tmp_pd->amo_cmd == 1) {
2771 #if CMK_DIRECT
2772                     //sender ACK to receiver to trigger it is done
2773                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2774                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2775                     msg_tag = DIRECT_PUT_DONE_TAG;
2776 #endif
2777                 }
2778                 else {
2779                     CmiFree((void *)tmp_pd->local_addr);
2780 #if REMOTE_EVENT
2781                     FreePostDesc(tmp_pd);
2782                     continue;
2783 #elif CQWRITE
2784                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2785                     FreePostDesc(tmp_pd);
2786                     continue;
2787 #else
2788                     MallocControlMsg(ack_msg_tmp);
2789                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2790                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2791                     ack_msg_tmp->length  = tmp_pd->length;
2792                     msg_tag = PUT_DONE_TAG;
2793 #endif
2794                 }
2795                 break;
2796 #endif
2797             case GNI_POST_RDMA_GET:
2798             case GNI_POST_FMA_GET:  {
2799 #if  ! USE_LRTS_MEMPOOL
2800                 MallocControlMsg(ack_msg_tmp);
2801                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2802                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2803                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2804                 msg_tag = ACK_TAG;  
2805 #else
2806 #if CMK_WITH_STATS
2807                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2808 #endif
2809                 int seq_id = tmp_pd->cqwrite_value;
2810                 if(seq_id > 0)      // BIG_MSG
2811                 {
2812                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2813                     MallocControlMsg(ack_msg_tmp);
2814                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2815                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2816                     ack_msg_tmp->seq_id = seq_id;
2817                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2818                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2819                     ack_msg_tmp->length = tmp_pd->length;
2820                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2821                     msg_tag = BIG_MSG_TAG; 
2822                 } 
2823                 else
2824                 {
2825                     msg_tag = ACK_TAG; 
2826 #if  !REMOTE_EVENT && !CQWRITE
2827                     MallocAckMsg(ack_msg);
2828                     ack_msg->source_addr = tmp_pd->remote_addr;
2829 #endif
2830                 }
2831 #endif
2832                 break;
2833             }
2834             case  GNI_POST_CQWRITE:
2835                    FreePostDesc(tmp_pd);
2836                    continue;
2837             default:
2838                 CmiPrintf("type=%d\n", tmp_pd->type);
2839                 CmiAbort("PumpLocalTransactions: unknown type!");
2840             }      /* end of switch */
2841
2842 #if CMK_DIRECT
2843             if (tmp_pd->amo_cmd == 1) {
2844                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2845                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2846             }
2847             else
2848 #endif
2849             if (msg_tag == ACK_TAG) {
2850 #if !REMOTE_EVENT
2851 #if   !CQWRITE
2852                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2853                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2854 #else
2855                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2856 #endif
2857 #endif
2858             }
2859             else {
2860                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2861                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2862             }
2863 #if CMK_PERSISTENT_COMM
2864             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2865 #endif
2866             {
2867                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2868 #if PRINT_SYH
2869                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2870 #endif
2871                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2872                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2873
2874                     START_EVENT();
2875                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2876                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2877 #if MACHINE_DEBUG_LOG
2878                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2879                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2880                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2881 #endif
2882                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2883                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2884                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2885                 }else if(msg_tag == BIG_MSG_TAG){
2886                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2887                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2888                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2889                         START_EVENT();
2890 #if PRINT_SYH
2891                         printf("Pipeline msg done [%d]\n", myrank);
2892 #endif
2893 #if                 CMK_SMP_TRACE_COMMTHREAD
2894                         if( tmp_pd->cqwrite_value == 1)
2895                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2896 #endif
2897                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2898                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2899                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2900                     }
2901                 }
2902             }
2903             FreePostDesc(tmp_pd);
2904         }
2905     } //end while
2906     if(status == GNI_RC_ERROR_RESOURCE)
2907     {
2908         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2909         GNI_RC_CHECK("Smsg_tx_cq full", status);
2910     }
2911 }
2912
2913 static void  SendRdmaMsg()
2914 {
2915     gni_return_t            status = GNI_RC_SUCCESS;
2916     gni_mem_handle_t        msg_mem_hndl;
2917     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2918     RDMA_REQUEST            *pre = 0;
2919     uint64_t                register_size = 0;
2920     void                    *msg;
2921     int                     i;
2922 #if CMK_SMP
2923     int len = PCQueueLength(sendRdmaBuf);
2924     for (i=0; i<len; i++)
2925     {
2926 #if CMI_EXERT_RDMA_CAP
2927          if( RDMA_pending >= RDMA_cap) break;
2928 #endif
2929         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2930         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2931         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2932         if (ptr == NULL) break;
2933 #else
2934     ptr = sendRdmaBuf;
2935     while (ptr!=0 )
2936     {
2937 #if CMI_EXERT_RDMA_CAP
2938          if( RDMA_pending >= RDMA_cap) break;
2939 #endif
2940 #endif 
2941         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2942         gni_post_descriptor_t *pd = ptr->pd;
2943         
2944         msg = (void*)(pd->local_addr);
2945         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2946         register_size = 0;
2947         if(pd->cqwrite_value == 0) {
2948             if(NoMsgInRecv(msg))
2949                 register_size = GetMempoolsize(msg);
2950         }
2951
2952         if(status == GNI_RC_SUCCESS)        //mem register good
2953         {
2954             int destNode = ptr->destNode;
2955             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2956             CMI_GNI_LOCK(lock);
2957 #if REMOTE_EVENT
2958             if( pd->cqwrite_value == 0) {
2959                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2960                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
2961                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2962             }
2963 #if CMK_PERSISTENT_COMM
2964             else if (pd->cqwrite_value == PERSIST_SEQ) {
2965                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2966                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
2967                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2968             }
2969 #endif
2970 #endif
2971 #if CMK_WITH_STATS
2972             RDMA_TRY_SEND(pd->type)
2973 #endif
2974 #if CMK_SMP_TRACE_COMMTHREAD
2975             int oldpe = -1;
2976             int oldeventid = -1;
2977             START_EVENT();
2978             if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
2979             { 
2980                 TRACE_COMM_GET_MSGID((void*)pd->local_addr, &oldpe, &oldeventid);
2981                 TRACE_COMM_SET_COMM_MSGID((void*)pd->local_addr);
2982             }
2983 #endif
2984
2985             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2986             {
2987                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
2988             }
2989             else
2990             {
2991                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
2992             }
2993             CMI_GNI_UNLOCK(lock);
2994             
2995 #if CMK_SMP_TRACE_COMMTHREAD
2996             if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
2997             { 
2998                 if (oldpe != -1)  TRACE_COMM_SET_MSGID((void*)pd->local_addr, oldpe, oldeventid);
2999             }
3000 #endif
3001             if(status == GNI_RC_SUCCESS)    //post good
3002             {
3003 #if CMI_EXERT_RDMA_CAP
3004                 RDMA_pending ++;
3005 #endif
3006 #if !CMK_SMP
3007                 tmp_ptr = ptr;
3008                 if(pre != 0) {
3009                     pre->next = ptr->next;
3010                 }
3011                 else {
3012                     sendRdmaBuf = ptr->next;
3013                 }
3014                 ptr = ptr->next;
3015                 FreeRdmaRequest(tmp_ptr);
3016 #endif
3017                 if(pd->cqwrite_value == 0)
3018                 {
3019 #if CMK_SMP_TRACE_COMMTHREAD 
3020                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3021                     if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
3022                     { 
3023                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);
3024                     }
3025 #endif
3026                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3027                 }
3028 #if  CMK_WITH_STATS
3029                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3030                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3031 #endif
3032 #if MACHINE_DEBUG_LOG
3033                 buffered_recv_msg += register_size;
3034                 MACHSTATE(8, "GO request from buffered\n"); 
3035 #endif
3036 #if PRINT_SYH
3037                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
3038 #endif
3039             }else           // cannot post
3040             {
3041 #if CMK_SMP
3042                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3043 #else
3044                 pre = ptr;
3045                 ptr = ptr->next;
3046 #endif
3047 #if PRINT_SYH
3048                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3049 #endif
3050                 break;
3051             }
3052         } else          //memory registration fails
3053         {
3054 #if CMK_SMP
3055             PCQueuePush(sendRdmaBuf, (char*)ptr);
3056 #else
3057             pre = ptr;
3058             ptr = ptr->next;
3059 #endif
3060         }
3061     } //end while
3062 #if ! CMK_SMP
3063     if(ptr == 0)
3064         sendRdmaTail = pre;
3065 #endif
3066 }
3067
3068 // return 1 if all messages are sent
3069 static int SendBufferMsg(SMSG_QUEUE *queue)
3070 {
3071     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3072     CONTROL_MSG         *control_msg_tmp;
3073     gni_return_t        status;
3074     int                 done = 1;
3075     uint64_t            register_size;
3076     void                *register_addr;
3077     int                 index_previous = -1;
3078
3079 #if CMK_SMP
3080     int          index = 0;
3081 #if ONE_SEND_QUEUE
3082     memset(destpe_avail, 0, mysize * sizeof(char));
3083     for (index=0; index<1; index++)
3084     {
3085         int i, len = PCQueueLength(queue->sendMsgBuf);
3086         for (i=0; i<len; i++) 
3087         {
3088             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3089             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3090             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3091             if(ptr == NULL) break;
3092             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3093                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3094                 continue;
3095             }
3096 #else
3097 #if SMP_LOCKS
3098     int nonempty = PCQueueLength(nonEmptyQueues);
3099     for(index =0; index<nonempty; index++)
3100     {
3101         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
3102         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
3103         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
3104         if(current_list == NULL) break; 
3105         PCQueue current_queue= current_list-> sendSmsgBuf;
3106         CmiLock(current_list->lock);
3107         int i, len = PCQueueLength(current_queue);
3108         current_list->pushed = 0;
3109         CmiUnlock(current_list->lock);
3110 #else
3111     for(index =0; index<mysize; index++)
3112     {
3113         //if (index == myrank) continue;
3114         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3115         int i, len = PCQueueLength(current_queue);
3116 #endif
3117         for (i=0; i<len; i++) 
3118         {
3119             CMI_PCQUEUEPOP_LOCK(current_queue)
3120             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3121             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3122             if (ptr == 0) break;
3123 #endif
3124 #else
3125     int index = queue->smsg_head_index;
3126     while(index != -1)
3127     {
3128         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3129         pre = 0;
3130         while(ptr != 0)
3131         {
3132 #endif
3133             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3134             status = GNI_RC_ERROR_RESOURCE;
3135             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
3136                 /* connection not exists yet */
3137 #if CMK_SMP
3138                   /* non-smp case, connect is issued in send_smsg_message */
3139                 if (smsg_connected_flag[index] == 0)
3140                     connect_to(ptr->destNode); 
3141 #endif
3142             }
3143             else
3144             switch(ptr->tag)
3145             {
3146             case SMALL_DATA_TAG:
3147                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3148                 if(status == GNI_RC_SUCCESS)
3149                 {
3150                     CmiFree(ptr->msg);
3151                 }
3152                 break;
3153             case LMSG_INIT_TAG:
3154                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3155                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
3156                 break;
3157 #if !REMOTE_EVENT && !CQWRITE
3158             case ACK_TAG:
3159                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3160                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3161                 break;
3162 #endif
3163             case BIG_MSG_TAG:
3164                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3165                 if(status == GNI_RC_SUCCESS)
3166                 {
3167                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3168                 }
3169                 break;
3170 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3171             case PUT_DONE_TAG:
3172                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3173                 if(status == GNI_RC_SUCCESS)
3174                 {
3175                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3176                 }
3177                 break;
3178 #endif
3179 #if CMK_DIRECT
3180             case DIRECT_PUT_DONE_TAG:
3181                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3182                 if(status == GNI_RC_SUCCESS)
3183                 {
3184                     free((CMK_DIRECT_HEADER*)ptr->msg);
3185                 }
3186                 break;
3187
3188 #endif
3189             default:
3190                 printf("Weird tag\n");
3191                 CmiAbort("should not happen\n");
3192             }       // end switch
3193             if(status == GNI_RC_SUCCESS)
3194             {
3195 #if PRINT_SYH
3196                 buffered_smsg_counter--;
3197                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3198 #endif
3199 #if !CMK_SMP
3200                 tmp_ptr = ptr;
3201                 if(pre)
3202                 {
3203                     ptr = pre ->next = ptr->next;
3204                 }else
3205                 {
3206                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3207                 }
3208                 FreeMsgList(tmp_ptr);
3209 #else
3210                 FreeMsgList(ptr);
3211 #endif
3212             }else {
3213 #if CMK_SMP
3214 #if ONE_SEND_QUEUE
3215                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3216 #else
3217                 PCQueuePush(current_queue, (char*)ptr);
3218 #endif
3219 #else
3220                 pre = ptr;
3221                 ptr=ptr->next;
3222 #endif
3223                 done = 0;
3224                 if(status == GNI_RC_ERROR_RESOURCE)
3225                 {
3226 #if CMK_SMP && ONE_SEND_QUEUE 
3227                     destpe_avail[ptr->destNode] = 1;
3228 #else
3229                     break;
3230 #endif
3231                 }
3232             } 
3233         } //end while
3234 #if !CMK_SMP
3235         if(ptr == 0)
3236             queue->smsg_msglist_index[index].tail = pre;
3237         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3238         {
3239             if(index_previous != -1)
3240                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3241             else
3242                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3243         }
3244         else {
3245             index_previous = index;
3246         }
3247         index = queue->smsg_msglist_index[index].next;
3248 #else
3249 #if !ONE_SEND_QUEUE && SMP_LOCKS
3250         CmiLock(current_list->lock);
3251         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3252         {
3253             current_list->pushed = 1;
3254             PCQueuePush(nonEmptyQueues, current_list);
3255         }
3256         CmiUnlock(current_list->lock); 
3257 #endif
3258 #endif
3259
3260     }   // end pooling for all cores
3261     return done;
3262 }
3263
3264 static void ProcessDeadlock();
3265 void LrtsAdvanceCommunication(int whileidle)
3266 {
3267     static int count = 0;
3268     /*  Receive Msg first */
3269 #if CMK_SMP_TRACE_COMMTHREAD
3270     double startT, endT;
3271 #endif
3272     if (useDynamicSMSG && whileidle)
3273     {
3274 #if CMK_SMP_TRACE_COMMTHREAD
3275         startT = CmiWallTimer();
3276 #endif
3277         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3278 #if CMK_SMP_TRACE_COMMTHREAD
3279         endT = CmiWallTimer();
3280         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3281 #endif
3282     }
3283
3284 #if CMK_SMP_TRACE_COMMTHREAD
3285     startT = CmiWallTimer();
3286 #endif
3287     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3288     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3289 #if CMK_SMP_TRACE_COMMTHREAD
3290     endT = CmiWallTimer();
3291     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3292 #endif
3293
3294 #if CMK_SMP_TRACE_COMMTHREAD
3295     startT = CmiWallTimer();
3296 #endif
3297     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3298     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3299 #if CMK_SMP_TRACE_COMMTHREAD
3300     endT = CmiWallTimer();
3301     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3302 #endif
3303
3304 #if CMK_SMP_TRACE_COMMTHREAD
3305     startT = CmiWallTimer();
3306 #endif
3307     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3308
3309 #if CQWRITE
3310     PumpCqWriteTransactions();
3311 #endif
3312
3313 #if REMOTE_EVENT
3314     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions());
3315 #endif
3316
3317     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3318 #if CMK_SMP_TRACE_COMMTHREAD
3319     endT = CmiWallTimer();
3320     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3321 #endif
3322  
3323 #if CMK_SMP_TRACE_COMMTHREAD
3324     startT = CmiWallTimer();
3325 #endif
3326     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3327     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3328 #if CMK_SMP_TRACE_COMMTHREAD
3329     endT = CmiWallTimer();
3330     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3331 #endif
3332
3333     /* Send buffered Message */
3334 #if CMK_SMP_TRACE_COMMTHREAD
3335     startT = CmiWallTimer();
3336 #endif
3337 #if CMK_USE_OOB
3338     if (SendBufferMsg(&smsg_oob_queue) == 1)
3339 #endif
3340     {
3341         STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
3342     }
3343     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3344 #if CMK_SMP_TRACE_COMMTHREAD
3345     endT = CmiWallTimer();
3346     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3347 #endif
3348
3349 #if CMK_SMP && ! LARGEPAGE
3350     if (_detected_hang)  ProcessDeadlock();
3351 #endif
3352 }
3353
3354 static void set_smsg_max()
3355 {
3356     char *env;
3357
3358     if(mysize <=512)
3359     {
3360         SMSG_MAX_MSG = 1024;
3361     }else if (mysize <= 4096)
3362     {
3363         SMSG_MAX_MSG = 1024;
3364     }else if (mysize <= 16384)
3365     {
3366         SMSG_MAX_MSG = 512;
3367     }else {
3368         SMSG_MAX_MSG = 256;
3369     }
3370
3371     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3372     if (env) SMSG_MAX_MSG = atoi(env);
3373     CmiAssert(SMSG_MAX_MSG > 0);
3374 }    
3375
3376 /* useDynamicSMSG */
3377 static void _init_dynamic_smsg()
3378 {
3379     gni_return_t status;
3380     uint32_t     vmdh_index = -1;
3381     int i;
3382
3383     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3384     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3385     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3386     for(i=0; i<mysize; i++) {
3387         smsg_connected_flag[i] = 0;
3388         smsg_attr_vector_local[i] = NULL;
3389         smsg_attr_vector_remote[i] = NULL;
3390     }
3391
3392     set_smsg_max();
3393
3394     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3395     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3396     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3397     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3398     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3399
3400     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3401     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3402     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3403     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3404     mailbox_list->offset = 0;
3405     mailbox_list->next = 0;
3406     
3407     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3408         mailbox_list->size, smsg_rx_cqh,
3409         GNI_MEM_READWRITE,   
3410         vmdh_index,
3411         &(mailbox_list->mem_hndl));
3412     GNI_RC_CHECK("MEMORY registration for smsg", status);
3413
3414     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3415     GNI_RC_CHECK("Unbound EP", status);
3416     
3417     alloc_smsg_attr(&send_smsg_attr);
3418
3419     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3420     GNI_RC_CHECK("post unbound datagram", status);
3421
3422       /* always pre-connect to proc 0 */
3423     //if (myrank != 0) connect_to(0);
3424
3425     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3426     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3427 }
3428
3429 static void _init_static_smsg()
3430 {
3431     gni_smsg_attr_t      *smsg_attr;
3432     gni_smsg_attr_t      remote_smsg_attr;
3433     gni_smsg_attr_t      *smsg_attr_vec;
3434     gni_mem_handle_t     my_smsg_mdh_mailbox;
3435     int      ret, i;
3436     gni_return_t status;
3437     uint32_t              vmdh_index = -1;
3438     mdh_addr_t            base_infor;
3439     mdh_addr_t            *base_addr_vec;
3440
3441     set_smsg_max();
3442     
3443     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3444     
3445     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3446     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3447     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3448     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3449     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3450     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3451     CmiAssert(ret == 0);
3452     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3453     
3454     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3455             smsg_memlen*(mysize), smsg_rx_cqh,
3456             GNI_MEM_READWRITE,   
3457             vmdh_index,
3458             &my_smsg_mdh_mailbox);
3459     register_memory_size += smsg_memlen*(mysize);
3460     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3461
3462     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3463     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set