f9685d3b1ebad2d76c165078f935c22ce15c583e
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/dir.h>
38 #include <sys/stat.h>
39 #include <gni_pub.h>
40 #include <pmi.h>
41 //#include <numatoolkit.h>
42
43 #include "converse.h"
44
45 #if CMK_DIRECT
46 #include "cmidirect.h"
47 #endif
48
49 #define     LARGEPAGE              0
50
51 #if CMK_SMP
52 #define MULTI_THREAD_SEND          0
53 #define COMM_THREAD_SEND           1
54 #endif
55
56 #if MULTI_THREAD_SEND
57 #define CMK_WORKER_SINGLE_TASK     0
58 #endif
59
60 #define REMOTE_EVENT               1
61 #define CQWRITE                    0
62
63 #define CMI_EXERT_SEND_CAP      0
64 #define CMI_EXERT_RECV_CAP      0
65
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP  32
68 #endif
69
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP  4                  /* cap <= 2 sometimes hang */
72 #endif
73
74 #define USE_LRTS_MEMPOOL                  1
75
76 #define PRINT_SYH                         0
77
78 // Trace communication thread
79 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
80 #define TRACE_THRESHOLD     0.00005
81 #define CMI_MPI_TRACE_MOREDETAILED 0
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
84 #else
85 #undef CMK_SMP_TRACE_COMMTHREAD
86 #define CMK_SMP_TRACE_COMMTHREAD 0
87 #endif
88
89 #define CMK_TRACE_COMMOVERHEAD 0
90 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
91 #undef CMI_MPI_TRACE_USEREVENTS
92 #define CMI_MPI_TRACE_USEREVENTS 1
93 #else
94 #undef CMK_TRACE_COMMOVERHEAD
95 #define CMK_TRACE_COMMOVERHEAD 0
96 #endif
97
98 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
99 CpvStaticDeclare(double, projTraceStart);
100 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
101 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
102 #else
103 #define  START_EVENT()
104 #define  END_EVENT(x)
105 #endif
106
107 #if USE_LRTS_MEMPOOL
108
109 #define oneMB (1024ll*1024)
110 #define oneGB (1024ll*1024*1024)
111
112 static CmiInt8 _mempool_size = 8*oneMB;
113 static CmiInt8 _expand_mem =  4*oneMB;
114 static CmiInt8 _mempool_size_limit = 0;
115
116 static CmiInt8 _totalmem = 0.8*oneGB;
117
118 #if LARGEPAGE
119 static CmiInt8 BIG_MSG  =  16*oneMB;
120 static CmiInt8 ONE_SEG  =  4*oneMB;
121 #else
122 static CmiInt8 BIG_MSG  =  4*oneMB;
123 static CmiInt8 ONE_SEG  =  2*oneMB;
124 #endif
125 #if MULTI_THREAD_SEND
126 static int BIG_MSG_PIPELINE = 1;
127 #else
128 static int BIG_MSG_PIPELINE = 4;
129 #endif
130
131 // dynamic flow control
132 static CmiInt8 buffered_send_msg = 0;
133 static CmiInt8 register_memory_size = 0;
134
135 #if LARGEPAGE
136 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
137 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
138 static CmiInt8  register_count = 0;
139 #else
140 #if CMK_SMP && COMM_THREAD_SEND 
141 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
142 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
143 #else
144 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
145 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
146 #endif
147
148
149 #endif
150
151 #endif     /* end USE_LRTS_MEMPOOL */
152
153 #if MULTI_THREAD_SEND 
154 #define     CMI_GNI_LOCK(x)       CmiLock(x);
155 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
156 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
157 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
158 #else
159 #define     CMI_GNI_LOCK(x)
160 #define     CMI_GNI_UNLOCK(x)
161 #define     CMI_PCQUEUEPOP_LOCK(Q)   
162 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
163 #endif
164
165 static int _tlbpagesize = 4096;
166
167 //static int _smpd_count  = 0;
168
169 static int   user_set_flag  = 0;
170
171 static int _checkProgress = 1;             /* check deadlock */
172 static int _detected_hang = 0;
173
174 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
175
176 // dynamic SMSG
177 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
178
179 static int avg_smsg_connection = 32;
180 static int                 *smsg_connected_flag= 0;
181 static gni_smsg_attr_t     **smsg_attr_vector_local;
182 static gni_smsg_attr_t     **smsg_attr_vector_remote;
183 static gni_ep_handle_t     ep_hndl_unbound;
184 static gni_smsg_attr_t     send_smsg_attr;
185 static gni_smsg_attr_t     recv_smsg_attr;
186
187 typedef struct _dynamic_smsg_mailbox{
188    void     *mailbox_base;
189    int      size;
190    int      offset;
191    gni_mem_handle_t  mem_hndl;
192    struct      _dynamic_smsg_mailbox  *next;
193 }dynamic_smsg_mailbox_t;
194
195 static dynamic_smsg_mailbox_t  *mailbox_list;
196
197 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
198 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
199
200 #if PRINT_SYH
201 int         lrts_send_msg_id = 0;
202 int         lrts_local_done_msg = 0;
203 int         lrts_send_rdma_success = 0;
204 #endif
205
206 #include "machine.h"
207
208 #include "pcqueue.h"
209
210 #include "mempool.h"
211
212 #if CMK_PERSISTENT_COMM
213 #include "machine-persistent.h"
214 #endif
215
216 //#define  USE_ONESIDED 1
217 #ifdef USE_ONESIDED
218 //onesided implementation is wrong, since no place to restore omdh
219 #include "onesided.h"
220 onesided_hnd_t   onesided_hnd;
221 onesided_md_t    omdh;
222 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
223
224 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
225
226 #else
227 uint8_t   onesided_hnd, omdh;
228
229 #if REMOTE_EVENT || CQWRITE 
230 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
231     if(register_memory_size+size>= MAX_REG_MEM) { \
232         status = GNI_RC_ERROR_NOMEM;} \
233     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
234         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
235 #else
236 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
237         if (register_memory_size + size >= MAX_REG_MEM) { \
238             status = GNI_RC_ERROR_NOMEM; \
239         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
240             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
241 #endif
242
243 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
244     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
245              register_memory_size -= size; \
246          else CmiAbort("MEM_DEregister");  \
247     } while (0)
248 #endif
249
250 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
251 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
252 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
253 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
254 #define   IncreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)++
255 #define   DecreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)--
256 #define   IncreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)++
257 #define   DecreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)--
258 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
259 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
260 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
261 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
262 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
263 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
264
265 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
266 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
267
268 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
269 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
270 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
271 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
272
273 #define ALIGNBUF                64
274
275 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
276 /* If SMSG is not used */
277
278 #define FMA_PER_CORE  1024
279 #define FMA_BUFFER_SIZE 1024
280
281 /* If SMSG is used */
282 static int  SMSG_MAX_MSG = 1024;
283 #define SMSG_MAX_CREDIT    72
284
285 #define MSGQ_MAXSIZE       2048
286
287 /* large message transfer with FMA or BTE */
288 #if ! REMOTE_EVENT
289 #define LRTS_GNI_RDMA_THRESHOLD  1024 
290 #else
291    /* remote events only work with RDMA */
292 #define LRTS_GNI_RDMA_THRESHOLD  0 
293 #endif
294
295 #if CMK_SMP
296 static int  REMOTE_QUEUE_ENTRIES=163840; 
297 static int LOCAL_QUEUE_ENTRIES=163840; 
298 #else
299 static int  REMOTE_QUEUE_ENTRIES=20480;
300 static int LOCAL_QUEUE_ENTRIES=20480; 
301 #endif
302
303 #define BIG_MSG_TAG             0x26
304 #define PUT_DONE_TAG            0x28
305 #define DIRECT_PUT_DONE_TAG     0x29
306 #define ACK_TAG                 0x30
307 /* SMSG is data message */
308 #define SMALL_DATA_TAG          0x31
309 /* SMSG is a control message to initialize a BTE */
310 #define LMSG_INIT_TAG           0x39 
311
312 #define DEBUG
313 #ifdef GNI_RC_CHECK
314 #undef GNI_RC_CHECK
315 #endif
316 #ifdef DEBUG
317 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
318 #else
319 #define GNI_RC_CHECK(msg,rc)
320 #endif
321
322 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
323 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
324 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
325
326 static int useStaticMSGQ = 0;
327 static int useStaticFMA = 0;
328 static int mysize, myrank;
329 static gni_nic_handle_t   nic_hndl;
330
331 typedef struct {
332     gni_mem_handle_t mdh;
333     uint64_t addr;
334 } mdh_addr_t ;
335 // this is related to dynamic SMSG
336
337 typedef struct mdh_addr_list{
338     gni_mem_handle_t mdh;
339    void *addr;
340     struct mdh_addr_list *next;
341 }mdh_addr_list_t;
342
343 static unsigned int         smsg_memlen;
344 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
345 mdh_addr_t          setup_mem;
346 mdh_addr_t          *smsg_connection_vec = 0;
347 gni_mem_handle_t    smsg_connection_memhndl;
348 static int          smsg_expand_slots = 10;
349 static int          smsg_available_slot = 0;
350 static void         *smsg_mailbox_mempool = 0;
351 mdh_addr_list_t     *smsg_dynamic_list = 0;
352
353 static void             *smsg_mailbox_base;
354 gni_msgq_attr_t         msgq_attrs;
355 gni_msgq_handle_t       msgq_handle;
356 gni_msgq_ep_attr_t      msgq_ep_attrs;
357 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
358
359 /* =====Beginning of Declarations of Machine Specific Variables===== */
360 static int cookie;
361 static int modes = 0;
362 static gni_cq_handle_t       smsg_rx_cqh = NULL;
363 static gni_cq_handle_t       default_tx_cqh = NULL;
364 static gni_cq_handle_t       rdma_tx_cqh = NULL;
365 static gni_cq_handle_t       rdma_rx_cqh = NULL;
366 static gni_cq_handle_t       post_tx_cqh = NULL;
367 static gni_ep_handle_t       *ep_hndl_array;
368
369 static CmiNodeLock           *ep_lock_array;
370 static CmiNodeLock           default_tx_cq_lock; 
371 static CmiNodeLock           rdma_tx_cq_lock; 
372 static CmiNodeLock           global_gni_lock; 
373 static CmiNodeLock           rx_cq_lock;
374 static CmiNodeLock           smsg_mailbox_lock;
375 static CmiNodeLock           smsg_rx_cq_lock;
376 static CmiNodeLock           *mempool_lock;
377 //#define     CMK_WITH_STATS      1
378 typedef struct msg_list
379 {
380     uint32_t destNode;
381     uint32_t size;
382     void *msg;
383     uint8_t tag;
384 #if !CMK_SMP
385     struct msg_list *next;
386 #endif
387 #if CMK_WITH_STATS
388     double  creation_time;
389 #endif
390 }MSG_LIST;
391
392
393 typedef struct control_msg
394 {
395     uint64_t            source_addr;    /* address from the start of buffer  */
396     uint64_t            dest_addr;      /* address from the start of buffer */
397     int                 total_length;   /* total length */
398     int                 length;         /* length of this packet */
399 #if REMOTE_EVENT
400     int                 ack_index;      /* index from integer to address */
401 #endif
402     uint8_t             seq_id;         //big message   0 meaning single message
403     gni_mem_handle_t    source_mem_hndl;
404     struct control_msg *next;
405 } CONTROL_MSG;
406
407 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
408
409 typedef struct ack_msg
410 {
411     uint64_t            source_addr;    /* address from the start of buffer  */
412 #if ! USE_LRTS_MEMPOOL
413     gni_mem_handle_t    source_mem_hndl;
414     int                 length;          /* total length */
415 #endif
416     struct ack_msg     *next;
417 } ACK_MSG;
418
419 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
420
421 #if CMK_DIRECT
422 typedef struct{
423     uint64_t    handler_addr;
424 }CMK_DIRECT_HEADER;
425
426 typedef struct {
427     char core[CmiMsgHeaderSizeBytes];
428     uint64_t handler;
429 }cmidirectMsg;
430
431 //SYH
432 CpvDeclare(int, CmiHandleDirectIdx);
433 void CmiHandleDirectMsg(cmidirectMsg* msg)
434 {
435
436     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
437    (*(_handle->callbackFnPtr))(_handle->callbackData);
438    CmiFree(msg);
439 }
440
441 void CmiDirectInit()
442 {
443     CpvInitialize(int,  CmiHandleDirectIdx);
444     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
445 }
446
447 #endif
448 typedef struct  rmda_msg
449 {
450     int                   destNode;
451 #if REMOTE_EVENT
452     int                   ack_index;
453 #endif
454     gni_post_descriptor_t *pd;
455 #if !CMK_SMP
456     struct  rmda_msg      *next;
457 #endif
458 }RDMA_REQUEST;
459
460
461 #if CMK_SMP
462 #define SMP_LOCKS               0
463 #define ONE_SEND_QUEUE                  0
464 PCQueue sendRdmaBuf;
465 typedef struct  msg_list_index
466 {
467     PCQueue     sendSmsgBuf;
468     int         pushed;
469     CmiNodeLock   lock;
470 } MSG_LIST_INDEX;
471 char                *destpe_avail;
472 #if  !ONE_SEND_QUEUE && SMP_LOCKS
473     PCQueue     nonEmptyQueues;
474 #endif
475 #else         /* non-smp */
476
477 static RDMA_REQUEST        *sendRdmaBuf = 0;
478 static RDMA_REQUEST        *sendRdmaTail = 0;
479 typedef struct  msg_list_index
480 {
481     int         next;
482     MSG_LIST    *sendSmsgBuf;
483     MSG_LIST    *tail;
484 } MSG_LIST_INDEX;
485
486 #endif
487
488 // buffered send queue
489 #if ! ONE_SEND_QUEUE
490 typedef struct smsg_queue
491 {
492     MSG_LIST_INDEX   *smsg_msglist_index;
493     int               smsg_head_index;
494 } SMSG_QUEUE;
495 #else
496 typedef struct smsg_queue
497 {
498     PCQueue       sendMsgBuf;
499 }  SMSG_QUEUE;
500 #endif
501
502 SMSG_QUEUE                  smsg_queue;
503 #if CMK_USE_OOB
504 SMSG_QUEUE                  smsg_oob_queue;
505 #endif
506
507 #if CMK_SMP
508
509 #define FreeMsgList(d)   free(d);
510 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
511
512 #else
513
514 static MSG_LIST       *msglist_freelist=0;
515
516 #define FreeMsgList(d)  \
517   do { \
518   (d)->next = msglist_freelist;\
519   msglist_freelist = d; \
520   } while (0)
521
522 #define MallocMsgList(d) \
523   do {  \
524   d = msglist_freelist;\
525   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
526              _MEMCHECK(d);\
527   } else msglist_freelist = d->next; \
528   d->next =0;  \
529   } while (0)
530
531 #endif
532
533 #if CMK_SMP
534
535 #define FreeControlMsg(d)      free(d);
536 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
537
538 #else
539
540 static CONTROL_MSG    *control_freelist=0;
541
542 #define FreeControlMsg(d)       \
543   do { \
544   (d)->next = control_freelist;\
545   control_freelist = d; \
546   } while (0);
547
548 #define MallocControlMsg(d) \
549   do {  \
550   d = control_freelist;\
551   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
552              _MEMCHECK(d);\
553   } else control_freelist = d->next;  \
554   } while (0);
555
556 #endif
557
558 #if CMK_SMP
559
560 #define FreeAckMsg(d)      free(d);
561 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
562
563 #else
564
565 static ACK_MSG        *ack_freelist=0;
566
567 #define FreeAckMsg(d)       \
568   do { \
569   (d)->next = ack_freelist;\
570   ack_freelist = d; \
571   } while (0)
572
573 #define MallocAckMsg(d) \
574   do { \
575   d = ack_freelist;\
576   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
577              _MEMCHECK(d);\
578   } else ack_freelist = d->next; \
579   } while (0)
580
581 #endif
582
583
584 # if CMK_SMP
585 #define FreeRdmaRequest(d)       free(d);
586 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
587 #else
588
589 static RDMA_REQUEST         *rdma_freelist = NULL;
590
591 #define FreeRdmaRequest(d)       \
592   do {  \
593   (d)->next = rdma_freelist;\
594   rdma_freelist = d;    \
595   } while (0)
596
597 #define MallocRdmaRequest(d) \
598   do {   \
599   d = rdma_freelist;\
600   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
601              _MEMCHECK(d);\
602   } else rdma_freelist = d->next; \
603   d->next =0;   \
604   } while (0)
605 #endif
606
607 /* reuse gni_post_descriptor_t */
608 static gni_post_descriptor_t *post_freelist=0;
609
610 #if  !CMK_SMP
611 #define FreePostDesc(d)       \
612     (d)->next_descr = post_freelist;\
613     post_freelist = d;
614
615 #define MallocPostDesc(d) \
616   d = post_freelist;\
617   if (d==0) { \
618      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
619      d->next_descr = 0;\
620       _MEMCHECK(d);\
621   } else post_freelist = d->next_descr;
622 #else
623
624 #define FreePostDesc(d)     free(d);
625 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
626
627 #endif
628
629
630 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
631 static int      buffered_smsg_counter = 0;
632
633 /* SmsgSend return success but message sent is not confirmed by remote side */
634 static MSG_LIST *buffered_fma_head = 0;
635 static MSG_LIST *buffered_fma_tail = 0;
636
637 /* functions  */
638 #define IsFree(a,ind)  !( a& (1<<(ind) ))
639 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
640 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
641
642 CpvDeclare(mempool_type*, mempool);
643
644 #if REMOTE_EVENT
645 /* ack pool for remote events */
646
647 #define SHIFT                   18
648 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
649 #define RANK_MASK               ((1<<SHIFT) - 1)
650 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
651
652 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
653 #define GET_RANK(evt)           ((evt) & RANK_MASK)
654 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
655
656 #define PERSIST_EVENT(idx)      (1<<31 | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
657
658 struct IndexStruct {
659 void *addr;
660 int next;
661 int type;     // 1: ACK   2: Persistent
662 };
663
664 typedef struct IndexPool {
665     struct IndexStruct   *indexes;
666     int                   size;
667     int                   freehead;
668     CmiNodeLock           lock;
669 } IndexPool;
670
671 static IndexPool  ackPool;
672 #if CMK_PERSISTENT_COMM
673 static IndexPool  persistPool;
674 #endif
675
676 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
677 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
678
679 static void IndexPool_init(IndexPool *pool)
680 {
681     int i;
682     if ((1<<SHIFT) < mysize) 
683         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
684     pool->size = 1024;
685     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
686     for (i=0; i<pool->size-1; i++) {
687         pool->indexes[i].next = i+1;
688         pool->indexes[i].type = -1;
689     }
690     pool->indexes[i].next = -1;
691     pool->freehead = 0;
692 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
693     pool->lock  = CmiCreateLock();
694 #else
695     pool->lock  = NULL;
696 #endif
697 }
698
699 static
700 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
701 {
702     int i;
703     int s;
704 #if MULTI_THREAD_SEND
705     CmiLock(pool->lock);
706 #endif
707     s = pool->freehead;
708     if (s == -1) {
709         int newsize = pool->size * 2;
710         printf("[%d] IndexPool_getslot expand to: %d\n", myrank, newsize);
711         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
712         struct IndexStruct *old_ackpool = pool->indexes;
713         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
714         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
715         for (i=pool->size; i<newsize-1; i++) {
716             pool->indexes[i].next = i+1;
717             pool->indexes[i].type = -1;
718         }
719         pool->indexes[i].next = -1;
720         pool->freehead = pool->size;
721         s = pool->size;
722         pool->size = newsize;
723         free(old_ackpool);
724     }
725     pool->freehead = pool->indexes[s].next;
726     pool->indexes[s].addr = addr;
727     pool->indexes[s].type = type;
728 #if MULTI_THREAD_SEND
729     CmiUnlock(pool->lock);
730 #endif
731     return s;
732 }
733
734 static
735 inline  void IndexPool_freeslot(IndexPool *pool, int s)
736 {
737     CmiAssert(s>=0 && s<pool->size);
738 #if MULTI_THREAD_SEND
739     CmiLock(pool->lock);
740 #endif
741     pool->indexes[s].next = pool->freehead;
742     pool->indexes[s].type = -1;
743     pool->freehead = s;
744 #if MULTI_THREAD_SEND
745     CmiUnlock(pool->lock);
746 #endif
747 }
748
749
750 #endif
751
752 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
753 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
754 #define CHARM_MAGIC_NUMBER               126
755
756 #if CMK_ERROR_CHECKING
757 extern unsigned char computeCheckSum(unsigned char *data, int len);
758 static int checksum_flag = 0;
759 #define CMI_SET_CHECKSUM(msg, len)      \
760         if (checksum_flag)  {   \
761           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
762           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
763         }
764 #define CMI_CHECK_CHECKSUM(msg, len)    \
765         if (checksum_flag)      \
766           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
767             CmiAbort("Fatal error: checksum doesn't agree!\n");
768 #else
769 #define CMI_SET_CHECKSUM(msg, len)
770 #define CMI_CHECK_CHECKSUM(msg, len)
771 #endif
772 /* =====End of Definitions of Message-Corruption Related Macros=====*/
773
774 static int print_stats = 0;
775 static int stats_off = 0;
776 void CmiTurnOnStats()
777 {
778     stats_off = 0;
779 }
780
781 void CmiTurnOffStats()
782 {
783     stats_off = 1;
784 }
785
786 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
787
788 #if CMK_WITH_STATS
789 FILE *counterLog = NULL;
790 typedef struct comm_thread_stats
791 {
792     uint64_t  smsg_data_count;
793     uint64_t  lmsg_init_count;
794     uint64_t  ack_count;
795     uint64_t  big_msg_ack_count;
796     uint64_t  smsg_count;
797     uint64_t  direct_put_done_count;
798     uint64_t  put_done_count;
799     //times of calling SmsgSend
800     uint64_t  try_smsg_data_count;
801     uint64_t  try_lmsg_init_count;
802     uint64_t  try_ack_count;
803     uint64_t  try_big_msg_ack_count;
804     uint64_t  try_direct_put_done_count;
805     uint64_t  try_put_done_count;
806     uint64_t  try_smsg_count;
807     
808     double    max_time_in_send_buffered_smsg;
809     double    all_time_in_send_buffered_smsg;
810
811     uint64_t  rdma_get_count, rdma_put_count;
812     uint64_t  try_rdma_get_count, try_rdma_put_count;
813     double    max_time_from_control_to_rdma_init;
814     double    all_time_from_control_to_rdma_init;
815
816     double    max_time_from_rdma_init_to_rdma_done;
817     double    all_time_from_rdma_init_to_rdma_done;
818
819     int      count_in_PumpNetwork;
820     double   time_in_PumpNetwork;
821     double   max_time_in_PumpNetwork;
822     int      count_in_SendBufferMsg_smsg;
823     double   time_in_SendBufferMsg_smsg;
824     double   max_time_in_SendBufferMsg_smsg;
825     int      count_in_SendRdmaMsg;
826     double   time_in_SendRdmaMsg;
827     double   max_time_in_SendRdmaMsg;
828     int      count_in_PumpRemoteTransactions;
829     double   time_in_PumpRemoteTransactions;
830     double   max_time_in_PumpRemoteTransactions;
831     int      count_in_PumpLocalTransactions_rdma;
832     double   time_in_PumpLocalTransactions_rdma;
833     double   max_time_in_PumpLocalTransactions_rdma;
834 } Comm_Thread_Stats;
835
836 static Comm_Thread_Stats   comm_stats;
837
838 static void init_comm_stats()
839 {
840   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
841   if (print_stats){
842       char ln[200];
843       int code = mkdir("counters", 00777); 
844       sprintf(ln,"counters/statistics.%d.%d", mysize, myrank);
845       counterLog=fopen(ln,"w");
846   }
847 }
848
849 #define SMSG_CREATION( x ) if(print_stats && !stats_off) { x->creation_time = CmiWallTimer(); }
850
851 #define SMSG_SENT_DONE(creation_time, tag)  \
852         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
853             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
854             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
855             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
856             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
857             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
858             comm_stats.smsg_count++; \
859             double inbuff_time = CmiWallTimer() - creation_time;   \
860             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
861             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
862         }
863
864 #define SMSG_TRY_SEND(tag)  \
865         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
866             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
867             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
868             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
869             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
870             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
871             comm_stats.try_smsg_count++; \
872         }
873
874 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
875
876 #define  RDMA_TRANS_DONE(x)      \
877          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
878              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
879              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
880          }
881
882 #define  RDMA_TRANS_INIT(type, x)      \
883          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
884              double rdma_trans_time = CmiWallTimer() - x ; \
885              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
886              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
887          }
888
889 #define STATS_PUMPNETWORK_TIME(x)   \
890         { double t = CmiWallTimer(); \
891           x;        \
892           t = CmiWallTimer() - t;          \
893           comm_stats.count_in_PumpNetwork++;        \
894           comm_stats.time_in_PumpNetwork += t;   \
895           if (t>comm_stats.max_time_in_PumpNetwork)      \
896               comm_stats.max_time_in_PumpNetwork = t;    \
897         }
898
899 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
900         { double t = CmiWallTimer(); \
901           x;        \
902           t = CmiWallTimer() - t;          \
903           comm_stats.count_in_PumpRemoteTransactions ++;        \
904           comm_stats.time_in_PumpRemoteTransactions += t;   \
905           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
906               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
907         }
908
909 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
910         { double t = CmiWallTimer(); \
911           x;        \
912           t = CmiWallTimer() - t;          \
913           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
914           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
915           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
916               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
917         }
918
919 #define STATS_SEND_SMSGS_TIME(x)   \
920         { double t = CmiWallTimer(); \
921           x;        \
922           t = CmiWallTimer() - t;          \
923           comm_stats.count_in_SendBufferMsg_smsg ++;        \
924           comm_stats.time_in_SendBufferMsg_smsg += t;   \
925           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
926               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
927         }
928
929 #define STATS_SENDRDMAMSG_TIME(x)   \
930         { double t = CmiWallTimer(); \
931           x;        \
932           t = CmiWallTimer() - t;          \
933           comm_stats.count_in_SendRdmaMsg ++;        \
934           comm_stats.time_in_SendRdmaMsg += t;   \
935           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
936               comm_stats.max_time_in_SendRdmaMsg = t;    \
937         }
938
939 static void print_comm_stats()
940 {
941     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[max:%f\tAverage:%f](milisecond)\n", myrank, 1000*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
942     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
943             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
944             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
945     
946     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
947             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
948             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
949
950     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
951     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [MAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
952     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [MAX:%f\t Average:%f](milisecond)\n\n", myrank, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
953
954
955     fprintf(counterLog, "                             count\ttotal_time\tmax \n", myrank);
956     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork);
957     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions);
958     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma);
959     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg);
960     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg);
961 }
962
963 #else
964 #define STATS_PUMPNETWORK_TIME(x)                  x
965 #define STATS_SEND_SMSGS_TIME(x)                   x
966 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
967 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
968 #define STATS_SENDRDMAMSG_TIME(x)                  x
969 #endif
970
971 static void
972 allgather(void *in,void *out, int len)
973 {
974     static int *ivec_ptr=NULL,already_called=0,job_size=0;
975     int i,rc;
976     int my_rank;
977     char *tmp_buf,*out_ptr;
978
979     if(!already_called) {
980
981         rc = PMI_Get_size(&job_size);
982         CmiAssert(rc == PMI_SUCCESS);
983         rc = PMI_Get_rank(&my_rank);
984         CmiAssert(rc == PMI_SUCCESS);
985
986         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
987         CmiAssert(ivec_ptr != NULL);
988
989         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
990         CmiAssert(rc == PMI_SUCCESS);
991
992         already_called = 1;
993
994     }
995
996     tmp_buf = (char *)malloc(job_size * len);
997     CmiAssert(tmp_buf);
998
999     rc = PMI_Allgather(in,tmp_buf,len);
1000     CmiAssert(rc == PMI_SUCCESS);
1001
1002     out_ptr = out;
1003
1004     for(i=0;i<job_size;i++) {
1005
1006         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1007
1008     }
1009
1010     free(tmp_buf);
1011 }
1012
1013 static void
1014 allgather_2(void *in,void *out, int len)
1015 {
1016     //PMI_Allgather is out of order
1017     int i,rc, extend_len;
1018     int  rank_index;
1019     char *out_ptr, *out_ref;
1020     char *in2;
1021
1022     extend_len = sizeof(int) + len;
1023     in2 = (char*)malloc(extend_len);
1024
1025     memcpy(in2, &myrank, sizeof(int));
1026     memcpy(in2+sizeof(int), in, len);
1027
1028     out_ptr = (char*)malloc(mysize*extend_len);
1029
1030     rc = PMI_Allgather(in2, out_ptr, extend_len);
1031     GNI_RC_CHECK("allgather", rc);
1032
1033     out_ref = out;
1034
1035     for(i=0;i<mysize;i++) {
1036         //rank index 
1037         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1038         //copy to the rank index slot
1039         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1040     }
1041
1042     free(out_ptr);
1043     free(in2);
1044
1045 }
1046
1047 static unsigned int get_gni_nic_address(int device_id)
1048 {
1049     unsigned int address, cpu_id;
1050     gni_return_t status;
1051     int i, alps_dev_id=-1,alps_address=-1;
1052     char *token, *p_ptr;
1053
1054     p_ptr = getenv("PMI_GNI_DEV_ID");
1055     if (!p_ptr) {
1056         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1057        
1058         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1059     } else {
1060         while ((token = strtok(p_ptr,":")) != NULL) {
1061             alps_dev_id = atoi(token);
1062             if (alps_dev_id == device_id) {
1063                 break;
1064             }
1065             p_ptr = NULL;
1066         }
1067         CmiAssert(alps_dev_id != -1);
1068         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1069         CmiAssert(p_ptr != NULL);
1070         i = 0;
1071         while ((token = strtok(p_ptr,":")) != NULL) {
1072             if (i == alps_dev_id) {
1073                 alps_address = atoi(token);
1074                 break;
1075             }
1076             p_ptr = NULL;
1077             ++i;
1078         }
1079         CmiAssert(alps_address != -1);
1080         address = alps_address;
1081     }
1082     return address;
1083 }
1084
1085 static uint8_t get_ptag(void)
1086 {
1087     char *p_ptr, *token;
1088     uint8_t ptag;
1089
1090     p_ptr = getenv("PMI_GNI_PTAG");
1091     CmiAssert(p_ptr != NULL);
1092     token = strtok(p_ptr, ":");
1093     ptag = (uint8_t)atoi(token);
1094     return ptag;
1095         
1096 }
1097
1098 static uint32_t get_cookie(void)
1099 {
1100     uint32_t cookie;
1101     char *p_ptr, *token;
1102
1103     p_ptr = getenv("PMI_GNI_COOKIE");
1104     CmiAssert(p_ptr != NULL);
1105     token = strtok(p_ptr, ":");
1106     cookie = (uint32_t)atoi(token);
1107
1108     return cookie;
1109 }
1110
1111 #if LARGEPAGE
1112
1113 /* directly mmap memory from hugetlbfs for large pages */
1114
1115 #include <sys/stat.h>
1116 #include <fcntl.h>
1117 #include <sys/mman.h>
1118 #include <hugetlbfs.h>
1119
1120 // size must be _tlbpagesize aligned
1121 void *my_get_huge_pages(size_t size)
1122 {
1123     char filename[512];
1124     int fd;
1125     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1126     void *ptr = NULL;
1127
1128     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1129     fd = open(filename, O_RDWR | O_CREAT, mode);
1130     if (fd == -1) {
1131         CmiAbort("my_get_huge_pages: open filed");
1132     }
1133     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1134     if (ptr == MAP_FAILED) ptr = NULL;
1135 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1136     close(fd);
1137     unlink(filename);
1138     return ptr;
1139 }
1140
1141 void my_free_huge_pages(void *ptr, int size)
1142 {
1143 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1144     int ret = munmap(ptr, size);
1145     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1146 }
1147
1148 #endif
1149
1150 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1151 /* TODO: add any that are related */
1152 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1153
1154
1155 #include "machine-lrts.h"
1156 #include "machine-common-core.c"
1157
1158 /* Network progress function is used to poll the network when for
1159    messages. This flushes receive buffers on some  implementations*/
1160 #if CMK_MACHINE_PROGRESS_DEFINED
1161 void CmiMachineProgressImpl() {
1162 }
1163 #endif
1164
1165 static int SendBufferMsg(SMSG_QUEUE *queue);
1166 static void SendRdmaMsg();
1167 static void PumpNetworkSmsg();
1168 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1169 #if CQWRITE
1170 static void PumpCqWriteTransactions();
1171 #endif
1172 #if REMOTE_EVENT
1173 static void PumpRemoteTransactions();
1174 #endif
1175
1176 #if MACHINE_DEBUG_LOG
1177 FILE *debugLog = NULL;
1178 static CmiInt8 buffered_recv_msg = 0;
1179 int         lrts_smsg_success = 0;
1180 int         lrts_received_msg = 0;
1181 #endif
1182
1183 static void sweep_mempool(mempool_type *mptr)
1184 {
1185     int n = 0;
1186     block_header *current = &(mptr->block_head);
1187
1188     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1189     while( current!= NULL) {
1190         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1191         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1192     }
1193     printf("[n %d] sweep_mempool slot END.\n", myrank);
1194 }
1195
1196 inline
1197 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1198 {
1199     block_header *current = *from;
1200
1201     //while(register_memory_size>= MAX_REG_MEM)
1202     //{
1203         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1204             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1205
1206         *from = current;
1207         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1208         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1209         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1210     //}
1211     return GNI_RC_SUCCESS;
1212 }
1213
1214 inline 
1215 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1216 {
1217     gni_return_t status = GNI_RC_SUCCESS;
1218     //int size = GetMempoolsize(msg);
1219     //void *blockaddr = GetMempoolBlockPtr(msg);
1220     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1221    
1222     block_header *current = &(mptr->block_head);
1223     while(register_memory_size>= MAX_REG_MEM)
1224     {
1225         status = deregisterMemory(mptr, &current);
1226         if (status != GNI_RC_SUCCESS) break;
1227     }
1228     if(register_memory_size>= MAX_REG_MEM) return status;
1229
1230     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1231     while(1)
1232     {
1233         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1234         if(status == GNI_RC_SUCCESS)
1235         {
1236             break;
1237         }
1238         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1239         {
1240             CmiAbort("Memory registor for mempool fails\n");
1241         }
1242         else
1243         {
1244             status = deregisterMemory(mptr, &current);
1245             if (status != GNI_RC_SUCCESS) break;
1246         }
1247     }; 
1248     return status;
1249 }
1250
1251 inline 
1252 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1253 {
1254     static int rank = -1;
1255     int i;
1256     gni_return_t status;
1257     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1258     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1259     mempool_type *mptr;
1260
1261     status = registerFromMempool(mptr1, msg, size, t, cqh);
1262     if (status == GNI_RC_SUCCESS) return status;
1263 #if CMK_SMP 
1264     for (i=0; i<CmiMyNodeSize()+1; i++) {
1265       rank = (rank+1)%(CmiMyNodeSize()+1);
1266       mptr = CpvAccessOther(mempool, rank);
1267       if (mptr == mptr1) continue;
1268       status = registerFromMempool(mptr, msg, size, t, cqh);
1269       if (status == GNI_RC_SUCCESS) return status;
1270     }
1271 #endif
1272     return  GNI_RC_ERROR_RESOURCE;
1273 }
1274
1275 inline
1276 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1277 {
1278     MSG_LIST        *msg_tmp;
1279     MallocMsgList(msg_tmp);
1280     msg_tmp->destNode = destNode;
1281     msg_tmp->size   = size;
1282     msg_tmp->msg    = msg;
1283     msg_tmp->tag    = tag;
1284 #if CMK_WITH_STATS
1285     SMSG_CREATION(msg_tmp)
1286 #endif
1287 #if !CMK_SMP
1288     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1289         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1290         queue->smsg_head_index = destNode;
1291         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1292     }else
1293     {
1294         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1295         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1296     }
1297 #else
1298 #if ONE_SEND_QUEUE
1299     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1300 #else
1301 #if SMP_LOCKS
1302     CmiLock(queue->smsg_msglist_index[destNode].lock);
1303     if(queue->smsg_msglist_index[destNode].pushed == 0)
1304     {
1305         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1306     }
1307     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1308     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1309 #else
1310     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1311 #endif
1312 #endif
1313 #endif
1314 #if PRINT_SYH
1315     buffered_smsg_counter++;
1316 #endif
1317 }
1318
1319 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1320 {
1321     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1322 }
1323
1324 inline
1325 static void setup_smsg_connection(int destNode)
1326 {
1327     mdh_addr_list_t  *new_entry = 0;
1328     gni_post_descriptor_t *pd;
1329     gni_smsg_attr_t      *smsg_attr;
1330     gni_return_t status = GNI_RC_NOT_DONE;
1331     RDMA_REQUEST        *rdma_request_msg;
1332     
1333     if(smsg_available_slot == smsg_expand_slots)
1334     {
1335         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1336         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1337         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1338
1339         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1340             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1341             GNI_MEM_READWRITE,   
1342             -1,
1343             &(new_entry->mdh));
1344         smsg_available_slot = 0; 
1345         new_entry->next = smsg_dynamic_list;
1346         smsg_dynamic_list = new_entry;
1347     }
1348     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1349     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1350     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1351     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1352     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1353     smsg_attr->buff_size = smsg_memlen;
1354     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1355     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1356     smsg_local_attr_vec[destNode] = smsg_attr;
1357     smsg_available_slot++;
1358     MallocPostDesc(pd);
1359     pd->type            = GNI_POST_FMA_PUT;
1360     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1361     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1362     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1363     pd->length          = sizeof(gni_smsg_attr_t);
1364     pd->local_addr      = (uint64_t) smsg_attr;
1365     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1366     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1367     pd->src_cq_hndl     = rdma_tx_cqh;
1368     pd->rdma_mode       = 0;
1369     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1370     print_smsg_attr(smsg_attr);
1371     if(status == GNI_RC_ERROR_RESOURCE )
1372     {
1373         MallocRdmaRequest(rdma_request_msg);
1374         rdma_request_msg->destNode = destNode;
1375         rdma_request_msg->pd = pd;
1376         /* buffer this request */
1377     }
1378 #if PRINT_SYH
1379     if(status != GNI_RC_SUCCESS)
1380        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1381     else
1382         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1383 #endif
1384 }
1385
1386 /* useDynamicSMSG */
1387 inline 
1388 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1389 {
1390     gni_return_t status = GNI_RC_NOT_DONE;
1391
1392     if(mailbox_list->offset == mailbox_list->size)
1393     {
1394         dynamic_smsg_mailbox_t *new_mailbox_entry;
1395         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1396         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1397         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1398         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1399         new_mailbox_entry->offset = 0;
1400         
1401         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1402             new_mailbox_entry->size, smsg_rx_cqh,
1403             GNI_MEM_READWRITE,   
1404             -1,
1405             &(new_mailbox_entry->mem_hndl));
1406
1407         GNI_RC_CHECK("register", status);
1408         new_mailbox_entry->next = mailbox_list;
1409         mailbox_list = new_mailbox_entry;
1410     }
1411     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1412     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1413     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1414     local_smsg_attr->mbox_offset = mailbox_list->offset;
1415     mailbox_list->offset += smsg_memlen;
1416     local_smsg_attr->buff_size = smsg_memlen;
1417     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1418     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1419 }
1420
1421 /* useDynamicSMSG */
1422 inline 
1423 static int connect_to(int destNode)
1424 {
1425     gni_return_t status = GNI_RC_NOT_DONE;
1426     CmiAssert(smsg_connected_flag[destNode] == 0);
1427     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1428     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1429     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1430     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1431     
1432     CMI_GNI_LOCK(global_gni_lock)
1433     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1434     CMI_GNI_UNLOCK(global_gni_lock)
1435     if (status == GNI_RC_ERROR_RESOURCE) {
1436       /* possibly destNode is making connection at the same time */
1437       free(smsg_attr_vector_local[destNode]);
1438       smsg_attr_vector_local[destNode] = NULL;
1439       free(smsg_attr_vector_remote[destNode]);
1440       smsg_attr_vector_remote[destNode] = NULL;
1441       mailbox_list->offset -= smsg_memlen;
1442       return 0;
1443     }
1444     GNI_RC_CHECK("GNI_Post", status);
1445     smsg_connected_flag[destNode] = 1;
1446     return 1;
1447 }
1448
1449 inline 
1450 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1451 {
1452     unsigned int          remote_address;
1453     uint32_t              remote_id;
1454     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1455     gni_smsg_attr_t       *smsg_attr;
1456     gni_post_descriptor_t *pd;
1457     gni_post_state_t      post_state;
1458     char                  *real_data; 
1459
1460     if (useDynamicSMSG) {
1461         switch (smsg_connected_flag[destNode]) {
1462         case 0: 
1463             connect_to(destNode);         /* continue to case 1 */
1464         case 1:                           /* pending connection, do nothing */
1465             status = GNI_RC_NOT_DONE;
1466             if(inbuff ==0)
1467                 buffer_small_msgs(queue, msg, size, destNode, tag);
1468             return status;
1469         }
1470     }
1471 #if CMK_SMP
1472 #if ! ONE_SEND_QUEUE
1473     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1474 #endif
1475     {
1476 #else
1477     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1478     {
1479 #endif
1480         //CMI_GNI_LOCK(smsg_mailbox_lock)
1481         CMI_GNI_LOCK(default_tx_cq_lock)
1482 #if CMK_SMP_TRACE_COMMTHREAD
1483         int oldpe = -1;
1484         int oldeventid = -1;
1485         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1486         { 
1487             START_EVENT();
1488             if ( tag == SMALL_DATA_TAG)
1489                 real_data = (char*)msg; 
1490             else 
1491                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1492             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1493             TRACE_COMM_SET_COMM_MSGID(real_data);
1494         }
1495 #endif
1496 #if REMOTE_EVENT
1497         if (tag == LMSG_INIT_TAG) {
1498             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1499             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1500                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1501         }
1502         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1503 #endif
1504 #if     CMK_WITH_STATS
1505         SMSG_TRY_SEND(tag)
1506 #endif
1507 #if CMK_WITH_STATS
1508     double              creation_time;
1509     if (ptr == NULL)
1510         creation_time = CmiWallTimer();
1511     else
1512         creation_time = ptr->creation_time;
1513 #endif
1514
1515     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1516 #if CMK_SMP_TRACE_COMMTHREAD
1517         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1518 #endif
1519         CMI_GNI_UNLOCK(default_tx_cq_lock)
1520         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1521         if(status == GNI_RC_SUCCESS)
1522         {
1523 #if     CMK_WITH_STATS
1524             SMSG_SENT_DONE(creation_time,tag) 
1525 #endif
1526 #if CMK_SMP_TRACE_COMMTHREAD
1527             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1528             { 
1529                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1530             }
1531 #endif
1532         }else
1533             status = GNI_RC_ERROR_RESOURCE;
1534     }
1535     if(status != GNI_RC_SUCCESS && inbuff ==0)
1536         buffer_small_msgs(queue, msg, size, destNode, tag);
1537     return status;
1538 }
1539
1540 inline 
1541 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1542 {
1543     /* construct a control message and send */
1544     CONTROL_MSG         *control_msg_tmp;
1545     MallocControlMsg(control_msg_tmp);
1546     control_msg_tmp->source_addr = (uint64_t)msg;
1547     control_msg_tmp->seq_id    = seqno;
1548     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1549 #if REMOTE_EVENT
1550     control_msg_tmp->ack_index    =  -1;
1551 #endif
1552 #if     USE_LRTS_MEMPOOL
1553     if(size < BIG_MSG)
1554     {
1555         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1556     }
1557     else
1558     {
1559         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1560         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1561         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1562     }
1563 #else
1564     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1565 #endif
1566     return control_msg_tmp;
1567 }
1568
1569 #define BLOCKING_SEND_CONTROL    0
1570
1571 // Large message, send control to receiver, receiver register memory and do a GET, 
1572 // return 1 - send no success
1573 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1574 {
1575     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1576     uint32_t            vmdh_index  = -1;
1577     int                 size;
1578     int                 offset = 0;
1579     uint64_t            source_addr;
1580     int                 register_size; 
1581     void                *msg;
1582
1583     size    =   control_msg_tmp->total_length;
1584     source_addr = control_msg_tmp->source_addr;
1585     register_size = control_msg_tmp->length;
1586
1587 #if  USE_LRTS_MEMPOOL
1588     if( control_msg_tmp->seq_id == 0 ){
1589 #if BLOCKING_SEND_CONTROL
1590         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1591             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1592                 LrtsAdvanceCommunication(0);
1593         }
1594 #endif
1595         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1596         {
1597             msg = (void*)source_addr;
1598             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1599             {
1600                 if(!inbuff)
1601                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1602                 return GNI_RC_ERROR_NOMEM;
1603             }
1604             //register the corresponding mempool
1605             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1606             if(status == GNI_RC_SUCCESS)
1607             {
1608                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1609             }
1610         }else
1611         {
1612             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1613             status = GNI_RC_SUCCESS;
1614         }
1615         if(NoMsgInSend(source_addr))
1616             register_size = GetMempoolsize((void*)(source_addr));
1617         else
1618             register_size = 0;
1619     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1620     {
1621         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1622         source_addr += offset;
1623         size = control_msg_tmp->length;
1624 #if BLOCKING_SEND_CONTROL
1625         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1626             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1627                 LrtsAdvanceCommunication(0);
1628         }
1629 #endif
1630         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1631             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1632             {
1633                 if(!inbuff)
1634                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1635                 return GNI_RC_ERROR_NOMEM;
1636             }
1637             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1638             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1639         }
1640         else
1641         {
1642             status = GNI_RC_SUCCESS;
1643         }
1644         register_size = 0;  
1645     }
1646
1647     if(status == GNI_RC_SUCCESS)
1648     {
1649         status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1650         if(status == GNI_RC_SUCCESS)
1651         {
1652             buffered_send_msg += register_size;
1653             if(control_msg_tmp->seq_id == 0)
1654             {
1655                 IncreaseMsgInSend(source_addr);
1656             }
1657             FreeControlMsg(control_msg_tmp);
1658             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1659         }else
1660             status = GNI_RC_ERROR_RESOURCE;
1661
1662     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1663     {
1664         CmiAbort("Memory registor for large msg\n");
1665     }else 
1666     {
1667         status = GNI_RC_ERROR_NOMEM; 
1668         if(!inbuff)
1669             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1670     }
1671     return status;
1672 #else
1673     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1674     if(status == GNI_RC_SUCCESS)
1675     {
1676         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1677         if(status == GNI_RC_SUCCESS)
1678         {
1679             FreeControlMsg(control_msg_tmp);
1680         }
1681     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1682     {
1683         CmiAbort("Memory registor for large msg\n");
1684     }else 
1685     {
1686         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1687     }
1688     return status;
1689 #endif
1690 }
1691
1692 inline void LrtsPrepareEnvelope(char *msg, int size)
1693 {
1694     CmiSetMsgSize(msg, size);
1695     CMI_SET_CHECKSUM(msg, size);
1696 }
1697
1698 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1699 {
1700     gni_return_t        status  =   GNI_RC_SUCCESS;
1701     uint8_t tag;
1702     CONTROL_MSG         *control_msg_tmp;
1703     int                 oob = ( mode & OUT_OF_BAND);
1704     SMSG_QUEUE          *queue;
1705
1706     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1707 #if CMK_USE_OOB
1708     queue = oob? &smsg_oob_queue : &smsg_queue;
1709 #else
1710     queue = &smsg_queue;
1711 #endif
1712
1713     LrtsPrepareEnvelope(msg, size);
1714
1715 #if PRINT_SYH
1716     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1717 #endif 
1718 #if CMK_SMP 
1719     if(size <= SMSG_MAX_MSG)
1720         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1721     else if (size < BIG_MSG) {
1722         control_msg_tmp =  construct_control_msg(size, msg, 0);
1723         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1724     }
1725     else {
1726           CmiSetMsgSeq(msg, 0);
1727           control_msg_tmp =  construct_control_msg(size, msg, 1);
1728           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1729     }
1730 #else   //non-smp, smp(worker sending)
1731     if(size <= SMSG_MAX_MSG)
1732     {
1733         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1734             CmiFree(msg);
1735     }
1736     else if (size < BIG_MSG) {
1737         control_msg_tmp =  construct_control_msg(size, msg, 0);
1738         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1739     }
1740     else {
1741 #if     USE_LRTS_MEMPOOL
1742         CmiSetMsgSeq(msg, 0);
1743         control_msg_tmp =  construct_control_msg(size, msg, 1);
1744         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1745 #else
1746         control_msg_tmp =  construct_control_msg(size, msg, 0);
1747         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1748 #endif
1749     }
1750 #endif
1751     return 0;
1752 }
1753
1754 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1755 {
1756   int i;
1757 #if CMK_BROADCAST_USE_CMIREFERENCE
1758   for(i=0;i<npes;i++) {
1759     if (pes[i] == CmiMyPe())
1760       CmiSyncSend(pes[i], len, msg);
1761     else {
1762       CmiReference(msg);
1763       CmiSyncSendAndFree(pes[i], len, msg);
1764     }
1765   }
1766 #else
1767   for(i=0;i<npes;i++) {
1768     CmiSyncSend(pes[i], len, msg);
1769   }
1770 #endif
1771 }
1772
1773 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1774 {
1775   /* A better asynchronous implementation may be wanted, but at least it works */
1776   CmiSyncListSendFn(npes, pes, len, msg);
1777   return (CmiCommHandle) 0;
1778 }
1779
1780 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1781 {
1782   if (npes == 1) {
1783       CmiSyncSendAndFree(pes[0], len, msg);
1784       return;
1785   }
1786 #if CMK_PERSISTENT_COMM
1787   if (CpvAccess(phs) && len > 1024) {
1788       int i;
1789       for(i=0;i<npes;i++) {
1790         if (pes[i] == CmiMyPe())
1791           CmiSyncSend(pes[i], len, msg);
1792         else {
1793           CmiReference(msg);
1794           CmiSyncSendAndFree(pes[i], len, msg);
1795         }
1796       }
1797       CmiFree(msg);
1798       return;
1799   }
1800 #endif
1801   
1802 #if CMK_BROADCAST_USE_CMIREFERENCE
1803   if (npes == 1) {
1804     CmiSyncSendAndFree(pes[0], len, msg);
1805     return;
1806   }
1807   CmiSyncListSendFn(npes, pes, len, msg);
1808   CmiFree(msg);
1809 #else
1810   int i;
1811   for(i=0;i<npes-1;i++) {
1812     CmiSyncSend(pes[i], len, msg);
1813   }
1814   if (npes>0)
1815     CmiSyncSendAndFree(pes[npes-1], len, msg);
1816   else 
1817     CmiFree(msg);
1818 #endif
1819 }
1820
1821 static void    PumpDatagramConnection();
1822 static      int         event_SetupConnect = 111;
1823 static      int         event_PumpSmsg = 222 ;
1824 static      int         event_PumpTransaction = 333;
1825 static      int         event_PumpRdmaTransaction = 444;
1826 static      int         event_SendBufferSmsg = 444;
1827 static      int         event_SendFmaRdmaMsg = 555;
1828 static      int         event_AdvanceCommunication = 666;
1829
1830 static void registerUserTraceEvents() {
1831 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1832     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1833     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1834     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1835     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1836     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1837     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1838     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1839 #endif
1840 }
1841
1842 static void ProcessDeadlock()
1843 {
1844     static CmiUInt8 *ptr = NULL;
1845     static CmiUInt8  last = 0, mysum, sum;
1846     static int count = 0;
1847     gni_return_t status;
1848     int i;
1849
1850 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1851 //sweep_mempool(CpvAccess(mempool));
1852     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1853     mysum = smsg_send_count + smsg_recv_count;
1854     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1855     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1856     GNI_RC_CHECK("PMI_Allgather", status);
1857     sum = 0;
1858     for (i=0; i<mysize; i++)  sum+= ptr[i];
1859     if (last == 0 || sum == last) 
1860         count++;
1861     else
1862         count = 0;
1863     last = sum;
1864     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1865     if (count == 2) { 
1866         /* detected twice, it is a real deadlock */
1867         if (myrank == 0)  {
1868             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1869             CmiAbort("Fatal> Deadlock detected.");
1870         }
1871
1872     }
1873     _detected_hang = 0;
1874 }
1875
1876 static void CheckProgress()
1877 {
1878     if (smsg_send_count == last_smsg_send_count &&
1879         smsg_recv_count == last_smsg_recv_count ) 
1880     {
1881         _detected_hang = 1;
1882 #if !CMK_SMP
1883         if (_detected_hang) ProcessDeadlock();
1884 #endif
1885
1886     }
1887     else {
1888         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1889         last_smsg_send_count = smsg_send_count;
1890         last_smsg_recv_count = smsg_recv_count;
1891         _detected_hang = 0;
1892     }
1893 }
1894
1895 static void set_limit()
1896 {
1897     //if (!user_set_flag && CmiMyRank() == 0) {
1898     if (CmiMyRank() == 0) {
1899         int mynode = CmiPhysicalNodeID(CmiMyPe());
1900         int numpes = CmiNumPesOnPhysicalNode(mynode);
1901         int numprocesses = numpes / CmiMyNodeSize();
1902         MAX_REG_MEM  = _totalmem / numprocesses;
1903         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1904         if (CmiMyPe() == 0)
1905            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1906         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1907         {
1908              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1909              CmiAbort("memory registration\n");
1910         }
1911     }
1912 }
1913
1914 void LrtsPostCommonInit(int everReturn)
1915 {
1916 #if CMK_DIRECT
1917     CmiDirectInit();
1918 #endif
1919 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1920     CpvInitialize(double, projTraceStart);
1921     /* only PE 0 needs to care about registration (to generate sts file). */
1922     //if (CmiMyPe() == 0) 
1923     {
1924         registerMachineUserEventsFunction(&registerUserTraceEvents);
1925     }
1926 #endif
1927
1928 #if CMK_SMP
1929     CmiIdleState *s=CmiNotifyGetState();
1930     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1931     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1932 #else
1933     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1934     if (useDynamicSMSG)
1935     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1936 #endif
1937
1938 #if ! LARGEPAGE
1939     if (_checkProgress)
1940 #if CMK_SMP
1941     if (CmiMyRank() == 0)
1942 #endif
1943     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1944 #endif
1945  
1946 #if !LARGEPAGE
1947     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1948 #endif
1949 }
1950
1951 /* this is called by worker thread */
1952 void LrtsPostNonLocal(){
1953 #if CMK_SMP_TRACE_COMMTHREAD
1954     double startT, endT;
1955 #endif
1956 #if MULTI_THREAD_SEND
1957     if(mysize == 1) return;
1958 #if CMK_SMP_TRACE_COMMTHREAD
1959     traceEndIdle();
1960 #endif
1961
1962 #if CMK_SMP_TRACE_COMMTHREAD
1963     startT = CmiWallTimer();
1964 #endif
1965
1966 #if CMK_WORKER_SINGLE_TASK
1967     if (CmiMyRank() % 6 == 0)
1968 #endif
1969     PumpNetworkSmsg();
1970
1971 #if CMK_WORKER_SINGLE_TASK
1972     if (CmiMyRank() % 6 == 1)
1973 #endif
1974     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1975
1976 #if CMK_WORKER_SINGLE_TASK
1977     if (CmiMyRank() % 6 == 2)
1978 #endif
1979     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1980
1981 #if REMOTE_EVENT
1982 #if CMK_WORKER_SINGLE_TASK
1983     if (CmiMyRank() % 6 == 3)
1984 #endif
1985     PumpRemoteTransactions();
1986 #endif
1987
1988 #if CMK_USE_OOB
1989     if (SendBufferMsg(&smsg_oob_queue) == 1)
1990 #endif
1991     {
1992 #if CMK_WORKER_SINGLE_TASK
1993     if (CmiMyRank() % 6 == 4)
1994 #endif
1995         SendBufferMsg(&smsg_queue);
1996     }
1997
1998 #if CMK_WORKER_SINGLE_TASK
1999     if (CmiMyRank() % 6 == 5)
2000 #endif
2001     SendRdmaMsg();
2002
2003 #if CMK_SMP_TRACE_COMMTHREAD
2004     endT = CmiWallTimer();
2005     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2006 #endif
2007 #if CMK_SMP_TRACE_COMMTHREAD
2008     traceBeginIdle();
2009 #endif
2010 #endif
2011 }
2012
2013 /* useDynamicSMSG */
2014 static void    PumpDatagramConnection()
2015 {
2016     uint32_t          remote_address;
2017     uint32_t          remote_id;
2018     gni_return_t status;
2019     gni_post_state_t  post_state;
2020     uint64_t          datagram_id;
2021     int i;
2022
2023    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2024    {
2025        if (datagram_id >= mysize) {           /* bound endpoint */
2026            int pe = datagram_id - mysize;
2027            CMI_GNI_LOCK(global_gni_lock)
2028            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2029            CMI_GNI_UNLOCK(global_gni_lock)
2030            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2031            {
2032                CmiAssert(remote_id == pe);
2033                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2034                GNI_RC_CHECK("Dynamic SMSG Init", status);
2035 #if PRINT_SYH
2036                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
2037 #endif
2038                CmiAssert(smsg_connected_flag[pe] == 1);
2039                smsg_connected_flag[pe] = 2;
2040            }
2041        }
2042        else {         /* unbound ep */
2043            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2044            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2045            {
2046                CmiAssert(remote_id<mysize);
2047                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2048                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2049                GNI_RC_CHECK("Dynamic SMSG Init", status);
2050 #if PRINT_SYH
2051                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
2052 #endif
2053                smsg_connected_flag[remote_id] = 2;
2054
2055                alloc_smsg_attr(&send_smsg_attr);
2056                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2057                GNI_RC_CHECK("post unbound datagram", status);
2058            }
2059        }
2060    }
2061 }
2062
2063 /* pooling CQ to receive network message */
2064 static void PumpNetworkRdmaMsgs()
2065 {
2066     gni_cq_entry_t      event_data;
2067     gni_return_t        status;
2068
2069 }
2070
2071 inline 
2072 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
2073 {
2074     RDMA_REQUEST        *rdma_request_msg;
2075     MallocRdmaRequest(rdma_request_msg);
2076     rdma_request_msg->destNode = inst_id;
2077     rdma_request_msg->pd = pd;
2078 #if REMOTE_EVENT
2079     rdma_request_msg->ack_index = ack_index;
2080 #endif
2081 #if CMK_SMP
2082     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2083 #else
2084     if(sendRdmaBuf == 0)
2085     {
2086         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2087     }else{
2088         sendRdmaTail->next = rdma_request_msg;
2089         sendRdmaTail =  rdma_request_msg;
2090     }
2091 #endif
2092
2093 }
2094
2095 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2096
2097 static void PumpNetworkSmsg()
2098 {
2099     uint64_t            inst_id;
2100     int                 ret;
2101     gni_cq_entry_t      event_data;
2102     gni_return_t        status, status2;
2103     void                *header;
2104     uint8_t             msg_tag;
2105     int                 msg_nbytes;
2106     void                *msg_data;
2107     gni_mem_handle_t    msg_mem_hndl;
2108     gni_smsg_attr_t     *smsg_attr;
2109     gni_smsg_attr_t     *remote_smsg_attr;
2110     int                 init_flag;
2111     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2112     uint64_t            source_addr;
2113     SMSG_QUEUE         *queue = &smsg_queue;
2114 #if   CMK_DIRECT
2115     cmidirectMsg        *direct_msg;
2116 #endif
2117 #if CMI_EXERT_RECV_CAP
2118     int                  recv_cnt = 0;
2119 #endif
2120     while(1)
2121     {
2122         CMI_GNI_LOCK(smsg_rx_cq_lock)
2123         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2124         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2125         if(status != GNI_RC_SUCCESS)
2126             break;
2127         inst_id = GNI_CQ_GET_INST_ID(event_data);
2128 #if REMOTE_EVENT
2129         inst_id = GET_RANK(inst_id);      /* important */
2130 #endif
2131         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2132 #if PRINT_SYH
2133         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2134 #endif
2135         if (useDynamicSMSG) {
2136             /* subtle: smsg may come before connection is setup */
2137             while (smsg_connected_flag[inst_id] != 2) 
2138                PumpDatagramConnection();
2139         }
2140         msg_tag = GNI_SMSG_ANY_TAG;
2141         while(1) {
2142             CMI_GNI_LOCK(smsg_mailbox_lock)
2143             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2144             if (status != GNI_RC_SUCCESS)
2145             {
2146                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2147                 break;
2148             }
2149 #if PRINT_SYH
2150             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2151 #endif
2152             /* copy msg out and then put into queue (small message) */
2153             switch (msg_tag) {
2154             case SMALL_DATA_TAG:
2155             {
2156                 START_EVENT();
2157                 msg_nbytes = CmiGetMsgSize(header);
2158                 msg_data    = CmiAlloc(msg_nbytes);
2159                 memcpy(msg_data, (char*)header, msg_nbytes);
2160                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2161                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2162                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
2163                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2164                 handleOneRecvedMsg(msg_nbytes, msg_data);
2165                 break;
2166             }
2167             case LMSG_INIT_TAG:
2168             {
2169 #if MULTI_THREAD_SEND
2170                 MallocControlMsg(control_msg_tmp);
2171                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2172                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2173                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2174                 getLargeMsgRequest(control_msg_tmp, inst_id);
2175                 FreeControlMsg(control_msg_tmp);
2176 #else
2177                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2178                 getLargeMsgRequest(header, inst_id);
2179                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2180 #endif
2181                 break;
2182             }
2183             case ACK_TAG:   //msg fit into mempool
2184             {
2185                 /* Get is done, release message . Now put is not used yet*/
2186                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2187                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2188                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2189 #if ! USE_LRTS_MEMPOOL
2190                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2191 #else
2192                 DecreaseMsgInSend(msg);
2193 #endif
2194                 if(NoMsgInSend(msg))
2195                     buffered_send_msg -= GetMempoolsize(msg);
2196                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2197                 CmiFree(msg);
2198                 break;
2199             }
2200             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2201             {
2202 #if MULTI_THREAD_SEND
2203                 MallocControlMsg(header_tmp);
2204                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2205                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2206 #else
2207                 header_tmp = (CONTROL_MSG *) header;
2208 #endif
2209                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2210                 void *msg = (void*)(header_tmp->source_addr);
2211                 int cur_seq = CmiGetMsgSeq(msg);
2212                 int offset = ONE_SEG*(cur_seq+1);
2213                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2214                 buffered_send_msg -= header_tmp->length;
2215                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2216                 if (remain_size < 0) remain_size = 0;
2217                 CmiSetMsgSize(msg, remain_size);
2218                 if(remain_size <= 0) //transaction done
2219                 {
2220                     CmiFree(msg);
2221                 }else if (header_tmp->total_length > offset)
2222                 {
2223                     CmiSetMsgSeq(msg, cur_seq+1);
2224                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2225                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2226                     //send next seg
2227                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2228                          // pipelining
2229                     if (header_tmp->seq_id == 1) {
2230                       int i;
2231                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2232                         int seq = cur_seq+i+2;
2233                         CmiSetMsgSeq(msg, seq-1);
2234                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2235                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2236                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2237                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2238                       }
2239                     }
2240                 }
2241 #if MULTI_THREAD_SEND
2242                 FreeControlMsg(header_tmp);
2243 #else
2244                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2245 #endif
2246                 break;
2247             }
2248 #if CMK_PERSISTENT_COMM
2249             case PUT_DONE_TAG:  {   //persistent message
2250                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2251                 int size = ((CONTROL_MSG *) header)->length;
2252                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2253                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2254                 CmiReference(msg);
2255                 CMI_CHECK_CHECKSUM(msg, size);
2256                 handleOneRecvedMsg(size, msg); 
2257 #if PRINT_SYH
2258                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2259 #endif
2260                 break;
2261             }
2262 #endif
2263 #if CMK_DIRECT
2264             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2265                 //create a trigger message
2266                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2267                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2268                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2269                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2270                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2271                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2272                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2273                 break;
2274 #endif
2275             default:
2276                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2277                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2278                 printf("weird tag problem\n");
2279                 CmiAbort("Unknown tag\n");
2280             }               // end switch
2281 #if PRINT_SYH
2282             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2283 #endif
2284             smsg_recv_count ++;
2285             msg_tag = GNI_SMSG_ANY_TAG;
2286 #if CMI_EXERT_RECV_CAP
2287             if (status == GNI_RC_SUCCESS && ++recv_cnt == RECV_CAP) return;
2288 #endif
2289         } //endwhile GNI_SmsgGetNextWTag
2290     }   //end while GetEvent
2291     if(status == GNI_RC_ERROR_RESOURCE)
2292     {
2293         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2294         GNI_RC_CHECK("Smsg_rx_cq full", status);
2295     }
2296 }
2297
2298 static void printDesc(gni_post_descriptor_t *pd)
2299 {
2300     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2301 }
2302
2303 #if CQWRITE
2304 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2305 {
2306     gni_post_descriptor_t *pd;
2307     gni_return_t        status = GNI_RC_SUCCESS;
2308     
2309     MallocPostDesc(pd);
2310     pd->type = GNI_POST_CQWRITE;
2311     pd->cq_mode = GNI_CQMODE_SILENT;
2312     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2313     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2314     pd->cqwrite_value = data;
2315     pd->remote_mem_hndl = mem_hndl;
2316     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2317     GNI_RC_CHECK("GNI_PostCqWrite", status);
2318 }
2319 #endif
2320
2321 // register memory for a message
2322 // return mem handle
2323 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2324 {
2325     gni_return_t status = GNI_RC_SUCCESS;
2326
2327     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2328
2329 #if CMK_PERSISTENT_COMM
2330       // persistent message is always registered
2331       // BIG_MSG small pieces do not have malloc chunk header
2332     if ((seqno <= 1 || seqno == PERSIST_SEQ) && !IsMemHndlZero(MEMHFIELD(msg))) {
2333         *memh = MEMHFIELD(msg);
2334         return GNI_RC_SUCCESS;
2335     }
2336 #endif
2337     if(seqno == 0 
2338 #if CMK_PERSISTENT_COMM
2339          || seqno == PERSIST_SEQ
2340 #endif
2341       )
2342     {
2343         if(IsMemHndlZero((GetMemHndl(msg))))
2344         {
2345             msg = (void*)(msg);
2346             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2347             if(status == GNI_RC_SUCCESS)
2348                 *memh = GetMemHndl(msg);
2349         }
2350         else {
2351             *memh = GetMemHndl(msg);
2352         }
2353     }
2354     else {
2355         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2356         status = registerMemory(msg, size, memh, NULL); 
2357     }
2358     return status;
2359 }
2360
2361 // for BIG_MSG called on receiver side for receiving control message
2362 // LMSG_INIT_TAG
2363 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2364 {
2365 #if     USE_LRTS_MEMPOOL
2366     CONTROL_MSG         *request_msg;
2367     gni_return_t        status = GNI_RC_SUCCESS;
2368     void                *msg_data;
2369     gni_post_descriptor_t *pd;
2370     gni_mem_handle_t    msg_mem_hndl;
2371     int                 size, transaction_size, offset = 0;
2372     size_t              register_size = 0;
2373
2374     // initial a get to transfer data from the sender side */
2375     request_msg = (CONTROL_MSG *) header;
2376     size = request_msg->total_length;
2377     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2378     MallocPostDesc(pd);
2379 #if CMK_WITH_STATS 
2380     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2381 #endif
2382     if(request_msg->seq_id < 2)   {
2383 #if CMK_SMP_TRACE_COMMTHREAD 
2384         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2385 #endif
2386         msg_data = CmiAlloc(size);
2387         CmiSetMsgSeq(msg_data, 0);
2388         _MEMCHECK(msg_data);
2389     }
2390     else {
2391         offset = ONE_SEG*(request_msg->seq_id-1);
2392         msg_data = (char*)request_msg->dest_addr + offset;
2393     }
2394    
2395     pd->cqwrite_value = request_msg->seq_id;
2396
2397     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2398     SetMemHndlZero(pd->local_mem_hndl);
2399     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2400     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2401         if(NoMsgInRecv( (void*)(msg_data)))
2402             register_size = GetMempoolsize((void*)(msg_data));
2403     }
2404
2405     pd->first_operand = ALIGN64(size);                   //  total length
2406
2407     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2408         pd->type            = GNI_POST_FMA_GET;
2409     else
2410         pd->type            = GNI_POST_RDMA_GET;
2411     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2412     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2413     pd->length          = transaction_size;
2414     pd->local_addr      = (uint64_t) msg_data;
2415     pd->remote_addr     = request_msg->source_addr + offset;
2416     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2417     pd->src_cq_hndl     = rdma_tx_cqh;
2418     pd->rdma_mode       = 0;
2419     pd->amo_cmd         = 0;
2420
2421     //memory registration success
2422     if(status == GNI_RC_SUCCESS)
2423     {
2424         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2425         CMI_GNI_LOCK(lock)
2426 #if REMOTE_EVENT
2427         if( request_msg->seq_id == 0)
2428         {
2429             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2430             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2431             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2432         }
2433 #endif
2434
2435 #if CMK_WITH_STATS
2436         RDMA_TRY_SEND(pd->type)
2437 #endif
2438         if(pd->type == GNI_POST_RDMA_GET) 
2439         {
2440             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2441         }
2442         else
2443         {
2444             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2445         }
2446         CMI_GNI_UNLOCK(lock)
2447
2448         if(status == GNI_RC_SUCCESS )
2449         {
2450             if(pd->cqwrite_value == 0)
2451             {
2452 #if MACHINE_DEBUG_LOG
2453                 buffered_recv_msg += register_size;
2454                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2455 #endif
2456                 IncreaseMsgInRecv(msg_data);
2457 #if CMK_SMP_TRACE_COMMTHREAD 
2458                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2459 #endif
2460             }
2461 #if  CMK_WITH_STATS
2462             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2463             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2464 #endif
2465         }
2466     }else
2467     {
2468         SetMemHndlZero((pd->local_mem_hndl));
2469     }
2470     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2471     {
2472 #if REMOTE_EVENT
2473         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2474 #else
2475         bufferRdmaMsg(inst_id, pd, -1); 
2476 #endif
2477     }else if (status != GNI_RC_SUCCESS) {
2478         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2479         GNI_RC_CHECK("GetLargeAFter posting", status);
2480     }
2481 #else
2482     CONTROL_MSG         *request_msg;
2483     gni_return_t        status;
2484     void                *msg_data;
2485     gni_post_descriptor_t *pd;
2486     RDMA_REQUEST        *rdma_request_msg;
2487     gni_mem_handle_t    msg_mem_hndl;
2488     //int source;
2489     // initial a get to transfer data from the sender side */
2490     request_msg = (CONTROL_MSG *) header;
2491     msg_data = CmiAlloc(request_msg->length);
2492     _MEMCHECK(msg_data);
2493
2494     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2495
2496     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2497     {
2498         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2499     }
2500
2501     MallocPostDesc(pd);
2502     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2503         pd->type            = GNI_POST_FMA_GET;
2504     else
2505         pd->type            = GNI_POST_RDMA_GET;
2506     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2507     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2508     pd->length          = ALIGN64(request_msg->length);
2509     pd->local_addr      = (uint64_t) msg_data;
2510     pd->remote_addr     = request_msg->source_addr;
2511     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2512     pd->src_cq_hndl     = rdma_tx_cqh;
2513     pd->rdma_mode       = 0;
2514     pd->amo_cmd         = 0;
2515
2516     //memory registration successful
2517     if(status == GNI_RC_SUCCESS)
2518     {
2519         pd->local_mem_hndl  = msg_mem_hndl;
2520        
2521         if(pd->type == GNI_POST_RDMA_GET) 
2522         {
2523             CMI_GNI_LOCK(rdma_tx_cq_lock)
2524             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2525             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2526         }
2527         else
2528         {
2529             CMI_GNI_LOCK(default_tx_cq_lock)
2530             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2531             CMI_GNI_UNLOCK(default_tx_cq_lock)
2532         }
2533
2534     }else
2535     {
2536         SetMemHndlZero(pd->local_mem_hndl);
2537     }
2538     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2539     {
2540         MallocRdmaRequest(rdma_request_msg);
2541         rdma_request_msg->next = 0;
2542         rdma_request_msg->destNode = inst_id;
2543         rdma_request_msg->pd = pd;
2544         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2545     }else {
2546         GNI_RC_CHECK("AFter posting", status);
2547     }
2548 #endif
2549 }
2550
2551 #if CQWRITE
2552 static void PumpCqWriteTransactions()
2553 {
2554
2555     gni_cq_entry_t          ev;
2556     gni_return_t            status;
2557     void                    *msg;  
2558     int                     msg_size;
2559     while(1) {
2560         //CMI_GNI_LOCK(my_cq_lock) 
2561         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2562         //CMI_GNI_UNLOCK(my_cq_lock)
2563         if(status != GNI_RC_SUCCESS) break;
2564         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2565 #if CMK_PERSISTENT_COMM
2566 #if PRINT_SYH
2567         printf(" %d CQ write event %p\n", myrank, msg);
2568 #endif
2569         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2570 #if PRINT_SYH
2571             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2572 #endif
2573             CmiReference(msg);
2574             msg_size = CmiGetMsgSize(msg);
2575             CMI_CHECK_CHECKSUM(msg, msg_size);
2576             handleOneRecvedMsg(msg_size, msg); 
2577             continue;
2578         }
2579 #endif
2580         DecreaseMsgInSend(msg);
2581 #if ! USE_LRTS_MEMPOOL
2582        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2583 #else
2584         DecreaseMsgInSend(msg);
2585 #endif
2586         if(NoMsgInSend(msg))
2587             buffered_send_msg -= GetMempoolsize(msg);
2588         CmiFree(msg);
2589     };
2590     if(status == GNI_RC_ERROR_RESOURCE)
2591     {
2592         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2593     }
2594 }
2595 #endif
2596
2597 #if REMOTE_EVENT
2598 static void PumpRemoteTransactions()
2599 {
2600     gni_cq_entry_t          ev;
2601     gni_return_t            status;
2602     void                    *msg;   
2603     int                     inst_id, index, type, size;
2604
2605     while(1) {
2606         CMI_GNI_LOCK(global_gni_lock)
2607         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2608         CMI_GNI_UNLOCK(global_gni_lock)
2609
2610         if(status != GNI_RC_SUCCESS) break;
2611
2612         inst_id = GNI_CQ_GET_INST_ID(ev);
2613         index = GET_INDEX(inst_id);
2614         type = GET_TYPE(inst_id);
2615
2616         switch (type) {
2617         case 0:    // ACK
2618             CmiAssert(GetIndexType(ackPool, index) == 1);
2619             msg = GetIndexAddress(ackPool, index);
2620             DecreaseMsgInSend(msg);
2621 #if ! USE_LRTS_MEMPOOL
2622            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2623 #else
2624             DecreaseMsgInSend(msg);
2625 #endif
2626             if(NoMsgInSend(msg))
2627                 buffered_send_msg -= GetMempoolsize(msg);
2628             CmiFree(msg);
2629             IndexPool_freeslot(&ackPool, index);
2630             break;
2631 #if CMK_PERSISTENT_COMM
2632         case 1:  {    // PERSISTENT
2633             CmiAssert(GetIndexType(persistPool, index) == 2);
2634             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2635             msg = slot->destBuf[0].destAddress;
2636             size = CmiGetMsgSize(msg);
2637             CmiReference(msg);
2638             CMI_CHECK_CHECKSUM(msg, size);
2639             handleOneRecvedMsg(size, msg); 
2640             break;
2641             }
2642 #endif
2643         default:
2644             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2645             CmiAbort("PumpRemoteTransactions: unknown type");
2646         }
2647     }
2648     if(status == GNI_RC_ERROR_RESOURCE)
2649     {
2650         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2651     }
2652 }
2653 #endif
2654
2655 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2656 {
2657     gni_cq_entry_t          ev;
2658     gni_return_t            status;
2659     uint64_t                type, inst_id;
2660     gni_post_descriptor_t   *tmp_pd;
2661     MSG_LIST                *ptr;
2662     CONTROL_MSG             *ack_msg_tmp;
2663     ACK_MSG                 *ack_msg;
2664     uint8_t                 msg_tag;
2665 #if CMK_DIRECT
2666     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2667 #endif
2668     SMSG_QUEUE         *queue = &smsg_queue;
2669
2670     while(1) {
2671         CMI_GNI_LOCK(my_cq_lock) 
2672         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2673         CMI_GNI_UNLOCK(my_cq_lock)
2674         if(status != GNI_RC_SUCCESS) break;
2675         
2676         type = GNI_CQ_GET_TYPE(ev);
2677         if (type == GNI_CQ_EVENT_TYPE_POST)
2678         {
2679             inst_id     = GNI_CQ_GET_INST_ID(ev);
2680 #if PRINT_SYH
2681             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2682 #endif
2683             CMI_GNI_LOCK(my_cq_lock)
2684             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2685             CMI_GNI_UNLOCK(my_cq_lock)
2686
2687             switch (tmp_pd->type) {
2688 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2689             case GNI_POST_RDMA_PUT:
2690 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2691                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2692 #endif
2693             case GNI_POST_FMA_PUT:
2694                 if(tmp_pd->amo_cmd == 1) {
2695 #if CMK_DIRECT
2696                     //sender ACK to receiver to trigger it is done
2697                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2698                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2699                     msg_tag = DIRECT_PUT_DONE_TAG;
2700 #endif
2701                 }
2702                 else {
2703                     CmiFree((void *)tmp_pd->local_addr);
2704 #if REMOTE_EVENT
2705                     FreePostDesc(tmp_pd);
2706                     continue;
2707 #elif CQWRITE
2708                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2709                     FreePostDesc(tmp_pd);
2710                     continue;
2711 #else
2712                     MallocControlMsg(ack_msg_tmp);
2713                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2714                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2715                     ack_msg_tmp->length  = tmp_pd->length;
2716                     msg_tag = PUT_DONE_TAG;
2717 #endif
2718                 }
2719                 break;
2720 #endif
2721             case GNI_POST_RDMA_GET:
2722             case GNI_POST_FMA_GET:  {
2723 #if  ! USE_LRTS_MEMPOOL
2724                 MallocControlMsg(ack_msg_tmp);
2725                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2726                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2727                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2728                 msg_tag = ACK_TAG;  
2729 #else
2730 #if CMK_WITH_STATS
2731                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2732 #endif
2733                 int seq_id = tmp_pd->cqwrite_value;
2734                 if(seq_id > 0)      // BIG_MSG
2735                 {
2736                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2737                     MallocControlMsg(ack_msg_tmp);
2738                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2739                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2740                     ack_msg_tmp->seq_id = seq_id;
2741                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2742                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2743                     ack_msg_tmp->length = tmp_pd->length;
2744                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2745                     msg_tag = BIG_MSG_TAG; 
2746                 } 
2747                 else
2748                 {
2749                     msg_tag = ACK_TAG; 
2750 #if  !REMOTE_EVENT && !CQWRITE
2751                     MallocAckMsg(ack_msg);
2752                     ack_msg->source_addr = tmp_pd->remote_addr;
2753 #endif
2754                 }
2755 #endif
2756                 break;
2757             }
2758             case  GNI_POST_CQWRITE:
2759                    FreePostDesc(tmp_pd);
2760                    continue;
2761             default:
2762                 CmiPrintf("type=%d\n", tmp_pd->type);
2763                 CmiAbort("PumpLocalTransactions: unknown type!");
2764             }      /* end of switch */
2765
2766 #if CMK_DIRECT
2767             if (tmp_pd->amo_cmd == 1) {
2768                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2769                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2770             }
2771             else
2772 #endif
2773             if (msg_tag == ACK_TAG) {
2774 #if !REMOTE_EVENT
2775 #if   !CQWRITE
2776                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2777                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2778 #else
2779                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2780 #endif
2781 #endif
2782             }
2783             else {
2784                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2785                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2786             }
2787 #if CMK_PERSISTENT_COMM
2788             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2789 #endif
2790             {
2791                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2792 #if PRINT_SYH
2793                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2794 #endif
2795                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2796                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2797
2798                     START_EVENT();
2799                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2800                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2801 #if MACHINE_DEBUG_LOG
2802                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2803                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2804                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2805 #endif
2806                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2807                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2808                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2809                 }else if(msg_tag == BIG_MSG_TAG){
2810                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2811                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2812                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2813                         START_EVENT();
2814 #if PRINT_SYH
2815                         printf("Pipeline msg done [%d]\n", myrank);
2816 #endif
2817 #if                 CMK_SMP_TRACE_COMMTHREAD
2818                         if( tmp_pd->cqwrite_value == 1)
2819                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2820 #endif
2821                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2822                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2823                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2824                     }
2825                 }
2826             }
2827             FreePostDesc(tmp_pd);
2828         }
2829     } //end while
2830     if(status == GNI_RC_ERROR_RESOURCE)
2831     {
2832         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2833         GNI_RC_CHECK("Smsg_tx_cq full", status);
2834     }
2835 }
2836
2837 static void  SendRdmaMsg()
2838 {
2839     gni_return_t            status = GNI_RC_SUCCESS;
2840     gni_mem_handle_t        msg_mem_hndl;
2841     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2842     RDMA_REQUEST            *pre = 0;
2843     uint64_t                register_size = 0;
2844     void                    *msg;
2845     int                     i;
2846 #if CMK_SMP
2847     int len = PCQueueLength(sendRdmaBuf);
2848     for (i=0; i<len; i++)
2849     {
2850         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2851         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2852         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2853         if (ptr == NULL) break;
2854 #else
2855     ptr = sendRdmaBuf;
2856     while (ptr!=0)
2857     {
2858 #endif 
2859         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2860         gni_post_descriptor_t *pd = ptr->pd;
2861         
2862         msg = (void*)(pd->local_addr);
2863         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2864         register_size = 0;
2865         if(pd->cqwrite_value == 0) {
2866             if(NoMsgInRecv(msg))
2867                 register_size = GetMempoolsize(msg);
2868         }
2869
2870         if(status == GNI_RC_SUCCESS)        //mem register good
2871         {
2872             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2873             CMI_GNI_LOCK(lock);
2874 #if REMOTE_EVENT
2875             if( pd->cqwrite_value == 0) {
2876                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2877                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2878                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2879             }
2880 #if CMK_PERSISTENT_COMM
2881             else if (pd->cqwrite_value == PERSIST_SEQ) {
2882                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2883                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, PERSIST_EVENT(ptr->ack_index));
2884                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2885             }
2886 #endif
2887 #endif
2888 #if CMK_WITH_STATS
2889             RDMA_TRY_SEND(pd->type)
2890 #endif
2891             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2892             {
2893                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2894             }
2895             else
2896             {
2897                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2898             }
2899             CMI_GNI_UNLOCK(lock);
2900             
2901             if(status == GNI_RC_SUCCESS)    //post good
2902             {
2903 #if !CMK_SMP
2904                 tmp_ptr = ptr;
2905                 if(pre != 0) {
2906                     pre->next = ptr->next;
2907                 }
2908                 else {
2909                     sendRdmaBuf = ptr->next;
2910                 }
2911                 ptr = ptr->next;
2912                 FreeRdmaRequest(tmp_ptr);
2913 #endif
2914                 if(pd->cqwrite_value == 0)
2915                 {
2916 #if CMK_SMP_TRACE_COMMTHREAD 
2917                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2918 #endif
2919                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2920                 }
2921 #if  CMK_WITH_STATS
2922                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2923                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2924 #endif
2925 #if MACHINE_DEBUG_LOG
2926                 buffered_recv_msg += register_size;
2927                 MACHSTATE(8, "GO request from buffered\n"); 
2928 #endif
2929 #if PRINT_SYH
2930                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
2931 #endif
2932             }else           // cannot post
2933             {
2934 #if CMK_SMP
2935                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2936 #else
2937                 pre = ptr;
2938                 ptr = ptr->next;
2939 #endif
2940 #if PRINT_SYH
2941                 printf("[%d] SendRdmaMsg: post failed. seqno: %d\n", myrank, pd->cqwrite_value);
2942 #endif
2943                 break;
2944             }
2945         } else          //memory registration fails
2946         {
2947 #if CMK_SMP
2948             PCQueuePush(sendRdmaBuf, (char*)ptr);
2949 #else
2950             pre = ptr;
2951             ptr = ptr->next;
2952 #endif
2953         }
2954     } //end while
2955 #if ! CMK_SMP
2956     if(ptr == 0)
2957         sendRdmaTail = pre;
2958 #endif
2959 }
2960
2961 // return 1 if all messages are sent
2962 static int SendBufferMsg(SMSG_QUEUE *queue)
2963 {
2964     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2965     CONTROL_MSG         *control_msg_tmp;
2966     gni_return_t        status;
2967     int                 done = 1;
2968     uint64_t            register_size;
2969     void                *register_addr;
2970     int                 index_previous = -1;
2971 #if CMI_EXERT_SEND_CAP
2972     int                 sent_cnt = 0;
2973 #endif
2974
2975 #if CMK_SMP
2976     int          index = 0;
2977 #if ONE_SEND_QUEUE
2978     memset(destpe_avail, 0, mysize * sizeof(char));
2979     for (index=0; index<1; index++)
2980     {
2981         int i, len = PCQueueLength(queue->sendMsgBuf);
2982         for (i=0; i<len; i++) 
2983         {
2984             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2985             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2986             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2987             if(ptr == NULL) break;
2988             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2989                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2990                 continue;
2991             }
2992 #else
2993 #if SMP_LOCKS
2994     int nonempty = PCQueueLength(nonEmptyQueues);
2995     for(index =0; index<nonempty; index++)
2996     {
2997         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2998         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2999         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
3000         if(current_list == NULL) break; 
3001         PCQueue current_queue= current_list-> sendSmsgBuf;
3002         CmiLock(current_list->lock);
3003         int i, len = PCQueueLength(current_queue);
3004         current_list->pushed = 0;
3005         CmiUnlock(current_list->lock);
3006 #else
3007     for(index =0; index<mysize; index++)
3008     {
3009         //if (index == myrank) continue;
3010         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3011         int i, len = PCQueueLength(current_queue);
3012 #endif
3013         for (i=0; i<len; i++) 
3014         {
3015             CMI_PCQUEUEPOP_LOCK(current_queue)
3016             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3017             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3018             if (ptr == 0) break;
3019 #endif
3020 #else
3021     int index = queue->smsg_head_index;
3022     while(index != -1)
3023     {
3024         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3025         pre = 0;
3026         while(ptr != 0)
3027         {
3028 #endif
3029             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3030             status = GNI_RC_ERROR_RESOURCE;
3031             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
3032                 /* connection not exists yet */
3033             }
3034             else
3035             switch(ptr->tag)
3036             {
3037             case SMALL_DATA_TAG:
3038                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3039                 if(status == GNI_RC_SUCCESS)
3040                 {
3041                     CmiFree(ptr->msg);
3042                 }
3043                 break;
3044             case LMSG_INIT_TAG:
3045                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3046                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
3047                 break;
3048             case ACK_TAG:
3049                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3050                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3051                 break;
3052             case BIG_MSG_TAG:
3053                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3054                 if(status == GNI_RC_SUCCESS)
3055                 {
3056                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3057                 }
3058                 break;
3059 #if CMK_PERSISTENT_COMM
3060             case PUT_DONE_TAG:
3061                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3062                 if(status == GNI_RC_SUCCESS)
3063                 {
3064                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3065                 }
3066                 break;
3067 #endif
3068 #if CMK_DIRECT
3069             case DIRECT_PUT_DONE_TAG:
3070                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3071                 if(status == GNI_RC_SUCCESS)
3072                 {
3073                     free((CMK_DIRECT_HEADER*)ptr->msg);
3074                 }
3075                 break;
3076
3077 #endif
3078             default:
3079                 printf("Weird tag\n");
3080                 CmiAbort("should not happen\n");
3081             }       // end switch
3082             if(status == GNI_RC_SUCCESS)
3083             {
3084 #if PRINT_SYH
3085                 buffered_smsg_counter--;
3086                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3087 #endif
3088 #if !CMK_SMP
3089                 tmp_ptr = ptr;
3090                 if(pre)
3091                 {
3092                     ptr = pre ->next = ptr->next;
3093                 }else
3094                 {
3095                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3096                 }
3097                 FreeMsgList(tmp_ptr);
3098 #else
3099                 FreeMsgList(ptr);
3100 #endif
3101 #if CMI_EXERT_SEND_CAP
3102                 if(++sent_cnt == SEND_CAP) break;
3103 #endif
3104             }else {
3105 #if CMK_SMP
3106 #if ONE_SEND_QUEUE
3107                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3108 #else
3109                 PCQueuePush(current_queue, (char*)ptr);
3110 #endif
3111 #else
3112                 pre = ptr;
3113                 ptr=ptr->next;
3114 #endif
3115                 done = 0;
3116                 if(status == GNI_RC_ERROR_RESOURCE)
3117                 {
3118 #if CMK_SMP && ONE_SEND_QUEUE 
3119                     destpe_avail[ptr->destNode] = 1;
3120 #else
3121                     break;
3122 #endif
3123                 }
3124             } 
3125         } //end while
3126 #if !CMK_SMP
3127         if(ptr == 0)
3128             queue->smsg_msglist_index[index].tail = pre;
3129         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3130         {
3131             if(index_previous != -1)
3132                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3133             else
3134                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3135         }
3136         else {
3137             index_previous = index;
3138         }
3139         index = queue->smsg_msglist_index[index].next;
3140 #else
3141 #if !ONE_SEND_QUEUE && SMP_LOCKS
3142         CmiLock(current_list->lock);
3143         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3144         {
3145             current_list->pushed = 1;
3146             PCQueuePush(nonEmptyQueues, current_list);
3147         }
3148         CmiUnlock(current_list->lock); 
3149 #endif
3150 #endif
3151
3152 #if CMI_EXERT_SEND_CAP
3153         if(sent_cnt == SEND_CAP) {
3154             done = 0;
3155             break;
3156         }
3157 #endif
3158     }   // end pooling for all cores
3159     return done;
3160 }
3161
3162 static void ProcessDeadlock();
3163 void LrtsAdvanceCommunication(int whileidle)
3164 {
3165     static int count = 0;
3166     /*  Receive Msg first */
3167 #if CMK_SMP_TRACE_COMMTHREAD
3168     double startT, endT;
3169 #endif
3170     if (useDynamicSMSG && whileidle)
3171     {
3172 #if CMK_SMP_TRACE_COMMTHREAD
3173         startT = CmiWallTimer();
3174 #endif
3175         PumpDatagramConnection();
3176 #if CMK_SMP_TRACE_COMMTHREAD
3177         endT = CmiWallTimer();
3178         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3179 #endif
3180     }
3181
3182 #if CMK_SMP_TRACE_COMMTHREAD
3183     startT = CmiWallTimer();
3184 #endif
3185     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3186     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3187 #if CMK_SMP_TRACE_COMMTHREAD
3188     endT = CmiWallTimer();
3189     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3190 #endif
3191
3192 #if CMK_SMP_TRACE_COMMTHREAD
3193     startT = CmiWallTimer();
3194 #endif
3195     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3196     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3197 #if CMK_SMP_TRACE_COMMTHREAD
3198     endT = CmiWallTimer();
3199     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3200 #endif
3201
3202 #if CMK_SMP_TRACE_COMMTHREAD
3203     startT = CmiWallTimer();
3204 #endif
3205     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3206
3207 #if CQWRITE
3208     PumpCqWriteTransactions();
3209 #endif
3210
3211 #if REMOTE_EVENT
3212     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions());
3213 #endif
3214
3215     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3216 #if CMK_SMP_TRACE_COMMTHREAD
3217     endT = CmiWallTimer();
3218     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3219 #endif
3220  
3221 #if CMK_SMP_TRACE_COMMTHREAD
3222     startT = CmiWallTimer();
3223 #endif
3224     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3225     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3226 #if CMK_SMP_TRACE_COMMTHREAD
3227     endT = CmiWallTimer();
3228     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3229 #endif
3230
3231     /* Send buffered Message */
3232 #if CMK_SMP_TRACE_COMMTHREAD
3233     startT = CmiWallTimer();
3234 #endif
3235 #if CMK_USE_OOB
3236     if (SendBufferMsg(&smsg_oob_queue) == 1)
3237 #endif
3238     {
3239         STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
3240     }
3241     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3242 #if CMK_SMP_TRACE_COMMTHREAD
3243     endT = CmiWallTimer();
3244     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3245 #endif
3246
3247 #if CMK_SMP && ! LARGEPAGE
3248     if (_detected_hang)  ProcessDeadlock();
3249 #endif
3250 }
3251
3252 /* useDynamicSMSG */
3253 static void _init_dynamic_smsg()
3254 {
3255     gni_return_t status;
3256     uint32_t     vmdh_index = -1;
3257     int i;
3258
3259     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3260     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3261     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3262     for(i=0; i<mysize; i++) {
3263         smsg_connected_flag[i] = 0;
3264         smsg_attr_vector_local[i] = NULL;
3265         smsg_attr_vector_remote[i] = NULL;
3266     }
3267     if(mysize <=512)
3268     {
3269         SMSG_MAX_MSG = 4096;
3270     }else if (mysize <= 4096)
3271     {
3272         SMSG_MAX_MSG = 4096/mysize * 1024;
3273     }else if (mysize <= 16384)
3274     {
3275         SMSG_MAX_MSG = 512;
3276     }else {
3277         SMSG_MAX_MSG = 256;
3278     }
3279
3280     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3281     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3282     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3283     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3284     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3285
3286     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3287     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3288     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3289     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3290     mailbox_list->offset = 0;
3291     mailbox_list->next = 0;
3292     
3293     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3294         mailbox_list->size, smsg_rx_cqh,
3295         GNI_MEM_READWRITE,   
3296         vmdh_index,
3297         &(mailbox_list->mem_hndl));
3298     GNI_RC_CHECK("MEMORY registration for smsg", status);
3299
3300     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3301     GNI_RC_CHECK("Unbound EP", status);
3302     
3303     alloc_smsg_attr(&send_smsg_attr);
3304
3305     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3306     GNI_RC_CHECK("post unbound datagram", status);
3307
3308       /* always pre-connect to proc 0 */
3309     //if (myrank != 0) connect_to(0);
3310 }
3311
3312 static void _init_static_smsg()
3313 {
3314     gni_smsg_attr_t      *smsg_attr;
3315     gni_smsg_attr_t      remote_smsg_attr;
3316     gni_smsg_attr_t      *smsg_attr_vec;
3317     gni_mem_handle_t     my_smsg_mdh_mailbox;
3318     int      ret, i;
3319     gni_return_t status;
3320     uint32_t              vmdh_index = -1;
3321     mdh_addr_t            base_infor;
3322     mdh_addr_t            *base_addr_vec;
3323     char *env;
3324
3325     if(mysize <=512)
3326     {
3327         SMSG_MAX_MSG = 1024;
3328     }else if (mysize <= 4096)
3329     {
3330         SMSG_MAX_MSG = 1024;
3331     }else if (mysize <= 16384)
3332     {
3333         SMSG_MAX_MSG = 512;
3334     }else {
3335         SMSG_MAX_MSG = 256;
3336     }
3337     
3338     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3339     if (env) SMSG_MAX_MSG = atoi(env);
3340     CmiAssert(SMSG_MAX_MSG > 0);
3341
3342     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3343     
3344     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3345     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3346     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3347     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3348     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3349     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3350     CmiAssert(ret == 0);
3351     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3352     
3353     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3354             smsg_memlen*(mysize), smsg_rx_cqh,
3355             GNI_MEM_READWRITE,   
3356             vmdh_index,
3357             &my_smsg_mdh_mailbox);
3358     register_memory_size += smsg_memlen*(mysize);
3359     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3360
3361     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3362     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3363
3364     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3365     base_infor.mdh =  my_smsg_mdh_mailbox;
3366     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3367
3368     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3369  
3370     for(i=0; i<mysize; i++)
3371     {
3372         if(i==myrank)
3373             continue;
3374         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3375         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3376         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3377         smsg_attr[i].mbox_offset = i*smsg_memlen;
3378         smsg_attr[i].buff_size = smsg_memlen;
3379         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3380         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3381     }
3382
3383     for(i=0; i<mysize; i++)
3384     {
3385         if (myrank == i) continue;
3386
3387         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3388         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3389         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3390         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3391         remote_smsg_attr.buff_size = smsg_memlen;
3392         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3393         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3394
3395         /* initialize the smsg channel */
3396         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3397         GNI_RC_CHECK("SMSG Init", status);
3398     } //end initialization
3399
3400     free(base_addr_vec);
3401     free(smsg_attr);
3402
3403     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3404     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3405
3406
3407 inline
3408 static void _init_send_queue(SMSG_QUEUE *queue)
3409 {
3410      int i;
3411 #if ONE_SEND_QUEUE
3412      queue->sendMsgBuf = PCQueueCreate();
3413      destpe_avail = (char*)malloc(mysize * sizeof(char));
3414 #else
3415      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3416 #if CMK_SMP && SMP_LOCKS
3417      nonEmptyQueues = PCQueueCreate();
3418 #endif
3419      for(i =0; i<mysize; i++)
3420      {
3421 #if CMK_SMP
3422          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3423 #if SMP_LOCKS
3424          queue->smsg_msglist_index[i].pushed = 0;
3425          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3426 #endif
3427 #else
3428          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3429          queue->smsg_msglist_index[i].next = -1;
3430          queue->smsg_head_index = -1;
3431 #endif
3432         
3433      }
3434 #endif
3435 }
3436
3437 inline
3438 static void _init_smsg()
3439 {
3440     if(mysize > 1) {
3441         if (useDynamicSMSG)
3442             _init_dynamic_smsg();
3443         else
3444             _init_static_smsg();
3445     }
3446
3447     _init_send_queue(&smsg_queue);
3448 #if CMK_USE_OOB
3449     _init_send_queue(&smsg_oob_queue);
3450 #endif
3451 }
3452
3453 static void _init_static_msgq()
3454 {
3455     gni_return_t status;
3456     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3457     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3458     msgq_attrs.smsg_q_sz = 1;
3459     msgq_attrs.rcv_pool_sz = 1;
3460     msgq_attrs.num_msgq_eps = 2;
3461     msgq_attrs.nloc_insts = 8;
3462     msgq_attrs.modes = 0;
3463     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3464
3465     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3466     GNI_RC_CHECK("MSGQ Init", status);
3467
3468
3469 }
3470
3471
3472 static CmiUInt8 total_mempool_size = 0;
3473 static CmiUInt8 total_mempool_calls = 0;
3474
3475 #if USE_LRTS_MEMPOOL
3476 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3477 {
3478     void *pool;
3479     int ret;
3480     gni_return_t status = GNI_RC_SUCCESS;
3481
3482     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3483     if (*size < default_size) *size = default_size;
3484 #if LARGEPAGE
3485     // round up to be multiple of _tlbpagesize
3486     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3487     *size = ALIGNHUGEPAGE(*size);
3488 #endif
3489     total_mempool_size += *size;
3490     total_mempool_calls += 1;
3491 #if   !LARGEPAGE
3492     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3493     {
3494         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3495         CmiAbort("alloc_mempool_block");
3496     }
3497 #endif
3498 #if LARGEPAGE
3499     pool = my_get_huge_pages(*size);
3500     ret = pool==NULL;
3501 #else
3502     ret = posix_memalign(&pool, ALIGNBUF, *size);
3503 #endif
3504     if (ret != 0) {
3505 #if CMK_SMP && STEAL_MEMPOOL
3506       pool = steal_mempool_block(size, mem_hndl);
3507       if (pool != NULL) return pool;
3508 #endif
3509       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3510       if (ret == ENOMEM)
3511         CmiAbort("alloc_mempool_block: out of memory.");
3512       else
3513         CmiAbort("alloc_mempool_block: posix_memalign failed");
3514     }
3515 #if LARGEPAGE
3516     CmiMemLock();
3517     register_count++;
3518     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3519     CmiMemUnlock();
3520     if(status != GNI_RC_SUCCESS) {
3521         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3522 sweep_mempool(CpvAccess(mempool));
3523     }
3524     GNI_RC_CHECK("MEMORY_REGISTER", status);
3525 #else
3526     SetMemHndlZero((*mem_hndl));
3527 #endif
3528     return pool;
3529 }
3530
3531 // ptr is a block head pointer
3532 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3533 {
3534     if(!(IsMemHndlZero(mem_hndl)))
3535     {
3536         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3537     }
3538 #if LARGEPAGE
3539     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3540 #else
3541     free(ptr);
3542 #endif
3543 }
3544 #endif
3545
3546 void LrtsPreCommonInit(int everReturn){
3547 #if USE_LRTS_MEMPOOL
3548     CpvInitialize(mempool_type*, mempool);
3549     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3550     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3551 #endif
3552 }
3553
3554 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3555 {
3556     register int            i;
3557     int                     rc;
3558     int                     device_id = 0;
3559     unsigned int            remote_addr;
3560     gni_cdm_handle_t        cdm_hndl;
3561     gni_return_t            status = GNI_RC_SUCCESS;
3562     uint32_t                vmdh_index = -1;
3563     uint8_t                 ptag;
3564     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3565     int                     first_spawned;
3566     int                     physicalID;
3567     char                   *env;
3568
3569     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3570     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3571     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3572    
3573     status = PMI_Init(&first_spawned);
3574     GNI_RC_CHECK("PMI_Init", status);
3575
3576     status = PMI_Get_size(&mysize);
3577     GNI_RC_CHECK("PMI_Getsize", status);
3578
3579     status = PMI_Get_rank(&myrank);
3580     GNI_RC_CHECK("PMI_getrank", status);
3581
3582     //physicalID = CmiPhysicalNodeID(myrank);
3583     
3584     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3585
3586     *myNodeID = myrank;
3587     *numNodes = mysize;
3588   
3589 #if MULTI_THREAD_SEND
3590     /* Currently, we only consider the case that comm. thread will only recv msgs */
3591     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3592 #endif
3593
3594     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3595     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3596     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3597
3598     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3599     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3600     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3601
3602     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3603     if (env) useDynamicSMSG = 1;
3604     if (!useDynamicSMSG)
3605       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3606     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3607     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3608     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3609     
3610     if(myrank == 0)
3611     {
3612         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3613         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3614     }
3615 #ifdef USE_ONESIDED
3616     onesided_init(NULL, &onesided_hnd);
3617
3618     // this is a GNI test, so use the libonesided bypass functionality
3619     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3620     local_addr = gniGetNicAddress();
3621 #else
3622     ptag = get_ptag();
3623     cookie = get_cookie();
3624 #if 0
3625     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3626 #endif
3627     //Create and attach to the communication  domain */
3628     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3629     GNI_RC_CHECK("GNI_CdmCreate", status);
3630     //* device id The device id is the minor number for the device
3631     //that is assigned to the device by the system when the device is created.
3632     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3633     //where X is the device number 0 default 
3634     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3635     GNI_RC_CHECK("GNI_CdmAttach", status);
3636     local_addr = get_gni_nic_address(0);
3637 #endif
3638     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3639     _MEMCHECK(MPID_UGNI_AllAddr);
3640     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3641     /* create the local completion queue */
3642     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3643     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3644     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3645     
3646     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3647     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3648     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3649
3650     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3651     GNI_RC_CHECK("Create CQ (rx)", status);
3652     
3653     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3654     GNI_RC_CHECK("Create Post CQ (rx)", status);
3655     
3656     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3657     //GNI_RC_CHECK("Create BTE CQ", status);
3658
3659     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3660     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3661     _MEMCHECK(ep_hndl_array);
3662 #if MULTI_THREAD_SEND 
3663     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3664     //default_tx_cq_lock = CmiCreateLock();
3665     rdma_tx_cq_lock = CmiCreateLock();
3666     smsg_rx_cq_lock = CmiCreateLock();
3667     //global_gni_lock  = CmiCreateLock();
3668     //rx_cq_lock  = CmiCreateLock();
3669 #endif
3670     for (i=0; i<mysize; i++) {
3671         if(i == myrank) continue;
3672         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3673         GNI_RC_CHECK("GNI_EpCreate ", status);   
3674         remote_addr = MPID_UGNI_AllAddr[i];
3675         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3676         GNI_RC_CHECK("GNI_EpBind ", status);   
3677     }
3678
3679     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3680     _init_smsg();
3681     PMI_Barrier();
3682
3683 #if     USE_LRTS_MEMPOOL
3684     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3685     if (env) {
3686         _totalmem = CmiReadSize(env);
3687         if (myrank == 0)
3688             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3689     }
3690
3691     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3692     if (env) _mempool_size = CmiReadSize(env);
3693     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3694         _mempool_size = CmiReadSize(env);
3695
3696
3697     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3698     if (env) {
3699         MAX_REG_MEM = CmiReadSize(env);
3700         user_set_flag = 1;
3701     }
3702     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3703         MAX_REG_MEM = CmiReadSize(env);
3704         user_set_flag = 1;
3705     }
3706
3707     env = getenv("CHARM_UGNI_SEND_MAX");
3708     if (env) {
3709         MAX_BUFF_SEND = CmiReadSize(env);
3710         user_set_flag = 1;
3711     }
3712     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3713         MAX_BUFF_SEND = CmiReadSize(env);
3714         user_set_flag = 1;
3715     }
3716
3717     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3718     if (env) {
3719         _mempool_size_limit = CmiReadSize(env);
3720     }
3721
3722     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3723     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3724
3725     if (myrank==0) {
3726         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3727         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3728         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3729             /* memblock can expand to BIG_MSG * 2 size */
3730             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3731             CmiAbort("mempool maximum size is too small. \n");
3732         }
3733 #if MULTI_THREAD_SEND
3734         printf("Charm++> worker thread sending messages\n");
3735 #elif COMM_THREAD_SEND
3736         printf("Charm++> only comm thread send/recv messages\n");
3737 #endif
3738     }
3739
3740 #endif     /* end of USE_LRTS_MEMPOOL */
3741
3742     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3743     if (env) {
3744         BIG_MSG = CmiReadSize(env);
3745         if (BIG_MSG < ONE_SEG)
3746           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3747     }
3748     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3749     if (env) {
3750         BIG_MSG_PIPELINE = atoi(env);
3751     }
3752
3753     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3754     if (env) _checkProgress = 0;
3755     if (mysize == 1) _checkProgress = 0;
3756
3757     
3758     /*
3759     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3760     if (env) 
3761         _tlbpagesize = CmiReadSize(env);
3762     */
3763     /* real gethugepagesize() is only available when hugetlb module linked */
3764     _tlbpagesize = gethugepagesize();
3765     if (myrank == 0) {
3766         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3767     }
3768
3769 #if LARGEPAGE
3770     if (_tlbpagesize == 4096) {
3771         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3772     }
3773 #endif
3774
3775     print_stats = CmiGetArgFlag(*argv, "+print_stats");
3776     
3777     stats_off = CmiGetArgFlag(*argv, "+stats_off");
3778
3779     /* init DMA buffer for medium message */
3780
3781     //_init_DMA_buffer();
3782     
3783     free(MPID_UGNI_AllAddr);
3784 #if CMK_SMP
3785     sendRdmaBuf = PCQueueCreate();
3786 #else
3787     sendRdmaBuf = 0;
3788 #endif
3789 #if MACHINE_DEBUG_LOG
3790     char ln[200];
3791     sprintf(ln,"debugLog.%d",myrank);
3792     debugLog=fopen(ln,"w");
3793 #endif
3794
3795 //    NTK_Init();
3796 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3797
3798 #if  REMOTE_EVENT
3799     IndexPool_init(&ackPool);
3800 #if CMK_PERSISTENT_COMM
3801     IndexPool_init(&persistPool);
3802 #endif
3803 #endif
3804
3805 #if CMK_WITH_STATS
3806     init_comm_stats();
3807 #endif
3808 }
3809
3810 void* LrtsAlloc(int n_bytes, int header)
3811 {
3812     void *ptr = NULL;
3813 #if 0
3814     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3815 #endif
3816     if(n_bytes <= SMSG_MAX_MSG)
3817     {
3818         int totalsize = n_bytes+header;
3819         ptr = malloc(totalsize);
3820     }
3821     else {
3822         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3823 #if     USE_LRTS_MEMPOOL
3824         n_bytes = ALIGN64(n_bytes);
3825         if(n_bytes < BIG_MSG)
3826         {
3827             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3828             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3829         }else 
3830         {
3831 #if LARGEPAGE
3832             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3833             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3834             char *res = my_get_huge_pages(n_bytes);
3835 #else
3836             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3837 #endif
3838             if (res) ptr = res + ALIGNBUF - header;
3839         }
3840 #else
3841         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3842         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3843         ptr = res + ALIGNBUF - header;
3844 #endif
3845     }
3846 #if CMK_PERSISTENT_COMM
3847     if (ptr) SetMemHndlZero(MEMHFIELD((char*)ptr+header));
3848 #endif
3849     return ptr;
3850 }
3851
3852 void  LrtsFree(void *msg)
3853 {
3854     CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3855 #if CMK_PERSISTENT_COMM
3856     if (!IsMemHndlZero(MEMHFIELD((char*)msg+sizeof(CmiChunkHeader)))) return;
3857 #endif
3858     if (size <= SMSG_MAX_MSG)
3859         free(msg);
3860     else {
3861         size = ALIGN64(size);
3862         if(size>=BIG_MSG)
3863         {
3864 #if LARGEPAGE
3865             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3866             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3867 #else
3868             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3869 #endif
3870         }
3871         else {
3872 #if    USE_LRTS_MEMPOOL
3873 #if CMK_SMP
3874             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3875 #else
3876             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3877 #endif
3878 #else
3879             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3880 #endif
3881         }
3882     }
3883 }
3884
3885 void LrtsExit()
3886 {
3887 #if CMK_WITH_STATS
3888 #if CMK_SMP
3889     if(CmiMyRank() == CmiMyNodeSize())
3890 #endif
3891     if (print_stats) print_comm_stats();
3892 #endif
3893     /* free memory ? */
3894 #if USE_LRTS_MEMPOOL
3895     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
3896     mempool_destroy(CpvAccess(mempool));
3897 #endif
3898     PMI_Finalize();
3899     exit(0);
3900 }
3901
3902 void LrtsDrainResources()
3903 {
3904     if(mysize == 1) return;
3905     while (
3906 #if CMK_USE_OOB
3907            !SendBufferMsg(&smsg_oob_queue) ||
3908 #endif
3909            !SendBufferMsg(&smsg_queue) 
3910           )
3911     {
3912         if (useDynamicSMSG)
3913             PumpDatagramConnection();
3914         PumpNetworkSmsg();
3915         PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3916         PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
3917         PumpRemoteTransactions();
3918         SendRdmaMsg();
3919     }
3920     PMI_Barrier();
3921 }
3922
3923 void LrtsAbort(const char *message) {
3924     fprintf(stderr, "[%d] CmiAbort: %s\n", myrank, message);
3925     CmiPrintStackTrace(0);
3926     PMI_Abort(-1, message);
3927 }
3928
3929 /**************************  TIMER FUNCTIONS **************************/
3930 #if CMK_TIMER_USE_SPECIAL
3931 /* MPI calls are not threadsafe, even the timer on some machines */
3932 static CmiNodeLock  timerLock = 0;
3933 static int _absoluteTime = 0;
3934 static int _is_global = 0;
3935 static struct timespec start_ts;
3936
3937 inline int CmiTimerIsSynchronized() {
3938     return 0;
3939 }
3940
3941 inline int CmiTimerAbsolute() {
3942     return _absoluteTime;
3943 }
3944
3945 double CmiStartTimer() {
3946     return 0.0;
3947 }
3948
3949 double CmiInitTime() {
3950     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
3951 }
3952
3953 void CmiTimerInit(char **argv) {
3954     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3955     if (_absoluteTime && CmiMyPe() == 0)
3956         printf("Charm++> absolute  timer is used\n");
3957     
3958     _is_global = CmiTimerIsSynchronized();
3959
3960
3961     if (_is_global) {
3962         if (CmiMyRank() == 0) {
3963             clock_gettime(CLOCK_MONOTONIC, &start_ts);
3964         }
3965     } else { /* we don't have a synchronous timer, set our own start time */
3966         CmiBarrier();
3967         CmiBarrier();
3968         CmiBarrier();
3969         clock_gettime(CLOCK_MONOTONIC, &start_ts);
3970     }
3971     CmiNodeAllBarrier();          /* for smp */
3972 }
3973
3974 /**
3975  * Since the timerLock is never created, and is
3976  * always NULL, then all the if-condition inside
3977  * the timer functions could be disabled right
3978  * now in the case of SMP.
3979  */
3980 double CmiTimer(void) {
3981     struct timespec now_ts;
3982     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3983     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3984         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3985 }
3986
3987 double CmiWallTimer(void) {
3988     struct timespec now_ts;
3989     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3990     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3991         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
3992 }
3993
3994 double CmiCpuTimer(void) {
3995     struct timespec now_ts;
3996     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3997     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3998         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3999 }
4000
4001 #endif
4002 /************Barrier Related Functions****************/
4003
4004 int CmiBarrier()
4005 {
4006     gni_return_t status;
4007
4008 #if CMK_SMP
4009     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
4010     CmiNodeAllBarrier();
4011     if (CmiMyRank() == CmiMyNodeSize())
4012 #else
4013     if (CmiMyRank() == 0)
4014 #endif
4015     {
4016         /**
4017          *  The call of CmiBarrier is usually before the initialization
4018          *  of trace module of Charm++, therefore, the START_EVENT
4019          *  and END_EVENT are disabled here. -Chao Mei
4020          */
4021         /*START_EVENT();*/
4022         status = PMI_Barrier();
4023         GNI_RC_CHECK("PMI_Barrier", status);
4024         /*END_EVENT(10);*/
4025     }
4026     CmiNodeAllBarrier();
4027     return 0;
4028
4029 }
4030 #if CMK_DIRECT
4031 #include "machine-cmidirect.c"
4032 #endif
4033 #if CMK_PERSISTENT_COMM
4034 #include "machine-persistent.c"
4035 #endif
4036
4037