01f84e35e9ee4b93a9b7284100fd240f2608180c
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     0
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_CAP      0
65 #define CMI_EXERT_RECV_CAP      0
66 #define CMI_EXERT_RDMA_CAP      0
67
68 #if CMI_EXERT_SEND_CAP
69 int SEND_large_cap = 100;
70 int SEND_large_pending = 0;
71 #endif
72
73 #if CMI_EXERT_RECV_CAP
74 #define RECV_CAP  4                  /* cap <= 2 sometimes hang */
75 #endif
76
77 #if CMI_EXERT_RDMA_CAP
78 int   RDMA_cap =   100;
79 int   RDMA_pending = 0;
80 #endif
81
82 #define USE_LRTS_MEMPOOL                  1
83
84 #define PRINT_SYH                         0
85
86 // Trace communication thread
87 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
88 #define TRACE_THRESHOLD     0.00005
89 #define CMI_MPI_TRACE_MOREDETAILED 0
90 #undef CMI_MPI_TRACE_USEREVENTS
91 #define CMI_MPI_TRACE_USEREVENTS 1
92 #else
93 #undef CMK_SMP_TRACE_COMMTHREAD
94 #define CMK_SMP_TRACE_COMMTHREAD 0
95 #endif
96
97 #define CMK_TRACE_COMMOVERHEAD 0
98 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
99 #undef CMI_MPI_TRACE_USEREVENTS
100 #define CMI_MPI_TRACE_USEREVENTS 1
101 #else
102 #undef CMK_TRACE_COMMOVERHEAD
103 #define CMK_TRACE_COMMOVERHEAD 0
104 #endif
105
106 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
107 CpvStaticDeclare(double, projTraceStart);
108 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
109 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
110 #else
111 #define  START_EVENT()
112 #define  END_EVENT(x)
113 #endif
114
115 #if USE_LRTS_MEMPOOL
116
117 #define oneMB (1024ll*1024)
118 #define oneGB (1024ll*1024*1024)
119
120 static CmiInt8 _mempool_size = 8*oneMB;
121 static CmiInt8 _expand_mem =  4*oneMB;
122 static CmiInt8 _mempool_size_limit = 0;
123
124 static CmiInt8 _totalmem = 0.8*oneGB;
125
126 #if LARGEPAGE
127 static CmiInt8 BIG_MSG  =  16*oneMB;
128 static CmiInt8 ONE_SEG  =  4*oneMB;
129 #else
130 static CmiInt8 BIG_MSG  =  4*oneMB;
131 static CmiInt8 ONE_SEG  =  2*oneMB;
132 #endif
133 #if MULTI_THREAD_SEND
134 static int BIG_MSG_PIPELINE = 1;
135 #else
136 static int BIG_MSG_PIPELINE = 4;
137 #endif
138
139 // dynamic flow control
140 static CmiInt8 buffered_send_msg = 0;
141 static CmiInt8 register_memory_size = 0;
142
143 #if LARGEPAGE
144 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
145 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
146 static CmiInt8  register_count = 0;
147 #else
148 #if CMK_SMP && COMM_THREAD_SEND 
149 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
150 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
151 #else
152 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
153 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
154 #endif
155
156
157 #endif
158
159 #endif     /* end USE_LRTS_MEMPOOL */
160
161 #if MULTI_THREAD_SEND 
162 #define     CMI_GNI_LOCK(x)       CmiLock(x);
163 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
164 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
165 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
166 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
167 #else
168 #define     CMI_GNI_LOCK(x)
169 #define     CMI_GNI_TRYLOCK(x)         (0)
170 #define     CMI_GNI_UNLOCK(x)
171 #define     CMI_PCQUEUEPOP_LOCK(Q)   
172 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
173 #endif
174
175 static int _tlbpagesize = 4096;
176
177 //static int _smpd_count  = 0;
178
179 static int   user_set_flag  = 0;
180
181 static int _checkProgress = 1;             /* check deadlock */
182 static int _detected_hang = 0;
183
184 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
185
186 // dynamic SMSG
187 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
188
189 static int avg_smsg_connection = 32;
190 static int                 *smsg_connected_flag= 0;
191 static gni_smsg_attr_t     **smsg_attr_vector_local;
192 static gni_smsg_attr_t     **smsg_attr_vector_remote;
193 static gni_ep_handle_t     ep_hndl_unbound;
194 static gni_smsg_attr_t     send_smsg_attr;
195 static gni_smsg_attr_t     recv_smsg_attr;
196
197 typedef struct _dynamic_smsg_mailbox{
198    void     *mailbox_base;
199    int      size;
200    int      offset;
201    gni_mem_handle_t  mem_hndl;
202    struct      _dynamic_smsg_mailbox  *next;
203 }dynamic_smsg_mailbox_t;
204
205 static dynamic_smsg_mailbox_t  *mailbox_list;
206
207 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
208 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
209
210 #if PRINT_SYH
211 int         lrts_send_msg_id = 0;
212 int         lrts_local_done_msg = 0;
213 int         lrts_send_rdma_success = 0;
214 #endif
215
216 #include "machine.h"
217
218 #include "pcqueue.h"
219
220 #include "mempool.h"
221
222 #if CMK_PERSISTENT_COMM
223 #include "machine-persistent.h"
224 #endif
225
226 //#define  USE_ONESIDED 1
227 #ifdef USE_ONESIDED
228 //onesided implementation is wrong, since no place to restore omdh
229 #include "onesided.h"
230 onesided_hnd_t   onesided_hnd;
231 onesided_md_t    omdh;
232 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
233
234 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
235
236 #else
237 uint8_t   onesided_hnd, omdh;
238
239 #if REMOTE_EVENT || CQWRITE 
240 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
241     if(register_memory_size+size>= MAX_REG_MEM) { \
242         status = GNI_RC_ERROR_NOMEM;} \
243     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
244         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
245 #else
246 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
247         if (register_memory_size + size >= MAX_REG_MEM) { \
248             status = GNI_RC_ERROR_NOMEM; \
249         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
250             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
251 #endif
252
253 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
254     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
255              register_memory_size -= size; \
256          else CmiAbort("MEM_DEregister");  \
257     } while (0)
258 #endif
259
260 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
261 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
262 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
263 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
264 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
265 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
266 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
267 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
268 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
269 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
270 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
271 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
272 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
273 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
274
275 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
276 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
277
278 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
279 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
280 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
281 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
282
283 #define ALIGNBUF                64
284
285 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
286 /* If SMSG is not used */
287
288 #define FMA_PER_CORE  1024
289 #define FMA_BUFFER_SIZE 1024
290
291 /* If SMSG is used */
292 static int  SMSG_MAX_MSG = 1024;
293 #define SMSG_MAX_CREDIT    72
294
295 #define MSGQ_MAXSIZE       2048
296
297 /* large message transfer with FMA or BTE */
298 #if ! REMOTE_EVENT
299 #define LRTS_GNI_RDMA_THRESHOLD  1024 
300 #else
301    /* remote events only work with RDMA */
302 #define LRTS_GNI_RDMA_THRESHOLD  0 
303 #endif
304
305 #if CMK_SMP
306 static int  REMOTE_QUEUE_ENTRIES=163840; 
307 static int LOCAL_QUEUE_ENTRIES=163840; 
308 #else
309 static int  REMOTE_QUEUE_ENTRIES=20480;
310 static int LOCAL_QUEUE_ENTRIES=20480; 
311 #endif
312
313 #define BIG_MSG_TAG             0x26
314 #define PUT_DONE_TAG            0x28
315 #define DIRECT_PUT_DONE_TAG     0x29
316 #define ACK_TAG                 0x30
317 /* SMSG is data message */
318 #define SMALL_DATA_TAG          0x31
319 /* SMSG is a control message to initialize a BTE */
320 #define LMSG_INIT_TAG           0x39 
321
322 #define DEBUG
323 #ifdef GNI_RC_CHECK
324 #undef GNI_RC_CHECK
325 #endif
326 #ifdef DEBUG
327 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
328 #else
329 #define GNI_RC_CHECK(msg,rc)
330 #endif
331
332 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
333 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
334 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
335
336 static int useStaticMSGQ = 0;
337 static int useStaticFMA = 0;
338 static int mysize, myrank;
339 static gni_nic_handle_t   nic_hndl;
340
341 typedef struct {
342     gni_mem_handle_t mdh;
343     uint64_t addr;
344 } mdh_addr_t ;
345 // this is related to dynamic SMSG
346
347 typedef struct mdh_addr_list{
348     gni_mem_handle_t mdh;
349    void *addr;
350     struct mdh_addr_list *next;
351 }mdh_addr_list_t;
352
353 static unsigned int         smsg_memlen;
354 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
355 mdh_addr_t          setup_mem;
356 mdh_addr_t          *smsg_connection_vec = 0;
357 gni_mem_handle_t    smsg_connection_memhndl;
358 static int          smsg_expand_slots = 10;
359 static int          smsg_available_slot = 0;
360 static void         *smsg_mailbox_mempool = 0;
361 mdh_addr_list_t     *smsg_dynamic_list = 0;
362
363 static void             *smsg_mailbox_base;
364 gni_msgq_attr_t         msgq_attrs;
365 gni_msgq_handle_t       msgq_handle;
366 gni_msgq_ep_attr_t      msgq_ep_attrs;
367 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
368
369 /* =====Beginning of Declarations of Machine Specific Variables===== */
370 static int cookie;
371 static int modes = 0;
372 static gni_cq_handle_t       smsg_rx_cqh = NULL;
373 static gni_cq_handle_t       default_tx_cqh = NULL;
374 static gni_cq_handle_t       rdma_tx_cqh = NULL;
375 static gni_cq_handle_t       rdma_rx_cqh = NULL;
376 static gni_cq_handle_t       post_tx_cqh = NULL;
377 static gni_ep_handle_t       *ep_hndl_array;
378
379 static CmiNodeLock           *ep_lock_array;
380 static CmiNodeLock           default_tx_cq_lock; 
381 static CmiNodeLock           rdma_tx_cq_lock; 
382 static CmiNodeLock           global_gni_lock; 
383 static CmiNodeLock           rx_cq_lock;
384 static CmiNodeLock           smsg_mailbox_lock;
385 static CmiNodeLock           smsg_rx_cq_lock;
386 static CmiNodeLock           *mempool_lock;
387 //#define     CMK_WITH_STATS      1
388 typedef struct msg_list
389 {
390     uint32_t destNode;
391     uint32_t size;
392     void *msg;
393     uint8_t tag;
394 #if !CMK_SMP
395     struct msg_list *next;
396 #endif
397 #if CMK_WITH_STATS
398     double  creation_time;
399 #endif
400 }MSG_LIST;
401
402
403 typedef struct control_msg
404 {
405     uint64_t            source_addr;    /* address from the start of buffer  */
406     uint64_t            dest_addr;      /* address from the start of buffer */
407     int                 total_length;   /* total length */
408     int                 length;         /* length of this packet */
409 #if REMOTE_EVENT
410     int                 ack_index;      /* index from integer to address */
411 #endif
412     uint8_t             seq_id;         //big message   0 meaning single message
413     gni_mem_handle_t    source_mem_hndl;
414     struct control_msg *next;
415 } CONTROL_MSG;
416
417 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
418
419 typedef struct ack_msg
420 {
421     uint64_t            source_addr;    /* address from the start of buffer  */
422 #if ! USE_LRTS_MEMPOOL
423     gni_mem_handle_t    source_mem_hndl;
424     int                 length;          /* total length */
425 #endif
426     struct ack_msg     *next;
427 } ACK_MSG;
428
429 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
430
431 #if CMK_DIRECT
432 typedef struct{
433     uint64_t    handler_addr;
434 }CMK_DIRECT_HEADER;
435
436 typedef struct {
437     char core[CmiMsgHeaderSizeBytes];
438     uint64_t handler;
439 }cmidirectMsg;
440
441 //SYH
442 CpvDeclare(int, CmiHandleDirectIdx);
443 void CmiHandleDirectMsg(cmidirectMsg* msg)
444 {
445
446     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
447    (*(_handle->callbackFnPtr))(_handle->callbackData);
448    CmiFree(msg);
449 }
450
451 void CmiDirectInit()
452 {
453     CpvInitialize(int,  CmiHandleDirectIdx);
454     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
455 }
456
457 #endif
458 typedef struct  rmda_msg
459 {
460     int                   destNode;
461 #if REMOTE_EVENT
462     int                   ack_index;
463 #endif
464     gni_post_descriptor_t *pd;
465 #if !CMK_SMP
466     struct  rmda_msg      *next;
467 #endif
468 }RDMA_REQUEST;
469
470
471 #if CMK_SMP
472 #define SMP_LOCKS                       0
473 #define ONE_SEND_QUEUE                  0
474 PCQueue sendRdmaBuf;
475 typedef struct  msg_list_index
476 {
477     PCQueue     sendSmsgBuf;
478     int         pushed;
479     CmiNodeLock   lock;
480 } MSG_LIST_INDEX;
481 char                *destpe_avail;
482 #if  !ONE_SEND_QUEUE && SMP_LOCKS
483     PCQueue     nonEmptyQueues;
484 #endif
485 #else         /* non-smp */
486
487 static RDMA_REQUEST        *sendRdmaBuf = 0;
488 static RDMA_REQUEST        *sendRdmaTail = 0;
489 typedef struct  msg_list_index
490 {
491     int         next;
492     MSG_LIST    *sendSmsgBuf;
493     MSG_LIST    *tail;
494 } MSG_LIST_INDEX;
495
496 #endif
497
498 // buffered send queue
499 #if ! ONE_SEND_QUEUE
500 typedef struct smsg_queue
501 {
502     MSG_LIST_INDEX   *smsg_msglist_index;
503     int               smsg_head_index;
504 } SMSG_QUEUE;
505 #else
506 typedef struct smsg_queue
507 {
508     PCQueue       sendMsgBuf;
509 }  SMSG_QUEUE;
510 #endif
511
512 SMSG_QUEUE                  smsg_queue;
513 #if CMK_USE_OOB
514 SMSG_QUEUE                  smsg_oob_queue;
515 #endif
516
517 #if CMK_SMP
518
519 #define FreeMsgList(d)   free(d);
520 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
521
522 #else
523
524 static MSG_LIST       *msglist_freelist=0;
525
526 #define FreeMsgList(d)  \
527   do { \
528   (d)->next = msglist_freelist;\
529   msglist_freelist = d; \
530   } while (0)
531
532 #define MallocMsgList(d) \
533   do {  \
534   d = msglist_freelist;\
535   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
536              _MEMCHECK(d);\
537   } else msglist_freelist = d->next; \
538   d->next =0;  \
539   } while (0)
540
541 #endif
542
543 #if CMK_SMP
544
545 #define FreeControlMsg(d)      free(d);
546 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
547
548 #else
549
550 static CONTROL_MSG    *control_freelist=0;
551
552 #define FreeControlMsg(d)       \
553   do { \
554   (d)->next = control_freelist;\
555   control_freelist = d; \
556   } while (0);
557
558 #define MallocControlMsg(d) \
559   do {  \
560   d = control_freelist;\
561   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
562              _MEMCHECK(d);\
563   } else control_freelist = d->next;  \
564   } while (0);
565
566 #endif
567
568 #if CMK_SMP
569
570 #define FreeAckMsg(d)      free(d);
571 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
572
573 #else
574
575 static ACK_MSG        *ack_freelist=0;
576
577 #define FreeAckMsg(d)       \
578   do { \
579   (d)->next = ack_freelist;\
580   ack_freelist = d; \
581   } while (0)
582
583 #define MallocAckMsg(d) \
584   do { \
585   d = ack_freelist;\
586   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
587              _MEMCHECK(d);\
588   } else ack_freelist = d->next; \
589   } while (0)
590
591 #endif
592
593
594 # if CMK_SMP
595 #define FreeRdmaRequest(d)       free(d);
596 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
597 #else
598
599 static RDMA_REQUEST         *rdma_freelist = NULL;
600
601 #define FreeRdmaRequest(d)       \
602   do {  \
603   (d)->next = rdma_freelist;\
604   rdma_freelist = d;    \
605   } while (0)
606
607 #define MallocRdmaRequest(d) \
608   do {   \
609   d = rdma_freelist;\
610   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
611              _MEMCHECK(d);\
612   } else rdma_freelist = d->next; \
613   d->next =0;   \
614   } while (0)
615 #endif
616
617 /* reuse gni_post_descriptor_t */
618 static gni_post_descriptor_t *post_freelist=0;
619
620 #if  !CMK_SMP
621 #define FreePostDesc(d)       \
622     (d)->next_descr = post_freelist;\
623     post_freelist = d;
624
625 #define MallocPostDesc(d) \
626   d = post_freelist;\
627   if (d==0) { \
628      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
629      d->next_descr = 0;\
630       _MEMCHECK(d);\
631   } else post_freelist = d->next_descr;
632 #else
633
634 #define FreePostDesc(d)     free(d);
635 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
636
637 #endif
638
639
640 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
641 static int      buffered_smsg_counter = 0;
642
643 /* SmsgSend return success but message sent is not confirmed by remote side */
644 static MSG_LIST *buffered_fma_head = 0;
645 static MSG_LIST *buffered_fma_tail = 0;
646
647 /* functions  */
648 #define IsFree(a,ind)  !( a& (1<<(ind) ))
649 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
650 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
651
652 CpvDeclare(mempool_type*, mempool);
653
654 #if REMOTE_EVENT
655 /* ack pool for remote events */
656
657 static int  SHIFT   =           18;
658 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
659 #define RANK_MASK               ((1<<SHIFT) - 1)
660 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
661
662 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
663 #define GET_RANK(evt)           ((evt) & RANK_MASK)
664 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
665
666 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
667
668 #if CMK_SMP
669 #define INIT_SIZE                4096
670 #else
671 #define INIT_SIZE                1024
672 #endif
673
674 struct IndexStruct {
675 void *addr;
676 int next;
677 int type;     // 1: ACK   2: Persistent
678 };
679
680 typedef struct IndexPool {
681     struct IndexStruct   *indexes;
682     int                   size;
683     int                   freehead;
684     CmiNodeLock           lock;
685 } IndexPool;
686
687 static IndexPool  ackPool;
688 #if CMK_PERSISTENT_COMM
689 static IndexPool  persistPool;
690 #endif
691
692 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
693 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
694
695 static void IndexPool_init(IndexPool *pool)
696 {
697     int i;
698     if ((1<<SHIFT) < mysize) 
699         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
700     pool->size = INIT_SIZE;
701     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
702     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
703     for (i=0; i<pool->size-1; i++) {
704         pool->indexes[i].next = i+1;
705         pool->indexes[i].type = 0;
706     }
707     pool->indexes[i].next = -1;
708     pool->freehead = 0;
709 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
710     pool->lock  = CmiCreateLock();
711 #else
712     pool->lock  = 0;
713 #endif
714 }
715
716 static
717 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
718 {
719     int s, i;
720 #if MULTI_THREAD_SEND  
721     CmiLock(pool->lock);
722 #endif
723     s = pool->freehead;
724     if (s == -1) {
725         int newsize = pool->size * 2;
726         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
727         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
728         struct IndexStruct *old_ackpool = pool->indexes;
729         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
730         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
731         for (i=pool->size; i<newsize-1; i++) {
732             pool->indexes[i].next = i+1;
733             pool->indexes[i].type = 0;
734         }
735         pool->indexes[i].next = -1;
736         pool->indexes[i].type = 0;
737         pool->freehead = pool->size;
738         s = pool->size;
739         pool->size = newsize;
740         free(old_ackpool);
741     }
742     pool->freehead = pool->indexes[s].next;
743     pool->indexes[s].addr = addr;
744     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
745     pool->indexes[s].type = type;
746 #if MULTI_THREAD_SEND
747     CmiUnlock(pool->lock);
748 #endif
749     return s;
750 }
751
752 static
753 inline  void IndexPool_freeslot(IndexPool *pool, int s)
754 {
755     CmiAssert(s>=0 && s<pool->size);
756 #if MULTI_THREAD_SEND
757     CmiLock(pool->lock);
758 #endif
759     pool->indexes[s].next = pool->freehead;
760     pool->indexes[s].type = 0;
761     pool->freehead = s;
762 #if MULTI_THREAD_SEND
763     CmiUnlock(pool->lock);
764 #endif
765 }
766
767
768 #endif
769
770 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
771 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
772 #define CHARM_MAGIC_NUMBER               126
773
774 #if CMK_ERROR_CHECKING
775 extern unsigned char computeCheckSum(unsigned char *data, int len);
776 static int checksum_flag = 0;
777 #define CMI_SET_CHECKSUM(msg, len)      \
778         if (checksum_flag)  {   \
779           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
780           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
781         }
782 #define CMI_CHECK_CHECKSUM(msg, len)    \
783         if (checksum_flag)      \
784           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
785             CmiAbort("Fatal error: checksum doesn't agree!\n");
786 #else
787 #define CMI_SET_CHECKSUM(msg, len)
788 #define CMI_CHECK_CHECKSUM(msg, len)
789 #endif
790 /* =====End of Definitions of Message-Corruption Related Macros=====*/
791
792 static int print_stats = 0;
793 static int stats_off = 0;
794 void CmiTurnOnStats()
795 {
796     stats_off = 0;
797     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
798 }
799
800 void CmiTurnOffStats()
801 {
802     stats_off = 1;
803 }
804
805 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
806
807 #if CMK_WITH_STATS
808 FILE *counterLog = NULL;
809 typedef struct comm_thread_stats
810 {
811     uint64_t  smsg_data_count;
812     uint64_t  lmsg_init_count;
813     uint64_t  ack_count;
814     uint64_t  big_msg_ack_count;
815     uint64_t  smsg_count;
816     uint64_t  direct_put_done_count;
817     uint64_t  put_done_count;
818     //times of calling SmsgSend
819     uint64_t  try_smsg_data_count;
820     uint64_t  try_lmsg_init_count;
821     uint64_t  try_ack_count;
822     uint64_t  try_big_msg_ack_count;
823     uint64_t  try_direct_put_done_count;
824     uint64_t  try_put_done_count;
825     uint64_t  try_smsg_count;
826     
827     double    max_time_in_send_buffered_smsg;
828     double    all_time_in_send_buffered_smsg;
829
830     uint64_t  rdma_get_count, rdma_put_count;
831     uint64_t  try_rdma_get_count, try_rdma_put_count;
832     double    max_time_from_control_to_rdma_init;
833     double    all_time_from_control_to_rdma_init;
834
835     double    max_time_from_rdma_init_to_rdma_done;
836     double    all_time_from_rdma_init_to_rdma_done;
837
838     int      count_in_PumpNetwork;
839     double   time_in_PumpNetwork;
840     double   max_time_in_PumpNetwork;
841     int      count_in_SendBufferMsg_smsg;
842     double   time_in_SendBufferMsg_smsg;
843     double   max_time_in_SendBufferMsg_smsg;
844     int      count_in_SendRdmaMsg;
845     double   time_in_SendRdmaMsg;
846     double   max_time_in_SendRdmaMsg;
847     int      count_in_PumpRemoteTransactions;
848     double   time_in_PumpRemoteTransactions;
849     double   max_time_in_PumpRemoteTransactions;
850     int      count_in_PumpLocalTransactions_rdma;
851     double   time_in_PumpLocalTransactions_rdma;
852     double   max_time_in_PumpLocalTransactions_rdma;
853     int      count_in_PumpDatagramConnection;
854     double   time_in_PumpDatagramConnection;
855     double   max_time_in_PumpDatagramConnection;
856 } Comm_Thread_Stats;
857
858 static Comm_Thread_Stats   comm_stats;
859
860 static char *counters_dirname = "counters";
861
862 static void init_comm_stats()
863 {
864   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
865   if (print_stats){
866       char ln[200];
867       int code = mkdir(counters_dirname, 00777); 
868       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
869       counterLog=fopen(ln,"w");
870       if (counterLog == NULL) CmiAbort("Counter files open failed");
871   }
872 }
873
874 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
875
876 #define SMSG_SENT_DONE(creation_time, tag)  \
877         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
878             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
879             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
880             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
881             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
882             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
883             comm_stats.smsg_count++; \
884             double inbuff_time = CmiWallTimer() - creation_time;   \
885             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
886             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
887         }
888
889 #define SMSG_TRY_SEND(tag)  \
890         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
891             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
892             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
893             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
894             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
895             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
896             comm_stats.try_smsg_count++; \
897         }
898
899 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
900
901 #define  RDMA_TRANS_DONE(x)      \
902          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
903              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
904              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
905          }
906
907 #define  RDMA_TRANS_INIT(type, x)      \
908          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
909              double rdma_trans_time = CmiWallTimer() - x ; \
910              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
911              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
912          }
913
914 #define STATS_PUMPNETWORK_TIME(x)   \
915         { double t = CmiWallTimer(); \
916           x;        \
917           t = CmiWallTimer() - t;          \
918           comm_stats.count_in_PumpNetwork++;        \
919           comm_stats.time_in_PumpNetwork += t;   \
920           if (t>comm_stats.max_time_in_PumpNetwork)      \
921               comm_stats.max_time_in_PumpNetwork = t;    \
922         }
923
924 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
925         { double t = CmiWallTimer(); \
926           x;        \
927           t = CmiWallTimer() - t;          \
928           comm_stats.count_in_PumpRemoteTransactions ++;        \
929           comm_stats.time_in_PumpRemoteTransactions += t;   \
930           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
931               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
932         }
933
934 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
935         { double t = CmiWallTimer(); \
936           x;        \
937           t = CmiWallTimer() - t;          \
938           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
939           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
940           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
941               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
942         }
943
944 #define STATS_SEND_SMSGS_TIME(x)   \
945         { double t = CmiWallTimer(); \
946           x;        \
947           t = CmiWallTimer() - t;          \
948           comm_stats.count_in_SendBufferMsg_smsg ++;        \
949           comm_stats.time_in_SendBufferMsg_smsg += t;   \
950           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
951               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
952         }
953
954 #define STATS_SENDRDMAMSG_TIME(x)   \
955         { double t = CmiWallTimer(); \
956           x;        \
957           t = CmiWallTimer() - t;          \
958           comm_stats.count_in_SendRdmaMsg ++;        \
959           comm_stats.time_in_SendRdmaMsg += t;   \
960           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
961               comm_stats.max_time_in_SendRdmaMsg = t;    \
962         }
963
964 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
965         { double t = CmiWallTimer(); \
966           x;        \
967           t = CmiWallTimer() - t;          \
968           comm_stats.count_in_PumpDatagramConnection ++;        \
969           comm_stats.time_in_PumpDatagramConnection += t;   \
970           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
971               comm_stats.max_time_in_PumpDatagramConnection = t;    \
972         }
973
974 static void print_comm_stats()
975 {
976     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
977     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
978             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
979             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
980     
981     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
982             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
983             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
984
985     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
986     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
987     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
988
989
990     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
991     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
992     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
993     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
994     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
995     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
996     if (useDynamicSMSG)
997     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
998
999     fclose(counterLog);
1000 }
1001
1002 #else
1003 #define STATS_PUMPNETWORK_TIME(x)                  x
1004 #define STATS_SEND_SMSGS_TIME(x)                   x
1005 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
1006 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1007 #define STATS_SENDRDMAMSG_TIME(x)                  x
1008 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1009 #endif
1010
1011 static void
1012 allgather(void *in,void *out, int len)
1013 {
1014     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1015     int i,rc;
1016     int my_rank;
1017     char *tmp_buf,*out_ptr;
1018
1019     if(!already_called) {
1020
1021         rc = PMI_Get_size(&job_size);
1022         CmiAssert(rc == PMI_SUCCESS);
1023         rc = PMI_Get_rank(&my_rank);
1024         CmiAssert(rc == PMI_SUCCESS);
1025
1026         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1027         CmiAssert(ivec_ptr != NULL);
1028
1029         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1030         CmiAssert(rc == PMI_SUCCESS);
1031
1032         already_called = 1;
1033
1034     }
1035
1036     tmp_buf = (char *)malloc(job_size * len);
1037     CmiAssert(tmp_buf);
1038
1039     rc = PMI_Allgather(in,tmp_buf,len);
1040     CmiAssert(rc == PMI_SUCCESS);
1041
1042     out_ptr = out;
1043
1044     for(i=0;i<job_size;i++) {
1045
1046         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1047
1048     }
1049
1050     free(tmp_buf);
1051 }
1052
1053 static void
1054 allgather_2(void *in,void *out, int len)
1055 {
1056     //PMI_Allgather is out of order
1057     int i,rc, extend_len;
1058     int  rank_index;
1059     char *out_ptr, *out_ref;
1060     char *in2;
1061
1062     extend_len = sizeof(int) + len;
1063     in2 = (char*)malloc(extend_len);
1064
1065     memcpy(in2, &myrank, sizeof(int));
1066     memcpy(in2+sizeof(int), in, len);
1067
1068     out_ptr = (char*)malloc(mysize*extend_len);
1069
1070     rc = PMI_Allgather(in2, out_ptr, extend_len);
1071     GNI_RC_CHECK("allgather", rc);
1072
1073     out_ref = out;
1074
1075     for(i=0;i<mysize;i++) {
1076         //rank index 
1077         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1078         //copy to the rank index slot
1079         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1080     }
1081
1082     free(out_ptr);
1083     free(in2);
1084
1085 }
1086
1087 static unsigned int get_gni_nic_address(int device_id)
1088 {
1089     unsigned int address, cpu_id;
1090     gni_return_t status;
1091     int i, alps_dev_id=-1,alps_address=-1;
1092     char *token, *p_ptr;
1093
1094     p_ptr = getenv("PMI_GNI_DEV_ID");
1095     if (!p_ptr) {
1096         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1097        
1098         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1099     } else {
1100         while ((token = strtok(p_ptr,":")) != NULL) {
1101             alps_dev_id = atoi(token);
1102             if (alps_dev_id == device_id) {
1103                 break;
1104             }
1105             p_ptr = NULL;
1106         }
1107         CmiAssert(alps_dev_id != -1);
1108         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1109         CmiAssert(p_ptr != NULL);
1110         i = 0;
1111         while ((token = strtok(p_ptr,":")) != NULL) {
1112             if (i == alps_dev_id) {
1113                 alps_address = atoi(token);
1114                 break;
1115             }
1116             p_ptr = NULL;
1117             ++i;
1118         }
1119         CmiAssert(alps_address != -1);
1120         address = alps_address;
1121     }
1122     return address;
1123 }
1124
1125 static uint8_t get_ptag(void)
1126 {
1127     char *p_ptr, *token;
1128     uint8_t ptag;
1129
1130     p_ptr = getenv("PMI_GNI_PTAG");
1131     CmiAssert(p_ptr != NULL);
1132     token = strtok(p_ptr, ":");
1133     ptag = (uint8_t)atoi(token);
1134     return ptag;
1135         
1136 }
1137
1138 static uint32_t get_cookie(void)
1139 {
1140     uint32_t cookie;
1141     char *p_ptr, *token;
1142
1143     p_ptr = getenv("PMI_GNI_COOKIE");
1144     CmiAssert(p_ptr != NULL);
1145     token = strtok(p_ptr, ":");
1146     cookie = (uint32_t)atoi(token);
1147
1148     return cookie;
1149 }
1150
1151 #if LARGEPAGE
1152
1153 /* directly mmap memory from hugetlbfs for large pages */
1154
1155 #include <sys/stat.h>
1156 #include <fcntl.h>
1157 #include <sys/mman.h>
1158 #include <hugetlbfs.h>
1159
1160 // size must be _tlbpagesize aligned
1161 void *my_get_huge_pages(size_t size)
1162 {
1163     char filename[512];
1164     int fd;
1165     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1166     void *ptr = NULL;
1167
1168     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1169     fd = open(filename, O_RDWR | O_CREAT, mode);
1170     if (fd == -1) {
1171         CmiAbort("my_get_huge_pages: open filed");
1172     }
1173     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1174     if (ptr == MAP_FAILED) ptr = NULL;
1175 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1176     close(fd);
1177     unlink(filename);
1178     return ptr;
1179 }
1180
1181 void my_free_huge_pages(void *ptr, int size)
1182 {
1183 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1184     int ret = munmap(ptr, size);
1185     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1186 }
1187
1188 #endif
1189
1190 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1191 /* TODO: add any that are related */
1192 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1193
1194
1195 #include "machine-lrts.h"
1196 #include "machine-common-core.c"
1197
1198 /* Network progress function is used to poll the network when for
1199    messages. This flushes receive buffers on some  implementations*/
1200 #if CMK_MACHINE_PROGRESS_DEFINED
1201 void CmiMachineProgressImpl() {
1202 }
1203 #endif
1204
1205 static int SendBufferMsg(SMSG_QUEUE *queue);
1206 static void SendRdmaMsg();
1207 static void PumpNetworkSmsg();
1208 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1209 #if CQWRITE
1210 static void PumpCqWriteTransactions();
1211 #endif
1212 #if REMOTE_EVENT
1213 static void PumpRemoteTransactions();
1214 #endif
1215
1216 #if MACHINE_DEBUG_LOG
1217 FILE *debugLog = NULL;
1218 static CmiInt8 buffered_recv_msg = 0;
1219 int         lrts_smsg_success = 0;
1220 int         lrts_received_msg = 0;
1221 #endif
1222
1223 static void sweep_mempool(mempool_type *mptr)
1224 {
1225     int n = 0;
1226     block_header *current = &(mptr->block_head);
1227
1228     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1229     while( current!= NULL) {
1230         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1231         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1232     }
1233     printf("[n %d] sweep_mempool slot END.\n", myrank);
1234 }
1235
1236 inline
1237 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1238 {
1239     block_header *current = *from;
1240
1241     //while(register_memory_size>= MAX_REG_MEM)
1242     //{
1243         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1244             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1245
1246         *from = current;
1247         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1248         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1249         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1250     //}
1251     return GNI_RC_SUCCESS;
1252 }
1253
1254 inline 
1255 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1256 {
1257     gni_return_t status = GNI_RC_SUCCESS;
1258     //int size = GetMempoolsize(msg);
1259     //void *blockaddr = GetMempoolBlockPtr(msg);
1260     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1261    
1262     block_header *current = &(mptr->block_head);
1263     while(register_memory_size>= MAX_REG_MEM)
1264     {
1265         status = deregisterMemory(mptr, &current);
1266         if (status != GNI_RC_SUCCESS) break;
1267     }
1268     if(register_memory_size>= MAX_REG_MEM) return status;
1269
1270     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1271     while(1)
1272     {
1273         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1274         if(status == GNI_RC_SUCCESS)
1275         {
1276             break;
1277         }
1278         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1279         {
1280             GNI_RC_CHECK("registerFromMempool", status);
1281         }
1282         else
1283         {
1284             status = deregisterMemory(mptr, &current);
1285             if (status != GNI_RC_SUCCESS) break;
1286         }
1287     }; 
1288     return status;
1289 }
1290
1291 inline 
1292 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1293 {
1294     static int rank = -1;
1295     int i;
1296     gni_return_t status;
1297     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1298     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1299     mempool_type *mptr;
1300
1301     status = registerFromMempool(mptr1, msg, size, t, cqh);
1302     if (status == GNI_RC_SUCCESS) return status;
1303 #if CMK_SMP 
1304     for (i=0; i<CmiMyNodeSize()+1; i++) {
1305       rank = (rank+1)%(CmiMyNodeSize()+1);
1306       mptr = CpvAccessOther(mempool, rank);
1307       if (mptr == mptr1) continue;
1308       status = registerFromMempool(mptr, msg, size, t, cqh);
1309       if (status == GNI_RC_SUCCESS) return status;
1310     }
1311 #endif
1312     return  GNI_RC_ERROR_RESOURCE;
1313 }
1314
1315 inline
1316 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1317 {
1318     MSG_LIST        *msg_tmp;
1319     MallocMsgList(msg_tmp);
1320     msg_tmp->destNode = destNode;
1321     msg_tmp->size   = size;
1322     msg_tmp->msg    = msg;
1323     msg_tmp->tag    = tag;
1324 #if CMK_WITH_STATS
1325     SMSG_CREATION(msg_tmp)
1326 #endif
1327 #if !CMK_SMP
1328     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1329         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1330         queue->smsg_head_index = destNode;
1331         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1332     }else
1333     {
1334         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1335         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1336     }
1337 #else
1338 #if ONE_SEND_QUEUE
1339     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1340 #else
1341 #if SMP_LOCKS
1342     CmiLock(queue->smsg_msglist_index[destNode].lock);
1343     if(queue->smsg_msglist_index[destNode].pushed == 0)
1344     {
1345         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1346     }
1347     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1348     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1349 #else
1350     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1351 #endif
1352 #endif
1353 #endif
1354 #if PRINT_SYH
1355     buffered_smsg_counter++;
1356 #endif
1357 }
1358
1359 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1360 {
1361     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1362 }
1363
1364 inline
1365 static void setup_smsg_connection(int destNode)
1366 {
1367     mdh_addr_list_t  *new_entry = 0;
1368     gni_post_descriptor_t *pd;
1369     gni_smsg_attr_t      *smsg_attr;
1370     gni_return_t status = GNI_RC_NOT_DONE;
1371     RDMA_REQUEST        *rdma_request_msg;
1372     
1373     if(smsg_available_slot == smsg_expand_slots)
1374     {
1375         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1376         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1377         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1378
1379         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1380             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1381             GNI_MEM_READWRITE,   
1382             -1,
1383             &(new_entry->mdh));
1384         smsg_available_slot = 0; 
1385         new_entry->next = smsg_dynamic_list;
1386         smsg_dynamic_list = new_entry;
1387     }
1388     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1389     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1390     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1391     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1392     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1393     smsg_attr->buff_size = smsg_memlen;
1394     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1395     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1396     smsg_local_attr_vec[destNode] = smsg_attr;
1397     smsg_available_slot++;
1398     MallocPostDesc(pd);
1399     pd->type            = GNI_POST_FMA_PUT;
1400     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1401     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1402     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1403     pd->length          = sizeof(gni_smsg_attr_t);
1404     pd->local_addr      = (uint64_t) smsg_attr;
1405     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1406     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1407     pd->src_cq_hndl     = rdma_tx_cqh;
1408     pd->rdma_mode       = 0;
1409     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1410     print_smsg_attr(smsg_attr);
1411     if(status == GNI_RC_ERROR_RESOURCE )
1412     {
1413         MallocRdmaRequest(rdma_request_msg);
1414         rdma_request_msg->destNode = destNode;
1415         rdma_request_msg->pd = pd;
1416         /* buffer this request */
1417     }
1418 #if PRINT_SYH
1419     if(status != GNI_RC_SUCCESS)
1420        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1421     else
1422         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1423 #endif
1424 }
1425
1426 /* useDynamicSMSG */
1427 inline 
1428 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1429 {
1430     gni_return_t status = GNI_RC_NOT_DONE;
1431
1432     if(mailbox_list->offset == mailbox_list->size)
1433     {
1434         dynamic_smsg_mailbox_t *new_mailbox_entry;
1435         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1436         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1437         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1438         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1439         new_mailbox_entry->offset = 0;
1440         
1441         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1442             new_mailbox_entry->size, smsg_rx_cqh,
1443             GNI_MEM_READWRITE,   
1444             -1,
1445             &(new_mailbox_entry->mem_hndl));
1446
1447         GNI_RC_CHECK("register", status);
1448         new_mailbox_entry->next = mailbox_list;
1449         mailbox_list = new_mailbox_entry;
1450     }
1451     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1452     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1453     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1454     local_smsg_attr->mbox_offset = mailbox_list->offset;
1455     mailbox_list->offset += smsg_memlen;
1456     local_smsg_attr->buff_size = smsg_memlen;
1457     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1458     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1459 }
1460
1461 /* useDynamicSMSG */
1462 inline 
1463 static int connect_to(int destNode)
1464 {
1465     gni_return_t status = GNI_RC_NOT_DONE;
1466     CmiAssert(smsg_connected_flag[destNode] == 0);
1467     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1468     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1469     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1470     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1471     
1472     CMI_GNI_LOCK(global_gni_lock)
1473     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1474     CMI_GNI_UNLOCK(global_gni_lock)
1475     if (status == GNI_RC_ERROR_RESOURCE) {
1476       /* possibly destNode is making connection at the same time */
1477       free(smsg_attr_vector_local[destNode]);
1478       smsg_attr_vector_local[destNode] = NULL;
1479       free(smsg_attr_vector_remote[destNode]);
1480       smsg_attr_vector_remote[destNode] = NULL;
1481       mailbox_list->offset -= smsg_memlen;
1482 #if PRINT_SYH
1483     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1484 #endif
1485       return 0;
1486     }
1487     GNI_RC_CHECK("GNI_Post", status);
1488     smsg_connected_flag[destNode] = 1;
1489 #if PRINT_SYH
1490     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1491 #endif
1492     return 1;
1493 }
1494
1495 inline 
1496 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1497 {
1498     unsigned int          remote_address;
1499     uint32_t              remote_id;
1500     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1501     gni_smsg_attr_t       *smsg_attr;
1502     gni_post_descriptor_t *pd;
1503     gni_post_state_t      post_state;
1504     char                  *real_data; 
1505
1506     if (useDynamicSMSG) {
1507         switch (smsg_connected_flag[destNode]) {
1508         case 0: 
1509             connect_to(destNode);         /* continue to case 1 */
1510         case 1:                           /* pending connection, do nothing */
1511             status = GNI_RC_NOT_DONE;
1512             if(inbuff ==0)
1513                 buffer_small_msgs(queue, msg, size, destNode, tag);
1514             return status;
1515         }
1516     }
1517 #if CMK_SMP
1518 #if ! ONE_SEND_QUEUE
1519     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1520 #endif
1521     {
1522 #else
1523     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1524     {
1525 #endif
1526         //CMI_GNI_LOCK(smsg_mailbox_lock)
1527         CMI_GNI_LOCK(default_tx_cq_lock)
1528 #if CMK_SMP_TRACE_COMMTHREAD
1529         int oldpe = -1;
1530         int oldeventid = -1;
1531         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1532         { 
1533             START_EVENT();
1534             if ( tag == SMALL_DATA_TAG)
1535                 real_data = (char*)msg; 
1536             else 
1537                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1538             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1539             TRACE_COMM_SET_COMM_MSGID(real_data);
1540         }
1541 #endif
1542 #if REMOTE_EVENT
1543         if (tag == LMSG_INIT_TAG) {
1544             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1545             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1546                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1547         }
1548         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1549 #endif
1550 #if     CMK_WITH_STATS
1551         SMSG_TRY_SEND(tag)
1552 #endif
1553 #if CMK_WITH_STATS
1554     double              creation_time;
1555     if (ptr == NULL)
1556         creation_time = CmiWallTimer();
1557     else
1558         creation_time = ptr->creation_time;
1559 #endif
1560
1561     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1562 #if CMK_SMP_TRACE_COMMTHREAD
1563         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1564 #endif
1565         CMI_GNI_UNLOCK(default_tx_cq_lock)
1566         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1567         if(status == GNI_RC_SUCCESS)
1568         {
1569 #if     CMK_WITH_STATS
1570             SMSG_SENT_DONE(creation_time,tag) 
1571 #endif
1572 #if CMK_SMP_TRACE_COMMTHREAD
1573             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1574             { 
1575                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1576             }
1577 #endif
1578         }else
1579             status = GNI_RC_ERROR_RESOURCE;
1580     }
1581     if(status != GNI_RC_SUCCESS && inbuff ==0)
1582         buffer_small_msgs(queue, msg, size, destNode, tag);
1583     return status;
1584 }
1585
1586 inline 
1587 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1588 {
1589     /* construct a control message and send */
1590     CONTROL_MSG         *control_msg_tmp;
1591     MallocControlMsg(control_msg_tmp);
1592     control_msg_tmp->source_addr = (uint64_t)msg;
1593     control_msg_tmp->seq_id    = seqno;
1594     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1595 #if REMOTE_EVENT
1596     control_msg_tmp->ack_index    =  -1;
1597 #endif
1598 #if     USE_LRTS_MEMPOOL
1599     if(size < BIG_MSG)
1600     {
1601         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1602     }
1603     else
1604     {
1605         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1606         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1607         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1608     }
1609 #else
1610     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1611 #endif
1612     return control_msg_tmp;
1613 }
1614
1615 #define BLOCKING_SEND_CONTROL    0
1616
1617 // Large message, send control to receiver, receiver register memory and do a GET, 
1618 // return 1 - send no success
1619 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1620 {
1621     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1622     uint32_t            vmdh_index  = -1;
1623     int                 size;
1624     int                 offset = 0;
1625     uint64_t            source_addr;
1626     int                 register_size; 
1627     void                *msg;
1628
1629     size    =   control_msg_tmp->total_length;
1630     source_addr = control_msg_tmp->source_addr;
1631     register_size = control_msg_tmp->length;
1632
1633 #if  USE_LRTS_MEMPOOL
1634     if( control_msg_tmp->seq_id == 0 ){
1635 #if BLOCKING_SEND_CONTROL
1636         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1637             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1638                 LrtsAdvanceCommunication(0);
1639         }
1640 #endif
1641         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1642         {
1643             msg = (void*)source_addr;
1644             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1645             {
1646                 if(!inbuff)
1647                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1648                 return GNI_RC_ERROR_NOMEM;
1649             }
1650             //register the corresponding mempool
1651             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1652             if(status == GNI_RC_SUCCESS)
1653             {
1654                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1655             }
1656         }else
1657         {
1658             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1659             status = GNI_RC_SUCCESS;
1660         }
1661         if(NoMsgInSend(source_addr))
1662             register_size = GetMempoolsize((void*)(source_addr));
1663         else
1664             register_size = 0;
1665     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1666     {
1667         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1668         source_addr += offset;
1669         size = control_msg_tmp->length;
1670 #if BLOCKING_SEND_CONTROL
1671         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1672             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1673                 LrtsAdvanceCommunication(0);
1674         }
1675 #endif
1676         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1677             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1678             {
1679                 if(!inbuff)
1680                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1681                 return GNI_RC_ERROR_NOMEM;
1682             }
1683             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1684             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1685         }
1686         else
1687         {
1688             status = GNI_RC_SUCCESS;
1689         }
1690         register_size = 0;  
1691     }
1692
1693 #if CMI_EXERT_SEND_CAP
1694     if(SEND_large_pending >= SEND_large_cap)
1695     {
1696         status = GNI_RC_ERROR_NOMEM;
1697     }
1698 #endif
1699  
1700     if(status == GNI_RC_SUCCESS)
1701     {
1702        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1703         if(status == GNI_RC_SUCCESS)
1704         {
1705 #if CMI_EXERT_SEND_CAP
1706             SEND_large_pending++;
1707 #endif
1708             buffered_send_msg += register_size;
1709             if(control_msg_tmp->seq_id == 0)
1710             {
1711                 IncreaseMsgInSend(source_addr);
1712             }
1713             FreeControlMsg(control_msg_tmp);
1714             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1715         }else
1716             status = GNI_RC_ERROR_RESOURCE;
1717
1718     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1719     {
1720         CmiAbort("Memory registor for large msg\n");
1721     }else 
1722     {
1723         status = GNI_RC_ERROR_NOMEM; 
1724         if(!inbuff)
1725             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1726     }
1727     return status;
1728 #else
1729     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1730     if(status == GNI_RC_SUCCESS)
1731     {
1732         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1733         if(status == GNI_RC_SUCCESS)
1734         {
1735             FreeControlMsg(control_msg_tmp);
1736         }
1737     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1738     {
1739         CmiAbort("Memory registor for large msg\n");
1740     }else 
1741     {
1742         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1743     }
1744     return status;
1745 #endif
1746 }
1747
1748 inline void LrtsPrepareEnvelope(char *msg, int size)
1749 {
1750     CmiSetMsgSize(msg, size);
1751     CMI_SET_CHECKSUM(msg, size);
1752 }
1753
1754 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1755 {
1756     gni_return_t        status  =   GNI_RC_SUCCESS;
1757     uint8_t tag;
1758     CONTROL_MSG         *control_msg_tmp;
1759     int                 oob = ( mode & OUT_OF_BAND);
1760     SMSG_QUEUE          *queue;
1761
1762     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1763 #if CMK_USE_OOB
1764     queue = oob? &smsg_oob_queue : &smsg_queue;
1765 #else
1766     queue = &smsg_queue;
1767 #endif
1768
1769     LrtsPrepareEnvelope(msg, size);
1770
1771 #if PRINT_SYH
1772     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1773 #endif 
1774 #if CMK_SMP 
1775     if(size <= SMSG_MAX_MSG)
1776         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1777     else if (size < BIG_MSG) {
1778         control_msg_tmp =  construct_control_msg(size, msg, 0);
1779         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1780     }
1781     else {
1782           CmiSetMsgSeq(msg, 0);
1783           control_msg_tmp =  construct_control_msg(size, msg, 1);
1784           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1785     }
1786 #else   //non-smp, smp(worker sending)
1787     if(size <= SMSG_MAX_MSG)
1788     {
1789         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1790             CmiFree(msg);
1791     }
1792     else if (size < BIG_MSG) {
1793         control_msg_tmp =  construct_control_msg(size, msg, 0);
1794         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1795     }
1796     else {
1797 #if     USE_LRTS_MEMPOOL
1798         CmiSetMsgSeq(msg, 0);
1799         control_msg_tmp =  construct_control_msg(size, msg, 1);
1800         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1801 #else
1802         control_msg_tmp =  construct_control_msg(size, msg, 0);
1803         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1804 #endif
1805     }
1806 #endif
1807     return 0;
1808 }
1809
1810 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1811 {
1812   int i;
1813 #if CMK_BROADCAST_USE_CMIREFERENCE
1814   for(i=0;i<npes;i++) {
1815     if (pes[i] == CmiMyPe())
1816       CmiSyncSend(pes[i], len, msg);
1817     else {
1818       CmiReference(msg);
1819       CmiSyncSendAndFree(pes[i], len, msg);
1820     }
1821   }
1822 #else
1823   for(i=0;i<npes;i++) {
1824     CmiSyncSend(pes[i], len, msg);
1825   }
1826 #endif
1827 }
1828
1829 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1830 {
1831   /* A better asynchronous implementation may be wanted, but at least it works */
1832   CmiSyncListSendFn(npes, pes, len, msg);
1833   return (CmiCommHandle) 0;
1834 }
1835
1836 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1837 {
1838   if (npes == 1) {
1839       CmiSyncSendAndFree(pes[0], len, msg);
1840       return;
1841   }
1842 #if CMK_PERSISTENT_COMM
1843   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE) {
1844       int i;
1845       for(i=0;i<npes;i++) {
1846         if (pes[i] == CmiMyPe())
1847           CmiSyncSend(pes[i], len, msg);
1848         else {
1849           CmiReference(msg);
1850           CmiSyncSendAndFree(pes[i], len, msg);
1851         }
1852       }
1853       CmiFree(msg);
1854       return;
1855   }
1856 #endif
1857   
1858 #if CMK_BROADCAST_USE_CMIREFERENCE
1859   CmiSyncListSendFn(npes, pes, len, msg);
1860   CmiFree(msg);
1861 #else
1862   int i;
1863   for(i=0;i<npes-1;i++) {
1864     CmiSyncSend(pes[i], len, msg);
1865   }
1866   if (npes>0)
1867     CmiSyncSendAndFree(pes[npes-1], len, msg);
1868   else 
1869     CmiFree(msg);
1870 #endif
1871 }
1872
1873 static void    PumpDatagramConnection();
1874 static      int         event_SetupConnect = 111;
1875 static      int         event_PumpSmsg = 222 ;
1876 static      int         event_PumpTransaction = 333;
1877 static      int         event_PumpRdmaTransaction = 444;
1878 static      int         event_SendBufferSmsg = 444;
1879 static      int         event_SendFmaRdmaMsg = 555;
1880 static      int         event_AdvanceCommunication = 666;
1881
1882 static void registerUserTraceEvents() {
1883 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1884     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1885     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1886     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1887     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1888     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1889     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1890     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1891 #endif
1892 }
1893
1894 static void ProcessDeadlock()
1895 {
1896     static CmiUInt8 *ptr = NULL;
1897     static CmiUInt8  last = 0, mysum, sum;
1898     static int count = 0;
1899     gni_return_t status;
1900     int i;
1901
1902 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1903 //sweep_mempool(CpvAccess(mempool));
1904     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1905     mysum = smsg_send_count + smsg_recv_count;
1906     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1907     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1908     GNI_RC_CHECK("PMI_Allgather", status);
1909     sum = 0;
1910     for (i=0; i<mysize; i++)  sum+= ptr[i];
1911     if (last == 0 || sum == last) 
1912         count++;
1913     else
1914         count = 0;
1915     last = sum;
1916     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1917     if (count == 2) { 
1918         /* detected twice, it is a real deadlock */
1919         if (myrank == 0)  {
1920             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1921             CmiAbort("Fatal> Deadlock detected.");
1922         }
1923
1924     }
1925     _detected_hang = 0;
1926 }
1927
1928 static void CheckProgress()
1929 {
1930     if (smsg_send_count == last_smsg_send_count &&
1931         smsg_recv_count == last_smsg_recv_count ) 
1932     {
1933         _detected_hang = 1;
1934 #if !CMK_SMP
1935         if (_detected_hang) ProcessDeadlock();
1936 #endif
1937
1938     }
1939     else {
1940         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1941         last_smsg_send_count = smsg_send_count;
1942         last_smsg_recv_count = smsg_recv_count;
1943         _detected_hang = 0;
1944     }
1945 }
1946
1947 static void set_limit()
1948 {
1949     //if (!user_set_flag && CmiMyRank() == 0) {
1950     if (CmiMyRank() == 0) {
1951         int mynode = CmiPhysicalNodeID(CmiMyPe());
1952         int numpes = CmiNumPesOnPhysicalNode(mynode);
1953         int numprocesses = numpes / CmiMyNodeSize();
1954         MAX_REG_MEM  = _totalmem / numprocesses;
1955         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1956         if (CmiMyPe() == 0)
1957            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1958         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1959         {
1960              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1961              CmiAbort("memory registration\n");
1962         }
1963     }
1964 }
1965
1966 void LrtsPostCommonInit(int everReturn)
1967 {
1968 #if CMK_DIRECT
1969     CmiDirectInit();
1970 #endif
1971 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1972     CpvInitialize(double, projTraceStart);
1973     /* only PE 0 needs to care about registration (to generate sts file). */
1974     //if (CmiMyPe() == 0) 
1975     {
1976         registerMachineUserEventsFunction(&registerUserTraceEvents);
1977     }
1978 #endif
1979
1980 #if CMK_SMP
1981     CmiIdleState *s=CmiNotifyGetState();
1982     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1983     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1984 #else
1985     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1986     if (useDynamicSMSG)
1987     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1988 #endif
1989
1990 #if ! LARGEPAGE
1991     if (_checkProgress)
1992 #if CMK_SMP
1993     if (CmiMyRank() == 0)
1994 #endif
1995     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1996 #endif
1997  
1998 #if !LARGEPAGE
1999     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
2000 #endif
2001 }
2002
2003 /* this is called by worker thread */
2004 void LrtsPostNonLocal(){
2005 #if CMK_SMP_TRACE_COMMTHREAD
2006     double startT, endT;
2007 #endif
2008 #if MULTI_THREAD_SEND
2009     if(mysize == 1) return;
2010 #if CMK_SMP_TRACE_COMMTHREAD
2011     traceEndIdle();
2012 #endif
2013
2014 #if CMK_SMP_TRACE_COMMTHREAD
2015     startT = CmiWallTimer();
2016 #endif
2017
2018 #if CMK_WORKER_SINGLE_TASK
2019     if (CmiMyRank() % 6 == 0)
2020 #endif
2021     PumpNetworkSmsg();
2022
2023 #if CMK_WORKER_SINGLE_TASK
2024     if (CmiMyRank() % 6 == 1)
2025 #endif
2026     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2027
2028 #if CMK_WORKER_SINGLE_TASK
2029     if (CmiMyRank() % 6 == 2)
2030 #endif
2031     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2032
2033 #if REMOTE_EVENT
2034 #if CMK_WORKER_SINGLE_TASK
2035     if (CmiMyRank() % 6 == 3)
2036 #endif
2037     PumpRemoteTransactions();
2038 #endif
2039
2040 #if CMK_WORKER_SINGLE_TASK
2041     if (CmiMyRank() % 6 == 4)
2042 #endif
2043 #if CMK_USE_OOB
2044     if (SendBufferMsg(&smsg_oob_queue) == 1)
2045 #endif
2046     {
2047         SendBufferMsg(&smsg_queue);
2048     }
2049
2050 #if CMK_WORKER_SINGLE_TASK
2051     if (CmiMyRank() % 6 == 5)
2052 #endif
2053     SendRdmaMsg();
2054
2055 #if CMK_SMP_TRACE_COMMTHREAD
2056     endT = CmiWallTimer();
2057     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2058 #endif
2059 #if CMK_SMP_TRACE_COMMTHREAD
2060     traceBeginIdle();
2061 #endif
2062 #endif
2063 }
2064
2065 /* useDynamicSMSG */
2066 static void    PumpDatagramConnection()
2067 {
2068     uint32_t          remote_address;
2069     uint32_t          remote_id;
2070     gni_return_t status;
2071     gni_post_state_t  post_state;
2072     uint64_t          datagram_id;
2073     int i;
2074
2075    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2076    {
2077        if (datagram_id >= mysize) {           /* bound endpoint */
2078            int pe = datagram_id - mysize;
2079            CMI_GNI_LOCK(global_gni_lock)
2080            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2081            CMI_GNI_UNLOCK(global_gni_lock)
2082            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2083            {
2084                CmiAssert(remote_id == pe);
2085                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2086                GNI_RC_CHECK("Dynamic SMSG Init", status);
2087 #if PRINT_SYH
2088                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2089 #endif
2090                CmiAssert(smsg_connected_flag[pe] == 1);
2091                smsg_connected_flag[pe] = 2;
2092            }
2093        }
2094        else {         /* unbound ep */
2095            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2096            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2097            {
2098                CmiAssert(remote_id<mysize);
2099                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2100                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2101                GNI_RC_CHECK("Dynamic SMSG Init", status);
2102 #if PRINT_SYH
2103                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2104 #endif
2105                smsg_connected_flag[remote_id] = 2;
2106
2107                alloc_smsg_attr(&send_smsg_attr);
2108                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2109                GNI_RC_CHECK("post unbound datagram", status);
2110            }
2111        }
2112    }
2113 }
2114
2115 /* pooling CQ to receive network message */
2116 static void PumpNetworkRdmaMsgs()
2117 {
2118     gni_cq_entry_t      event_data;
2119     gni_return_t        status;
2120
2121 }
2122
2123 inline 
2124 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
2125 {
2126     RDMA_REQUEST        *rdma_request_msg;
2127     MallocRdmaRequest(rdma_request_msg);
2128     rdma_request_msg->destNode = inst_id;
2129     rdma_request_msg->pd = pd;
2130 #if REMOTE_EVENT
2131     rdma_request_msg->ack_index = ack_index;
2132 #endif
2133 #if CMK_SMP
2134     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2135 #else
2136     if(sendRdmaBuf == 0)
2137     {
2138         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2139     }else{
2140         sendRdmaTail->next = rdma_request_msg;
2141         sendRdmaTail =  rdma_request_msg;
2142     }
2143 #endif
2144
2145 }
2146
2147 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2148
2149 static void PumpNetworkSmsg()
2150 {
2151     uint64_t            inst_id;
2152     int                 ret;
2153     gni_cq_entry_t      event_data;
2154     gni_return_t        status, status2;
2155     void                *header;
2156     uint8_t             msg_tag;
2157     int                 msg_nbytes;
2158     void                *msg_data;
2159     gni_mem_handle_t    msg_mem_hndl;
2160     gni_smsg_attr_t     *smsg_attr;
2161     gni_smsg_attr_t     *remote_smsg_attr;
2162     int                 init_flag;
2163     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2164     uint64_t            source_addr;
2165     SMSG_QUEUE         *queue = &smsg_queue;
2166 #if   CMK_DIRECT
2167     cmidirectMsg        *direct_msg;
2168 #endif
2169 #if CMI_EXERT_RECV_CAP
2170     int                  recv_cnt = 0;
2171 #endif
2172     while(1)
2173     {
2174         CMI_GNI_LOCK(smsg_rx_cq_lock)
2175         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2176         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2177         if(status != GNI_RC_SUCCESS)
2178             break;
2179         inst_id = GNI_CQ_GET_INST_ID(event_data);
2180 #if REMOTE_EVENT
2181         inst_id = GET_RANK(inst_id);      /* important */
2182 #endif
2183         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2184 #if PRINT_SYH
2185         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2186 #endif
2187         if (useDynamicSMSG) {
2188             /* subtle: smsg may come before connection is setup */
2189             while (smsg_connected_flag[inst_id] != 2) 
2190                PumpDatagramConnection();
2191         }
2192         msg_tag = GNI_SMSG_ANY_TAG;
2193         while(1) {
2194             CMI_GNI_LOCK(smsg_mailbox_lock)
2195             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2196             if (status != GNI_RC_SUCCESS)
2197             {
2198                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2199                 break;
2200             }
2201 #if PRINT_SYH
2202             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2203 #endif
2204             /* copy msg out and then put into queue (small message) */
2205             switch (msg_tag) {
2206             case SMALL_DATA_TAG:
2207             {
2208                 START_EVENT();
2209                 msg_nbytes = CmiGetMsgSize(header);
2210                 msg_data    = CmiAlloc(msg_nbytes);
2211                 memcpy(msg_data, (char*)header, msg_nbytes);
2212                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2213                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2214                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
2215                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2216                 handleOneRecvedMsg(msg_nbytes, msg_data);
2217                 break;
2218             }
2219             case LMSG_INIT_TAG:
2220             {
2221 #if MULTI_THREAD_SEND
2222                 MallocControlMsg(control_msg_tmp);
2223                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2224                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2225                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2226                 getLargeMsgRequest(control_msg_tmp, inst_id);
2227                 FreeControlMsg(control_msg_tmp);
2228 #else
2229                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2230                 getLargeMsgRequest(header, inst_id);
2231                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2232 #endif
2233                 break;
2234             }
2235 #if !REMOTE_EVENT && !CQWRITE
2236             case ACK_TAG:   //msg fit into mempool
2237             {
2238                 /* Get is done, release message . Now put is not used yet*/
2239                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2240                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2241                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2242 #if ! USE_LRTS_MEMPOOL
2243                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2244 #else
2245                 DecreaseMsgInSend(msg);
2246 #endif
2247                 if(NoMsgInSend(msg))
2248                     buffered_send_msg -= GetMempoolsize(msg);
2249                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2250                 CmiFree(msg);
2251 #if CMI_EXERT_SEND_CAP
2252                 SEND_large_pending--;
2253 #endif
2254                 break;
2255             }
2256 #endif
2257             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2258             {
2259 #if MULTI_THREAD_SEND
2260                 MallocControlMsg(header_tmp);
2261                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2262                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2263 #else
2264                 header_tmp = (CONTROL_MSG *) header;
2265 #endif
2266                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2267 #if CMI_EXERT_SEND_CAP
2268                     SEND_large_pending--;
2269 #endif
2270                 void *msg = (void*)(header_tmp->source_addr);
2271                 int cur_seq = CmiGetMsgSeq(msg);
2272                 int offset = ONE_SEG*(cur_seq+1);
2273                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2274                 buffered_send_msg -= header_tmp->length;
2275                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2276                 if (remain_size < 0) remain_size = 0;
2277                 CmiSetMsgSize(msg, remain_size);
2278                 if(remain_size <= 0) //transaction done
2279                 {
2280                     CmiFree(msg);
2281                 }else if (header_tmp->total_length > offset)
2282                 {
2283                     CmiSetMsgSeq(msg, cur_seq+1);
2284                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2285                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2286                     //send next seg
2287                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2288                          // pipelining
2289                     if (header_tmp->seq_id == 1) {
2290                       int i;
2291                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2292                         int seq = cur_seq+i+2;
2293                         CmiSetMsgSeq(msg, seq-1);
2294                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2295                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2296                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2297                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2298                       }
2299                     }
2300                 }
2301 #if MULTI_THREAD_SEND
2302                 FreeControlMsg(header_tmp);
2303 #else
2304                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2305 #endif
2306                 break;
2307             }
2308 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2309             case PUT_DONE_TAG:  {   //persistent message
2310                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2311                 int size = ((CONTROL_MSG *) header)->length;
2312                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2313                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2314                 CmiReference(msg);
2315                 CMI_CHECK_CHECKSUM(msg, size);
2316                 handleOneRecvedMsg(size, msg); 
2317 #if PRINT_SYH
2318                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2319 #endif
2320                 break;
2321             }
2322 #endif
2323 #if CMK_DIRECT
2324             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2325                 //create a trigger message
2326                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2327                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2328                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2329                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2330                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2331                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2332                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2333                 break;
2334 #endif
2335             default:
2336                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2337                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2338                 printf("weird tag problem\n");
2339                 CmiAbort("Unknown tag\n");
2340             }               // end switch
2341 #if PRINT_SYH
2342             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2343 #endif
2344             smsg_recv_count ++;
2345             msg_tag = GNI_SMSG_ANY_TAG;
2346 #if CMI_EXERT_RECV_CAP
2347             if (status == GNI_RC_SUCCESS && ++recv_cnt == RECV_CAP) return;
2348 #endif
2349         } //endwhile GNI_SmsgGetNextWTag
2350     }   //end while GetEvent
2351     if(status == GNI_RC_ERROR_RESOURCE)
2352     {
2353         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2354         GNI_RC_CHECK("Smsg_rx_cq full", status);
2355     }
2356 }
2357
2358 static void printDesc(gni_post_descriptor_t *pd)
2359 {
2360     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2361 }
2362
2363 #if CQWRITE
2364 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2365 {
2366     gni_post_descriptor_t *pd;
2367     gni_return_t        status = GNI_RC_SUCCESS;
2368     
2369     MallocPostDesc(pd);
2370     pd->type = GNI_POST_CQWRITE;
2371     pd->cq_mode = GNI_CQMODE_SILENT;
2372     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2373     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2374     pd->cqwrite_value = data;
2375     pd->remote_mem_hndl = mem_hndl;
2376     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2377     GNI_RC_CHECK("GNI_PostCqWrite", status);
2378 }
2379 #endif
2380
2381 // register memory for a message
2382 // return mem handle
2383 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2384 {
2385     gni_return_t status = GNI_RC_SUCCESS;
2386
2387     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2388
2389 #if CMK_PERSISTENT_COMM
2390       // persistent message is always registered
2391       // BIG_MSG small pieces do not have malloc chunk header
2392     if (IS_PERSISTENT_MEMORY(msg)) {
2393         *memh = GetMemHndl(msg);
2394         return GNI_RC_SUCCESS;
2395     }
2396 #endif
2397     if(seqno == 0 
2398 #if CMK_PERSISTENT_COMM
2399          || seqno == PERSIST_SEQ
2400 #endif
2401       )
2402     {
2403         if(IsMemHndlZero((GetMemHndl(msg))))
2404         {
2405             msg = (void*)(msg);
2406             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2407             if(status == GNI_RC_SUCCESS)
2408                 *memh = GetMemHndl(msg);
2409         }
2410         else {
2411             *memh = GetMemHndl(msg);
2412         }
2413     }
2414     else {
2415         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2416         status = registerMemory(msg, size, memh, NULL); 
2417     }
2418     return status;
2419 }
2420
2421 // for BIG_MSG called on receiver side for receiving control message
2422 // LMSG_INIT_TAG
2423 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2424 {
2425 #if     USE_LRTS_MEMPOOL
2426     CONTROL_MSG         *request_msg;
2427     gni_return_t        status = GNI_RC_SUCCESS;
2428     void                *msg_data;
2429     gni_post_descriptor_t *pd;
2430     gni_mem_handle_t    msg_mem_hndl;
2431     int                 size, transaction_size, offset = 0;
2432     size_t              register_size = 0;
2433
2434     // initial a get to transfer data from the sender side */
2435     request_msg = (CONTROL_MSG *) header;
2436     size = request_msg->total_length;
2437     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2438     MallocPostDesc(pd);
2439 #if CMK_WITH_STATS 
2440     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2441 #endif
2442     if(request_msg->seq_id < 2)   {
2443 #if CMK_SMP_TRACE_COMMTHREAD 
2444         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2445 #endif
2446         msg_data = CmiAlloc(size);
2447         CmiSetMsgSeq(msg_data, 0);
2448         _MEMCHECK(msg_data);
2449     }
2450     else {
2451         offset = ONE_SEG*(request_msg->seq_id-1);
2452         msg_data = (char*)request_msg->dest_addr + offset;
2453     }
2454    
2455     pd->cqwrite_value = request_msg->seq_id;
2456
2457     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2458     SetMemHndlZero(pd->local_mem_hndl);
2459     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2460     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2461         if(NoMsgInRecv( (void*)(msg_data)))
2462             register_size = GetMempoolsize((void*)(msg_data));
2463     }
2464
2465     pd->first_operand = ALIGN64(size);                   //  total length
2466
2467     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2468         pd->type            = GNI_POST_FMA_GET;
2469     else
2470         pd->type            = GNI_POST_RDMA_GET;
2471     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2472     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2473     pd->length          = transaction_size;
2474     pd->local_addr      = (uint64_t) msg_data;
2475     pd->remote_addr     = request_msg->source_addr + offset;
2476     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2477     pd->src_cq_hndl     = rdma_tx_cqh;
2478     pd->rdma_mode       = 0;
2479     pd->amo_cmd         = 0;
2480 #if CMI_EXERT_RDMA_CAP
2481     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2482 #endif
2483     //memory registration success
2484     if(status == GNI_RC_SUCCESS )
2485     {
2486         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2487         CMI_GNI_LOCK(lock)
2488 #if REMOTE_EVENT
2489         if( request_msg->seq_id == 0)
2490         {
2491             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2492             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2493             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2494         }
2495 #endif
2496
2497 #if CMK_WITH_STATS
2498         RDMA_TRY_SEND(pd->type)
2499 #endif
2500         if(pd->type == GNI_POST_RDMA_GET) 
2501         {
2502             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2503         }
2504         else
2505         {
2506             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2507         }
2508         CMI_GNI_UNLOCK(lock)
2509
2510         if(status == GNI_RC_SUCCESS )
2511         {
2512 #if CMI_EXERT_RDMA_CAP
2513             RDMA_pending++;
2514 #endif
2515             if(pd->cqwrite_value == 0)
2516             {
2517 #if MACHINE_DEBUG_LOG
2518                 buffered_recv_msg += register_size;
2519                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2520 #endif
2521                 IncreaseMsgInRecv(msg_data);
2522 #if CMK_SMP_TRACE_COMMTHREAD 
2523                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2524 #endif
2525             }
2526 #if  CMK_WITH_STATS
2527             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2528             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2529 #endif
2530         }
2531     }else
2532     {
2533         SetMemHndlZero((pd->local_mem_hndl));
2534     }
2535     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2536     {
2537 #if REMOTE_EVENT
2538         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2539 #else
2540         bufferRdmaMsg(inst_id, pd, -1); 
2541 #endif
2542     }else if (status != GNI_RC_SUCCESS) {
2543         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2544         GNI_RC_CHECK("GetLargeAFter posting", status);
2545     }
2546 #else
2547     CONTROL_MSG         *request_msg;
2548     gni_return_t        status;
2549     void                *msg_data;
2550     gni_post_descriptor_t *pd;
2551     RDMA_REQUEST        *rdma_request_msg;
2552     gni_mem_handle_t    msg_mem_hndl;
2553     //int source;
2554     // initial a get to transfer data from the sender side */
2555     request_msg = (CONTROL_MSG *) header;
2556     msg_data = CmiAlloc(request_msg->length);
2557     _MEMCHECK(msg_data);
2558
2559     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2560
2561     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2562     {
2563         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2564     }
2565
2566     MallocPostDesc(pd);
2567     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2568         pd->type            = GNI_POST_FMA_GET;
2569     else
2570         pd->type            = GNI_POST_RDMA_GET;
2571     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2572     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2573     pd->length          = ALIGN64(request_msg->length);
2574     pd->local_addr      = (uint64_t) msg_data;
2575     pd->remote_addr     = request_msg->source_addr;
2576     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2577     pd->src_cq_hndl     = rdma_tx_cqh;
2578     pd->rdma_mode       = 0;
2579     pd->amo_cmd         = 0;
2580
2581     //memory registration successful
2582     if(status == GNI_RC_SUCCESS)
2583     {
2584         pd->local_mem_hndl  = msg_mem_hndl;
2585        
2586         if(pd->type == GNI_POST_RDMA_GET) 
2587         {
2588             CMI_GNI_LOCK(rdma_tx_cq_lock)
2589             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2590             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2591         }
2592         else
2593         {
2594             CMI_GNI_LOCK(default_tx_cq_lock)
2595             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2596             CMI_GNI_UNLOCK(default_tx_cq_lock)
2597         }
2598
2599     }else
2600     {
2601         SetMemHndlZero(pd->local_mem_hndl);
2602     }
2603     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2604     {
2605         MallocRdmaRequest(rdma_request_msg);
2606         rdma_request_msg->next = 0;
2607         rdma_request_msg->destNode = inst_id;
2608         rdma_request_msg->pd = pd;
2609         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2610     }else {
2611         GNI_RC_CHECK("AFter posting", status);
2612     }
2613 #endif
2614 }
2615
2616 #if CQWRITE
2617 static void PumpCqWriteTransactions()
2618 {
2619
2620     gni_cq_entry_t          ev;
2621     gni_return_t            status;
2622     void                    *msg;  
2623     int                     msg_size;
2624     while(1) {
2625         //CMI_GNI_LOCK(my_cq_lock) 
2626         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2627         //CMI_GNI_UNLOCK(my_cq_lock)
2628         if(status != GNI_RC_SUCCESS) break;
2629         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2630 #if CMK_PERSISTENT_COMM
2631 #if PRINT_SYH
2632         printf(" %d CQ write event %p\n", myrank, msg);
2633 #endif
2634         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2635 #if PRINT_SYH
2636             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2637 #endif
2638             CmiReference(msg);
2639             msg_size = CmiGetMsgSize(msg);
2640             CMI_CHECK_CHECKSUM(msg, msg_size);
2641             handleOneRecvedMsg(msg_size, msg); 
2642             continue;
2643         }
2644 #endif
2645 #if ! USE_LRTS_MEMPOOL
2646        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2647 #else
2648         DecreaseMsgInSend(msg);
2649 #endif
2650         if(NoMsgInSend(msg))
2651             buffered_send_msg -= GetMempoolsize(msg);
2652         CmiFree(msg);
2653     };
2654     if(status == GNI_RC_ERROR_RESOURCE)
2655     {
2656         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2657     }
2658 }
2659 #endif
2660
2661 #if REMOTE_EVENT
2662 static void PumpRemoteTransactions()
2663 {
2664     gni_cq_entry_t          ev;
2665     gni_return_t            status;
2666     void                    *msg;   
2667     int                     inst_id, index, type, size;
2668
2669     while(1) {
2670         CMI_GNI_LOCK(rdma_tx_cq_lock)
2671 //        CMI_GNI_LOCK(global_gni_lock)
2672         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2673 //        CMI_GNI_UNLOCK(global_gni_lock)
2674         CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2675
2676         if(status != GNI_RC_SUCCESS) break;
2677
2678         inst_id = GNI_CQ_GET_INST_ID(ev);
2679         index = GET_INDEX(inst_id);
2680         type = GET_TYPE(inst_id);
2681         switch (type) {
2682         case 0:    // ACK
2683             CmiAssert(index>=0 && index<ackPool.size);
2684             CMI_GNI_LOCK(ackPool.lock);
2685             CmiAssert(GetIndexType(ackPool, index) == 1);
2686             msg = GetIndexAddress(ackPool, index);
2687             CMI_GNI_UNLOCK(ackPool.lock);
2688 #if PRINT_SYH
2689             printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2690 #endif
2691 #if ! USE_LRTS_MEMPOOL
2692            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2693 #else
2694             DecreaseMsgInSend(msg);
2695 #endif
2696             if(NoMsgInSend(msg))
2697                 buffered_send_msg -= GetMempoolsize(msg);
2698             CmiFree(msg);
2699             IndexPool_freeslot(&ackPool, index);
2700 #if CMI_EXERT_SEND_CAP
2701             SEND_large_pending--;
2702 #endif
2703             break;
2704 #if CMK_PERSISTENT_COMM
2705         case 1:  {    // PERSISTENT
2706             CmiLock(persistPool.lock);
2707             CmiAssert(GetIndexType(persistPool, index) == 2);
2708             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2709             CmiUnlock(persistPool.lock);
2710             START_EVENT();
2711             msg = slot->destBuf[0].destAddress;
2712             size = CmiGetMsgSize(msg);
2713             CmiReference(msg);
2714             CMI_CHECK_CHECKSUM(msg, size);
2715             TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2716             handleOneRecvedMsg(size, msg); 
2717             break;
2718             }
2719 #endif
2720         default:
2721             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2722             CmiAbort("PumpRemoteTransactions: unknown type");
2723         }
2724     }
2725     if(status == GNI_RC_ERROR_RESOURCE)
2726     {
2727         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2728     }
2729 }
2730 #endif
2731
2732 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2733 {
2734     gni_cq_entry_t          ev;
2735     gni_return_t            status;
2736     uint64_t                type, inst_id;
2737     gni_post_descriptor_t   *tmp_pd;
2738     MSG_LIST                *ptr;
2739     CONTROL_MSG             *ack_msg_tmp;
2740     ACK_MSG                 *ack_msg;
2741     uint8_t                 msg_tag;
2742 #if CMK_DIRECT
2743     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2744 #endif
2745     SMSG_QUEUE         *queue = &smsg_queue;
2746
2747     while(1) {
2748         CMI_GNI_LOCK(my_cq_lock) 
2749         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2750         CMI_GNI_UNLOCK(my_cq_lock)
2751         if(status != GNI_RC_SUCCESS) break;
2752         
2753         type = GNI_CQ_GET_TYPE(ev);
2754         if (type == GNI_CQ_EVENT_TYPE_POST)
2755         {
2756
2757 #if CMI_EXERT_RDMA_CAP
2758             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2759             RDMA_pending--;
2760 #endif
2761             inst_id     = GNI_CQ_GET_INST_ID(ev);
2762 #if PRINT_SYH
2763             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2764 #endif
2765             CMI_GNI_LOCK(my_cq_lock)
2766             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2767             CMI_GNI_UNLOCK(my_cq_lock)
2768
2769             switch (tmp_pd->type) {
2770 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2771             case GNI_POST_RDMA_PUT:
2772 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2773                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2774 #endif
2775             case GNI_POST_FMA_PUT:
2776                 if(tmp_pd->amo_cmd == 1) {
2777 #if CMK_DIRECT
2778                     //sender ACK to receiver to trigger it is done
2779                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2780                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2781                     msg_tag = DIRECT_PUT_DONE_TAG;
2782 #endif
2783                 }
2784                 else {
2785                     CmiFree((void *)tmp_pd->local_addr);
2786 #if REMOTE_EVENT
2787                     FreePostDesc(tmp_pd);
2788                     continue;
2789 #elif CQWRITE
2790                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2791                     FreePostDesc(tmp_pd);
2792                     continue;
2793 #else
2794                     MallocControlMsg(ack_msg_tmp);
2795                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2796                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2797                     ack_msg_tmp->length  = tmp_pd->length;
2798                     msg_tag = PUT_DONE_TAG;
2799 #endif
2800                 }
2801                 break;
2802 #endif
2803             case GNI_POST_RDMA_GET:
2804             case GNI_POST_FMA_GET:  {
2805 #if  ! USE_LRTS_MEMPOOL
2806                 MallocControlMsg(ack_msg_tmp);
2807                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2808                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2809                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2810                 msg_tag = ACK_TAG;  
2811 #else
2812 #if CMK_WITH_STATS
2813                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2814 #endif
2815                 int seq_id = tmp_pd->cqwrite_value;
2816                 if(seq_id > 0)      // BIG_MSG
2817                 {
2818                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2819                     MallocControlMsg(ack_msg_tmp);
2820                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2821                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2822                     ack_msg_tmp->seq_id = seq_id;
2823                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2824                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2825                     ack_msg_tmp->length = tmp_pd->length;
2826                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2827                     msg_tag = BIG_MSG_TAG; 
2828                 } 
2829                 else
2830                 {
2831                     msg_tag = ACK_TAG; 
2832 #if  !REMOTE_EVENT && !CQWRITE
2833                     MallocAckMsg(ack_msg);
2834                     ack_msg->source_addr = tmp_pd->remote_addr;
2835 #endif
2836                 }
2837 #endif
2838                 break;
2839             }
2840             case  GNI_POST_CQWRITE:
2841                    FreePostDesc(tmp_pd);
2842                    continue;
2843             default:
2844                 CmiPrintf("type=%d\n", tmp_pd->type);
2845                 CmiAbort("PumpLocalTransactions: unknown type!");
2846             }      /* end of switch */
2847
2848 #if CMK_DIRECT
2849             if (tmp_pd->amo_cmd == 1) {
2850                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2851                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2852             }
2853             else
2854 #endif
2855             if (msg_tag == ACK_TAG) {
2856 #if !REMOTE_EVENT
2857 #if   !CQWRITE
2858                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2859                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2860 #else
2861                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2862 #endif
2863 #endif
2864             }
2865             else {
2866                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2867                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2868             }
2869 #if CMK_PERSISTENT_COMM
2870             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2871 #endif
2872             {
2873                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2874 #if PRINT_SYH
2875                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2876 #endif
2877                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2878                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2879
2880                     START_EVENT();
2881                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2882                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2883 #if MACHINE_DEBUG_LOG
2884                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2885                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2886                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2887 #endif
2888                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2889                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2890                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2891                 }else if(msg_tag == BIG_MSG_TAG){
2892                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2893                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2894                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2895                         START_EVENT();
2896 #if PRINT_SYH
2897                         printf("Pipeline msg done [%d]\n", myrank);
2898 #endif
2899 #if                 CMK_SMP_TRACE_COMMTHREAD
2900                         if( tmp_pd->cqwrite_value == 1)
2901                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2902 #endif
2903                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2904                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2905                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2906                     }
2907                 }
2908             }
2909             FreePostDesc(tmp_pd);
2910         }
2911     } //end while
2912     if(status == GNI_RC_ERROR_RESOURCE)
2913     {
2914         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2915         GNI_RC_CHECK("Smsg_tx_cq full", status);
2916     }
2917 }
2918
2919 static void  SendRdmaMsg()
2920 {
2921     gni_return_t            status = GNI_RC_SUCCESS;
2922     gni_mem_handle_t        msg_mem_hndl;
2923     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2924     RDMA_REQUEST            *pre = 0;
2925     uint64_t                register_size = 0;
2926     void                    *msg;
2927     int                     i;
2928 #if CMK_SMP
2929     int len = PCQueueLength(sendRdmaBuf);
2930     for (i=0; i<len; i++)
2931     {
2932 #if CMI_EXERT_RDMA_CAP
2933         if( RDMA_pending >= RDMA_cap) break;
2934 #endif
2935         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2936         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2937         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2938         if (ptr == NULL) break;
2939 #else
2940     ptr = sendRdmaBuf;
2941     while (ptr!=0 )
2942     {
2943 #if CMI_EXERT_RDMA_CAP
2944          if( RDMA_pending >= RDMA_cap) break;
2945 #endif
2946 #endif 
2947         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2948         gni_post_descriptor_t *pd = ptr->pd;
2949         
2950         msg = (void*)(pd->local_addr);
2951         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2952         register_size = 0;
2953         if(pd->cqwrite_value == 0) {
2954             if(NoMsgInRecv(msg))
2955                 register_size = GetMempoolsize(msg);
2956         }
2957
2958         if(status == GNI_RC_SUCCESS)        //mem register good
2959         {
2960             int destNode = ptr->destNode;
2961             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2962             CMI_GNI_LOCK(lock);
2963 #if REMOTE_EVENT
2964             if( pd->cqwrite_value == 0) {
2965                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2966                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
2967                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2968             }
2969 #if CMK_PERSISTENT_COMM
2970             else if (pd->cqwrite_value == PERSIST_SEQ) {
2971                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2972                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
2973                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2974             }
2975 #endif
2976 #endif
2977 #if CMK_WITH_STATS
2978             RDMA_TRY_SEND(pd->type)
2979 #endif
2980 #if CMK_SMP_TRACE_COMMTHREAD
2981 //            int oldpe = -1;
2982 //            int oldeventid = -1;
2983 //            if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
2984 //            { 
2985 //                TRACE_COMM_GET_MSGID((void*)pd->local_addr, &oldpe, &oldeventid);
2986 //                TRACE_COMM_SET_COMM_MSGID((void*)pd->local_addr);
2987 //            }
2988               if(IS_PUT(pd->type) )
2989               { 
2990                   START_EVENT();
2991                   TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);
2992               }
2993 #endif
2994
2995             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2996             {
2997                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
2998             }
2999             else
3000             {
3001                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3002             }
3003             CMI_GNI_UNLOCK(lock);
3004             
3005 #if CMK_SMP_TRACE_COMMTHREAD
3006 //            if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
3007 //            { 
3008 //                if (oldpe != -1)  TRACE_COMM_SET_MSGID((void*)pd->local_addr, oldpe, oldeventid);
3009 //            }
3010 #endif
3011             if(status == GNI_RC_SUCCESS)    //post good
3012             {
3013 #if CMI_EXERT_RDMA_CAP
3014                 RDMA_pending ++;
3015 #endif
3016 #if !CMK_SMP
3017                 tmp_ptr = ptr;
3018                 if(pre != 0) {
3019                     pre->next = ptr->next;
3020                 }
3021                 else {
3022                     sendRdmaBuf = ptr->next;
3023                 }
3024                 ptr = ptr->next;
3025                 FreeRdmaRequest(tmp_ptr);
3026 #endif
3027                 if(pd->cqwrite_value == 0)
3028                 {
3029 #if CMK_SMP_TRACE_COMMTHREAD 
3030                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3031 #endif
3032                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3033                 }
3034 #if  CMK_WITH_STATS
3035                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3036                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3037 #endif
3038 #if MACHINE_DEBUG_LOG
3039                 buffered_recv_msg += register_size;
3040                 MACHSTATE(8, "GO request from buffered\n"); 
3041 #endif
3042 #if PRINT_SYH
3043                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
3044 #endif
3045             }else           // cannot post
3046             {
3047 #if CMK_SMP
3048                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3049 #else
3050                 pre = ptr;
3051                 ptr = ptr->next;
3052 #endif
3053 #if PRINT_SYH
3054                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3055 #endif
3056                 break;
3057             }
3058         } else          //memory registration fails
3059         {
3060 #if CMK_SMP
3061             PCQueuePush(sendRdmaBuf, (char*)ptr);
3062 #else
3063             pre = ptr;
3064             ptr = ptr->next;
3065 #endif
3066         }
3067     } //end while
3068 #if ! CMK_SMP
3069     if(ptr == 0)
3070         sendRdmaTail = pre;
3071 #endif
3072 }
3073
3074 // return 1 if all messages are sent
3075 static int SendBufferMsg(SMSG_QUEUE *queue)
3076 {
3077     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3078     CONTROL_MSG         *control_msg_tmp;
3079     gni_return_t        status;
3080     int                 done = 1;
3081     uint64_t            register_size;
3082     void                *register_addr;
3083     int                 index_previous = -1;
3084
3085 #if CMK_SMP
3086     int          index = 0;
3087 #if ONE_SEND_QUEUE
3088     memset(destpe_avail, 0, mysize * sizeof(char));
3089     for (index=0; index<1; index++)
3090     {
3091         int i, len = PCQueueLength(queue->sendMsgBuf);
3092         for (i=0; i<len; i++) 
3093         {
3094             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3095             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3096             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3097             if(ptr == NULL) break;
3098             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3099                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3100                 continue;
3101             }
3102 #else
3103 #if SMP_LOCKS
3104     int nonempty = PCQueueLength(nonEmptyQueues);
3105     for(index =0; index<nonempty; index++)
3106     {
3107         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
3108         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
3109         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
3110         if(current_list == NULL) break; 
3111         PCQueue current_queue= current_list-> sendSmsgBuf;
3112         CmiLock(current_list->lock);
3113         int i, len = PCQueueLength(current_queue);
3114         current_list->pushed = 0;
3115         CmiUnlock(current_list->lock);
3116 #else
3117     for(index =0; index<mysize; index++)
3118     {
3119         //if (index == myrank) continue;
3120         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3121         int i, len = PCQueueLength(current_queue);
3122 #endif
3123         for (i=0; i<len; i++) 
3124         {
3125             CMI_PCQUEUEPOP_LOCK(current_queue)
3126             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3127             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3128             if (ptr == 0) break;
3129 #endif
3130 #else
3131     int index = queue->smsg_head_index;
3132     while(index != -1)
3133     {
3134         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3135         pre = 0;
3136         while(ptr != 0)
3137         {
3138 #endif
3139             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3140             status = GNI_RC_ERROR_RESOURCE;
3141             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
3142                 /* connection not exists yet */
3143 #if CMK_SMP
3144                   /* non-smp case, connect is issued in send_smsg_message */
3145                 if (smsg_connected_flag[index] == 0)
3146                     connect_to(ptr->destNode); 
3147 #endif
3148             }
3149             else
3150             switch(ptr->tag)
3151             {
3152             case SMALL_DATA_TAG:
3153                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3154                 if(status == GNI_RC_SUCCESS)
3155                 {
3156                     CmiFree(ptr->msg);
3157                 }
3158                 break;
3159             case LMSG_INIT_TAG:
3160                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3161                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
3162                 break;
3163 #if !REMOTE_EVENT && !CQWRITE
3164             case ACK_TAG:
3165                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3166                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3167                 break;
3168 #endif
3169             case BIG_MSG_TAG:
3170                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3171                 if(status == GNI_RC_SUCCESS)
3172                 {
3173                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3174                 }
3175                 break;
3176 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3177             case PUT_DONE_TAG:
3178                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3179                 if(status == GNI_RC_SUCCESS)
3180                 {
3181                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3182                 }
3183                 break;
3184 #endif
3185 #if CMK_DIRECT
3186             case DIRECT_PUT_DONE_TAG:
3187                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3188                 if(status == GNI_RC_SUCCESS)
3189                 {
3190                     free((CMK_DIRECT_HEADER*)ptr->msg);
3191                 }
3192                 break;
3193
3194 #endif
3195             default:
3196                 printf("Weird tag\n");
3197                 CmiAbort("should not happen\n");
3198             }       // end switch
3199             if(status == GNI_RC_SUCCESS)
3200             {
3201 #if PRINT_SYH
3202                 buffered_smsg_counter--;
3203                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3204 #endif
3205 #if !CMK_SMP
3206                 tmp_ptr = ptr;
3207                 if(pre)
3208                 {
3209                     ptr = pre ->next = ptr->next;
3210                 }else
3211                 {
3212                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3213                 }
3214                 FreeMsgList(tmp_ptr);
3215 #else
3216                 FreeMsgList(ptr);
3217 #endif
3218             }else {
3219 #if CMK_SMP
3220 #if ONE_SEND_QUEUE
3221                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3222 #else
3223                 PCQueuePush(current_queue, (char*)ptr);
3224 #endif
3225 #else
3226                 pre = ptr;
3227                 ptr=ptr->next;
3228 #endif
3229                 done = 0;
3230                 if(status == GNI_RC_ERROR_RESOURCE)
3231                 {
3232 #if CMK_SMP && ONE_SEND_QUEUE 
3233                     destpe_avail[ptr->destNode] = 1;
3234 #else
3235                     break;
3236 #endif
3237                 }
3238             } 
3239         } //end while
3240 #if !CMK_SMP
3241         if(ptr == 0)
3242             queue->smsg_msglist_index[index].tail = pre;
3243         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3244         {
3245             if(index_previous != -1)
3246                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3247             else
3248                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3249         }
3250         else {
3251             index_previous = index;
3252         }
3253         index = queue->smsg_msglist_index[index].next;
3254 #else
3255 #if !ONE_SEND_QUEUE && SMP_LOCKS
3256         CmiLock(current_list->lock);
3257         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3258         {
3259             current_list->pushed = 1;
3260             PCQueuePush(nonEmptyQueues, (char*)current_list);
3261         }
3262         CmiUnlock(current_list->lock); 
3263 #endif
3264 #endif
3265
3266     }   // end pooling for all cores
3267     return done;
3268 }
3269
3270 static void ProcessDeadlock();
3271 void LrtsAdvanceCommunication(int whileidle)
3272 {
3273     static int count = 0;
3274     /*  Receive Msg first */
3275 #if CMK_SMP_TRACE_COMMTHREAD
3276     double startT, endT;
3277 #endif
3278     if (useDynamicSMSG && whileidle)
3279     {
3280 #if CMK_SMP_TRACE_COMMTHREAD
3281         startT = CmiWallTimer();
3282 #endif
3283         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3284 #if CMK_SMP_TRACE_COMMTHREAD
3285         endT = CmiWallTimer();
3286         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3287 #endif
3288     }
3289
3290 #if CMK_SMP_TRACE_COMMTHREAD
3291     startT = CmiWallTimer();
3292 #endif
3293     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3294     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3295 #if CMK_SMP_TRACE_COMMTHREAD
3296     endT = CmiWallTimer();
3297     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3298 #endif
3299
3300 #if CMK_SMP_TRACE_COMMTHREAD
3301     startT = CmiWallTimer();
3302 #endif
3303     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3304     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3305 #if CMK_SMP_TRACE_COMMTHREAD
3306     endT = CmiWallTimer();
3307     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3308 #endif
3309
3310 #if CMK_SMP_TRACE_COMMTHREAD
3311     startT = CmiWallTimer();
3312 #endif
3313     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3314
3315 #if CQWRITE
3316     PumpCqWriteTransactions();
3317 #endif
3318
3319 #if REMOTE_EVENT
3320     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions());
3321 #endif
3322
3323     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3324 #if CMK_SMP_TRACE_COMMTHREAD
3325     endT = CmiWallTimer();
3326     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3327 #endif
3328  
3329 #if CMK_SMP_TRACE_COMMTHREAD
3330     startT = CmiWallTimer();
3331 #endif
3332     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3333     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3334 #if CMK_SMP_TRACE_COMMTHREAD
3335     endT = CmiWallTimer();
3336     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3337 #endif
3338
3339     /* Send buffered Message */
3340 #if CMK_SMP_TRACE_COMMTHREAD
3341     startT = CmiWallTimer();
3342 #endif
3343 #if CMK_USE_OOB
3344     if (SendBufferMsg(&smsg_oob_queue) == 1)
3345 #endif
3346     {
3347         STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
3348     }
3349     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3350 #if CMK_SMP_TRACE_COMMTHREAD
3351     endT = CmiWallTimer();
3352     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3353 #endif
3354
3355 #if CMK_SMP && ! LARGEPAGE
3356     if (_detected_hang)  ProcessDeadlock();
3357 #endif
3358 }
3359
3360 static void set_smsg_max()
3361 {
3362     char *env;
3363
3364     if(mysize <=512)
3365     {
3366         SMSG_MAX_MSG = 1024;
3367     }else if (mysize <= 4096)
3368     {
3369         SMSG_MAX_MSG = 1024;
3370     }else if (mysize <= 16384)
3371     {
3372         SMSG_MAX_MSG = 512;
3373     }else {
3374         SMSG_MAX_MSG = 256;
3375     }
3376
3377     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3378     if (env) SMSG_MAX_MSG = atoi(env);
3379     CmiAssert(SMSG_MAX_MSG > 0);
3380 }    
3381
3382 /* useDynamicSMSG */
3383 static void _init_dynamic_smsg()
3384 {
3385     gni_return_t status;
3386     uint32_t     vmdh_index = -1;
3387     int i;
3388
3389     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3390     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3391     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3392     for(i=0; i<mysize; i++) {
3393         smsg_connected_flag[i] = 0;
3394         smsg_attr_vector_local[i] = NULL;
3395         smsg_attr_vector_remote[i] = NULL;
3396     }
3397
3398     set_smsg_max();
3399
3400     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3401     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3402     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3403     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3404     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3405
3406     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3407     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3408     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3409     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3410     mailbox_list->offset = 0;
3411     mailbox_list->next = 0;
3412     
3413     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3414         mailbox_list->size, smsg_rx_cqh,
3415         GNI_MEM_READWRITE,   
3416         vmdh_index,
3417         &(mailbox_list->mem_hndl));
3418     GNI_RC_CHECK("MEMORY registration for smsg", status);
3419
3420     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3421     GNI_RC_CHECK("Unbound EP", status);
3422     
3423     alloc_smsg_attr(&send_smsg_attr);
3424
3425     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3426     GNI_RC_CHECK("post unbound datagram", status);
3427
3428       /* always pre-connect to proc 0 */
3429     //if (myrank != 0) connect_to(0);
3430
3431     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3432     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3433 }
3434
3435 static void _init_static_smsg()
3436 {
3437     gni_smsg_attr_t      *smsg_attr;
3438     gni_smsg_attr_t      remote_smsg_attr;
3439     gni_smsg_attr_t      *smsg_attr_vec;
3440     gni_mem_handle_t     my_smsg_mdh_mailbox;
3441     int      ret, i;
3442     gni_return_t status;
3443     uint32_t              vmdh_index = -1;
3444     mdh_addr_t            base_infor;
3445     mdh_addr_t            *base_addr_vec;
3446
3447     set_smsg_max();
3448     
3449     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3450     
3451     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3452     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3453     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3454     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3455     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3456     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3457     CmiAssert(ret == 0);
3458     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3459     
3460     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3461             smsg_memlen*(mysize), smsg_rx_cqh,
3462             GNI_MEM_READWRITE,   
3463             vmdh_index,
3464             &my_smsg_mdh_mailbox);
3465     register_memory_size += smsg_memlen*(mysize);
3466     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3467
3468     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3469     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3470
3471     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3472     base_infor.mdh =  my_smsg_mdh_mailbox;
3473     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3474
3475     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3476  
3477     for(i=0; i<mysize; i++)
3478     {
3479         if(i==myrank)
3480             continue;
3481         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3482         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3483         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3484         smsg_attr[i].mbox_offset = i*smsg_memlen;
3485         smsg_attr[i].buff_size = smsg_memlen;
3486         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3487         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3488     }
3489
3490     for(i=0; i<mysize; i++)
3491     {
3492         if (myrank == i) continue;
3493
3494         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3495         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3496         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3497         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3498         remote_smsg_attr.buff_size = smsg_memlen;
3499         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3500         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3501
3502         /* initialize the smsg channel */
3503         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3504         GNI_RC_CHECK("SMSG Init", status);
3505     } //end initialization
3506
3507     free(base_addr_vec);
3508     free(smsg_attr);
3509
3510     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3511     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3512
3513
3514 inline
3515 static void _init_send_queue(SMSG_QUEUE *queue)
3516 {
3517      int i;
3518 #if ONE_SEND_QUEUE
3519      queue->sendMsgBuf = PCQueueCreate();
3520      destpe_avail = (char*)malloc(mysize * sizeof(char));
3521 #else
3522      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3523 #if CMK_SMP && SMP_LOCKS
3524      nonEmptyQueues = PCQueueCreate();
3525 #endif
3526      for(i =0; i<mysize; i++)
3527      {
3528 #if CMK_SMP
3529          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3530 #if SMP_LOCKS
3531          queue->smsg_msglist_index[i].pushed = 0;
3532          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3533 #endif
3534 #else
3535          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3536          queue->smsg_msglist_index[i].next = -1;
3537          queue->smsg_head_index = -1;
3538 #endif
3539         
3540      }
3541 #endif
3542 }
3543
3544 inline
3545 static void _init_smsg()
3546 {
3547     if(mysize > 1) {
3548         if (useDynamicSMSG)
3549             _init_dynamic_smsg();
3550         else
3551             _init_static_smsg();
3552     }
3553
3554     _init_send_queue(&smsg_queue);
3555 #if CMK_USE_OOB
3556     _init_send_queue(&smsg_oob_queue);
3557 #endif
3558 }
3559
3560 static void _init_static_msgq()
3561 {
3562     gni_return_t status;
3563     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3564     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3565     msgq_attrs.smsg_q_sz = 1;
3566     msgq_attrs.rcv_pool_sz = 1;
3567     msgq_attrs.num_msgq_eps = 2;
3568     msgq_attrs.nloc_insts = 8;
3569     msgq_attrs.modes = 0;
3570     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3571
3572     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3573     GNI_RC_CHECK("MSGQ Init", status);
3574
3575
3576 }
3577
3578
3579 static CmiUInt8 total_mempool_size = 0;
3580 static CmiUInt8 total_mempool_calls = 0;
3581
3582 #if USE_LRTS_MEMPOOL
3583 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3584 {
3585     void *pool;
3586     int ret;
3587     gni_return_t status = GNI_RC_SUCCESS;
3588
3589     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3590     if (*size < default_size) *size = default_size;
3591 #if LARGEPAGE
3592     // round up to be multiple of _tlbpagesize
3593     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3594     *size = ALIGNHUGEPAGE(*size);
3595 #endif
3596     total_mempool_size += *size;
3597     total_mempool_calls += 1;
3598 #if   !LARGEPAGE
3599     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3600     {
3601         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3602         CmiAbort("alloc_mempool_block");
3603     }
3604 #endif
3605 #if LARGEPAGE
3606     pool = my_get_huge_pages(*size);
3607     ret = pool==NULL;
3608 #else
3609     ret = posix_memalign(&pool, ALIGNBUF, *size);
3610 #endif
3611     if (ret != 0) {
3612 #if CMK_SMP && STEAL_MEMPOOL
3613       pool = steal_mempool_block(size, mem_hndl);
3614       if (pool != NULL) return pool;
3615 #endif
3616       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3617       if (ret == ENOMEM)
3618         CmiAbort("alloc_mempool_block: out of memory.");
3619       else
3620         CmiAbort("alloc_mempool_block: posix_memalign failed");
3621     }
3622 #if LARGEPAGE
3623     CmiMemLock();
3624     register_count++;
3625     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3626     CmiMemUnlock();
3627     if(status != GNI_RC_SUCCESS) {
3628         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3629 sweep_mempool(CpvAccess(mempool));
3630     }
3631     GNI_RC_CHECK("MEMORY_REGISTER", status);
3632 #else
3633     SetMemHndlZero((*mem_hndl));
3634 #endif
3635     return pool;
3636 }
3637
3638 // ptr is a block head pointer
3639 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3640 {
3641     if(!(IsMemHndlZero(mem_hndl)))
3642     {
3643         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3644     }
3645 #if LARGEPAGE
3646     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3647 #else
3648     free(ptr);
3649 #endif
3650 }
3651 #endif
3652
3653 void LrtsPreCommonInit(int everReturn){
3654 #if USE_LRTS_MEMPOOL
3655     CpvInitialize(mempool_type*, mempool);
3656     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3657     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3658 #endif
3659 }
3660
3661 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3662 {
3663     register int            i;
3664     int                     rc;
3665     int                     device_id = 0;
3666     unsigned int            remote_addr;
3667     gni_cdm_handle_t        cdm_hndl;
3668     gni_return_t            status = GNI_RC_SUCCESS;
3669     uint32_t                vmdh_index = -1;
3670     uint8_t                 ptag;
3671     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3672     int                     first_spawned;
3673     int                     physicalID;
3674     char                   *env;
3675
3676     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3677     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3678     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3679    
3680     status = PMI_Init(&first_spawned);
3681     GNI_RC_CHECK("PMI_Init", status);
3682
3683     status = PMI_Get_size(&mysize);
3684     GNI_RC_CHECK("PMI_Getsize", status);
3685
3686     status = PMI_Get_rank(&myrank);
3687     GNI_RC_CHECK("PMI_getrank", status);
3688
3689     //physicalID = CmiPhysicalNodeID(myrank);
3690     
3691     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3692
3693     *myNodeID = myrank;
3694     *numNodes = mysize;
3695   
3696 #if MULTI_THREAD_SEND
3697     /* Currently, we only consider the case that comm. thread will only recv msgs */
3698     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3699 #endif
3700
3701 #if CMI_EXERT_SEND_CAP
3702     CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
3703 #endif
3704
3705     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3706     
3707     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3708     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3709     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3710
3711     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3712     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3713     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3714
3715     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3716     if (env) useDynamicSMSG = 1;
3717     if (!useDynamicSMSG)
3718       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3719     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3720     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3721     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3722     
3723     if(myrank == 0)
3724     {
3725         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3726         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3727     }
3728 #ifdef USE_ONESIDED
3729     onesided_init(NULL, &onesided_hnd);
3730
3731     // this is a GNI test, so use the libonesided bypass functionality
3732     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3733     local_addr = gniGetNicAddress();
3734 #else
3735     ptag = get_ptag();
3736     cookie = get_cookie();
3737 #if 0
3738     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3739 #endif
3740     //Create and attach to the communication  domain */
3741     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3742     GNI_RC_CHECK("GNI_CdmCreate", status);
3743     //* device id The device id is the minor number for the device
3744     //that is assigned to the device by the system when the device is created.
3745     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3746     //where X is the device number 0 default 
3747     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3748     GNI_RC_CHECK("GNI_CdmAttach", status);
3749     local_addr = get_gni_nic_address(0);
3750 #endif
3751     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3752     _MEMCHECK(MPID_UGNI_AllAddr);
3753     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3754     /* create the local completion queue */
3755     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3756     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3757     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3758     
3759     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3760     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3761     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3762
3763     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3764     GNI_RC_CHECK("Create CQ (rx)", status);
3765     
3766     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3767     GNI_RC_CHECK("Create Post CQ (rx)", status);
3768     
3769     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3770     //GNI_RC_CHECK("Create BTE CQ", status);
3771
3772     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3773     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3774     _MEMCHECK(ep_hndl_array);
3775 #if MULTI_THREAD_SEND 
3776     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3777     //default_tx_cq_lock = CmiCreateLock();
3778     rdma_tx_cq_lock = CmiCreateLock();
3779     smsg_rx_cq_lock = CmiCreateLock();
3780     //global_gni_lock  = CmiCreateLock();
3781     //rx_cq_lock  = CmiCreateLock();
3782 #endif
3783     for (i=0; i<mysize; i++) {
3784         if(i == myrank) continue;
3785         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3786         GNI_RC_CHECK("GNI_EpCreate ", status);   
3787         remote_addr = MPID_UGNI_AllAddr[i];
3788         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3789         GNI_RC_CHECK("GNI_EpBind ", status);   
3790     }
3791
3792     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3793     _init_smsg();
3794     PMI_Barrier();
3795
3796 #if     USE_LRTS_MEMPOOL
3797     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3798     if (env) {
3799         _totalmem = CmiReadSize(env);
3800         if (myrank == 0)
3801             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3802     }
3803
3804     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3805     if (env) _mempool_size = CmiReadSize(env);
3806     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3807         _mempool_size = CmiReadSize(env);
3808
3809
3810     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3811     if (env) {
3812         MAX_REG_MEM = CmiReadSize(env);
3813         user_set_flag = 1;
3814     }
3815     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3816         MAX_REG_MEM = CmiReadSize(env);
3817         user_set_flag = 1;
3818     }
3819
3820     env = getenv("CHARM_UGNI_SEND_MAX");
3821     if (env) {
3822         MAX_BUFF_SEND = CmiReadSize(env);
3823         user_set_flag = 1;
3824     }
3825     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3826         MAX_BUFF_SEND = CmiReadSize(env);
3827         user_set_flag = 1;
3828     }
3829
3830     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3831     if (env) {
3832         _mempool_size_limit = CmiReadSize(env);
3833     }
3834
3835     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3836     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3837
3838     if (myrank==0) {
3839         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3840         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3841         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3842             /* memblock can expand to BIG_MSG * 2 size */
3843             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3844             CmiAbort("mempool maximum size is too small. \n");
3845         }
3846 #if MULTI_THREAD_SEND
3847         printf("Charm++> worker thread sending messages\n");
3848 #elif COMM_THREAD_SEND
3849         printf("Charm++> only comm thread send/recv messages\n");
3850 #endif
3851     }
3852
3853 #endif     /* end of USE_LRTS_MEMPOOL */
3854
3855     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3856     if (env) {
3857         BIG_MSG = CmiReadSize(env);
3858         if (BIG_MSG < ONE_SEG)
3859           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3860     }
3861     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3862     if (env) {
3863         BIG_MSG_PIPELINE = atoi(env);
3864     }
3865
3866     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3867     if (env) _checkProgress = 0;
3868     if (mysize == 1) _checkProgress = 0;
3869
3870 #if CMI_EXERT_RDMA_CAP
3871     env = getenv("CHARM_UGNI_RDMA_MAX");
3872     if (env)  {
3873         RDMA_pending = atoi(env);
3874         if (myrank == 0)
3875             printf("Charm++> Max pending RDMA set to: %d\n", RDMA_pending);
3876     }
3877 #endif
3878     
3879     /*
3880     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3881     if (env) 
3882         _tlbpagesize = CmiReadSize(env);
3883     */
3884     /* real gethugepagesize() is only available when hugetlb module linked */
3885     _tlbpagesize = gethugepagesize();
3886     if (myrank == 0) {
3887         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3888     }
3889
3890 #if LARGEPAGE
3891     if (_tlbpagesize == 4096) {
3892         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3893     }
3894 #endif
3895
3896       /* stats related arguments */
3897 #if CMK_WITH_STATS
3898     CmiGetArgStringDesc(*argv,"+gni_stats_root",&counters_dirname,"counter directory name, default counters");
3899
3900     print_stats = CmiGetArgFlag(*argv, "+print_stats");
3901     
3902     stats_off = CmiGetArgFlag(*argv, "+stats_off");
3903
3904     init_comm_stats();
3905 #endif
3906
3907     /* init DMA buffer for medium message */
3908
3909     //_init_DMA_buffer();
3910     
3911     free(MPID_UGNI_AllAddr);
3912
3913 #if CMK_SMP
3914     sendRdmaBuf = PCQueueCreate();
3915 #else
3916     sendRdmaBuf = 0;
3917 #endif
3918
3919 #if MACHINE_DEBUG_LOG
3920     char ln[200];
3921     sprintf(ln,"debugLog.%d",myrank);
3922     debugLog=fopen(ln,"w");
3923 #endif
3924
3925 //    NTK_Init();
3926 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3927
3928 #if  REMOTE_EVENT
3929     SHIFT = 1;
3930     while (1<<SHIFT < mysize) SHIFT++;
3931     CmiAssert(SHIFT < 31);
3932     IndexPool_init(&ackPool);
3933 #if CMK_PERSISTENT_COMM
3934     IndexPool_init(&persistPool);
3935 #endif
3936 #endif
3937 }
3938
3939 void* LrtsAlloc(int n_bytes, int header)
3940 {
3941     void *ptr = NULL;
3942 #if 0
3943     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3944 #endif
3945     if(n_bytes <= SMSG_MAX_MSG)
3946     {
3947         int totalsize = n_bytes+header;
3948         ptr = malloc(totalsize);
3949     }
3950     else {
3951         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3952 #if     USE_LRTS_MEMPOOL
3953         n_bytes = ALIGN64(n_bytes);
3954         if(n_bytes < BIG_MSG)
3955         {
3956             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3957             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3958         }else 
3959         {
3960 #if LARGEPAGE
3961             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3962             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3963             char *res = my_get_huge_pages(n_bytes);
3964 #else
3965             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3966 #endif
3967             if (res) ptr = res + ALIGNBUF - header;
3968         }
3969 #else
3970         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3971         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3972         ptr = res + ALIGNBUF - header;
3973 #endif
3974     }
3975     return ptr;
3976 }
3977
3978 void  LrtsFree(void *msg)
3979 {
3980     CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3981 #if CMK_PERSISTENT_COMM
3982     if (IS_PERSISTENT_MEMORY(msg)) return;
3983 #endif
3984     if (size <= SMSG_MAX_MSG)
3985         free(msg);
3986     else {
3987         size = ALIGN64(size);
3988         if(size>=BIG_MSG)
3989         {
3990 #if LARGEPAGE
3991             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3992             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3993 #else
3994             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3995 #endif
3996         }
3997         else {
3998 #if    USE_LRTS_MEMPOOL
3999 #if CMK_SMP
4000             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
4001 #else
4002             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
4003 #endif
4004 #else
4005             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
4006 #endif
4007         }
4008     }
4009 }
4010
4011 void LrtsExit()
4012 {
4013 #if CMK_WITH_STATS
4014 #if CMK_SMP
4015     if(CmiMyRank() == CmiMyNodeSize())
4016 #endif
4017     if (print_stats) print_comm_stats();
4018 #endif
4019     /* free memory ? */
4020 #if USE_LRTS_MEMPOOL
4021     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
4022     mempool_destroy(CpvAccess(mempool));
4023 #endif
4024     PMI_Finalize();
4025     exit(0);
4026 }
4027
4028 void LrtsDrainResources()
4029 {
4030     if(mysize == 1) return;
4031     while (
4032 #if CMK_USE_OOB
4033            !SendBufferMsg(&smsg_oob_queue) ||
4034 #endif
4035            !SendBufferMsg(&smsg_queue) 
4036           )
4037     {
4038         if (useDynamicSMSG)
4039             PumpDatagramConnection();
4040         PumpNetworkSmsg();
4041         PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
4042         PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
4043 #if REMOTE_EVENT
4044         PumpRemoteTransactions();
4045 #endif
4046         SendRdmaMsg();
4047     }
4048     PMI_Barrier();
4049 }
4050
4051 void LrtsAbort(const char *message) {
4052     fprintf(stderr, "[%d] CmiAbort: %s\n", myrank, message);
4053     CmiPrintStackTrace(0);
4054     PMI_Abort(-1, message);
4055 }
4056
4057 /**************************  TIMER FUNCTIONS **************************/
4058 #if CMK_TIMER_USE_SPECIAL
4059 /* MPI calls are not threadsafe, even the timer on some machines */
4060 static CmiNodeLock  timerLock = 0;
4061 static int _absoluteTime = 0;
4062 static int _is_global = 0;
4063 static struct timespec start_ts;
4064
4065 inline int CmiTimerIsSynchronized() {
4066     return 0;
4067 }
4068
4069 inline int CmiTimerAbsolute() {
4070     return _absoluteTime;
4071 }
4072
4073 double CmiStartTimer() {
4074     return 0.0;
4075 }
4076
4077 double CmiInitTime() {
4078     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
4079 }
4080
4081</